Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/altdata_brief/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,24 @@ def public_summary_path(source_key: str, *, root: Path | None = None) -> Path:
return base.joinpath(*subpath, leaf)


#: Relative location of super-pricing's narrative-history archive within its
#: repo. Unlike the public summary, this is a *local runtime cache* — it is
#: never committed and therefore absent in CI and fresh clones.
_NARRATIVE_HISTORY_SUBPATH: tuple[str, ...] = ("cache", "alt_data", "narrative_history.jsonl")


def narrative_history_path(*, root: Path | None = None) -> Path:
"""Resolve super-pricing's ``narrative_history.jsonl`` archive.

``root`` overrides the discovered super-pricing repo root (used by tests
to point at an isolated ``tmp_path``). The archive is a local cache, so
callers must treat a missing path as a normal fallback condition rather
than an error.
"""
base = root if root is not None else SOURCE_REPO_DIRS["super_pricing"]
return base.joinpath(*_NARRATIVE_HISTORY_SUBPATH)


# ---------------------------------------------------------------------------
# Preference toggling
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -215,6 +233,7 @@ def source_mode_to_kwargs(source_mode: str) -> dict[str, object]:
"env_flag",
"env_value",
"load_source_config",
"narrative_history_path",
"public_summary_path",
"source_mode_to_kwargs",
]
134 changes: 112 additions & 22 deletions src/altdata_brief/synthesis/baseline.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
"""Mock baseline values used by the observation synthesizer (v0.2).

A *real* 7-day rolling baseline will be wired in v0.3 by reading the
super-pricing narrative archive directly. For now the values below are
hand-tuned constants derived from eyeballing the last week of caches,
so the observation sentences feel anchored rather than rule-blind.

When the real archive wiring lands, this module's only job becomes
fallback: if ``load_recent_history()`` returns ``None`` the synthesizer
falls back to these constants instead of producing an ungrounded line.
"""Rolling 7-day baselines for the observation synthesizer.

v0.2 anchored the "对比近 7 日" context lines on hand-tuned constants.
v0.3 wires the **policy-impact** baseline to super-pricing's *real*
narrative archive (``cache/alt_data/narrative_history.jsonl``):
:func:`load_recent_history` reads the recent entries and
:func:`resolve_baseline` reduces a metric's series to its mean — falling back
to the module-level constants below when the archive is absent (CI, fresh
clones) or carries no usable signal.

Only ``policy_impact_7d`` is archive-backed today: the narrative archive
records the policy-radar top-industry ``avg_impact`` (embedded in the entry's
prose) but does **not** expose per-day metal-spread / ETF-NAV / industry-heat
numerics, so those three baselines remain constant-fallback until
super-pricing publishes them. The per-metric fallback in
:func:`resolve_baseline` makes adding one later a one-line change: return the
new key from :func:`load_recent_history` and the observation layer picks it up
automatically.
"""

from __future__ import annotations

import json
import re
from pathlib import Path
from typing import Any

from altdata_brief.config import narrative_history_path

# Rolling 7-day average of |avg_impact| for the policy-radar top industry.
# Empirically ~0.18 over the last week of super-pricing snapshots.
BASELINE_POLICY_IMPACT_7D = 0.18
Expand All @@ -31,23 +44,100 @@
# How many trading days a signal must persist to be deemed "actionable".
SIGNAL_PERSISTENCE_DAYS = 3

#: How many recent signal-bearing entries form the rolling window.
_DEFAULT_HISTORY_LIMIT = 7

def load_recent_history() -> dict[str, Any] | None:
"""Future hook: read the last 7 entries from super-pricing's archive.
#: The policy-radar ``avg_impact`` is stored inside the narrative prose, e.g.
#: ``"... 行业影响力 avg_impact=-0.35, 偏空。"`` — pull the number out.
_AVG_IMPACT_RE = re.compile(r"avg_impact\s*=\s*([-+]?\d+(?:\.\d+)?)")

Returns None today (v0.2). v0.3 will return::

{
"policy_impact_7d": [...],
"metal_spread_7d": [...],
"etf_nav_7d": [...],
"industry_heat_7d": [...],
}
def _extract_avg_impact(entry: dict[str, Any]) -> float | None:
"""Pull the policy-radar ``avg_impact`` magnitude from a narrative entry.

The observation module already handles ``None`` by falling back to
the module-level constants above, so callers don't need to special-case.
The figure lives in the human-readable ``summary``/``bullets`` text rather
than a structured field, so we regex it out and return its absolute value
(the baseline is the mean of ``|avg_impact|``). Returns ``None`` when no
figure is present — e.g. the industry-less filler entries the archive
interleaves — so the caller can skip the entry.
"""
haystack = str(entry.get("summary") or "")
bullets = entry.get("bullets")
if isinstance(bullets, list):
haystack = haystack + " " + " ".join(str(b) for b in bullets)
match = _AVG_IMPACT_RE.search(haystack)
if match is None:
return None
try:
return abs(float(match.group(1)))
except (TypeError, ValueError): # pragma: no cover - regex guarantees float shape
return None


def load_recent_history(
path: Path | None = None, *, limit: int = _DEFAULT_HISTORY_LIMIT
) -> dict[str, Any] | None:
"""Read the recent policy-impact series from super-pricing's archive.

Reads up to the last ``limit`` *signal-bearing* entries (those carrying a
parseable ``avg_impact``) from ``narrative_history.jsonl`` and returns::

{"policy_impact_7d": [0.35, 0.12, ...]} # |avg_impact|, oldest→newest

Returns ``None`` when the archive is missing/unreadable or contains no
parseable entry — the observation module then falls back to the
module-level constants. Malformed JSONL lines are skipped, not fatal.

The archive interleaves multiple entries per refresh (and industry-less
filler), so this is "last ``limit`` *signal* entries", not "last ``limit``
calendar days". Only ``policy_impact_7d`` is populated today (see module
docstring). ``path`` is overridable for tests; it defaults to
:func:`~altdata_brief.config.narrative_history_path`.
"""
history_path = Path(path) if path is not None else narrative_history_path()
try:
raw = history_path.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
return None

impacts: list[float] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if not isinstance(entry, dict):
continue
value = _extract_avg_impact(entry)
if value is not None:
impacts.append(value)

if not impacts:
return None
return {"policy_impact_7d": impacts[-limit:]}


def resolve_baseline(history: dict[str, Any] | None, key: str, fallback: float) -> float:
"""Reduce a metric's recent series to its mean, else return ``fallback``.

``history`` is the dict from :func:`load_recent_history` (or ``None``).
When ``history[key]`` is a non-empty list of numbers, returns their mean;
otherwise returns ``fallback``. This is the single fallback seam every
observation baseline flows through, so a missing/empty archive degrades
cleanly to the hand-tuned constants.
"""
return None
if not history:
return fallback
series = history.get(key)
if not isinstance(series, list) or not series:
return fallback
values = [float(v) for v in series if isinstance(v, (int, float)) and not isinstance(v, bool)]
if not values:
return fallback
return sum(values) / len(values)


def describe_intensity(value: float, baseline: float) -> str:
Expand Down
80 changes: 58 additions & 22 deletions src/altdata_brief/synthesis/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
*framing → contextualize → follow-up* sequence:

* sentence 1 — "今日核心信号是 X" (the headline)
* sentence 2 — "对比近 7 日,X …" (context vs. mock baseline)
* sentence 2 — "对比近 7 日,X …" (context vs. the rolling baseline)
* sentence 3 — "若该信号延续 N 日,可重点观察 X 板块/品种" (research follow-up)

The wording is still deterministic (no LLM). Baselines come from
:mod:`altdata_brief.synthesis.baseline` constants — v0.3 will swap
those for the real super-pricing narrative archive.
The wording is deterministic (no LLM). Baselines are resolved per
observation from :mod:`altdata_brief.synthesis.baseline`: v0.3 wires the
**policy-impact** baseline to super-pricing's real narrative archive, with
the remaining baselines (and any run without the archive) falling back to
the module constants.
"""

from __future__ import annotations
Expand All @@ -27,6 +29,7 @@
SIGNAL_PERSISTENCE_DAYS,
describe_intensity,
load_recent_history,
resolve_baseline,
)


Expand All @@ -40,6 +43,37 @@ class _Candidate:
action: str


@dataclass(slots=True, frozen=True)
class _Baselines:
"""Per-section "近 7 日" baselines, resolved once per observation.

Built from :func:`load_recent_history` via :func:`resolve_baseline`, so
each field is the real rolling mean when super-pricing's archive supplies
it and the hand-tuned constant otherwise. Today only ``policy_impact`` is
archive-backed (see :mod:`altdata_brief.synthesis.baseline`).
"""

policy_impact: float
metal_spread: float
etf_nav: float
industry_heat: float

@classmethod
def from_history(cls, history: dict[str, Any] | None) -> _Baselines:
return cls(
policy_impact=resolve_baseline(
history, "policy_impact_7d", BASELINE_POLICY_IMPACT_7D
),
metal_spread=resolve_baseline(
history, "metal_spread_7d", BASELINE_METAL_SPREAD_7D
),
etf_nav=resolve_baseline(history, "etf_nav_7d", BASELINE_ETF_NAV_VOL_7D),
industry_heat=resolve_baseline(
history, "industry_heat_7d", BASELINE_INDUSTRY_HEAT_7D
),
)


_BOLDED_RE = re.compile(r"\*\*([^*]+)\*\*")
_NON_INDUSTRY_BOLD_TOKENS = {"良好", "一般", "较弱", "未知", "OK", "WARN", "FAIL"}
_SIGNAL_LABELS = {
Expand Down Expand Up @@ -82,30 +116,32 @@ def synthesize_observation(
Degrades gracefully — missing source = fewer candidates, never an
exception.
"""
history = load_recent_history() # v0.3 hook; today returns None
_ = history # constants below already encode v0.2 baselines
# v0.3: resolve real rolling baselines from super-pricing's narrative
# archive once, then thread them through the candidate builders. Absent
# archive (CI, fresh clones) → each baseline falls back to its constant.
baselines = _Baselines.from_history(load_recent_history())

candidates: list[_Candidate] = []
sources: list[str] = []

if super_payload is not None:
sources.append(super_payload.cache_label)
cand = _policy_candidate(super_payload)
cand = _policy_candidate(super_payload, baselines)
if cand:
candidates.append(cand)
cand = _inventory_candidate(super_payload)
cand = _inventory_candidate(super_payload, baselines)
if cand:
candidates.append(cand)

if etf_payload is not None:
sources.append(etf_payload.cache_label)
cand = _etf_candidate(etf_payload)
cand = _etf_candidate(etf_payload, baselines)
if cand:
candidates.append(cand)

if quant_payload is not None:
sources.append(quant_payload.cache_label)
cand = _industry_candidate(quant_payload)
cand = _industry_candidate(quant_payload, baselines)
if cand:
candidates.append(cand)

Expand Down Expand Up @@ -147,7 +183,7 @@ def synthesize_observation(
# -- per-source candidate builders -------------------------------------


def _policy_candidate(payload: AdapterPayload) -> _Candidate | None:
def _policy_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None:
policy = payload.data.get("policy_radar", {}) or {}
ranked = policy.get("industry_signals", []) or []
if not ranked:
Expand All @@ -164,9 +200,9 @@ def _policy_candidate(payload: AdapterPayload) -> _Candidate | None:
f"今日核心信号是 **{industry}** 的政策口径{direction}收敛"
f"(政策影响={impact:+.3f},提及次数={mentions})。"
)
intensity = describe_intensity(abs(impact), BASELINE_POLICY_IMPACT_7D)
intensity = describe_intensity(abs(impact), baselines.policy_impact)
context = (
f"对比近 7 日政策影响基线≈{BASELINE_POLICY_IMPACT_7D:.2f},"
f"对比近 7 日政策影响基线≈{baselines.policy_impact:.2f},"
f"该信号强度{intensity},政策雷达当批次累计 {policy.get('policy_count', 0)} 条记录。"
)
action = (
Expand All @@ -181,7 +217,7 @@ def _policy_candidate(payload: AdapterPayload) -> _Candidate | None:
)


def _inventory_candidate(payload: AdapterPayload) -> _Candidate | None:
def _inventory_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None:
macro = payload.data.get("macro_hf", {}) or {}
metals = macro.get("metals", []) or []
if len(metals) < 2:
Expand All @@ -198,9 +234,9 @@ def _inventory_candidate(payload: AdapterPayload) -> _Candidate | None:
f"({leader[0]:+.2f}%) 与 **{laggard[1].get('name_cn')}** "
f"({laggard[0]:+.2f}%) 周价差 {spread:.2f}%。"
)
intensity = describe_intensity(spread, BASELINE_METAL_SPREAD_7D)
intensity = describe_intensity(spread, baselines.metal_spread)
context = (
f"对比近 7 日基线均值 {BASELINE_METAL_SPREAD_7D:.2f}%,"
f"对比近 7 日基线均值 {baselines.metal_spread:.2f}%,"
f"今日分化{intensity},提示上下游需求节奏正在切换。"
)
action = (
Expand All @@ -216,7 +252,7 @@ def _inventory_candidate(payload: AdapterPayload) -> _Candidate | None:
)


def _etf_candidate(payload: AdapterPayload) -> _Candidate | None:
def _etf_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None:
nav = payload.data.get("nav", {}) or {}
daily = nav.get("daily_return")
if daily is None:
Expand All @@ -232,9 +268,9 @@ def _etf_candidate(payload: AdapterPayload) -> _Candidate | None:
f"今日核心信号是 ETF 512400 日内 NAV {direction} {abs(pct):.2f}%,"
f"数据源评级 **{health}**。"
)
intensity = describe_intensity(abs(daily), BASELINE_ETF_NAV_VOL_7D)
intensity = describe_intensity(abs(daily), baselines.etf_nav)
context = (
f"对比近 7 日波动均值 ≈{BASELINE_ETF_NAV_VOL_7D * 100:.2f}%,"
f"对比近 7 日波动均值 ≈{baselines.etf_nav * 100:.2f}%,"
f"今日波幅{intensity};商品驱动子源 "
f"{(payload.data.get('commodity_drivers') or {}).get('ok_count', 0)}"
f"/{(payload.data.get('commodity_drivers') or {}).get('total', 0)} 正常。"
Expand All @@ -251,7 +287,7 @@ def _etf_candidate(payload: AdapterPayload) -> _Candidate | None:
)


def _industry_candidate(payload: AdapterPayload) -> _Candidate | None:
def _industry_candidate(payload: AdapterPayload, baselines: _Baselines) -> _Candidate | None:
rows = payload.data.get("industries", []) or []
if not rows:
return None
Expand All @@ -265,9 +301,9 @@ def _industry_candidate(payload: AdapterPayload) -> _Candidate | None:
f"今日核心信号是行业热度榜首 **{name}**"
f"(热度={heat:.3f},政策口径={_SIGNAL_LABELS.get(str(top.get('policy_signal')), '中性')})。"
)
intensity = describe_intensity(heat, BASELINE_INDUSTRY_HEAT_7D)
intensity = describe_intensity(heat, baselines.industry_heat)
context = (
f"对比近 7 日热度均值 ≈{BASELINE_INDUSTRY_HEAT_7D:.2f},"
f"对比近 7 日热度均值 ≈{baselines.industry_heat:.2f},"
f"今日热度{intensity},关注政策叠加是否同向放大。"
)
action = (
Expand Down
Loading
Loading