Skip to content

Commit f364bb3

Browse files
authored
Add files via upload
1 parent 77f5339 commit f364bb3

1 file changed

Lines changed: 270 additions & 0 deletions

File tree

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
"""
2+
data-engine/python/adapters/carbon_monitor.py
3+
Prexus Intelligence — Carbon Monitor Adapter
4+
Near-real-time CO₂ emissions by country and sector.
5+
Source: carbonmonitor.org — daily updates, ~5-day lag.
6+
Free API, no key required.
7+
8+
Provides:
9+
- CO₂ emissions intensity by country
10+
- Sector breakdown (power, industry, ground transport, residential, aviation)
11+
- Year-over-year emissions trend
12+
- Transition risk proxy score
13+
"""
14+
15+
import time
16+
import logging
17+
from datetime import datetime, timezone, timedelta
18+
from typing import Optional
19+
20+
import httpx
21+
22+
from .base import BaseAdapter, TelemetryRecord
23+
24+
logger = logging.getLogger("prexus.adapters.carbon_monitor")
25+
26+
27+
# Country-level carbon intensity benchmarks (tCO2 per MWh electricity)
28+
# Used as fallback when API is unavailable
29+
CARBON_INTENSITY_FALLBACK = {
30+
"IND": 0.71, "CHN": 0.62, "USA": 0.38, "GBR": 0.23,
31+
"DEU": 0.35, "FRA": 0.07, "JPN": 0.47, "AUS": 0.65,
32+
"BRA": 0.09, "ZAF": 0.91, "RUS": 0.37, "CAN": 0.15,
33+
"KOR": 0.46, "MEX": 0.44, "IDN": 0.73, "SAU": 0.68,
34+
"TUR": 0.47, "ARG": 0.30, "POL": 0.74, "NGA": 0.41,
35+
}
36+
37+
# Sector transition risk weights (how exposed is each sector to carbon pricing)
38+
SECTOR_TRANSITION_WEIGHTS = {
39+
"Power Industry": 0.92,
40+
"Industry": 0.78,
41+
"Ground Transport": 0.65,
42+
"Residential": 0.42,
43+
"Aviation": 0.70,
44+
"International Aviation": 0.75,
45+
"Shipping": 0.68,
46+
}
47+
48+
# Country → ISO2 mapping for API
49+
COUNTRY_ISO2 = {
50+
"IND": "IN", "CHN": "CN", "USA": "US", "GBR": "GB",
51+
"DEU": "DE", "FRA": "FR", "JPN": "JP", "AUS": "AU",
52+
"BRA": "BR", "ZAF": "ZA", "RUS": "RU", "CAN": "CA",
53+
"KOR": "KR", "MEX": "MX", "IDN": "ID", "SAU": "SA",
54+
"TUR": "TR", "ARG": "AR", "POL": "PL", "NGA": "NG",
55+
"PAK": "PK", "BGD": "BD", "VNM": "VN", "THA": "TH",
56+
"EGY": "EG", "IRN": "IR", "IRQ": "IQ", "ARE": "AE",
57+
}
58+
59+
60+
class CarbonMonitorAdapter(BaseAdapter):
61+
62+
SOURCE_NAME = "Carbon Monitor"
63+
REFRESH_INTERVAL_HOURS = 24.0
64+
65+
BASE_URL = "https://carbonmonitor-gracedb.larc.nasa.gov/api/data"
66+
ALT_URL = "https://api.carbonmonitor.org/v1"
67+
68+
def __init__(self):
69+
super().__init__()
70+
self._country_cache: dict = {} # ISO3 → last API response
71+
72+
async def fetch(
73+
self,
74+
lat: float,
75+
lon: float,
76+
country_code: str = "IND",
77+
**kwargs
78+
) -> list[TelemetryRecord]:
79+
start = time.perf_counter()
80+
try:
81+
raw = await self._fetch_raw(lat, lon, country_code=country_code)
82+
records = self._parse(lat, lon, raw, country_code)
83+
self._mark_success((time.perf_counter() - start) * 1000)
84+
return records
85+
except Exception as e:
86+
self._mark_failure(str(e))
87+
return self._fallback_records(lat, lon, country_code)
88+
89+
async def _fetch_raw(
90+
self,
91+
lat: float,
92+
lon: float,
93+
country_code: str = "IND",
94+
**kwargs
95+
) -> dict:
96+
if country_code in self._country_cache:
97+
cached = self._country_cache[country_code]
98+
age_h = (datetime.now(timezone.utc) - cached["fetched_at"]).total_seconds() / 3600
99+
if age_h < self.REFRESH_INTERVAL_HOURS:
100+
return cached["data"]
101+
102+
iso2 = COUNTRY_ISO2.get(country_code, country_code[:2])
103+
end_date = datetime.now(timezone.utc).date() - timedelta(days=5)
104+
start_date = end_date - timedelta(days=365)
105+
106+
async with httpx.AsyncClient(timeout=20) as client:
107+
# Try primary Carbon Monitor API
108+
try:
109+
url = f"https://carbonmonitor.org/api/data"
110+
params = {
111+
"country": iso2,
112+
"startDate": start_date.isoformat(),
113+
"endDate": end_date.isoformat(),
114+
}
115+
resp = await client.get(url, params=params)
116+
if resp.status_code == 200:
117+
data = resp.json()
118+
self._country_cache[country_code] = {
119+
"data": data,
120+
"fetched_at": datetime.now(timezone.utc),
121+
}
122+
return data
123+
except Exception:
124+
pass
125+
126+
# Try alternative Carbon Monitor endpoint
127+
try:
128+
url = f"https://api.carbonmonitor.org/v2/data/{iso2}"
129+
resp = await client.get(url)
130+
if resp.status_code == 200:
131+
data = resp.json()
132+
self._country_cache[country_code] = {
133+
"data": data,
134+
"fetched_at": datetime.now(timezone.utc),
135+
}
136+
return data
137+
except Exception:
138+
pass
139+
140+
# Both failed — return metadata-only response
141+
raise ConnectionError(f"Carbon Monitor API unavailable for {country_code}")
142+
143+
def _parse(
144+
self,
145+
lat: float,
146+
lon: float,
147+
raw: dict,
148+
country_code: str,
149+
) -> list[TelemetryRecord]:
150+
now = datetime.now(timezone.utc)
151+
152+
# Parse emissions data
153+
emissions_by_sector = {}
154+
total_recent = 0.0
155+
total_prior_year = 0.0
156+
data_points = raw.get("data", raw.get("values", []))
157+
158+
for point in data_points[-365:]: # last year
159+
date_str = point.get("date", point.get("timestamp", ""))
160+
sectors = point.get("sectors", point.get("sector_data", {}))
161+
if isinstance(sectors, dict):
162+
for sector, value in sectors.items():
163+
try:
164+
v = float(value)
165+
emissions_by_sector[sector] = emissions_by_sector.get(sector, 0) + v
166+
total_recent += v
167+
except (ValueError, TypeError):
168+
pass
169+
170+
for point in data_points[-730:-365]: # prior year
171+
sectors = point.get("sectors", point.get("sector_data", {}))
172+
if isinstance(sectors, dict):
173+
for _, value in sectors.items():
174+
try:
175+
total_prior_year += float(value)
176+
except (ValueError, TypeError):
177+
pass
178+
179+
# Year-over-year trend
180+
yoy_change_pct = 0.0
181+
if total_prior_year > 0:
182+
yoy_change_pct = ((total_recent - total_prior_year) / total_prior_year) * 100
183+
184+
# Carbon intensity score (0-1, where 1 = highest emitting)
185+
fallback_intensity = CARBON_INTENSITY_FALLBACK.get(country_code, 0.5)
186+
# Normalize: 0.91 (ZAF) = 1.0, 0.07 (FRA) = 0.0
187+
co2_intensity_norm = min(1.0, fallback_intensity / 0.91)
188+
189+
# Transition risk: carbon intensity × sector concentration in high-risk sectors
190+
high_risk_share = 0.0
191+
total_emissions = max(sum(emissions_by_sector.values()), 1.0)
192+
for sector, amount in emissions_by_sector.items():
193+
weight = SECTOR_TRANSITION_WEIGHTS.get(sector, 0.5)
194+
high_risk_share += (amount / total_emissions) * weight
195+
196+
transition_risk_score = min(1.0, co2_intensity_norm * 0.5 + high_risk_share * 0.5)
197+
198+
# Carbon price trajectory risk (policy tightening signal)
199+
# Trend: if emissions rising → higher regulatory risk
200+
policy_risk = min(1.0, max(0.0, transition_risk_score + (yoy_change_pct / 100.0) * 0.15))
201+
202+
confidence = 0.85 if data_points else 0.40
203+
freshness = 24.0 # Carbon Monitor has ~24h update cycle after 5d lag
204+
205+
def rec(variable, value, unit, conf=None):
206+
return TelemetryRecord(
207+
source = self.SOURCE_NAME,
208+
variable = variable,
209+
lat = lat,
210+
lon = lon,
211+
value = round(float(value), 6),
212+
unit = unit,
213+
timestamp = now,
214+
confidence = conf or confidence,
215+
freshness_hours = freshness,
216+
metadata = {"country": country_code, "iso2": COUNTRY_ISO2.get(country_code, "")},
217+
)
218+
219+
records = [
220+
rec("co2_intensity_norm", co2_intensity_norm, "0-1"),
221+
rec("co2_intensity_tco2_mwh", fallback_intensity, "tCO2/MWh"),
222+
rec("transition_risk_score", transition_risk_score, "0-1"),
223+
rec("carbon_policy_risk", policy_risk, "0-1"),
224+
rec("emissions_yoy_change_pct",yoy_change_pct, "%"),
225+
rec("high_risk_sector_share", high_risk_share, "0-1"),
226+
]
227+
228+
# Per-sector records
229+
for sector, amount in emissions_by_sector.items():
230+
safe_name = sector.lower().replace(" ", "_")
231+
records.append(rec(
232+
f"emissions_{safe_name}_mtco2",
233+
amount / 1000.0, # kt → Mt
234+
"MtCO2",
235+
))
236+
237+
return records
238+
239+
def _fallback_records(
240+
self,
241+
lat: float,
242+
lon: float,
243+
country_code: str,
244+
) -> list[TelemetryRecord]:
245+
"""Fallback using static carbon intensity benchmarks."""
246+
now = datetime.now(timezone.utc)
247+
intensity = CARBON_INTENSITY_FALLBACK.get(country_code, 0.5)
248+
intensity_norm = min(1.0, intensity / 0.91)
249+
transition_risk = intensity_norm * 0.65 # conservative estimate
250+
251+
return [
252+
TelemetryRecord(
253+
source=self.SOURCE_NAME, variable="co2_intensity_norm",
254+
lat=lat, lon=lon, value=intensity_norm, unit="0-1",
255+
timestamp=now, confidence=0.55, freshness_hours=168.0, # 1-week stale
256+
metadata={"country": country_code, "source": "static_benchmark"},
257+
),
258+
TelemetryRecord(
259+
source=self.SOURCE_NAME, variable="transition_risk_score",
260+
lat=lat, lon=lon, value=transition_risk, unit="0-1",
261+
timestamp=now, confidence=0.55, freshness_hours=168.0,
262+
metadata={"country": country_code, "source": "static_benchmark"},
263+
),
264+
TelemetryRecord(
265+
source=self.SOURCE_NAME, variable="co2_intensity_tco2_mwh",
266+
lat=lat, lon=lon, value=intensity, unit="tCO2/MWh",
267+
timestamp=now, confidence=0.55, freshness_hours=168.0,
268+
metadata={"country": country_code, "source": "static_benchmark"},
269+
),
270+
]

0 commit comments

Comments
 (0)