diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ed51ac5..77c2068 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,7 +37,7 @@ jobs: TELEGRAM_PROJECTS_ROOT: ${{ github.workspace }} run: | python -m pip install --upgrade pip - python -m pip install pytest + python -m pip install pytest pyyaml python -m pytest -q python -m compileall -q src tests diff --git a/control-plane/AGENTS.md b/control-plane/AGENTS.md index 32c984e..c03ba58 100644 --- a/control-plane/AGENTS.md +++ b/control-plane/AGENTS.md @@ -1,107 +1,70 @@ # Telegram Control-Plane Rules -- This directory is the local Telegram control-plane, not a Telegram runtime repo. -- Default operation is read-only toward external Telegram components. -- Do not move repos, refresh plugin cache, sync skill-index, rewrite LaunchAgents, - start mirror jobs, or copy sessions from here without an explicit later plan. -- `generated/` may be rewritten by local doctor/status commands. -- The first milestone is allowed to fail closed on known defects. - -## Agent Entry (read this first) - -### Codex (live read — hot path) - -Do **not** load the full telegram skill for «что нового / прочитай чат за сегодня». - -1. `telegram://docs/routing` **or** `tg read today --limit 30 --json` -2. Fallback: `bin/telegram-fast-read-today` → MCP `telegram_read` `mode="fast"` -3. Forbidden until read fails: mcporter, tool_search, plugin README, doctor, launchd +This directory is the local Telegram control-plane, not a Telegram runtime repo. +Default operation is read-only toward external Telegram components. -`tg` on PATH: `./bin/telegram-kit --local` +## First calls (agents start here) -### All hosts +- `./bin/tgc next --json` — doctor triage as prioritized actions with exact commands. +- `./bin/tgc commands --json` — machine-readable registry of every command + (purpose, level, safety, example). Same data as `tests/test_command_registry.py` + enforces, so it cannot drift from `bin/`. -For live Telegram work, read MCP resources first (smaller than the full skill): +## Intent → command -- `telegram://docs/routing`, `telegram://docs/tools`, `telegram://docs/sources` - -Full skill: `$HOME/.agents/skills/telegram` (symlink to -`generated/telegram-plugin-package/skills/telegram`). Do not improvise Telethon -calls or browse `telegram-mcp` unless debugging. - -### Speed path (low-stakes "что нового / за сегодня") - -1. Classify: current/today/recent → **live only** (never mirror/archive). -2. On this host, run first: - `tg read today --limit 30 --json` (or `bin/telegram-fast-read-today` alias). -3. If that fails, call MCP `telegram_read` with `mode="fast"`, not legacy - `read_today_dialog` (not on default allowlist). -4. Skip `mcporter list`, doctor, launchd, and plugin README until a real failure. - -### Quality path (complete / media / send) - -| User intent | Tool | Notes | +| Intent | Command | Notes | | --- | --- | --- | -| Keyword in dialog | `telegram_search` | Then fetch context only for hits | -| Full today, nothing missed | `telegram_read` `mode="full"` + page | Report `truncated` / `has_more_before` | -| Draft reply | `telegram_prepare_reply` | No send without explicit user text | -| Send | `telegram_confirmed_send` | Fresh `confirmation_token` from preview | -| Photos/video | `telegram_inspect_media` / downloads | Never answer from captions only | +| Что нового / прочитай чат за сегодня | `tg read today --limit 30 --json` | Live only; never mirror/archive. Fallback: MCP `telegram_read` `mode="fast"` | +| Keyword in dialog | MCP `telegram_search` | Then fetch context only for hits | +| Full today, nothing missed | MCP `telegram_read` `mode="full"` + page | Report `truncated` / `has_more_before` | +| Draft reply | MCP `telegram_prepare_reply` | No send without explicit user text | +| Send | MCP `telegram_confirmed_send` | Fresh `confirmation_token` from preview | +| Photos/video | MCP `telegram_inspect_media` / downloads | Never answer from captions only | | Historical (allowlist) | `telegram-local-mirror` skill | Not for today/latest | +| Mirror status/read/search | `./bin/telegram-mirror-fast …` | Local exports only; promotion is maintenance | +| Control-plane health | `./bin/telegram-status`, `./bin/telegram-doctor --json` | Core profile, fails closed | +| Low-stakes today smoke | `./bin/telegram-fast-read-today me --limit 1` | Read-only fast path | +| Anything else | `./bin/tgc commands` | Pick by level/safety from the registry | -### Default MCP surface (16 tools) - -Only these are exposed to agents via plugin allowlist: `telegram_read`, -`telegram_search`, `telegram_prepare_reply`, `telegram_confirmed_send`, -`telegram_inspect_media`, `telegram_export_members`, `resolve_dialog`, -`find_dialog`, `collect_dialog_context`, `collect_context`, `download_media`, -`download_media_batch`, `download_dialog_media`, `prepare_media_inspection_manifest`, -`get_me`, `doctor_check`. Legacy aliases (`read_today_dialog`, `prepare_dialog_reply`, -`draft_reply`, `search_dialog_messages`, …) and raw `send_dialog_message` / -`reply_in_dialog` are **not** on the default surface (full/admin profile only). +Forbidden until a read actually fails: mcporter, tool_search, plugin README, +doctor, launchd. `tg` on PATH: `./bin/telegram-kit --local`. For live Telegram +work read MCP resources first (`telegram://docs/routing`, `telegram://docs/tools`, +`telegram://docs/sources`); full skill: `$HOME/.agents/skills/telegram`. Do not +improvise Telethon calls or browse `telegram-mcp` unless debugging. -### Doc sync (skill ↔ MCP resources) +## Hard rules -Edit `generated/telegram-plugin-package/skills/telegram/references/`, then: - -```bash -./bin/telegram-agent-docs-sync -``` - -Restarts local MCP HTTP daemons automatically after sync. CI uses `--check --no-restart`. -`build-plugin-package` runs the same sync automatically. Manifest: -`skills/telegram/agent-docs/manifest.json`. - -### Telemetry (local) - -- Daily JSONL: `~/telegram-mcp/telemetry/daily/YYYY-MM-DD.jsonl` (30-day retention). -- Symlink: `~/telegram-mcp/telemetry.jsonl` → today’s file. -- Snapshot: `~/telegram-mcp/telemetry-stats.json` (runtime_stats + scheduler, ~60s). -- Prometheus: `http://127.0.0.1:9109/metrics` (set `TELEGRAM_TELEMETRY_METRICS_PORT`; use `9110` for PL profile). -- Policy: `policy/telemetry/` (Prometheus scrape, alert rules, Grafana dashboard JSON). -- Summarize: `./bin/telegram-telemetry-status --json` or MCP `bin/telemetry-summary --json`. -- Event `source`: `mcp_tool`, `fast_read_cli`, `mcp_server` — only paths through MCP/fast-read are tracked. -- `doctor_check` includes `telemetry_summary` for the last 24h. +- Do not move repos, refresh plugin cache, sync skill-index, rewrite LaunchAgents, + start mirror jobs, or copy sessions from here without an explicit later plan. +- `generated/` may be rewritten by local doctor/status commands. +- Blocking doctor findings: stop and run `./bin/telegram-repair-plan --json` + (dry-run) before proposing changes. +- Maintenance/release actions (`telegram-maintenance-doctor`, + `telegram-release-gate`, `telegram-repair-plan-apply`, plugin cache + materialization, adapter installs, docs sync) require an explicit + maintenance/release task. + +## Deep docs (read on demand) + +- Doctor warn triage and command levels: `docs/agents/doctor-triage.md` +- Default MCP surface (16 tools) and release-gate naming: `docs/agents/mcp-surface.md` +- Telemetry locations and thresholds: `docs/agents/telemetry.md` +- Doc sync skill ↔ MCP resources: `docs/agents/doc-sync.md` +- Human map: `MAP.md`; roadmap: `TELEGRAM_AGENT_KIT_ROADMAP.md` +- Live MCP backend location: `policy/managed-systems.json` → `telegram-mcp` +- Portable plugin: `generated/telegram-plugin-package` -### Verification on this host +## Verification on this host ```bash -./bin/telegram-fast-read-today me --limit 1 -./bin/telegram-golden-read-smoke --json -./bin/telegram-mcp-surface --json +tg read today me --limit 1 --json # payload.data_source == "live_telegram" +./bin/tgc next ./bin/telegram-doctor --json -./bin/telegram-telemetry-status --json ``` -Fast read must return `payload.data_source == "live_telegram"`, not -`Unknown tool`. Golden smoke covers five dialogs in `policy/golden-dialogs.json` -(me, Конспекты, three DMs). Use `TELEGRAM_GOLDEN_READ_SKIP=1` only in CI/offline. - -### Runtime locations - -- Live MCP backend: `policy/managed-systems.json` → `telegram-mcp` -- Portable plugin: `generated/telegram-plugin-package` -- Human map: `MAP.md`, roadmap: `TELEGRAM_AGENT_KIT_ROADMAP.md` +Use `./bin/telegram-golden-read-smoke --json` only for release or live-smoke +verification (five dialogs from `policy/golden-dialogs.json`). +`TELEGRAM_GOLDEN_READ_SKIP=1` is CI/offline only. ## Agent skills @@ -115,4 +78,4 @@ Five canonical triage roles (`needs-triage`, `needs-info`, `ready-for-agent`, `r ### Domain docs -Single-context layout: `CONTEXT.md` at the repo root and `docs/adr/` for decisions. See `docs/agents/domain.md`. \ No newline at end of file +Single-context layout: `CONTEXT.md` at the repo root and `docs/adr/` for decisions. See `docs/agents/domain.md`. diff --git a/control-plane/PLAN.md b/control-plane/PLAN.md index 1f9f4c1..86c3a0c 100644 --- a/control-plane/PLAN.md +++ b/control-plane/PLAN.md @@ -44,25 +44,28 @@ Roadmap milestones 1–7 are implemented in code and gates: ## Verification -Run these before calling the control-plane healthy: +For day-to-day health, use: ```bash -./bin/telegram-release-gate +./bin/telegram-status ./bin/telegram-doctor --json -python3 -m pytest -q -m integration +./bin/telegram-mirror-fast status --json ``` Expected current shape: -- `telegram-release-gate`: exit `0` -- `telegram-managed-systems`: `ok` -- `telegram-mcp-surface`: `ok` -- `telegram-docs-audit`: `ok` -- `telegram-doctor`: `warn` with `0` blocking findings -- known warnings only for `telegram-mirror` recovery state and telecrawl archive - gaps +- `telegram-doctor`: fast core profile only +- core covers live Telegram routing/surface safety, fast read, minimal live + runtime state, and lightweight mirror status +- mirror fast path reads existing local exports with + `telegram-mirror-fast read/search`; it does not start watchers or backfills +- release/archive/telemetry/plugin/mirror-promotion checks are not core + +For release/maintenance work, run `./bin/telegram-maintenance-doctor --json` or +`./bin/telegram-release-gate`. Use component audits such as +`telegram-mcp-surface`, `telegram-docs-audit`, or `telegram-telemetry-status` +only as drill-down from doctor findings. ## karpathy-kb Canonical entity: `research/karpathy-kb/wiki/entities/telegram-control-plane.md` - diff --git a/control-plane/README.md b/control-plane/README.md index 4603f5c..c25557d 100644 --- a/control-plane/README.md +++ b/control-plane/README.md @@ -6,28 +6,44 @@ This is not a monorepo migration and not a new source of truth. It observes the existing live components and fails closed when their state disagrees with the desired policy. -## Commands +## Quick Start ```bash -./bin/telegram-doctor --json +./bin/tgc next --json # doctor triage as prioritized actions with exact commands +./bin/tgc commands --json # machine-readable registry of every command ./bin/telegram-status --json -./bin/telegram-fast-read-today me --limit 1 -./bin/telegram-managed-systems --json -./bin/telegram-plugin-drift --json -./bin/telegram-mcp-surface --json -./bin/telegram-launchd-audit --json -./bin/telegram-session-audit --json -./bin/telegram-mirror-preflight --json -./bin/telegram-telecrawl-status --json -./bin/telegram-repair-plan --json -./bin/telegram-repair-plan-apply --json -./bin/telegram-telemetry-status --json -./bin/telegram-docs-audit --json -./bin/telegram-release-gates --json -./bin/telegram-install-adapters --json -./bin/telegram-release-gate +./bin/telegram-doctor --json +./bin/telegram-maintenance-doctor --json +``` + +`tgc` is the agent entrypoint: `next` answers "what should I do right now", +`commands` lists every public command with purpose, level +(daily/live/mirror/drilldown/maintenance/release), and safety class. The +registry is unit-tested against `bin/`, so it cannot drift. + +Use `telegram-status`/`telegram-doctor` for quick local health. They run the +single-user core profile by default. For low-stakes current reads, use the live +Telegram path instead: + +```bash +tg read today --limit 30 --json +``` + +For local mirror work, use the mirror fast path/status first; full mirror +promotion, export completeness, and recovery checks belong to maintenance. + +```bash +./bin/telegram-mirror-fast status --json +./bin/telegram-mirror-fast read --limit 30 --json +./bin/telegram-mirror-fast search --target --limit 30 --json ``` +Run component commands such as `telegram-mcp-surface`, +`telegram-telemetry-status`, `telegram-telecrawl-status`, `telegram-docs-audit`, +or `telegram-managed-systems` only as drill-down after a core or maintenance +check points at that component. `telegram-release-gate` is the release path, not +the daily path. + ## Plugin Packaging The canonical portable Telegram plugin package is: @@ -65,7 +81,8 @@ The builder fails closed if the package would contain private paths, `.env`, enters through `/Users/sereja/plugins/telegram`, but that path is now a symlink alias to the portable package root, not the canonical artifact source. -After rebuilding the package, materialize Codex's local plugin cache with: +After rebuilding the package, materialize Codex's local plugin cache only as an +explicit maintenance/release step: ```bash codex plugin remove telegram@sereja-local && codex plugin add telegram@sereja-local @@ -81,17 +98,22 @@ codex plugin remove telegram@sereja-local && codex plugin add telegram@sereja-lo - `docs/telegram-kit-explainer.html` is a self-contained Russian explainer for how the Telegram agent kit fits together (open locally in a browser). -`telegram-doctor --json` writes the runtime-only +`telegram-doctor --json` writes the runtime-only core-profile `generated/observed-registry.json` snapshot and exits non-zero while blocking defects are present. The snapshot is intentionally ignored by git because it contains live PIDs, timestamps, and host inventory state. +Use `telegram-doctor --profile maintenance --json` or +`telegram-maintenance-doctor --json` for the broad estate audit that includes +release, plugin, archive, telemetry, and recovery checks. + `telegram-repair-plan --json` is dry-run planning only. It describes ordered repair steps, touched paths, verification commands, and rollback notes without applying changes. `telegram-repair-plan-apply --json` runs only allowlisted safe apply steps (today: -`plugin-cache-materialize` when drift reports installer-ready cache lag). +`plugin-cache-materialize` when drift reports installer-ready cache lag). Do not +run it from a general status/read task. `telegram-telemetry-status --json` summarizes daily JSONL logs, checks Prometheus `/metrics` targets (9109/9110), and applies thresholds from @@ -100,15 +122,17 @@ into Grafana and include `policy/telemetry/prometheus-scrape.yml` in Prometheus. ## Surface Contract -The default Telegram MCP endpoint is read-only toward external Telegram state. -It may resolve, read, search, collect context, and prepare send/reply previews, -but it must not send, reply, edit, delete, mark, create, invite, promote, or -otherwise mutate Telegram. +The default Telegram MCP endpoint is read-mostly toward external Telegram state. +It may resolve, read, search, collect context, prepare send/reply previews, and +perform confirmed sends through `telegram_confirmed_send` after a fresh preview +token. It must not expose raw send/reply, edit, delete, mark, create, invite, +promote, or other direct mutation tools. Write-capable tools such as `send_dialog_message`, `reply_in_dialog`, and `reply_message` are allowed only in an explicit `full` or `admin` tool profile. -The control-plane treats any write-capable tool in the default profile or plugin -allowlist as a blocking defect. +The control-plane treats any raw write-capable tool in the default profile or +plugin allowlist as a blocking defect. `telegram_confirmed_send` is the approved +confirmed-write facade tool. For simple low-stakes "read today" tasks, `bin/telegram-fast-read-today` is the supported first path on this host. It talks directly to the local MCP HTTP daemon @@ -138,15 +162,38 @@ tests, and live smokes. Use `./bin/telegram-release-gate --ci` in GitHub Actions (agent-docs check, docs audit, pytest only). Integration smokes stay manual: `python3 -m pytest -q -m integration`. -`telegram-doctor` includes the docs audit via the `docs` registry component. +`telegram-maintenance-doctor` includes the docs audit via the `docs` registry +component. + +## Command Levels + +- Daily: `telegram-status`, `telegram-doctor`. +- Live read: `tg read today --limit 30 --json`. +- Mirror fast path: `telegram-mirror-fast status/read/search`; full mirror + preflight is maintenance only. +- Drill-down: component audits such as `telegram-mcp-surface`, + `telegram-telemetry-status`, `telegram-telecrawl-status`, + `telegram-managed-systems`, and `telegram-docs-audit`. +- Release/maintenance: `telegram-maintenance-doctor`, `telegram-release-gate`, + `telegram-agent-docs-sync`, `telegram-install-adapters`, plugin cache + materialization, `telegram-repair-plan`, and `telegram-repair-plan-apply`. ## Current Status -- Healthy control-plane target: `telegram-doctor --json` returns `warn` with - `0` blocking findings; any blocking finding is a release blocker (`exit 1`). -- Expected operational warning: `telecrawl_known_gaps` when the default archive +- Healthy core target: `telegram-doctor --json` returns `ok` with + `0` blocking findings and does not run release/archive/telemetry checks. +- Healthy maintenance target: `telegram-maintenance-doctor --json` returns + `warn` with `0` blocking findings; any blocking finding is a release blocker + (`exit 1`). +- A maintenance `warn` with `0` blocking findings is an operational warning, not + a reason to start repair. Read `findings[].component` first, then run + `telegram-repair-plan --json` only when a concrete repair is needed. +- Expected maintenance warning: `telecrawl_known_gaps` when the default archive still has retryable import gaps (`TimeoutError` backlog). This is documented - in `policy/telecrawl.json` and is not a release blocker. + in `policy/telecrawl.json` and is not a core/live Telegram blocker. +- Expected maintenance warning: `mcp_telemetry` when recent tool errors or error + rate cross local telemetry thresholds. Check `telegram-telemetry-status --json` + before changing runtime routing. - `telegram-fast-read-today me --limit 1` is the local fast smoke for the supported simple-read shortcut. - `telegram-managed-systems --json` is the canonical inventory of Telegram @@ -156,6 +203,10 @@ tests, and live smokes. Use `./bin/telegram-release-gate --ci` in GitHub Actions aligned at local Telegram plugin version `0.1.10`. - The default MCP tool profile is the restricted facade profile. Admin/channel management tools require an explicit full/admin profile. +- `telegram-mcp-surface --json` may report the broader live backend tool count; + that is not drift by itself. The default profile is healthy when the approved + 16 facade tools are the only `default_surface_tools` and + `unexpected_write_or_destructive_tools` is empty. - Active MCP LaunchAgent plists no longer contain Telegram API secrets; they load credentials through `TELEGRAM_MCP_ENV_FILE` pointing at private `0600` env files under the MCP session directories. @@ -172,6 +223,11 @@ tests, and live smokes. Use `./bin/telegram-release-gate --ci` in GitHub Actions - Do not move repos. - Do not delete Telegram-related paths directly. Start from `policy/managed-systems.json` and create a dry-run repair/cleanup plan first. +- For any doctor warning that looks actionable, run + `./bin/telegram-repair-plan --json` and inspect the dry-run before applying + anything. +- Do not run plugin cache materialization, adapter installs, docs sync restarts, + or `telegram-repair-plan-apply` without an explicit maintenance/release task. - Do not start mirror watchers, backfills, sync jobs, or LaunchAgents. - Do not copy Telegram sessions into this tree. - Do not store secrets, session strings, subscriber exports, media payloads, or diff --git a/control-plane/bin/telegram-maintenance-doctor b/control-plane/bin/telegram-maintenance-doctor new file mode 100755 index 0000000..27bf2e1 --- /dev/null +++ b/control-plane/bin/telegram-maintenance-doctor @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +PYTHONPATH="${ROOT}/src" python3 -m telegram_control_plane doctor --profile maintenance "$@" diff --git a/control-plane/bin/telegram-mirror-fast b/control-plane/bin/telegram-mirror-fast new file mode 100755 index 0000000..4163523 --- /dev/null +++ b/control-plane/bin/telegram-mirror-fast @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +PYTHONPATH="${ROOT}/src" python3 -m telegram_control_plane.mirror_fast "$@" diff --git a/control-plane/bin/tgc b/control-plane/bin/tgc new file mode 100755 index 0000000..fe3e892 --- /dev/null +++ b/control-plane/bin/tgc @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +PYTHONPATH="${ROOT}/src" exec python3 -m telegram_control_plane "$@" diff --git a/control-plane/docs/agents/doc-sync.md b/control-plane/docs/agents/doc-sync.md new file mode 100644 index 0000000..bf6075c --- /dev/null +++ b/control-plane/docs/agents/doc-sync.md @@ -0,0 +1,11 @@ +# Doc Sync (skill ↔ MCP resources) + +Edit `generated/telegram-plugin-package/skills/telegram/references/`, then: + +```bash +./bin/telegram-agent-docs-sync +``` + +Restarts local MCP HTTP daemons automatically after sync. CI uses `--check --no-restart`. +`build-plugin-package` runs the same sync automatically. Manifest: +`skills/telegram/agent-docs/manifest.json`. diff --git a/control-plane/docs/agents/doctor-triage.md b/control-plane/docs/agents/doctor-triage.md new file mode 100644 index 0000000..a5ad3fd --- /dev/null +++ b/control-plane/docs/agents/doctor-triage.md @@ -0,0 +1,46 @@ +# Doctor Warn Triage + +Use doctor for control-plane health, not for ordinary live reads. Interpret +`./bin/telegram-doctor --json` by severity, not by the top-level word alone. +`telegram-doctor` is the fast core profile by default; use +`./bin/telegram-maintenance-doctor --json` or +`./bin/telegram-doctor --profile maintenance --json` only for broad +release/archive/recovery checks: + +1. `status=ok`: control-plane checks are clean. +2. `status=warn` with `summary.blocking_findings=0`: operational warning, not a + release blocker. Read `findings[].component` before acting. +3. `status=fail` or any blocking finding: stop and use + `./bin/telegram-repair-plan --json` before proposing changes. + +`./bin/tgc next --json` automates this triage: it maps each finding component +to the exact drill-down command and appends the repair-plan step when blocking +findings are present. + +## Common non-blocking maintenance warnings + +- `mcp_telemetry`: recent MCP tool errors or high error rate. Check + `./bin/telegram-telemetry-status --json`; do not rewrite runtime routing from + this signal alone. +- `telecrawl_known_gaps`: archive import backlog or terminal archive gaps. + Telecrawl is archive evidence, not live/current Telegram truth. +- `plugin_cache_needs_materialization`: plugin/cache install lag. This is + distinct from default MCP surface health. + +Surface health and maintenance health are separate layers: a green +`telegram-mcp-surface --json` can coexist with maintenance `warn`, and plugin +drift can be green while another runtime layer warns. + +## Operator command levels + +- Daily health: `./bin/telegram-status` for a human summary, or + `./bin/telegram-doctor --json` for machine-readable core output. +- Mirror fast path: use `./bin/telegram-mirror-fast status/read/search` first + when the task explicitly asks for mirror; it reads local export/ledger files + only. Mirror promotion/preflight is maintenance, not daily mirror use. +- Drill-down: run `telegram-mcp-surface`, `telegram-docs-audit`, + `telegram-telemetry-status`, `telegram-telecrawl-status`, or other component + checks only after doctor points at that component. +- Maintenance/release: `telegram-maintenance-doctor`, `telegram-release-gate`, + plugin cache materialization, adapter installs, and docs sync require an + explicit maintenance/release task. diff --git a/control-plane/docs/agents/domain.md b/control-plane/docs/agents/domain.md index ccbbcec..48b4b90 100644 --- a/control-plane/docs/agents/domain.md +++ b/control-plane/docs/agents/domain.md @@ -4,9 +4,8 @@ How the engineering skills should consume this repo's domain documentation when ## Before exploring, read these -- **`CONTEXT.md`** at the repo root, or -- **`CONTEXT-MAP.md`** at the repo root if it exists — it points at one `CONTEXT.md` per context. Read each one relevant to the topic. -- **`docs/adr/`** — read ADRs that touch the area you're about to work in. In multi-context repos, also check `src//docs/adr/` for context-scoped decisions. +- **`CONTEXT.md`** at the repo root. +- **`docs/adr/`** if an ADR exists for the area you're about to work in. If any of these files don't exist, **proceed silently**. Don't flag their absence; don't suggest creating them upfront. The producer skill (`/grill-with-docs`) creates them lazily when terms or decisions actually get resolved. @@ -39,4 +38,4 @@ If the concept you need isn't in the glossary yet, that's a signal — either yo If your output contradicts an existing ADR, surface it explicitly rather than silently overriding: -> _Contradicts ADR-0007 (event-sourced orders) — but worth reopening because…_ \ No newline at end of file +> _Contradicts ADR-0007 (event-sourced orders) — but worth reopening because…_ diff --git a/control-plane/docs/agents/mcp-surface.md b/control-plane/docs/agents/mcp-surface.md new file mode 100644 index 0000000..0acbfc2 --- /dev/null +++ b/control-plane/docs/agents/mcp-surface.md @@ -0,0 +1,20 @@ +# Default MCP Surface (16 tools) + +Only these are exposed to agents via plugin allowlist: `telegram_read`, +`telegram_search`, `telegram_prepare_reply`, `telegram_confirmed_send`, +`telegram_inspect_media`, `telegram_export_members`, `resolve_dialog`, +`find_dialog`, `collect_dialog_context`, `collect_context`, `download_media`, +`download_media_batch`, `download_dialog_media`, `prepare_media_inspection_manifest`, +`get_me`, `doctor_check`. Legacy aliases (`read_today_dialog`, `prepare_dialog_reply`, +`draft_reply`, `search_dialog_messages`, …) and raw `send_dialog_message` / +`reply_in_dialog` are **not** on the default surface (full/admin profile only). + +If `telegram-mcp-surface --json` reports a larger backend `tool_count`, do not +treat that alone as drift. The default profile is healthy when +`default_surface_tools` matches these 16 approved tools and +`unexpected_write_or_destructive_tools` is empty. + +## Naming note + +- `telegram-release-gate` **runs** the bundled pre-release gates. +- `telegram-release-gates` **audits** the gate configuration only. diff --git a/control-plane/docs/agents/telemetry.md b/control-plane/docs/agents/telemetry.md new file mode 100644 index 0000000..793ac96 --- /dev/null +++ b/control-plane/docs/agents/telemetry.md @@ -0,0 +1,10 @@ +# Local Telemetry + +- Daily JSONL: `~/telegram-mcp/telemetry/daily/YYYY-MM-DD.jsonl` (30-day retention). +- Symlink: `~/telegram-mcp/telemetry.jsonl` → today’s file. +- Snapshot: `~/telegram-mcp/telemetry-stats.json` (runtime_stats + scheduler, ~60s). +- Prometheus: `http://127.0.0.1:9109/metrics` (set `TELEGRAM_TELEMETRY_METRICS_PORT`; use `9110` for PL profile). +- Policy: `policy/telemetry/` (Prometheus scrape, alert rules, Grafana dashboard JSON). +- Summarize: `./bin/telegram-telemetry-status --json` or MCP `bin/telemetry-summary --json`. +- Event `source`: `mcp_tool`, `fast_read_cli`, `mcp_server` — only paths through MCP/fast-read are tracked. +- `doctor_check` includes `telemetry_summary` for the last 24h. diff --git a/control-plane/policy/registry-schema.json b/control-plane/policy/registry-schema.json index 878daa1..e31e4a2 100644 --- a/control-plane/policy/registry-schema.json +++ b/control-plane/policy/registry-schema.json @@ -47,6 +47,16 @@ "runtime_inventory": ["status", "findings", "summary", "children"], "launchd": ["status", "findings", "jobs", "loaded_jobs", "policy"], "sessions": ["status", "findings", "summary", "policy_summary"], + "mirror_fast_status": [ + "status", + "findings", + "classification", + "runtime_root_exists", + "runtime_exports_exists", + "ledger_count", + "fast_command", + "maintenance_command" + ], "telegram_mirror": ["status", "classification", "findings", "policy", "runtime_state_summary"], "telecrawl": ["status", "findings", "wrapper", "gap_policy", "account_summary", "freshness"] } diff --git a/control-plane/policy/telecrawl.json b/control-plane/policy/telecrawl.json index 0f2a369..9e76f41 100644 --- a/control-plane/policy/telecrawl.json +++ b/control-plane/policy/telecrawl.json @@ -15,6 +15,5 @@ "InviteHashInvalidError" ], "retry_terminal_access_errors": false, - "negative_results_claim": "no matches in this archive coverage", - "route_current_latest_today_send_reply_media_to": "live_mcp" + "source_evidence_owner": "policy/source-routing.json" } diff --git a/control-plane/pyproject.toml b/control-plane/pyproject.toml index d257571..1103c2c 100644 --- a/control-plane/pyproject.toml +++ b/control-plane/pyproject.toml @@ -2,6 +2,7 @@ name = "telegram-control-plane" version = "0.1.0" requires-python = ">=3.11" +dependencies = ["pyyaml>=6"] [tool.pytest.ini_options] pythonpath = ["src"] diff --git a/control-plane/src/telegram_control_plane/audit_remediation.py b/control-plane/src/telegram_control_plane/audit_remediation.py index 5dca2bd..e5f5af9 100644 --- a/control-plane/src/telegram_control_plane/audit_remediation.py +++ b/control-plane/src/telegram_control_plane/audit_remediation.py @@ -6,7 +6,7 @@ from pathlib import Path from typing import Any, Iterable -from .doctor import build_registry +from .doctor import ControlPlaneDoctor from .paths import CONTROL_ROOT, MCP_REPO, MIRROR_ROOT, PLUGIN_CACHE, PLUGIN_CACHE_ROOT, PLUGIN_SOURCE, POLICY_DIR from .util import load_json @@ -33,6 +33,10 @@ def load_remediation_policy(path: str = str(AUDIT_REMEDIATION_PATH)) -> dict[str return load_json(Path(path)) or {} +def build_registry() -> dict[str, Any]: + return ControlPlaneDoctor(profile="maintenance").build_registry() + + def clear_policy_cache() -> None: load_remediation_policy.cache_clear() diff --git a/control-plane/src/telegram_control_plane/audits.py b/control-plane/src/telegram_control_plane/audits.py index 382939e..12ff43d 100644 --- a/control-plane/src/telegram_control_plane/audits.py +++ b/control-plane/src/telegram_control_plane/audits.py @@ -483,13 +483,16 @@ def audit_fast_read_adapter() -> dict[str, Any]: } ) else: - completed = subprocess.run( - command, - capture_output=True, - text=True, - timeout=5, - check=False, - ) + try: + completed = subprocess.run( + command, + capture_output=True, + text=True, + timeout=20, + check=False, + ) + except subprocess.TimeoutExpired: + completed = subprocess.CompletedProcess(command, 124, "", "help probe timed out") help_probe = { "ran": True, "exit_code": completed.returncode, @@ -1211,6 +1214,32 @@ def audit_mirror() -> dict[str, Any]: } +def audit_mirror_fast_status() -> dict[str, Any]: + mirror_policy = load_json(POLICY_DIR / "mirror.json") or {} + runtime_exports = MIRROR_RUNTIME_ROOT / "runtime/ingest/telegram/exports" + ledgers_root = MIRROR_RUNTIME_ROOT / "data/telegram_sync" + ledgers = sorted(ledgers_root.glob("*.json")) if ledgers_root.exists() else [] + findings: list[dict[str, Any]] = [] + if not MIRROR_RUNTIME_ROOT.exists(): + findings.append( + { + "id": "mirror_runtime_root_missing", + "severity": "warn", + "message": "Mirror runtime root is missing.", + } + ) + return { + "status": status_from_findings(findings), + "findings": findings, + "classification": mirror_policy.get("classification") or "mirror-recovery", + "runtime_root_exists": MIRROR_RUNTIME_ROOT.exists(), + "runtime_exports_exists": runtime_exports.exists(), + "ledger_count": len(ledgers), + "fast_command": "telegram-mirror-fast status", + "maintenance_command": "telegram-mirror-audit", + } + + def _mirror_export_coverage(export_root: Path) -> dict[str, Any]: allowlist_report = run_json( ["python3", str(MIRROR_ROOT / "scripts/telegram_mirror_allowlist_report.py"), "--json"], @@ -1393,7 +1422,7 @@ def audit_telecrawl() -> dict[str, Any]: def _collect_components() -> dict[str, dict[str, Any]]: from .doctor import ControlPlaneDoctor - return ControlPlaneDoctor().collect_components() + return ControlPlaneDoctor(profile="maintenance").collect_components() def build_registry() -> dict[str, Any]: diff --git a/control-plane/src/telegram_control_plane/cli.py b/control-plane/src/telegram_control_plane/cli.py index 69b88c3..85c9371 100644 --- a/control-plane/src/telegram_control_plane/cli.py +++ b/control-plane/src/telegram_control_plane/cli.py @@ -24,6 +24,9 @@ from .doctor import ControlPlaneDoctor from .paths import OBSERVED_REGISTRY from .audit_remediation import apply_repair_plan, build_repair_plan +from .doctor_profiles import PROFILE_COMPONENTS +from .command_registry import registry_report +from .next_actions import build_next_actions, render_next_actions from .runtime_inventory import audit_runtime_inventory from .source_routing import audit_source_routing, recommend_route @@ -54,7 +57,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Read-only Telegram control-plane") parser.add_argument( "command", - choices=["doctor", "status", "route", *COMMANDS], + choices=["doctor", "status", "route", "commands", "next", *COMMANDS], help="Audit command", ) parser.add_argument("--json", action="store_true", help="Emit JSON") @@ -63,6 +66,12 @@ def parse_args(argv: list[str]) -> argparse.Namespace: action="store_true", help="Do not write generated/observed-registry.json for doctor/status", ) + parser.add_argument( + "--profile", + choices=sorted(PROFILE_COMPONENTS), + default="core", + help="Doctor/status component profile (default: core)", + ) return parser.parse_args(argv) @@ -97,8 +106,27 @@ def main(argv: list[str] | None = None) -> int: return 0 args = parse_args(raw_argv) + if args.command == "commands": + report = registry_report() + if args.json: + print(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)) + else: + for entry in report["commands"]: + print( + f"{entry['name']:<32} {entry['level']:<12} {entry['safety']:<10} " + f"{entry['purpose']}" + ) + return 0 + if args.command == "next": + doctor = ControlPlaneDoctor(profile=args.profile) + report = build_next_actions(doctor.build_registry()) + if args.json: + print(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)) + else: + print(render_next_actions(report)) + return 1 if report.get("status") == "fail" else 0 if args.command in {"doctor", "status"}: - doctor = ControlPlaneDoctor() + doctor = ControlPlaneDoctor(profile=args.profile) report = doctor.build_registry() if not args.no_write_registry: doctor.write_registry(OBSERVED_REGISTRY, report) diff --git a/control-plane/src/telegram_control_plane/command_registry.py b/control-plane/src/telegram_control_plane/command_registry.py new file mode 100644 index 0000000..ec703fb --- /dev/null +++ b/control-plane/src/telegram_control_plane/command_registry.py @@ -0,0 +1,277 @@ +"""Single source of truth for every public control-plane command. + +Every executable in ``bin/`` (except sourced helpers) must be registered here. +``tests/test_command_registry.py`` fails closed when a wrapper and the registry +drift apart, and when AGENTS.md stops documenting a daily/live command. +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass +from typing import Any + +LEVELS = ("daily", "live", "mirror", "drilldown", "maintenance", "release") +SAFETIES = ("read-only", "mutating", "guarded") + + +@dataclass(frozen=True) +class CommandSpec: + name: str + purpose: str + level: str + safety: str + example: str + # Doctor component this command drills into, if any. + component: str | None = None + + +COMMAND_REGISTRY: tuple[CommandSpec, ...] = ( + # Daily health. + CommandSpec( + name="telegram-status", + purpose="Human-readable core health summary", + level="daily", + safety="read-only", + example="./bin/telegram-status", + ), + CommandSpec( + name="telegram-doctor", + purpose="Core-profile doctor; fails closed on blocking defects", + level="daily", + safety="read-only", + example="./bin/telegram-doctor --json", + ), + CommandSpec( + name="tgc", + purpose="Agent entrypoint: `tgc next` (what to do now), `tgc commands --json` (this registry)", + level="daily", + safety="read-only", + example="./bin/tgc next --json", + ), + # Live Telegram reads. + CommandSpec( + name="tg", + purpose="Live Telegram CLI (read/search/today); first path for current reads", + level="live", + safety="read-only", + example="tg read today --limit 30 --json", + ), + CommandSpec( + name="telegram-fast-read-today", + purpose="Direct MCP HTTP fast read for low-stakes 'today' tasks", + level="live", + safety="read-only", + example="./bin/telegram-fast-read-today me --limit 1", + component="fast_read_adapter", + ), + # Mirror fast path. + CommandSpec( + name="telegram-mirror-fast", + purpose="Mirror fast path: status/read/search over local exports only", + level="mirror", + safety="read-only", + example="./bin/telegram-mirror-fast status --json", + component="mirror_fast_status", + ), + # Drill-down audits (run only after doctor points at the component). + CommandSpec( + name="telegram-mcp-surface", + purpose="Audit default MCP tool surface vs approved facade allowlist", + level="drilldown", + safety="read-only", + example="./bin/telegram-mcp-surface --json", + component="mcp_surface", + ), + CommandSpec( + name="telegram-mcp-profiles", + purpose="Audit MCP tool profiles (default/full/admin) configuration", + level="drilldown", + safety="read-only", + example="./bin/telegram-mcp-profiles --json", + component="mcp_profiles", + ), + CommandSpec( + name="telegram-source-routing-audit", + purpose="Audit live/mirror/archive source routing policy", + level="drilldown", + safety="read-only", + example="./bin/telegram-source-routing-audit --json", + component="source_routing", + ), + CommandSpec( + name="telegram-source-route", + purpose="Recommend live/mirror/archive source for a task intent", + level="drilldown", + safety="read-only", + example="./bin/telegram-source-route 'что нового за сегодня' --json", + ), + CommandSpec( + name="telegram-launchd-audit", + purpose="Audit Telegram LaunchAgents (no secrets in plists, expected state)", + level="drilldown", + safety="read-only", + example="./bin/telegram-launchd-audit --json", + component="launchd", + ), + CommandSpec( + name="telegram-session-audit", + purpose="Audit Telegram session file locations and permissions", + level="drilldown", + safety="read-only", + example="./bin/telegram-session-audit --json", + component="sessions", + ), + CommandSpec( + name="telegram-plugin-drift", + purpose="Audit portable plugin package vs marketplace alias vs installed cache", + level="drilldown", + safety="read-only", + example="./bin/telegram-plugin-drift --json", + component="plugin_drift", + ), + CommandSpec( + name="telegram-telemetry-status", + purpose="Summarize MCP telemetry JSONL and Prometheus targets vs thresholds", + level="drilldown", + safety="read-only", + example="./bin/telegram-telemetry-status --json", + component="mcp_telemetry", + ), + CommandSpec( + name="telegram-telecrawl-status", + purpose="Audit telecrawl archive gaps (archive evidence, not live truth)", + level="drilldown", + safety="read-only", + example="./bin/telegram-telecrawl-status --json", + component="telecrawl", + ), + CommandSpec( + name="telegram-docs-audit", + purpose="Audit agent docs/skill references for drift", + level="drilldown", + safety="read-only", + example="./bin/telegram-docs-audit --json", + component="docs", + ), + CommandSpec( + name="telegram-managed-systems", + purpose="Canonical inventory of Telegram repos, surfaces, and data roots", + level="drilldown", + safety="read-only", + example="./bin/telegram-managed-systems --json", + component="managed_systems", + ), + CommandSpec( + name="telegram-mirror-audit", + purpose="Audit mirror recovery candidate state", + level="drilldown", + safety="read-only", + example="./bin/telegram-mirror-audit --json", + component="telegram_mirror", + ), + CommandSpec( + name="telegram-runtime-inventory", + purpose="Audit runtime processes/daemons related to Telegram", + level="drilldown", + safety="read-only", + example="./bin/telegram-runtime-inventory --json", + component="runtime_inventory", + ), + # Maintenance / release. + CommandSpec( + name="telegram-maintenance-doctor", + purpose="Broad estate audit (release/plugin/archive/telemetry/recovery)", + level="maintenance", + safety="read-only", + example="./bin/telegram-maintenance-doctor --json", + ), + CommandSpec( + name="telegram-repair-plan", + purpose="Dry-run ordered repair plan; never applies changes", + level="maintenance", + safety="read-only", + example="./bin/telegram-repair-plan --json", + ), + CommandSpec( + name="telegram-repair-plan-apply", + purpose="Apply only allowlisted safe repair steps; explicit maintenance task only", + level="maintenance", + safety="guarded", + example="./bin/telegram-repair-plan-apply --json", + ), + CommandSpec( + name="telegram-mirror-preflight", + purpose="Gate before promoting mirror from recovery to runtime", + level="maintenance", + safety="read-only", + example="./bin/telegram-mirror-preflight --json", + ), + CommandSpec( + name="telegram-golden-read-smoke", + purpose="Live read smoke over golden dialogs; release/live-smoke only", + level="release", + safety="read-only", + example="./bin/telegram-golden-read-smoke --json", + component="golden_read_smoke", + ), + CommandSpec( + name="telegram-release-gate", + purpose="RUN the bundled pre-release gates (tests, audits, smokes)", + level="release", + safety="read-only", + example="./bin/telegram-release-gate", + ), + CommandSpec( + name="telegram-release-gates", + purpose="AUDIT release-gate configuration (does not run the gates)", + level="release", + safety="read-only", + example="./bin/telegram-release-gates --json", + component="release_gates", + ), + CommandSpec( + name="telegram-agent-docs-sync", + purpose="Sync skill references into MCP agent docs; restarts local daemons", + level="maintenance", + safety="mutating", + example="./bin/telegram-agent-docs-sync --check", + component="agent_docs_sync", + ), + CommandSpec( + name="telegram-install-adapters", + purpose="Audit portable adapter install state", + level="maintenance", + safety="read-only", + example="./bin/telegram-install-adapters --json", + component="install_adapters", + ), + CommandSpec( + name="telegram-kit", + purpose="Install/check local kit symlinks (tg on PATH)", + level="maintenance", + safety="mutating", + example="./bin/telegram-kit --local", + ), +) + +_BY_COMPONENT = { + spec.component: spec for spec in COMMAND_REGISTRY if spec.component is not None +} +_BY_NAME = {spec.name: spec for spec in COMMAND_REGISTRY} + + +def command_for_component(component: str) -> CommandSpec | None: + return _BY_COMPONENT.get(component) + + +def command_by_name(name: str) -> CommandSpec | None: + return _BY_NAME.get(name) + + +def registry_report() -> dict[str, Any]: + return { + "status": "ok", + "levels": list(LEVELS), + "safeties": list(SAFETIES), + "commands": [asdict(spec) for spec in COMMAND_REGISTRY], + } diff --git a/control-plane/src/telegram_control_plane/doctor.py b/control-plane/src/telegram_control_plane/doctor.py index 4e42c4f..c22fa60 100644 --- a/control-plane/src/telegram_control_plane/doctor.py +++ b/control-plane/src/telegram_control_plane/doctor.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import Any +from .doctor_profiles import collect_profile_components, doctor_profile from . import registry_redaction, runtime_inventory, source_routing from .util import status_from_findings @@ -15,8 +16,14 @@ class ControlPlaneDoctor: """Build and persist the read-only Telegram control-plane registry.""" - def __init__(self, component_collector: Callable[[], ComponentReports] | None = None) -> None: + def __init__( + self, + component_collector: Callable[[], ComponentReports] | None = None, + *, + profile: str = "core", + ) -> None: self._component_collector = component_collector + self.profile = doctor_profile(profile) def collect_components(self) -> ComponentReports: if self._component_collector is not None: @@ -24,32 +31,51 @@ def collect_components(self) -> ComponentReports: from . import audits - launchd = audits.audit_launchd() - sessions = audits.audit_sessions() - mirror = audits.audit_mirror() - return { - "managed_systems": audits.audit_managed_systems(), - "docs": audits.audit_docs(), - "plugin_drift": audits.audit_plugin_drift(), - "mcp_telemetry": audits.audit_mcp_telemetry(), - "fast_read_adapter": audits.audit_fast_read_adapter(), - "golden_read_smoke": audits.audit_golden_read_smoke(), - "agent_docs_sync": audits.audit_agent_docs_sync(), - "release_gates": audits.audit_release_gates(), - "install_adapters": audits.audit_install_adapters(), - "mcp_surface": audits.audit_mcp_surface(), - "mcp_profiles": audits.audit_mcp_profiles(), - "source_routing": source_routing.audit_source_routing(), + cached: dict[str, dict[str, Any]] = {} + + def launchd() -> dict[str, Any]: + if "launchd" not in cached: + cached["launchd"] = audits.audit_launchd() + return cached["launchd"] + + def sessions() -> dict[str, Any]: + if "sessions" not in cached: + cached["sessions"] = audits.audit_sessions() + return cached["sessions"] + + def mirror() -> dict[str, Any]: + if "telegram_mirror" not in cached: + cached["telegram_mirror"] = audits.audit_mirror() + return cached["telegram_mirror"] + + def runtime_inventory_report() -> dict[str, Any]: + return runtime_inventory.audit_runtime_inventory( + launchd_report=launchd(), + sessions_report=sessions(), + mirror_report=mirror(), + ) + + collectors = { + "managed_systems": audits.audit_managed_systems, + "docs": audits.audit_docs, + "plugin_drift": audits.audit_plugin_drift, + "mcp_telemetry": audits.audit_mcp_telemetry, + "fast_read_adapter": audits.audit_fast_read_adapter, + "golden_read_smoke": audits.audit_golden_read_smoke, + "agent_docs_sync": audits.audit_agent_docs_sync, + "release_gates": audits.audit_release_gates, + "install_adapters": audits.audit_install_adapters, + "mcp_surface": audits.audit_mcp_surface, + "mcp_profiles": audits.audit_mcp_profiles, + "source_routing": source_routing.audit_source_routing, "launchd": launchd, "sessions": sessions, "telegram_mirror": mirror, - "runtime_inventory": runtime_inventory.audit_runtime_inventory( - launchd_report=launchd, - sessions_report=sessions, - mirror_report=mirror, - ), - "telecrawl": audits.audit_telecrawl(), + "mirror_fast_status": audits.audit_mirror_fast_status, + "runtime_inventory": runtime_inventory_report, + "telecrawl": audits.audit_telecrawl, } + return collect_profile_components(collectors, profile_name=self.profile.name) def build_registry(self, raw_components: ComponentReports | None = None) -> dict[str, Any]: components_input = raw_components if raw_components is not None else self.collect_components() @@ -67,6 +93,7 @@ def build_registry(self, raw_components: ComponentReports | None = None) -> dict registry = { "schema_version": 1, "generated_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"), + "profile": self.profile.name, "read_only_external_state": True, "status": status_from_findings(findings), "summary": { diff --git a/control-plane/src/telegram_control_plane/doctor_profiles.py b/control-plane/src/telegram_control_plane/doctor_profiles.py new file mode 100644 index 0000000..e0df18e --- /dev/null +++ b/control-plane/src/telegram_control_plane/doctor_profiles.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Callable + +ComponentCollector = Callable[[], dict[str, Any]] + +CORE_COMPONENTS = ( + "fast_read_adapter", + "mcp_surface", + "source_routing", + "launchd", + "sessions", + "mirror_fast_status", +) + +MAINTENANCE_COMPONENTS = ( + "managed_systems", + "docs", + "plugin_drift", + "mcp_telemetry", + "fast_read_adapter", + "golden_read_smoke", + "agent_docs_sync", + "release_gates", + "install_adapters", + "mcp_surface", + "mcp_profiles", + "source_routing", + "launchd", + "sessions", + "telegram_mirror", + "runtime_inventory", + "telecrawl", +) + +PROFILE_COMPONENTS = { + "core": CORE_COMPONENTS, + "maintenance": MAINTENANCE_COMPONENTS, +} + + +@dataclass(frozen=True) +class DoctorProfile: + name: str + components: tuple[str, ...] + + +def doctor_profile(name: str = "core") -> DoctorProfile: + components = PROFILE_COMPONENTS.get(name) + if components is None: + known = ", ".join(sorted(PROFILE_COMPONENTS)) + raise ValueError(f"Unknown doctor profile {name!r}; expected one of: {known}") + return DoctorProfile(name=name, components=components) + + +def collect_profile_components( + collectors: dict[str, ComponentCollector], + *, + profile_name: str = "core", +) -> dict[str, dict[str, Any]]: + profile = doctor_profile(profile_name) + reports: dict[str, dict[str, Any]] = {} + for component in profile.components: + collector = collectors[component] + reports[component] = collector() + return reports diff --git a/control-plane/src/telegram_control_plane/mirror_fast.py b/control-plane/src/telegram_control_plane/mirror_fast.py new file mode 100644 index 0000000..3f902ef --- /dev/null +++ b/control-plane/src/telegram_control_plane/mirror_fast.py @@ -0,0 +1,284 @@ +from __future__ import annotations + +import argparse +import json +import sys +from collections import deque +from datetime import date, datetime +from pathlib import Path +from typing import Any + +import yaml + +from .paths import MIRROR_RUNTIME_ROOT + +EXPORT_ROOT = MIRROR_RUNTIME_ROOT / "runtime/ingest/telegram/exports" +CONFIG_ROOT = MIRROR_RUNTIME_ROOT / "config" +LEDGER_ROOT = MIRROR_RUNTIME_ROOT / "data/telegram_sync" + + +def _parse_date(value: str | None) -> date | None: + if not value: + return None + return date.fromisoformat(value.strip()) + + +def _row_date(row: dict[str, Any]) -> date | None: + raw = str(row.get("date") or row.get("timestamp") or "").strip() + if not raw: + return None + try: + return datetime.fromisoformat(raw.replace("Z", "+00:00")).date() + except ValueError: + try: + return date.fromisoformat(raw[:10]) + except ValueError: + return None + + +def _row_text(row: dict[str, Any]) -> str: + text = str(row.get("text_markdown") or row.get("text_raw") or row.get("message") or "").strip() + if text: + return text + media_type = str(row.get("media_type") or "").strip() + return f"[media:{media_type}]" if media_type else "[empty]" + + +def _message_payload(row: dict[str, Any], *, source: dict[str, Any]) -> dict[str, Any]: + return { + "id": int(row.get("id") or row.get("message_id") or 0), + "date": row.get("date") or row.get("timestamp"), + "text": _row_text(row), + "media_type": row.get("media_type"), + "views": row.get("views"), + "forwards": row.get("forwards"), + "source": source, + } + + +def _load_jsonl(path: Path) -> list[dict[str, Any]]: + rows: list[dict[str, Any]] = [] + if not path.exists(): + return rows + for line in path.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + try: + payload = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(payload, dict): + rows.append(payload) + return rows + + +def _config_rows(config_root: Path = CONFIG_ROOT) -> list[dict[str, Any]]: + rows: list[dict[str, Any]] = [] + if not config_root.exists(): + return rows + for path in sorted(config_root.glob("telegram_channels*.yaml")): + try: + payload = yaml.safe_load(path.read_text(encoding="utf-8")) or {} + except (OSError, yaml.YAMLError): + continue + for row in payload.get("channels", []): + if not isinstance(row, dict): + continue + item = dict(row) + item["_config_path"] = str(path) + rows.append(item) + return rows + + +def _norm(value: Any) -> str: + return str(value or "").strip().lower().lstrip("@") + + +def _matches(row: dict[str, Any], query: str) -> bool: + needle = _norm(query) + if not needle: + return False + fields = ( + row.get("name"), + row.get("username"), + row.get("channel_id"), + row.get("author_id"), + row.get("speaker_name"), + row.get("mirror_scope"), + row.get("export_folder"), + ) + return any(needle in _norm(value) for value in fields) + + +def _source_for(row: dict[str, Any], messages_path: Path) -> dict[str, Any]: + return { + "channel_id": row.get("channel_id"), + "name": row.get("name"), + "username": row.get("username"), + "mirror_scope": row.get("mirror_scope"), + "export_folder": row.get("export_folder"), + "messages_path": str(messages_path), + } + + +def _messages_path(row: dict[str, Any], export_root: Path = EXPORT_ROOT) -> Path: + return export_root / str(row.get("export_folder") or "").strip() / "messages_raw.jsonl" + + +def build_status(*, export_root: Path = EXPORT_ROOT, ledger_root: Path = LEDGER_ROOT) -> dict[str, Any]: + exports = sorted(export_root.glob("**/messages_raw.jsonl")) if export_root.exists() else [] + ledgers = sorted(ledger_root.glob("*.json")) if ledger_root.exists() else [] + return { + "status": "ok" if MIRROR_RUNTIME_ROOT.exists() else "warn", + "mode": "read_only_fast_mirror", + "runtime_root": str(MIRROR_RUNTIME_ROOT), + "runtime_root_exists": MIRROR_RUNTIME_ROOT.exists(), + "export_root": str(export_root), + "export_root_exists": export_root.exists(), + "export_count": len(exports), + "ledger_root": str(ledger_root), + "ledger_count": len(ledgers), + "config_channel_count": len(_config_rows()), + "commands": { + "read": "telegram-mirror-fast read --limit 30 --json", + "search": "telegram-mirror-fast search --limit 30 --json", + }, + } + + +def read_messages( + *, + query: str, + date_from: str | None = None, + date_to: str | None = None, + limit: int = 30, + config_root: Path = CONFIG_ROOT, + export_root: Path = EXPORT_ROOT, +) -> dict[str, Any]: + matches = [row for row in _config_rows(config_root) if _matches(row, query)] + if not matches: + return {"status": "warn", "error": "mirror_target_not_found", "query": query, "messages": [], "message_count": 0} + + start = _parse_date(date_from) + end = _parse_date(date_to) + if start and end and start > end: + raise ValueError("--date-from must be before or equal to --date-to") + + messages: deque[dict[str, Any]] = deque(maxlen=max(limit, 0) or None) + missing_exports: list[dict[str, Any]] = [] + for row in matches: + path = _messages_path(row, export_root) + source = _source_for(row, path) + if not path.exists(): + missing_exports.append(source) + continue + for raw in _load_jsonl(path): + msg_date = _row_date(raw) + if start and (msg_date is None or msg_date < start): + continue + if end and (msg_date is None or msg_date > end): + continue + messages.append(_message_payload(raw, source=source)) + + result = list(messages) + result.sort(key=lambda row: (str(row.get("date") or ""), int(row.get("id") or 0))) + return { + "status": "ok" if result else "warn", + "query": query, + "range": {"date_from": date_from, "date_to": date_to}, + "matched_targets": len(matches), + "missing_exports": missing_exports, + "message_count": len(result), + "messages": result, + } + + +def search_messages( + *, + text: str, + target: str | None = None, + limit: int = 30, + config_root: Path = CONFIG_ROOT, + export_root: Path = EXPORT_ROOT, +) -> dict[str, Any]: + needle = text.casefold() + rows = _config_rows(config_root) + if target: + rows = [row for row in rows if _matches(row, target)] + hits: list[dict[str, Any]] = [] + for row in rows: + path = _messages_path(row, export_root) + if not path.exists(): + continue + source = _source_for(row, path) + for raw in _load_jsonl(path): + body = _row_text(raw) + if needle in body.casefold(): + hits.append(_message_payload(raw, source=source)) + + hits.sort(key=lambda row: (str(row.get("date") or ""), int(row.get("id") or 0)), reverse=True) + return { + "status": "ok" if hits else "warn", + "query": text, + "target": target, + "message_count": min(len(hits), max(limit, 0)), + "total_hits": len(hits), + "messages": hits[: max(limit, 0)], + } + + +def _render_text(payload: dict[str, Any]) -> str: + lines = [f"status: {payload.get('status')}"] + if payload.get("mode"): + lines.append(f"mode: {payload['mode']}") + for key in ("export_count", "ledger_count", "message_count", "total_hits"): + if key in payload: + lines.append(f"{key}: {payload[key]}") + for msg in payload.get("messages", [])[:10]: + source = msg.get("source") if isinstance(msg.get("source"), dict) else {} + lines.append(f"- {msg.get('date')} {source.get('name')}: {msg.get('text')}") + if payload.get("error"): + lines.append(f"error: {payload['error']}") + return "\n".join(lines) + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Fast read-only Telegram mirror commands") + subparsers = parser.add_subparsers(dest="command", required=True) + + status = subparsers.add_parser("status", help="Show fast mirror file status") + status.add_argument("--json", action="store_true") + + read = subparsers.add_parser("read", help="Read messages from one mirrored channel/chat") + read.add_argument("query") + read.add_argument("--date-from") + read.add_argument("--date-to") + read.add_argument("--limit", type=int, default=30) + read.add_argument("--json", action="store_true") + + search = subparsers.add_parser("search", help="Search mirrored channel/chat exports") + search.add_argument("text") + search.add_argument("--target", help="Optional channel/chat filter") + search.add_argument("--limit", type=int, default=30) + search.add_argument("--json", action="store_true") + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(sys.argv[1:] if argv is None else argv) + if args.command == "status": + payload = build_status() + elif args.command == "read": + payload = read_messages(query=args.query, date_from=args.date_from, date_to=args.date_to, limit=args.limit) + else: + payload = search_messages(text=args.text, target=args.target, limit=args.limit) + + if args.json: + print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)) + else: + print(_render_text(payload)) + return 1 if payload.get("status") == "fail" else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/control-plane/src/telegram_control_plane/next_actions.py b/control-plane/src/telegram_control_plane/next_actions.py new file mode 100644 index 0000000..f653dd2 --- /dev/null +++ b/control-plane/src/telegram_control_plane/next_actions.py @@ -0,0 +1,78 @@ +"""Turn a doctor report into a prioritized, executable next-actions list. + +This is the agent's first call: it answers "what should I do right now" +with exact commands instead of requiring doc archaeology. +""" + +from __future__ import annotations + +from typing import Any + +from .command_registry import command_for_component + +_SEVERITY_ORDER = {"blocking": 0, "warning": 1} + +_FALLBACK_COMMAND = "./bin/telegram-doctor --profile maintenance --json" +_REPAIR_PLAN_COMMAND = "./bin/telegram-repair-plan --json" + + +def build_next_actions(doctor_report: dict[str, Any]) -> dict[str, Any]: + findings = [ + item for item in doctor_report.get("findings", []) if isinstance(item, dict) + ] + findings.sort(key=lambda item: _SEVERITY_ORDER.get(str(item.get("severity")), 9)) + + blocking_actions: list[dict[str, Any]] = [] + warning_actions: list[dict[str, Any]] = [] + for finding in findings: + severity = str(finding.get("severity", "warning")) + component = str(finding.get("component", "unknown")) + spec = command_for_component(component) + command = spec.example if spec is not None else _FALLBACK_COMMAND + action = { + "severity": severity, + "component": component, + "finding_id": finding.get("id"), + "message": finding.get("message"), + "command": command, + } + if severity == "blocking": + blocking_actions.append(action) + else: + warning_actions.append(action) + + if blocking_actions: + blocking_actions.append( + { + "severity": "blocking", + "component": "repair_plan", + "finding_id": "dry_run_repair_plan", + "message": ( + "Blocking findings present: inspect the dry-run repair plan " + "before applying anything" + ), + "command": _REPAIR_PLAN_COMMAND, + } + ) + actions = blocking_actions + warning_actions + + return { + "status": doctor_report.get("status"), + "summary": doctor_report.get("summary"), + "next_actions": actions, + } + + +def render_next_actions(report: dict[str, Any]) -> str: + lines = [f"status: {report.get('status')}"] + actions = report.get("next_actions", []) + if not actions: + lines.append("no action needed") + return "\n".join(lines) + for index, action in enumerate(actions, start=1): + lines.append( + f"{index}. [{action.get('severity')}] {action.get('component')}: " + f"{action.get('message')}" + ) + lines.append(f" run: {action.get('command')}") + return "\n".join(lines) diff --git a/control-plane/src/telegram_control_plane/source_evidence.py b/control-plane/src/telegram_control_plane/source_evidence.py new file mode 100644 index 0000000..6fac42c --- /dev/null +++ b/control-plane/src/telegram_control_plane/source_evidence.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from .paths import POLICY_DIR +from .util import load_json + +SOURCE_ROUTING_PATH = POLICY_DIR / "source-routing.json" +TELECRAWL_POLICY_PATH = POLICY_DIR / "telecrawl.json" + +REQUIRED_SOURCE_IDS = frozenset({"live_mcp", "telecrawl_archive", "telegram_mirror"}) + + +def _dict(value: Any) -> dict[str, Any]: + return value if isinstance(value, dict) else {} + + +def _list(value: Any) -> list[Any]: + return value if isinstance(value, list) else [] + + +@dataclass(frozen=True) +class SourceEvidenceRules: + """Domain rules for choosing live, archive, and mirror evidence sources.""" + + source_routing_policy: dict[str, Any] + telecrawl_policy: dict[str, Any] + + @classmethod + def load( + cls, + *, + source_routing_path: Path = SOURCE_ROUTING_PATH, + telecrawl_policy_path: Path = TELECRAWL_POLICY_PATH, + ) -> "SourceEvidenceRules": + return cls( + source_routing_policy=load_json(source_routing_path) or {}, + telecrawl_policy=load_json(telecrawl_policy_path) or {}, + ) + + @property + def rules(self) -> dict[str, Any]: + return _dict(self.source_routing_policy.get("rules")) + + @property + def sources(self) -> dict[str, Any]: + return _dict(self.source_routing_policy.get("sources")) + + @property + def claims(self) -> dict[str, Any]: + return _dict(self.source_routing_policy.get("claims")) + + @property + def live_route_target(self) -> str: + target = self.rules.get("route_current_latest_today_send_reply_media_to") + return str(target) if isinstance(target, str) and target else "live_mcp" + + @property + def live_blocked_sources(self) -> list[str]: + return [str(item) for item in _list(self.rules.get("never_route_live_intents_to")) if isinstance(item, str)] + + @property + def negative_archive_claim(self) -> str | None: + claim = self.claims.get("negative_archive_results") + return claim if isinstance(claim, str) and claim else None + + @property + def never_infer_absence_from_archive_only(self) -> bool: + return self.claims.get("never_infer_absence_from_archive_only") is True + + @property + def telecrawl_is_archive_evidence(self) -> bool: + return ( + self.telecrawl_policy.get("is_live") is False + and self.telecrawl_policy.get("classification") == "archive_snapshot" + ) + + @property + def telecrawl_blocks_current_claims(self) -> bool: + return self.telecrawl_policy.get("known_gaps_are_blocking_for_current_claims") is True + + def audit_findings(self) -> list[dict[str, Any]]: + findings: list[dict[str, Any]] = [] + missing_sources = sorted(REQUIRED_SOURCE_IDS - set(self.sources)) + if missing_sources: + findings.append( + { + "id": "source_evidence_missing_source", + "severity": "blocking", + "sources": missing_sources, + "message": "Source evidence policy is missing required source definitions.", + } + ) + if self.live_route_target != "live_mcp": + findings.append( + { + "id": "source_evidence_live_route_not_live_mcp", + "severity": "blocking", + "route": self.live_route_target, + "message": "Current/latest/today/send/media claims must route to live_mcp.", + } + ) + if "telecrawl_archive" not in self.live_blocked_sources: + findings.append( + { + "id": "source_evidence_archive_not_blocked_for_live", + "severity": "blocking", + "message": "Live/current intents must explicitly block telecrawl_archive.", + } + ) + if not self.telecrawl_is_archive_evidence: + findings.append( + { + "id": "source_evidence_telecrawl_not_archive", + "severity": "blocking", + "message": "Telecrawl must be classified as non-live archive evidence.", + } + ) + if not self.telecrawl_blocks_current_claims: + findings.append( + { + "id": "source_evidence_telecrawl_allows_current_claims", + "severity": "blocking", + "message": "Telecrawl gaps must block current/latest completeness claims.", + } + ) + if not self.negative_archive_claim: + findings.append( + { + "id": "source_evidence_missing_negative_archive_claim", + "severity": "warn", + "message": "Archive negative-results wording is missing from source evidence claims.", + } + ) + if not self.never_infer_absence_from_archive_only: + findings.append( + { + "id": "source_evidence_archive_absence_not_guarded", + "severity": "warn", + "message": "Archive evidence must not imply global absence from Telegram.", + } + ) + return findings + + +def source_evidence_rules( + *, + source_routing_policy: dict[str, Any] | None = None, + telecrawl_policy: dict[str, Any] | None = None, +) -> SourceEvidenceRules: + return SourceEvidenceRules( + source_routing_policy=( + source_routing_policy + if source_routing_policy is not None + else load_json(SOURCE_ROUTING_PATH) or {} + ), + telecrawl_policy=telecrawl_policy if telecrawl_policy is not None else load_json(TELECRAWL_POLICY_PATH) or {}, + ) diff --git a/control-plane/src/telegram_control_plane/source_routing.py b/control-plane/src/telegram_control_plane/source_routing.py index 117d114..bef14a3 100644 --- a/control-plane/src/telegram_control_plane/source_routing.py +++ b/control-plane/src/telegram_control_plane/source_routing.py @@ -7,7 +7,7 @@ from typing import Any from .paths import CONTROL_ROOT, POLICY_DIR, TELECRAWL_ARCHIVE -from . import telecrawl_gap +from . import source_evidence, telecrawl_gap from .util import load_json, status_from_findings SOURCE_ROUTING_PATH = POLICY_DIR / "source-routing.json" @@ -125,6 +125,10 @@ def recommend_route( archive_score = scores.get("telecrawl_archive", 0) mirror_score = scores.get("telegram_mirror", 0) + evidence_rules = source_evidence.source_evidence_rules( + source_routing_policy=self.payload, + telecrawl_policy=telecrawl_gap.load_telecrawl_policy(), + ) if live_score > 0 and (live_score >= archive_score and live_score >= mirror_score): primary = "live_mcp" elif archive_score > mirror_score and archive_score > 0: @@ -132,21 +136,18 @@ def recommend_route( elif mirror_score > 0: primary = "telegram_mirror" else: - primary = str(self.rules.get("route_current_latest_today_send_reply_media_to") or "live_mcp") + primary = evidence_rules.live_route_target blocked: list[str] = [] warnings: list[str] = [] if primary == "live_mcp": - never = self.rules.get("never_route_live_intents_to") - if isinstance(never, list): - blocked.extend(str(item) for item in never if isinstance(item, str)) + blocked.extend(evidence_rules.live_blocked_sources) if primary == "telecrawl_archive": if archive_ready is False: warnings.append("archive_not_ready") if archive_has_gaps is True: warnings.append("archive_has_known_gaps") - telecrawl_policy = telecrawl_gap.load_telecrawl_policy() - if telecrawl_policy.get("known_gaps_are_blocking_for_current_claims"): + if evidence_rules.telecrawl_blocks_current_claims: warnings.append("do_not_use_for_current_claims") if primary == "telegram_mirror" and mirror_preflight_ok is False: warnings.append("mirror_preflight_required") @@ -163,43 +164,17 @@ def recommend_route( "backend": primary_cfg.get("backend"), "description": primary_cfg.get("description"), "fallback_live_tools": live_cfg.get("tools_first") if isinstance(live_cfg.get("tools_first"), list) else [], - "negative_archive_claim": self.claims.get("negative_archive_results"), + "negative_archive_claim": evidence_rules.negative_archive_claim, "policy_path": str(SOURCE_ROUTING_PATH), } def audit(self) -> dict[str, Any]: telecrawl_policy = telecrawl_gap.load_telecrawl_policy() - findings: list[dict[str, Any]] = [] - live_route = self.rules.get("route_current_latest_today_send_reply_media_to") - telecrawl_route = telecrawl_policy.get("route_current_latest_today_send_reply_media_to") - if live_route and telecrawl_route and live_route != telecrawl_route: - findings.append( - { - "id": "source_routing_live_route_mismatch", - "severity": "blocking", - "message": "source-routing and telecrawl policies disagree on live route target.", - "source_routing": live_route, - "telecrawl": telecrawl_route, - } - ) - if telecrawl_policy.get("negative_results_claim") != self.claims.get("negative_archive_results"): - findings.append( - { - "id": "source_routing_negative_claim_mismatch", - "severity": "warn", - "message": "Archive negative-results claim differs between source-routing and telecrawl policy.", - } - ) - for source_id in ("live_mcp", "telecrawl_archive", "telegram_mirror"): - if source_id not in self.sources: - findings.append( - { - "id": "source_routing_missing_source", - "severity": "blocking", - "source": source_id, - "message": "Source routing policy is missing a required source definition.", - } - ) + evidence_rules = source_evidence.source_evidence_rules( + source_routing_policy=self.payload, + telecrawl_policy=telecrawl_policy, + ) + findings: list[dict[str, Any]] = evidence_rules.audit_findings() for phrase in ( "что нового за сегодня в чате", "прочитай переписку за сегодня", diff --git a/control-plane/src/telegram_control_plane/telecrawl_gap.py b/control-plane/src/telegram_control_plane/telecrawl_gap.py index a348ee8..415e37c 100644 --- a/control-plane/src/telegram_control_plane/telecrawl_gap.py +++ b/control-plane/src/telegram_control_plane/telecrawl_gap.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import Any +from . import source_evidence from .paths import POLICY_DIR, TELECRAWL_DEFAULT_DB from .util import load_json, status_from_findings @@ -200,13 +201,15 @@ def known_gaps_findings( def gap_policy_summary(policy: dict[str, Any] | None = None) -> dict[str, Any]: payload = policy if policy is not None else load_telecrawl_policy() + evidence_rules = source_evidence.source_evidence_rules(telecrawl_policy=payload) return { "classification": payload.get("classification"), "is_live": payload.get("is_live"), "known_gaps_are_blocking_for_archive_search": payload.get("known_gaps_are_blocking_for_archive_search"), "known_gaps_are_blocking_for_current_claims": payload.get("known_gaps_are_blocking_for_current_claims"), - "route_current_latest_today_send_reply_media_to": payload.get("route_current_latest_today_send_reply_media_to"), - "negative_results_claim": payload.get("negative_results_claim"), + "route_current_latest_today_send_reply_media_to": evidence_rules.live_route_target, + "negative_results_claim": evidence_rules.negative_archive_claim, + "never_infer_absence_from_archive_only": evidence_rules.never_infer_absence_from_archive_only, } @@ -233,7 +236,11 @@ def evaluate_archive_readiness( "count": len(active_incomplete), } ) - import_gaps_payload = archive_status.get("import_gaps") if isinstance(archive_status.get("import_gaps"), dict) else {} + import_gaps_payload = ( + archive_status.get("import_gaps") + if isinstance(archive_status.get("import_gaps"), dict) + else {} + ) findings.extend(known_gaps_findings(payload, import_gaps_payload)) if archive_status.get("source_kind") != "archive_snapshot": findings.append( @@ -247,4 +254,4 @@ def evaluate_archive_readiness( "status": status_from_findings(findings), "findings": findings, "gap_policy": gap_policy_summary(payload), - } \ No newline at end of file + } diff --git a/control-plane/tests/test_command_registry.py b/control-plane/tests/test_command_registry.py new file mode 100644 index 0000000..e55d559 --- /dev/null +++ b/control-plane/tests/test_command_registry.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from telegram_control_plane.command_registry import ( + COMMAND_REGISTRY, + LEVELS, + SAFETIES, + command_for_component, + registry_report, +) + +ROOT = Path(__file__).resolve().parents[1] +BIN_DIR = ROOT / "bin" + +# Helper sourced by other wrappers, not an operator command. +NON_COMMAND_BIN = {"telegram-env.sh"} + + +def bin_wrapper_names() -> set[str]: + return { + path.name + for path in BIN_DIR.iterdir() + if path.is_file() and path.name not in NON_COMMAND_BIN + } + + +def test_every_bin_wrapper_is_registered() -> None: + registered = {spec.name for spec in COMMAND_REGISTRY} + assert bin_wrapper_names() == registered + + +def test_registry_entries_are_valid() -> None: + seen: set[str] = set() + for spec in COMMAND_REGISTRY: + assert spec.name not in seen, f"duplicate registry entry: {spec.name}" + seen.add(spec.name) + assert spec.level in LEVELS, spec.name + assert spec.safety in SAFETIES, spec.name + assert spec.purpose, spec.name + assert spec.example.startswith(("./bin/", "tg ")), spec.name + assert (BIN_DIR / spec.name).exists(), spec.name + + +def test_registry_report_shape() -> None: + report = registry_report() + assert report["status"] == "ok" + names = [entry["name"] for entry in report["commands"]] + assert "telegram-doctor" in names + assert "tg" in names + # JSON-serializable end to end. + json.dumps(report) + + +@pytest.mark.parametrize( + ("component", "expected"), + [ + ("mcp_surface", "telegram-mcp-surface"), + ("source_routing", "telegram-source-routing-audit"), + ("launchd", "telegram-launchd-audit"), + ("sessions", "telegram-session-audit"), + ("plugin_drift", "telegram-plugin-drift"), + ("mcp_telemetry", "telegram-telemetry-status"), + ("telecrawl", "telegram-telecrawl-status"), + ("docs", "telegram-docs-audit"), + ("managed_systems", "telegram-managed-systems"), + ("telegram_mirror", "telegram-mirror-audit"), + ("runtime_inventory", "telegram-runtime-inventory"), + ("mirror_fast_status", "telegram-mirror-fast"), + ("golden_read_smoke", "telegram-golden-read-smoke"), + ("release_gates", "telegram-release-gates"), + ("mcp_profiles", "telegram-mcp-profiles"), + ], +) +def test_doctor_components_map_to_drilldown_commands(component: str, expected: str) -> None: + spec = command_for_component(component) + assert spec is not None + assert spec.name == expected + + +def test_all_doctor_profile_components_have_drilldown_mapping() -> None: + from telegram_control_plane.doctor_profiles import PROFILE_COMPONENTS + + for components in PROFILE_COMPONENTS.values(): + for component in components: + assert command_for_component(component) is not None, component + + +def test_agents_md_documents_daily_and_live_commands() -> None: + agents_md = (ROOT / "AGENTS.md").read_text(encoding="utf-8") + for spec in COMMAND_REGISTRY: + if spec.level in {"daily", "live"}: + assert spec.name in agents_md, ( + f"AGENTS.md must mention {spec.level} command {spec.name}" + ) diff --git a/control-plane/tests/test_doctor.py b/control-plane/tests/test_doctor.py index 79c8f1c..34b5b48 100644 --- a/control-plane/tests/test_doctor.py +++ b/control-plane/tests/test_doctor.py @@ -2,7 +2,12 @@ import pytest +import telegram_control_plane.audits as audits +import telegram_control_plane.cli as cli +import telegram_control_plane.runtime_inventory as runtime_inventory +import telegram_control_plane.source_routing as source_routing from telegram_control_plane.doctor import ControlPlaneDoctor +from telegram_control_plane.doctor_profiles import CORE_COMPONENTS, MAINTENANCE_COMPONENTS def test_control_plane_doctor_builds_registry_from_component_reports() -> None: @@ -19,6 +24,7 @@ def test_control_plane_doctor_builds_registry_from_component_reports() -> None: registry = doctor.build_registry() assert registry["read_only_external_state"] is True + assert registry["profile"] == "core" assert registry["status"] == "warn" assert registry["summary"]["components"] == {"mcp_surface": "ok", "telecrawl": "warn"} assert registry["summary"]["blocking_findings"] == 0 @@ -33,3 +39,152 @@ def test_control_plane_doctor_write_registry_fails_closed_on_private_leak(tmp_pa with pytest.raises(ValueError, match="private runtime leaks"): doctor.write_registry(tmp_path / "observed-registry.json", {"note": "Telegram @example"}) + + +def test_core_doctor_collects_only_core_components(monkeypatch) -> None: + calls: list[str] = [] + + def report(name: str): + def collect() -> dict[str, object]: + calls.append(name) + return {"status": "ok", "findings": []} + + return collect + + for name in ( + "audit_fast_read_adapter", + "audit_mcp_surface", + "audit_launchd", + "audit_sessions", + "audit_mirror_fast_status", + ): + monkeypatch.setattr(audits, name, report(name.removeprefix("audit_"))) + monkeypatch.setattr(source_routing, "audit_source_routing", report("source_routing")) + + for name in ( + "audit_managed_systems", + "audit_docs", + "audit_plugin_drift", + "audit_mcp_telemetry", + "audit_golden_read_smoke", + "audit_agent_docs_sync", + "audit_release_gates", + "audit_install_adapters", + "audit_mcp_profiles", + "audit_mirror", + "audit_telecrawl", + ): + monkeypatch.setattr( + audits, + name, + lambda *args, _name=name, **kwargs: (_ for _ in ()).throw(AssertionError(_name)), + ) + monkeypatch.setattr( + runtime_inventory, + "audit_runtime_inventory", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("runtime_inventory")), + ) + + registry = ControlPlaneDoctor(profile="core").build_registry() + + assert registry["profile"] == "core" + assert set(registry["summary"]["components"]) == set(CORE_COMPONENTS) + assert set(calls) == set(CORE_COMPONENTS) + + +def test_maintenance_doctor_collects_maintenance_components(monkeypatch) -> None: + def report(name: str): + def collect(*args, **kwargs) -> dict[str, object]: + return {"status": "ok", "findings": [], "name": name} + + return collect + + for name in ( + "audit_managed_systems", + "audit_docs", + "audit_plugin_drift", + "audit_mcp_telemetry", + "audit_fast_read_adapter", + "audit_golden_read_smoke", + "audit_agent_docs_sync", + "audit_release_gates", + "audit_install_adapters", + "audit_mcp_surface", + "audit_mcp_profiles", + "audit_launchd", + "audit_sessions", + "audit_mirror", + "audit_mirror_fast_status", + "audit_telecrawl", + ): + monkeypatch.setattr(audits, name, report(name.removeprefix("audit_"))) + monkeypatch.setattr(source_routing, "audit_source_routing", report("source_routing")) + monkeypatch.setattr(runtime_inventory, "audit_runtime_inventory", report("runtime_inventory")) + + registry = ControlPlaneDoctor(profile="maintenance").build_registry() + + assert registry["profile"] == "maintenance" + assert set(registry["summary"]["components"]) == set(MAINTENANCE_COMPONENTS) + assert "telecrawl" in registry["summary"]["components"] + assert "release_gates" in registry["summary"]["components"] + assert "runtime_inventory" in registry["summary"]["components"] + + +def test_control_plane_doctor_rejects_unknown_profile() -> None: + with pytest.raises(ValueError, match="Unknown doctor profile"): + ControlPlaneDoctor(profile="everything") + + +def test_cli_doctor_defaults_to_core_profile(monkeypatch, capsys) -> None: + profiles: list[str] = [] + + class FakeDoctor: + def __init__(self, *, profile: str = "core") -> None: + profiles.append(profile) + + def build_registry(self) -> dict[str, object]: + return { + "status": "ok", + "profile": profiles[-1], + "summary": {"components": {}, "blocking_findings": 0, "warning_findings": 0}, + "findings": [], + } + + def write_registry(self, path, registry) -> None: + raise AssertionError("registry should not be written with --no-write-registry") + + monkeypatch.setattr(cli, "ControlPlaneDoctor", FakeDoctor) + + assert cli.main(["doctor", "--json", "--no-write-registry"]) == 0 + payload = capsys.readouterr().out + + assert profiles == ["core"] + assert '"profile": "core"' in payload + + +def test_cli_doctor_accepts_maintenance_profile(monkeypatch, capsys) -> None: + profiles: list[str] = [] + + class FakeDoctor: + def __init__(self, *, profile: str = "core") -> None: + profiles.append(profile) + + def build_registry(self) -> dict[str, object]: + return { + "status": "warn", + "profile": profiles[-1], + "summary": {"components": {"telecrawl": "warn"}, "blocking_findings": 0, "warning_findings": 1}, + "findings": [{"id": "telecrawl_known_gaps", "severity": "warn", "component": "telecrawl"}], + } + + def write_registry(self, path, registry) -> None: + raise AssertionError("registry should not be written with --no-write-registry") + + monkeypatch.setattr(cli, "ControlPlaneDoctor", FakeDoctor) + + assert cli.main(["doctor", "--profile", "maintenance", "--json", "--no-write-registry"]) == 0 + payload = capsys.readouterr().out + + assert profiles == ["maintenance"] + assert '"profile": "maintenance"' in payload + assert "telecrawl_known_gaps" in payload diff --git a/control-plane/tests/test_mirror_fast.py b/control-plane/tests/test_mirror_fast.py new file mode 100644 index 0000000..11ebbab --- /dev/null +++ b/control-plane/tests/test_mirror_fast.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from telegram_control_plane import mirror_fast + + +def _write_fixture(tmp_path: Path) -> tuple[Path, Path]: + config_root = tmp_path / "config" + export_root = tmp_path / "exports" + config_root.mkdir() + (config_root / "telegram_channels.yaml").write_text( + """ +channels: + - channel_id: "1001" + name: PRIME CHAT + username: prime_chat + mirror_scope: prime-chat + export_folder: people/prime/telegram/chats/PRIME CHAT +""".strip(), + encoding="utf-8", + ) + messages_path = export_root / "people/prime/telegram/chats/PRIME CHAT/messages_raw.jsonl" + messages_path.parent.mkdir(parents=True) + messages_path.write_text( + "\n".join( + [ + json.dumps({"id": 1, "date": "2026-06-10T10:00:00+00:00", "text_raw": "old"}), + json.dumps({"id": 2, "date": "2026-06-12T10:00:00+00:00", "text_markdown": "fresh mirror note"}), + ] + ), + encoding="utf-8", + ) + return config_root, export_root + + +def test_read_messages_reads_existing_export_without_recovery_work(tmp_path: Path) -> None: + config_root, export_root = _write_fixture(tmp_path) + + payload = mirror_fast.read_messages( + query="prime-chat", + date_from="2026-06-12", + date_to="2026-06-12", + limit=30, + config_root=config_root, + export_root=export_root, + ) + + assert payload["status"] == "ok" + assert payload["message_count"] == 1 + assert payload["messages"][0]["text"] == "fresh mirror note" + assert payload["messages"][0]["source"]["name"] == "PRIME CHAT" + + +def test_search_messages_can_filter_by_target(tmp_path: Path) -> None: + config_root, export_root = _write_fixture(tmp_path) + + payload = mirror_fast.search_messages( + text="mirror", + target="prime", + limit=10, + config_root=config_root, + export_root=export_root, + ) + + assert payload["status"] == "ok" + assert payload["total_hits"] == 1 + assert payload["messages"][0]["id"] == 2 + + +def test_read_messages_reports_missing_target(tmp_path: Path) -> None: + config_root, export_root = _write_fixture(tmp_path) + + payload = mirror_fast.read_messages( + query="missing", + config_root=config_root, + export_root=export_root, + ) + + assert payload["status"] == "warn" + assert payload["error"] == "mirror_target_not_found" + + +def test_main_uses_provided_argv(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: + monkeypatch.setattr( + mirror_fast, + "build_status", + lambda: {"status": "ok", "mode": "read_only_fast_mirror", "export_count": 0, "ledger_count": 0}, + ) + + assert mirror_fast.main(["status", "--json"]) == 0 + + assert '"mode": "read_only_fast_mirror"' in capsys.readouterr().out diff --git a/control-plane/tests/test_next_actions.py b/control-plane/tests/test_next_actions.py new file mode 100644 index 0000000..df43acd --- /dev/null +++ b/control-plane/tests/test_next_actions.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import json + +from telegram_control_plane.next_actions import build_next_actions + + +def _doctor_report(findings: list[dict[str, object]], status: str) -> dict[str, object]: + return { + "status": status, + "findings": findings, + "summary": { + "blocking_findings": sum( + 1 for item in findings if item.get("severity") == "blocking" + ), + "warning_findings": sum( + 1 for item in findings if item.get("severity") == "warning" + ), + }, + } + + +def test_healthy_report_yields_no_actions() -> None: + report = build_next_actions(_doctor_report([], "ok")) + assert report["status"] == "ok" + assert report["next_actions"] == [] + json.dumps(report) + + +def test_warning_maps_component_to_drilldown_command() -> None: + doctor = _doctor_report( + [ + { + "severity": "warning", + "component": "mcp_telemetry", + "id": "tool_errors", + "message": "recent tool errors", + } + ], + "warn", + ) + report = build_next_actions(doctor) + assert report["status"] == "warn" + (action,) = report["next_actions"] + assert action["component"] == "mcp_telemetry" + assert "telegram-telemetry-status" in action["command"] + + +def test_blocking_findings_come_first_and_add_repair_plan() -> None: + doctor = _doctor_report( + [ + { + "severity": "warning", + "component": "telecrawl", + "id": "known_gaps", + "message": "archive gaps", + }, + { + "severity": "blocking", + "component": "mcp_surface", + "id": "unexpected_write_tool", + "message": "raw write tool exposed", + }, + ], + "fail", + ) + report = build_next_actions(doctor) + assert report["status"] == "fail" + severities = [item["severity"] for item in report["next_actions"]] + assert severities == sorted(severities, key=lambda s: 0 if s == "blocking" else 1) + first = report["next_actions"][0] + assert first["component"] == "mcp_surface" + # Blocking findings must route through the dry-run repair plan. + assert any( + "telegram-repair-plan" in item["command"] for item in report["next_actions"] + ) + + +def test_unknown_component_falls_back_to_doctor() -> None: + doctor = _doctor_report( + [ + { + "severity": "warning", + "component": "mystery_component", + "id": "x", + "message": "?", + } + ], + "warn", + ) + report = build_next_actions(doctor) + (action,) = report["next_actions"] + assert "telegram-doctor" in action["command"] diff --git a/control-plane/tests/test_source_routing.py b/control-plane/tests/test_source_routing.py index 35874e4..69e4d6a 100644 --- a/control-plane/tests/test_source_routing.py +++ b/control-plane/tests/test_source_routing.py @@ -6,6 +6,7 @@ recommend_route, score_intent, ) +from telegram_control_plane.source_evidence import source_evidence_rules def test_today_intent_routes_live_mcp() -> None: @@ -75,14 +76,71 @@ def test_source_routing_policy_matchers_are_policy_backed() -> None: def test_route_warnings_include_unready_archive_and_mirror_preflight() -> None: archive = recommend_route("найди в архиве docker", archive_ready=False, archive_has_gaps=True) assert archive["primary_source"] == "telecrawl_archive" - assert {"archive_not_ready", "archive_has_known_gaps"}.issubset(archive["warnings"]) + assert {"archive_not_ready", "archive_has_known_gaps", "do_not_use_for_current_claims"}.issubset( + archive["warnings"] + ) mirror = recommend_route("mirror allowlist export", mirror_preflight_ok=False) assert mirror["primary_source"] == "telegram_mirror" assert "mirror_preflight_required" in mirror["warnings"] -def test_source_routing_audit_flags_live_route_mismatch(monkeypatch) -> None: +def test_source_evidence_rules_own_live_archive_invariant() -> None: + rules = source_evidence_rules() + + assert rules.live_route_target == "live_mcp" + assert "telecrawl_archive" in rules.live_blocked_sources + assert rules.telecrawl_is_archive_evidence is True + assert rules.telecrawl_blocks_current_claims is True + assert rules.negative_archive_claim == "no matches in this archive coverage" + assert rules.audit_findings() == [] + + +def test_ambiguous_route_fallback_uses_source_evidence_rules() -> None: + policy = { + "sources": { + "live_mcp": {"tools_first": ["live"]}, + "telecrawl_archive": {"tools_first": ["archive"]}, + "telegram_mirror": {"tools_first": ["mirror"]}, + }, + "rules": { + "route_current_latest_today_send_reply_media_to": "telegram_mirror", + "never_route_live_intents_to": ["telecrawl_archive"], + }, + "claims": {"negative_archive_results": "fixture", "never_infer_absence_from_archive_only": True}, + } + + route = SourceRoutingPolicy(policy).recommend_route("без явного intent") + + assert route["primary_source"] == source_evidence_rules(source_routing_policy=policy).live_route_target + + +def test_source_routing_audit_uses_single_missing_source_finding() -> None: + policy = SourceRoutingPolicy( + { + "sources": {"live_mcp": {}}, + "rules": { + "route_current_latest_today_send_reply_media_to": "live_mcp", + "never_route_live_intents_to": ["telecrawl_archive"], + }, + "claims": {"negative_archive_results": "same", "never_infer_absence_from_archive_only": True}, + } + ) + + report = policy.audit() + missing = [item for item in report["findings"] if "missing_source" in item["id"]] + + assert missing == [ + { + "id": "source_evidence_missing_source", + "severity": "blocking", + "sources": ["telecrawl_archive", "telegram_mirror"], + "message": "Source evidence policy is missing required source definitions.", + } + ] + + +def test_source_routing_audit_flags_live_route_not_live_mcp() -> None: policy = SourceRoutingPolicy( { "sources": {"live_mcp": {}, "telecrawl_archive": {}, "telegram_mirror": {}}, @@ -90,15 +148,8 @@ def test_source_routing_audit_flags_live_route_mismatch(monkeypatch) -> None: "claims": {"negative_archive_results": "same"}, } ) - monkeypatch.setattr( - "telegram_control_plane.source_routing.telecrawl_gap.load_telecrawl_policy", - lambda: { - "route_current_latest_today_send_reply_media_to": "live_mcp", - "negative_results_claim": "same", - }, - ) report = policy.audit() assert report["status"] == "fail" - assert any(item["id"] == "source_routing_live_route_mismatch" for item in report["findings"]) + assert any(item["id"] == "source_evidence_live_route_not_live_mcp" for item in report["findings"]) diff --git a/control-plane/tests/test_telecrawl_gap.py b/control-plane/tests/test_telecrawl_gap.py index 0d89a39..4b125e4 100644 --- a/control-plane/tests/test_telecrawl_gap.py +++ b/control-plane/tests/test_telecrawl_gap.py @@ -3,7 +3,9 @@ import sqlite3 from pathlib import Path +from telegram_control_plane.source_evidence import source_evidence_rules from telegram_control_plane.telecrawl_gap import ( + gap_policy_summary, import_gaps, known_gaps_findings, load_telecrawl_policy, @@ -15,6 +17,9 @@ def test_telecrawl_policy_declares_expected_gap_warning() -> None: policy = load_telecrawl_policy() assert policy.get("known_gaps_are_blocking_for_archive_search") is False assert "telecrawl_known_gaps" in policy.get("expected_doctor_warning_ids", []) + assert policy.get("source_evidence_owner") == "policy/source-routing.json" + assert "route_current_latest_today_send_reply_media_to" not in policy + assert "negative_results_claim" not in policy def test_import_gaps_split_retryable_and_terminal(tmp_path: Path) -> None: @@ -37,7 +42,10 @@ def test_import_gaps_split_retryable_and_terminal(tmp_path: Path) -> None: def test_known_gaps_finding_is_operational_warn_by_default() -> None: policy = load_telecrawl_policy() - findings = known_gaps_findings(policy, {"has_known_gaps": True, "retryable_error_summary": [], "terminal_error_summary": []}) + findings = known_gaps_findings( + policy, + {"has_known_gaps": True, "retryable_error_summary": [], "terminal_error_summary": []}, + ) assert findings[0]["id"] == "telecrawl_known_gaps" assert findings[0]["severity"] == "warn" assert findings[0]["expected_operational_warning"] is True @@ -46,4 +54,14 @@ def test_known_gaps_finding_is_operational_warn_by_default() -> None: def test_non_retryable_error_types_defaults() -> None: policy = load_telecrawl_policy() types = non_retryable_error_types(policy) - assert "ChannelPrivateError" in types \ No newline at end of file + assert "ChannelPrivateError" in types + + +def test_gap_policy_summary_gets_shared_claims_from_source_evidence_rules() -> None: + summary = gap_policy_summary() + rules = source_evidence_rules() + + assert summary["route_current_latest_today_send_reply_media_to"] == "live_mcp" + assert summary["route_current_latest_today_send_reply_media_to"] == rules.live_route_target + assert summary["negative_results_claim"] == rules.negative_archive_claim + assert summary["never_infer_absence_from_archive_only"] is True diff --git a/scripts/ci-release-gate.sh b/scripts/ci-release-gate.sh index ebe55e6..e14bb5c 100755 --- a/scripts/ci-release-gate.sh +++ b/scripts/ci-release-gate.sh @@ -29,7 +29,7 @@ if [[ ! -x "${PYTHON_BIN}" ]]; then python3 -m venv "${ROOT}/mcp/.venv" fi "${PYTHON_BIN}" -m pip install -q --upgrade pip -"${PYTHON_BIN}" -m pip install -q -e "${ROOT}/mcp" pytest +"${PYTHON_BIN}" -m pip install -q -e "${ROOT}/mcp" pytest pyyaml find "${ROOT}/mcp/src" "${ROOT}/control-plane/src" -type d -name __pycache__ -prune -exec rm -rf {} + 2>/dev/null || true