Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 83 additions & 15 deletions py_gasbuddy/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,31 @@ def parse_distance(value: Any) -> float | None:
return None


def build_discount_map(offers: list[dict[str, Any]]) -> dict[str, float]:
def build_discount_map(offers: list[Any]) -> dict[str, float]:
"""Return fuel product key to total pwgbDiscount mapping across all offers."""
discount_map: dict[str, float] = {}
for offer in offers:
for disc in offer.get("discounts") or []:
if not isinstance(offer, dict):
continue
discounts = offer.get("discounts")
if not isinstance(discounts, list):
continue
for disc in discounts:
if not isinstance(disc, dict):
continue
raw = disc.get("pwgbDiscount")
if raw is None:
continue
try:
amount = float(raw)
except (ValueError, TypeError):
continue
for grade in disc.get("grades") or []:
grades = disc.get("grades")
if not isinstance(grades, list):
continue
for grade in grades:
if not isinstance(grade, str):
continue
discount_map[grade] = discount_map.get(grade, 0.0) + amount
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return discount_map

Expand All @@ -46,8 +58,12 @@ def format_price_node(
price_node: dict[str, Any], deal_discount: float | None = None
) -> PriceNode:
"""Format a single price node."""
credit_data = price_node.get("credit") or {}
cash_data = price_node.get("cash") or {}
credit_data = price_node.get("credit")
if not isinstance(credit_data, dict):
credit_data = {}
cash_data = price_node.get("cash")
if not isinstance(cash_data, dict):
cash_data = {}

credit_price = credit_data.get("price", 0)
cash_price = cash_data.get("price", 0)
Expand All @@ -67,15 +83,39 @@ def format_price_node(
)


def _stations_block(response: dict[str, Any]) -> dict[str, Any]:
"""Return the ``stations`` dict from a locationBySearchTerm response.

Defensive against the GraphQL spec's allowance of partial responses
(``data: null``, missing keys, ``stations`` being null or a non-dict).
Returns an empty dict when the expected shape isn't present so callers
can downgrade to "no results" rather than crash.
"""
data = response.get("data")
if not isinstance(data, dict):
return {}
location = data.get("locationBySearchTerm")
if not isinstance(location, dict):
return {}
stations = location.get("stations")
return stations if isinstance(stations, dict) else {}

Comment thread
coderabbitai[bot] marked this conversation as resolved.

def parse_cursor(response: dict[str, Any]) -> str | None:
"""Extract the next-page cursor from a locationBySearchTerm response."""
stations = response["data"]["locationBySearchTerm"]["stations"]
return (stations.get("cursor") or {}).get("next")
stations = _stations_block(response)
cursor = stations.get("cursor")
if not isinstance(cursor, dict):
return None
return cursor.get("next")


def parse_location_results(response: dict[str, Any]) -> LocationSearchResult:
"""Parse location search results into a LocationSearchResult."""
stations = response["data"]["locationBySearchTerm"]["stations"]
stations = _stations_block(response)
raw_results = stations.get("results") or []
if not isinstance(raw_results, list):
raw_results = []
results = [
cast(
StationSummary,
Expand All @@ -91,7 +131,8 @@ def parse_location_results(response: dict[str, Any]) -> LocationSearchResult:
"price_unit": r.get("priceUnit"),
},
)
for r in stations["results"]
for r in raw_results
if isinstance(r, dict) and "id" in r
]
return cast(
LocationSearchResult,
Expand All @@ -102,7 +143,15 @@ def parse_location_results(response: dict[str, Any]) -> LocationSearchResult:
def parse_results(response: dict[str, Any], limit: int) -> list[StationPrice]:
"""Parse price-service API results into a StationPrice list."""
result_list: list[StationPrice] = []
results = response["data"]["locationBySearchTerm"]["stations"]["results"]
raw_results = _stations_block(response).get("results") or []
if not isinstance(raw_results, list):
raw_results = []
required_station_keys = {"id", "priceUnit", "currency", "latitude", "longitude"}
results = [
r
for r in raw_results
if isinstance(r, dict) and required_station_keys.issubset(r)
]

for result in results[:limit]:
raw: dict[str, Any] = {
Expand All @@ -128,21 +177,26 @@ def parse_results(response: dict[str, Any], limit: int) -> list[StationPrice]:
"phone": result.get("phone") or None,
}
pay_status_obj = result.get("payStatus")
is_pay_available = (pay_status_obj is None) or bool(
(pay_status_obj or {}).get("isPayAvailable", False)
is_pay_available = (pay_status_obj is None) or (
isinstance(pay_status_obj, dict)
and bool(pay_status_obj.get("isPayAvailable", False))
)
raw["pay_status"] = is_pay_available
offers = result.get("offers") or []
offers = [o for o in (result.get("offers") or []) if isinstance(o, dict)]
discount_map = build_discount_map(offers) if is_pay_available else {}
for price in result.get("prices") or []:
if not isinstance(price, dict) or "fuelProduct" not in price:
continue
fuel_key = price["fuelProduct"]
raw[fuel_key] = format_price_node(price, discount_map.get(fuel_key))
result_list.append(cast(StationPrice, raw))
return result_list


def parse_ev_stations(stations_data: list[dict[str, Any]]) -> list[EvStation]:
def parse_ev_stations(stations_data: Any) -> list[EvStation]:
"""Parse raw EV station dicts into EvStation list."""
if not isinstance(stations_data, list):
return []
return [
cast(
EvStation,
Expand Down Expand Up @@ -179,16 +233,30 @@ def parse_ev_stations(stations_data: list[dict[str, Any]]) -> list[EvStation]:
},
)
for s in stations_data
if isinstance(s, dict) and "id" in s
]


def parse_trends(response: dict[str, Any]) -> list[TrendData]:
"""Parse price-service API results into a TrendData list."""
data = response.get("data")
if not isinstance(data, dict):
return []
location = data.get("locationBySearchTerm")
if not isinstance(location, dict):
return []
trends = location.get("trends") or []
if not isinstance(trends, list):
return []
return [
TrendData(
average_price=trend["today"],
lowest_price=trend["todayLow"],
area=trend["areaName"],
)
for trend in response["data"]["locationBySearchTerm"]["trends"]
for trend in trends
if isinstance(trend, dict)
and "today" in trend
and "todayLow" in trend
and "areaName" in trend
]
12 changes: 9 additions & 3 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,10 @@ async def test_retry_logic(mock_aioclient, caplog):
)
# Patch asyncio.sleep used by backoff so the test doesn't actually
# wait through the exponential delay between retries.
with caplog.at_level(logging.DEBUG), patch("backoff._async.asyncio.sleep", new=AsyncMock()):
with (
caplog.at_level(logging.DEBUG),
patch("backoff._async.asyncio.sleep", new=AsyncMock()),
):
with pytest.raises(py_gasbuddy.LibraryError):
manager = py_gasbuddy.GasBuddy(station_id=205033)
await manager.price_lookup()
Expand All @@ -358,11 +361,14 @@ async def test_retry_succeeds_on_second_attempt(mock_aioclient, caplog):
mock_aioclient.post(
TEST_URL,
status=403,
body='<!DOCTYPE html><html><title>Just a moment...</title></html>',
body="<!DOCTYPE html><html><title>Just a moment...</title></html>",
)
mock_aioclient.post(TEST_URL, status=200, body=load_fixture("station.json"))

with caplog.at_level(logging.DEBUG), patch("backoff._async.asyncio.sleep", new=AsyncMock()):
with (
caplog.at_level(logging.DEBUG),
patch("backoff._async.asyncio.sleep", new=AsyncMock()),
):
manager = py_gasbuddy.GasBuddy(station_id=205033)
data = await manager.price_lookup()

Expand Down
Loading
Loading