diff --git a/src/altdata_brief/config.py b/src/altdata_brief/config.py index 17173dd..e76a2eb 100644 --- a/src/altdata_brief/config.py +++ b/src/altdata_brief/config.py @@ -118,6 +118,24 @@ def public_summary_path(source_key: str, *, root: Path | None = None) -> Path: return base.joinpath(*subpath, leaf) +#: Relative location of super-pricing's narrative-history archive within its +#: repo. Unlike the public summary, this is a *local runtime cache* — it is +#: never committed and therefore absent in CI and fresh clones. +_NARRATIVE_HISTORY_SUBPATH: tuple[str, ...] = ("cache", "alt_data", "narrative_history.jsonl") + + +def narrative_history_path(*, root: Path | None = None) -> Path: + """Resolve super-pricing's ``narrative_history.jsonl`` archive. + + ``root`` overrides the discovered super-pricing repo root (used by tests + to point at an isolated ``tmp_path``). The archive is a local cache, so + callers must treat a missing path as a normal fallback condition rather + than an error. + """ + base = root if root is not None else SOURCE_REPO_DIRS["super_pricing"] + return base.joinpath(*_NARRATIVE_HISTORY_SUBPATH) + + # --------------------------------------------------------------------------- # Preference toggling # --------------------------------------------------------------------------- @@ -215,6 +233,7 @@ def source_mode_to_kwargs(source_mode: str) -> dict[str, object]: "env_flag", "env_value", "load_source_config", + "narrative_history_path", "public_summary_path", "source_mode_to_kwargs", ] diff --git a/src/altdata_brief/synthesis/baseline.py b/src/altdata_brief/synthesis/baseline.py index 337acc5..fc6be20 100644 --- a/src/altdata_brief/synthesis/baseline.py +++ b/src/altdata_brief/synthesis/baseline.py @@ -1,19 +1,32 @@ -"""Mock baseline values used by the observation synthesizer (v0.2). - -A *real* 7-day rolling baseline will be wired in v0.3 by reading the -super-pricing narrative archive directly. For now the values below are -hand-tuned constants derived from eyeballing the last week of caches, -so the observation sentences feel anchored rather than rule-blind. - -When the real archive wiring lands, this module's only job becomes -fallback: if ``load_recent_history()`` returns ``None`` the synthesizer -falls back to these constants instead of producing an ungrounded line. +"""Rolling 7-day baselines for the observation synthesizer. + +v0.2 anchored the "对比近 7 日" context lines on hand-tuned constants. +v0.3 wires the **policy-impact** baseline to super-pricing's *real* +narrative archive (``cache/alt_data/narrative_history.jsonl``): +:func:`load_recent_history` reads the recent entries and +:func:`resolve_baseline` reduces a metric's series to its mean — falling back +to the module-level constants below when the archive is absent (CI, fresh +clones) or carries no usable signal. + +Only ``policy_impact_7d`` is archive-backed today: the narrative archive +records the policy-radar top-industry ``avg_impact`` (embedded in the entry's +prose) but does **not** expose per-day metal-spread / ETF-NAV / industry-heat +numerics, so those three baselines remain constant-fallback until +super-pricing publishes them. The per-metric fallback in +:func:`resolve_baseline` makes adding one later a one-line change: return the +new key from :func:`load_recent_history` and the observation layer picks it up +automatically. """ from __future__ import annotations +import json +import re +from pathlib import Path from typing import Any +from altdata_brief.config import narrative_history_path + # Rolling 7-day average of |avg_impact| for the policy-radar top industry. # Empirically ~0.18 over the last week of super-pricing snapshots. BASELINE_POLICY_IMPACT_7D = 0.18 @@ -31,23 +44,100 @@ # How many trading days a signal must persist to be deemed "actionable". SIGNAL_PERSISTENCE_DAYS = 3 +#: How many recent signal-bearing entries form the rolling window. +_DEFAULT_HISTORY_LIMIT = 7 -def load_recent_history() -> dict[str, Any] | None: - """Future hook: read the last 7 entries from super-pricing's archive. +#: The policy-radar ``avg_impact`` is stored inside the narrative prose, e.g. +#: ``"... 行业影响力 avg_impact=-0.35, 偏空。"`` — pull the number out. +_AVG_IMPACT_RE = re.compile(r"avg_impact\s*=\s*([-+]?\d+(?:\.\d+)?)") - Returns None today (v0.2). v0.3 will return:: - { - "policy_impact_7d": [...], - "metal_spread_7d": [...], - "etf_nav_7d": [...], - "industry_heat_7d": [...], - } +def _extract_avg_impact(entry: dict[str, Any]) -> float | None: + """Pull the policy-radar ``avg_impact`` magnitude from a narrative entry. - The observation module already handles ``None`` by falling back to - the module-level constants above, so callers don't need to special-case. + The figure lives in the human-readable ``summary``/``bullets`` text rather + than a structured field, so we regex it out and return its absolute value + (the baseline is the mean of ``|avg_impact|``). Returns ``None`` when no + figure is present — e.g. the industry-less filler entries the archive + interleaves — so the caller can skip the entry. + """ + haystack = str(entry.get("summary") or "") + bullets = entry.get("bullets") + if isinstance(bullets, list): + haystack = haystack + " " + " ".join(str(b) for b in bullets) + match = _AVG_IMPACT_RE.search(haystack) + if match is None: + return None + try: + return abs(float(match.group(1))) + except (TypeError, ValueError): # pragma: no cover - regex guarantees float shape + return None + + +def load_recent_history( + path: Path | None = None, *, limit: int = _DEFAULT_HISTORY_LIMIT +) -> dict[str, Any] | None: + """Read the recent policy-impact series from super-pricing's archive. + + Reads up to the last ``limit`` *signal-bearing* entries (those carrying a + parseable ``avg_impact``) from ``narrative_history.jsonl`` and returns:: + + {"policy_impact_7d": [0.35, 0.12, ...]} # |avg_impact|, oldest→newest + + Returns ``None`` when the archive is missing/unreadable or contains no + parseable entry — the observation module then falls back to the + module-level constants. Malformed JSONL lines are skipped, not fatal. + + The archive interleaves multiple entries per refresh (and industry-less + filler), so this is "last ``limit`` *signal* entries", not "last ``limit`` + calendar days". Only ``policy_impact_7d`` is populated today (see module + docstring). ``path`` is overridable for tests; it defaults to + :func:`~altdata_brief.config.narrative_history_path`. + """ + history_path = Path(path) if path is not None else narrative_history_path() + try: + raw = history_path.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + return None + + impacts: list[float] = [] + for line in raw.splitlines(): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + if not isinstance(entry, dict): + continue + value = _extract_avg_impact(entry) + if value is not None: + impacts.append(value) + + if not impacts: + return None + return {"policy_impact_7d": impacts[-limit:]} + + +def resolve_baseline(history: dict[str, Any] | None, key: str, fallback: float) -> float: + """Reduce a metric's recent series to its mean, else return ``fallback``. + + ``history`` is the dict from :func:`load_recent_history` (or ``None``). + When ``history[key]`` is a non-empty list of numbers, returns their mean; + otherwise returns ``fallback``. This is the single fallback seam every + observation baseline flows through, so a missing/empty archive degrades + cleanly to the hand-tuned constants. """ - return None + if not history: + return fallback + series = history.get(key) + if not isinstance(series, list) or not series: + return fallback + values = [float(v) for v in series if isinstance(v, (int, float)) and not isinstance(v, bool)] + if not values: + return fallback + return sum(values) / len(values) def describe_intensity(value: float, baseline: float) -> str: diff --git a/src/altdata_brief/synthesis/observation.py b/src/altdata_brief/synthesis/observation.py index 48afc9e..62054a1 100644 --- a/src/altdata_brief/synthesis/observation.py +++ b/src/altdata_brief/synthesis/observation.py @@ -4,12 +4,14 @@ *framing → contextualize → follow-up* sequence: * sentence 1 — "今日核心信号是 X" (the headline) -* sentence 2 — "对比近 7 日,X …" (context vs. mock baseline) +* sentence 2 — "对比近 7 日,X …" (context vs. the rolling baseline) * sentence 3 — "若该信号延续 N 日,可重点观察 X 板块/品种" (research follow-up) -The wording is still deterministic (no LLM). Baselines come from -:mod:`altdata_brief.synthesis.baseline` constants — v0.3 will swap -those for the real super-pricing narrative archive. +The wording is deterministic (no LLM). Baselines are resolved per +observation from :mod:`altdata_brief.synthesis.baseline`: v0.3 wires the +**policy-impact** baseline to super-pricing's real narrative archive, with +the remaining baselines (and any run without the archive) falling back to +the module constants. """ from __future__ import annotations @@ -27,6 +29,7 @@ SIGNAL_PERSISTENCE_DAYS, describe_intensity, load_recent_history, + resolve_baseline, ) @@ -40,6 +43,37 @@ class _Candidate: action: str +@dataclass(slots=True, frozen=True) +class _Baselines: + """Per-section "近 7 日" baselines, resolved once per observation. + + Built from :func:`load_recent_history` via :func:`resolve_baseline`, so + each field is the real rolling mean when super-pricing's archive supplies + it and the hand-tuned constant otherwise. Today only ``policy_impact`` is + archive-backed (see :mod:`altdata_brief.synthesis.baseline`). + """ + + policy_impact: float + metal_spread: float + etf_nav: float + industry_heat: float + + @classmethod + def from_history(cls, history: dict[str, Any] | None) -> _Baselines: + return cls( + policy_impact=resolve_baseline( + history, "policy_impact_7d", BASELINE_POLICY_IMPACT_7D + ), + metal_spread=resolve_baseline( + history, "metal_spread_7d", BASELINE_METAL_SPREAD_7D + ), + etf_nav=resolve_baseline(history, "etf_nav_7d", BASELINE_ETF_NAV_VOL_7D), + industry_heat=resolve_baseline( + history, "industry_heat_7d", BASELINE_INDUSTRY_HEAT_7D + ), + ) + + _BOLDED_RE = re.compile(r"\*\*([^*]+)\*\*") _NON_INDUSTRY_BOLD_TOKENS = {"良好", "一般", "较弱", "未知", "OK", "WARN", "FAIL"} _SIGNAL_LABELS = { @@ -82,30 +116,32 @@ def synthesize_observation( Degrades gracefully — missing source = fewer candidates, never an exception. """ - history = load_recent_history() # v0.3 hook; today returns None - _ = history # constants below already encode v0.2 baselines + # v0.3: resolve real rolling baselines from super-pricing's narrative + # archive once, then thread them through the candidate builders. Absent + # archive (CI, fresh clones) → each baseline falls back to its constant. + baselines = _Baselines.from_history(load_recent_history()) candidates: list[_Candidate] = [] sources: list[str] = [] if super_payload is not None: sources.append(super_payload.cache_label) - cand = _policy_candidate(super_payload) + cand = _policy_candidate(super_payload, baselines) if cand: candidates.append(cand) - cand = _inventory_candidate(super_payload) + cand = _inventory_candidate(super_payload, baselines) if cand: candidates.append(cand) if etf_payload is not None: sources.append(etf_payload.cache_label) - cand = _etf_candidate(etf_payload) + cand = _etf_candidate(etf_payload, baselines) if cand: candidates.append(cand) if quant_payload is not None: sources.append(quant_payload.cache_label) - cand = _industry_candidate(quant_payload) + cand = _industry_candidate(quant_payload, baselines) if cand: candidates.append(cand) @@ -147,7 +183,7 @@ def synthesize_observation( # -- per-source candidate builders ------------------------------------- -def _policy_candidate(payload: AdapterPayload) -> _Candidate | None: +def _policy_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None: policy = payload.data.get("policy_radar", {}) or {} ranked = policy.get("industry_signals", []) or [] if not ranked: @@ -164,9 +200,9 @@ def _policy_candidate(payload: AdapterPayload) -> _Candidate | None: f"今日核心信号是 **{industry}** 的政策口径{direction}收敛" f"(政策影响={impact:+.3f},提及次数={mentions})。" ) - intensity = describe_intensity(abs(impact), BASELINE_POLICY_IMPACT_7D) + intensity = describe_intensity(abs(impact), baselines.policy_impact) context = ( - f"对比近 7 日政策影响基线≈{BASELINE_POLICY_IMPACT_7D:.2f}," + f"对比近 7 日政策影响基线≈{baselines.policy_impact:.2f}," f"该信号强度{intensity},政策雷达当批次累计 {policy.get('policy_count', 0)} 条记录。" ) action = ( @@ -181,7 +217,7 @@ def _policy_candidate(payload: AdapterPayload) -> _Candidate | None: ) -def _inventory_candidate(payload: AdapterPayload) -> _Candidate | None: +def _inventory_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None: macro = payload.data.get("macro_hf", {}) or {} metals = macro.get("metals", []) or [] if len(metals) < 2: @@ -198,9 +234,9 @@ def _inventory_candidate(payload: AdapterPayload) -> _Candidate | None: f"({leader[0]:+.2f}%) 与 **{laggard[1].get('name_cn')}** " f"({laggard[0]:+.2f}%) 周价差 {spread:.2f}%。" ) - intensity = describe_intensity(spread, BASELINE_METAL_SPREAD_7D) + intensity = describe_intensity(spread, baselines.metal_spread) context = ( - f"对比近 7 日基线均值 {BASELINE_METAL_SPREAD_7D:.2f}%," + f"对比近 7 日基线均值 {baselines.metal_spread:.2f}%," f"今日分化{intensity},提示上下游需求节奏正在切换。" ) action = ( @@ -216,7 +252,7 @@ def _inventory_candidate(payload: AdapterPayload) -> _Candidate | None: ) -def _etf_candidate(payload: AdapterPayload) -> _Candidate | None: +def _etf_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None: nav = payload.data.get("nav", {}) or {} daily = nav.get("daily_return") if daily is None: @@ -232,9 +268,9 @@ def _etf_candidate(payload: AdapterPayload) -> _Candidate | None: f"今日核心信号是 ETF 512400 日内 NAV {direction} {abs(pct):.2f}%," f"数据源评级 **{health}**。" ) - intensity = describe_intensity(abs(daily), BASELINE_ETF_NAV_VOL_7D) + intensity = describe_intensity(abs(daily), baselines.etf_nav) context = ( - f"对比近 7 日波动均值 ≈{BASELINE_ETF_NAV_VOL_7D * 100:.2f}%," + f"对比近 7 日波动均值 ≈{baselines.etf_nav * 100:.2f}%," f"今日波幅{intensity};商品驱动子源 " f"{(payload.data.get('commodity_drivers') or {}).get('ok_count', 0)}" f"/{(payload.data.get('commodity_drivers') or {}).get('total', 0)} 正常。" @@ -251,7 +287,7 @@ def _etf_candidate(payload: AdapterPayload) -> _Candidate | None: ) -def _industry_candidate(payload: AdapterPayload) -> _Candidate | None: +def _industry_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None: rows = payload.data.get("industries", []) or [] if not rows: return None @@ -265,9 +301,9 @@ def _industry_candidate(payload: AdapterPayload) -> _Candidate | None: f"今日核心信号是行业热度榜首 **{name}**" f"(热度={heat:.3f},政策口径={_SIGNAL_LABELS.get(str(top.get('policy_signal')), '中性')})。" ) - intensity = describe_intensity(heat, BASELINE_INDUSTRY_HEAT_7D) + intensity = describe_intensity(heat, baselines.industry_heat) context = ( - f"对比近 7 日热度均值 ≈{BASELINE_INDUSTRY_HEAT_7D:.2f}," + f"对比近 7 日热度均值 ≈{baselines.industry_heat:.2f}," f"今日热度{intensity},关注政策叠加是否同向放大。" ) action = ( diff --git a/tests/test_baseline_history.py b/tests/test_baseline_history.py new file mode 100644 index 0000000..449623f --- /dev/null +++ b/tests/test_baseline_history.py @@ -0,0 +1,166 @@ +"""v0.3: real 7-day baseline wired from super-pricing's narrative archive. + +The v0.2 observation section anchored its "对比近 7 日" context lines on +hand-tuned module constants. v0.3 reads super-pricing's +``cache/alt_data/narrative_history.jsonl`` so the policy-impact baseline is +the *actual* rolling mean of recent snapshots, falling back to the constant +when the archive is absent (CI, fresh clones) or carries no usable signal. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from altdata_brief.adapters.base import AdapterPayload +from altdata_brief.synthesis import synthesize_observation +from altdata_brief.synthesis.baseline import ( + BASELINE_POLICY_IMPACT_7D, + load_recent_history, + resolve_baseline, +) + + +def _write_jsonl(path: Path, entries: list[dict]) -> None: + path.write_text( + "\n".join(json.dumps(e, ensure_ascii=False) for e in entries) + "\n", + encoding="utf-8", + ) + + +def _narrative_entry(avg_impact: float | None, industry: str | None = "新能源汽车") -> dict: + """Mirror the real narrative_history.jsonl shape (avg_impact lives in prose).""" + if avg_impact is None: + summary = "宏观高频库存信号(相关金属):SHFE 铜 destocking (库存去化)。" + else: + summary = ( + f"政策雷达本周捕获 9 条相关政策记录,{industry} 行业影响力 " + f"avg_impact={avg_impact}, 偏空。" + ) + return { + "archived_at": "2026-05-20T00:00:00+00:00", + "industry": industry, + "summary": summary, + "bullets": [summary], + } + + +# -- load_recent_history --------------------------------------------------- + + +def test_load_recent_history_extracts_abs_policy_impact_series(tmp_path: Path) -> None: + p = tmp_path / "narrative_history.jsonl" + _write_jsonl(p, [_narrative_entry(0.10), _narrative_entry(-0.30), _narrative_entry(0.20)]) + hist = load_recent_history(p) + assert hist is not None + # absolute values (baseline is "average of |avg_impact|"), original order preserved + assert hist["policy_impact_7d"] == [0.10, 0.30, 0.20] + + +def test_load_recent_history_filters_entries_without_avg_impact(tmp_path: Path) -> None: + p = tmp_path / "narrative_history.jsonl" + _write_jsonl( + p, + [_narrative_entry(0.10), _narrative_entry(None, industry=None), _narrative_entry(0.40)], + ) + hist = load_recent_history(p) + assert hist is not None + assert hist["policy_impact_7d"] == [0.10, 0.40] + + +def test_load_recent_history_keeps_only_last_limit_signal_entries(tmp_path: Path) -> None: + p = tmp_path / "narrative_history.jsonl" + _write_jsonl(p, [_narrative_entry(round(i / 100, 2)) for i in range(1, 11)]) # 0.01..0.10 + hist = load_recent_history(p, limit=7) + assert hist is not None + assert len(hist["policy_impact_7d"]) == 7 + assert hist["policy_impact_7d"][0] == pytest.approx(0.04) + assert hist["policy_impact_7d"][-1] == pytest.approx(0.10) + + +def test_load_recent_history_missing_file_returns_none(tmp_path: Path) -> None: + assert load_recent_history(tmp_path / "does_not_exist.jsonl") is None + + +def test_load_recent_history_no_parseable_entries_returns_none(tmp_path: Path) -> None: + p = tmp_path / "narrative_history.jsonl" + _write_jsonl(p, [_narrative_entry(None, industry=None)]) + assert load_recent_history(p) is None + + +def test_load_recent_history_skips_malformed_lines(tmp_path: Path) -> None: + p = tmp_path / "narrative_history.jsonl" + p.write_text( + json.dumps(_narrative_entry(0.10), ensure_ascii=False) + "\n" + + "{ this is not valid json\n" + + json.dumps(_narrative_entry(0.30), ensure_ascii=False) + "\n", + encoding="utf-8", + ) + hist = load_recent_history(p) + assert hist is not None + assert hist["policy_impact_7d"] == [0.10, 0.30] + + +# -- resolve_baseline ------------------------------------------------------ + + +def test_resolve_baseline_returns_mean_when_present() -> None: + hist = {"policy_impact_7d": [0.30, 0.40]} + assert resolve_baseline(hist, "policy_impact_7d", BASELINE_POLICY_IMPACT_7D) == pytest.approx(0.35) + + +def test_resolve_baseline_falls_back_when_history_none() -> None: + assert resolve_baseline(None, "policy_impact_7d", 0.18) == 0.18 + + +def test_resolve_baseline_falls_back_when_key_missing_or_empty() -> None: + assert resolve_baseline({}, "policy_impact_7d", 0.18) == 0.18 + assert resolve_baseline({"policy_impact_7d": []}, "policy_impact_7d", 0.18) == 0.18 + + +# -- observation integration ---------------------------------------------- + + +def _policy_payload(avg_impact: float = 0.40) -> AdapterPayload: + """Minimal payload where the policy candidate is unambiguously the lead.""" + return AdapterPayload( + source="super-pricing-system", + fetched_at="2026-05-29T00:00:00+00:00", + cache_path=None, + live=False, + data={ + "policy_radar": { + "industry_signals": [ + {"industry": "新能源汽车", "avg_impact": avg_impact, "mentions": 9} + ], + "policy_count": 9, + } + }, + ) + + +def test_observation_uses_real_baseline_when_history_present( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "altdata_brief.synthesis.observation.load_recent_history", + lambda *a, **k: {"policy_impact_7d": [0.30, 0.40]}, + ) + result = synthesize_observation(_policy_payload(0.40), None, None, None) + context = result["sentences"][1] + assert "0.35" in context # real rolling mean of [0.30, 0.40] + assert "0.18" not in context # NOT the v0.2 hand-tuned constant + + +def test_observation_falls_back_to_constant_when_history_absent( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "altdata_brief.synthesis.observation.load_recent_history", + lambda *a, **k: None, + ) + result = synthesize_observation(_policy_payload(0.40), None, None, None) + context = result["sentences"][1] + assert f"{BASELINE_POLICY_IMPACT_7D:.2f}" in context # 0.18 fallback