From 5ce356d0f5f7dc4ac39b3144cfaa75d914ffe47a Mon Sep 17 00:00:00 2001 From: Harvey <8107750+bharvey88@users.noreply.github.com> Date: Tue, 26 May 2026 07:07:21 -0500 Subject: [PATCH 1/3] fix: serialise concurrent cache writes and CSRF refreshes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related concurrency hazards in the token-cache machinery: 1. `GasBuddyCache.write_cache` opened the target path directly. With a shared cache (HA coordinator + a parallel service call), two coroutines could both call `_get_headers` after expiry, both fetch a new token, and interleave bytes into the cache file — a corrupt payload caught next call by JSONDecodeError, falling back to an empty dict, and wasting a token-refresh round trip on the next call. Write to a uniquely-named sibling tempfile and `aiofiles.os.replace` onto the final path. `os.replace` is atomic on POSIX and Windows ≥Vista. Also clean up the tempfile on write failure. 2. `_get_headers` had no in-process mutual exclusion. Two coroutines racing on a cold cache both hit `gasbuddy.com/home` and both wrote. With #1 the writes are now safe, but the duplicate HTTP is still wasted. Add an `asyncio.Lock` (`self._token_lock`) and a per-cache `self._lock` in `GasBuddyCache`. The token-refresh block re-checks `_cf_last` after acquiring the lock to absorb the case where the previous holder just populated the token. Extracted the actual refresh body into `_refresh_token` for readability. Co-Authored-By: Claude Opus 4.7 --- py_gasbuddy/__init__.py | 23 ++++++++++++++++++++++- py_gasbuddy/cache.py | 38 ++++++++++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/py_gasbuddy/__init__.py b/py_gasbuddy/__init__.py index da865ad..5cdeb3c 100644 --- a/py_gasbuddy/__init__.py +++ b/py_gasbuddy/__init__.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json import logging import re @@ -131,6 +132,10 @@ def __init__( self._cache_manager: GasBuddyCache | None = None self._timeout = timeout self._session = session + # Serialise CSRF refreshes within a single process — without this, + # two concurrent callers on a cold cache both GET /home and both + # write the token file. + self._token_lock = asyncio.Lock() async def process_request(self, query: GraphQLQuery) -> dict[str, Any]: """Process API requests. @@ -702,8 +707,24 @@ async def _get_headers(self) -> None: if self._cf_last is None or self._cf_last: return - _LOGGER.debug("Token invalid, getting a new one...") + # Serialise concurrent token-refresh attempts within this + # GasBuddy instance. After acquiring the lock, re-check whether + # the previous holder already populated the token so we don't + # double-fetch. + async with self._token_lock: + if self._cf_last is True and self._tag: + return + _LOGGER.debug("Token invalid, getting a new one...") + await self._refresh_token(url, method, json_data) + + async def _refresh_token( + self, + url: str, + method: str, + json_data: Any, + ) -> None: + """GET /home (or solver) and persist the extracted CSRF token.""" csrf_timeout = aiohttp.ClientTimeout(total=self._timeout / 1000) async with self._get_session() as session: http_method = getattr(session, method) diff --git a/py_gasbuddy/cache.py b/py_gasbuddy/cache.py index e82006c..00a9e0d 100644 --- a/py_gasbuddy/cache.py +++ b/py_gasbuddy/cache.py @@ -1,7 +1,10 @@ """Cache functions for py-gasbuddy.""" +import asyncio import json import logging +import os +import uuid from pathlib import Path from typing import Any @@ -20,15 +23,38 @@ def __init__(self, cache_file: str = "") -> None: self._cache_file = Path.home() / ".cache" / "py_gasbuddy" / "token" else: self._cache_file = Path(cache_file) + # Serialise cache mutations within a single process. The HA + # coordinator + a parallel service call could otherwise race + # both reading and writing the same token file. + self._lock = asyncio.Lock() async def write_cache(self, data: Any) -> None: - """Write cache file.""" - # Create parent directories if they don't exist - if not await aiofiles.os.path.exists(self._cache_file.parent): - await aiofiles.os.makedirs(self._cache_file.parent) + """Atomically write the cache file. - async with aiofiles.open(self._cache_file, mode="wb") as file: - await file.write(data) + Writes to a uniquely-named sibling tempfile and ``os.replace``s + onto the final path, so concurrent writers can't produce a torn + file. The asyncio lock further serialises in-process writers. + """ + async with self._lock: + # Create parent directories if they don't exist + if not await aiofiles.os.path.exists(self._cache_file.parent): + await aiofiles.os.makedirs(self._cache_file.parent) + + tmp_path = self._cache_file.with_name( + f".{self._cache_file.name}.{os.getpid()}.{uuid.uuid4().hex}.tmp" + ) + try: + async with aiofiles.open(tmp_path, mode="wb") as file: + await file.write(data) + # os.replace is atomic on POSIX and Windows ≥Vista. + await aiofiles.os.replace(tmp_path, self._cache_file) + except Exception: + # Best-effort cleanup of the tempfile on failure. + try: + await aiofiles.os.remove(tmp_path) + except OSError: + pass + raise async def read_cache(self) -> Any: """Read cache file.""" From 8bbf9508cf60f68064086f086eb4f05075616d9e Mon Sep 17 00:00:00 2001 From: Harvey <8107750+bharvey88@users.noreply.github.com> Date: Tue, 26 May 2026 12:37:33 -0500 Subject: [PATCH 2/3] address CodeRabbit feedback on cache concurrency - cache.py: drop the separate path.exists() check before makedirs and use exist_ok=True instead. Closes the TOCTOU race where a racing process creates the directory between our check and call. - __init__.py: set self._cf_last = True after a successful CSRF refresh so coroutines still queued on self._token_lock skip their own refresh attempt instead of re-fetching. Co-Authored-By: Claude Opus 4.7 --- py_gasbuddy/__init__.py | 11 ++++++++--- py_gasbuddy/cache.py | 7 ++++--- tests/test_init.py | 12 +++++++++--- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/py_gasbuddy/__init__.py b/py_gasbuddy/__init__.py index 5cdeb3c..9e35bb3 100644 --- a/py_gasbuddy/__init__.py +++ b/py_gasbuddy/__init__.py @@ -712,8 +712,8 @@ async def _get_headers(self) -> None: # the previous holder already populated the token so we don't # double-fetch. async with self._token_lock: - if self._cf_last is True and self._tag: - return + if self._cf_last is True and self._tag: # type: ignore[unreachable] + return # type: ignore[unreachable] _LOGGER.debug("Token invalid, getting a new one...") await self._refresh_token(url, method, json_data) @@ -759,7 +759,12 @@ async def _refresh_token( data[TOKEN] = self._tag encoded = json.dumps(data).encode("utf-8") _LOGGER.debug("CSRF token found: %s", self._tag) - await self._cache_manager.write_cache(encoded) + if self._cache_manager is not None: + await self._cache_manager.write_cache(encoded) + # Mark this instance as having a fresh token so + # any coroutine still queued on _token_lock skips + # its own refresh after we release the lock. + self._cf_last = True else: _LOGGER.error("CSRF token not found.") raise CSRFTokenMissing diff --git a/py_gasbuddy/cache.py b/py_gasbuddy/cache.py index 00a9e0d..4ac132e 100644 --- a/py_gasbuddy/cache.py +++ b/py_gasbuddy/cache.py @@ -36,9 +36,10 @@ async def write_cache(self, data: Any) -> None: file. The asyncio lock further serialises in-process writers. """ async with self._lock: - # Create parent directories if they don't exist - if not await aiofiles.os.path.exists(self._cache_file.parent): - await aiofiles.os.makedirs(self._cache_file.parent) + # Create parent directories if they don't exist. Use + # exist_ok=True so a racing process that creates the + # directory between our check and call doesn't blow up. + await aiofiles.os.makedirs(self._cache_file.parent, exist_ok=True) tmp_path = self._cache_file.with_name( f".{self._cache_file.name}.{os.getpid()}.{uuid.uuid4().hex}.tmp" diff --git a/tests/test_init.py b/tests/test_init.py index 00af1b1..133c7d7 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -338,7 +338,10 @@ async def test_retry_logic(mock_aioclient, caplog): ) # Patch asyncio.sleep used by backoff so the test doesn't actually # wait through the exponential delay between retries. - with caplog.at_level(logging.DEBUG), patch("backoff._async.asyncio.sleep", new=AsyncMock()): + with ( + caplog.at_level(logging.DEBUG), + patch("backoff._async.asyncio.sleep", new=AsyncMock()), + ): with pytest.raises(py_gasbuddy.LibraryError): manager = py_gasbuddy.GasBuddy(station_id=205033) await manager.price_lookup() @@ -358,11 +361,14 @@ async def test_retry_succeeds_on_second_attempt(mock_aioclient, caplog): mock_aioclient.post( TEST_URL, status=403, - body='Just a moment...', + body="Just a moment...", ) mock_aioclient.post(TEST_URL, status=200, body=load_fixture("station.json")) - with caplog.at_level(logging.DEBUG), patch("backoff._async.asyncio.sleep", new=AsyncMock()): + with ( + caplog.at_level(logging.DEBUG), + patch("backoff._async.asyncio.sleep", new=AsyncMock()), + ): manager = py_gasbuddy.GasBuddy(station_id=205033) data = await manager.price_lookup() From 64e6029edab63b0decfdee3a11bbf69ddbc2faa3 Mon Sep 17 00:00:00 2001 From: "firstof9@gmail.com" Date: Wed, 27 May 2026 17:05:40 -0700 Subject: [PATCH 3/3] test: add test coverage for cache atomicity and concurrency --- tests/test_init.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/test_init.py b/tests/test_init.py index 133c7d7..24deb9d 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1198,3 +1198,74 @@ async def test_location_search_pagination(mock_aioclient, caplog, tmp_path): result_err = await manager.location_search(zipcode=12345) assert result_err["results"] == [] assert result_err["next_cursor"] is None + + +async def test_cache_write_atomic_failure(tmp_path): + """Test that a write failure cleans up the tempfile and propagates the error.""" + from unittest.mock import patch + + cache_file = tmp_path / "test_cache" + cache = py_gasbuddy.cache.GasBuddyCache(str(cache_file)) + + # Mock aiofiles.os.replace to fail + with patch("aiofiles.os.replace", side_effect=OSError("Atomic replace failed")): + with pytest.raises(OSError, match="Atomic replace failed"): + await cache.write_cache(b"data") + + # The final cache file should not exist + assert not cache_file.exists() + + # The temp files in that directory should also be cleaned up (no tmp files) + tmp_files = list(tmp_path.glob(".*.tmp")) + assert not tmp_files + + +async def test_token_refresh_concurrency(mock_aioclient): + """Test that concurrent token-refresh attempts are serialized and only one HTTP fetch is executed.""" + import asyncio + from unittest.mock import patch + + # Mock GB_HOME_URL response + mock_aioclient.get( + GB_URL, + status=200, + body=load_fixture("index.html"), + ) + + manager = py_gasbuddy.GasBuddy() + + # Track how many times _refresh_token is invoked + original_refresh = manager._refresh_token + refresh_calls = 0 + + async def spy_refresh(*args, **kwargs): + nonlocal refresh_calls + refresh_calls += 1 + # Introduce a small sleep to simulate network delay, forcing the concurrency + await asyncio.sleep(0.05) + await original_refresh(*args, **kwargs) + + with patch.object(manager, "_refresh_token", side_effect=spy_refresh): + # Trigger concurrent _get_headers calls + await asyncio.gather( + manager._get_headers(), + manager._get_headers(), + ) + + # Only one token refresh should have occurred + assert refresh_calls == 1 + await manager.clear_cache() + + +async def test_cache_write_atomic_failure_cleanup_fails(tmp_path): + """Test that a write failure propagates correctly even when the tempfile cleanup itself fails with OSError.""" + from unittest.mock import patch + + cache_file = tmp_path / "test_cache" + cache = py_gasbuddy.cache.GasBuddyCache(str(cache_file)) + + # Mock replace to fail, and mock remove to fail as well + with patch("aiofiles.os.replace", side_effect=OSError("Atomic replace failed")): + with patch("aiofiles.os.remove", side_effect=OSError("Remove failed")): + with pytest.raises(OSError, match="Atomic replace failed"): + await cache.write_cache(b"data")