diff --git a/CONTRACTS.md b/CONTRACTS.md index 8361e35..cadd4d1 100644 --- a/CONTRACTS.md +++ b/CONTRACTS.md @@ -96,6 +96,7 @@ Projection fields: - `workflow_ref` - `evidence_ref` - `workflow_evidence_snapshot` +- `subject_ref` - `assessment_ref` - `policy_decision_ref` - `use_approval_ref` diff --git a/README.md b/README.md index 2d2a5cd..c431cda 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ You can also set an `organization_name` in `config.yaml` or `config.local.yaml` | Command | Description | |---------|-------------| | `forge log` | Log a new incident with interactive prompts | -| `forge list` | List incidents with `--project`, `--severity`, `--since`, `--tag`, and `--limit` filters | +| `forge list` | List incidents with `--project`, `--severity`, `--since`, `--tag`, structured-axis, and `--limit` filters | | `forge show ` | Show full details of one incident; suffix matches like `forge show 001` work | | `forge ref ` | Print a Proofhouse `IncidentRef` compatibility projection as JSON | | `forge edit ` | Open an incident in your editor | @@ -176,7 +176,7 @@ Each incident is a YAML file in `incidents/YYYY-MM/` with structured fields cove - resolution: `root_cause`, `immediate_fix`, `systemic_takeaway` - metadata: `tags`, `related_incidents`, `playbook_entry` - optional Proofhouse axes: `capability_area`, `lifecycle_stage`, `issue_class`, `workflow_archetype`, `subject_type`, `blocked_use_class`, `observed_state` -- optional pointer refs: `workflow_ref`, `evidence_ref`, `workflow_evidence_snapshot`, `assessment_ref`, `policy_decision_ref`, `use_approval_ref`, `asset_ref`, `derivation_ref`, `transform_ref` +- optional pointer refs: `workflow_ref`, `evidence_ref`, `workflow_evidence_snapshot`, `subject_ref`, `assessment_ref`, `policy_decision_ref`, `use_approval_ref`, `asset_ref`, `derivation_ref`, `transform_ref` Existing incident YAML remains compatible: all Proofhouse axes and pointer refs are optional. Older files that only contain the original classification/event/resolution/metadata fields still load, list, analyze, and emit a compatibility `IncidentRef`. @@ -185,12 +185,14 @@ Existing incident YAML remains compatible: all Proofhouse axes and pointer refs When an incident relates to Proofhouse workflow evidence or Operational Learning, keep Forge records pointer-based: - use structured axes for document-operations and Operational Learning failure classes -- use pointer ref fields for `WorkflowRef`, `EvidenceRef` / `WorkflowEvidenceSnapshot`, `AssessmentRef`, `PolicyDecisionRef`, `UseApprovalRef`, and Operational Learning `AssetRef`, `DerivationRef`, or `TransformRef` placeholders +- use pointer ref fields for `WorkflowRef`, `EvidenceRef` / `WorkflowEvidenceSnapshot`, `SubjectRef`, `AssessmentRef`, `PolicyDecisionRef`, `UseApprovalRef`, and Operational Learning `AssetRef`, `DerivationRef`, or `TransformRef` placeholders - use `context` only for short human-readable summaries - use `tags` as secondary discovery aids, not as the only structure - use `related_incidents` only for Forge incident IDs - do not paste raw customer data, regulated personal data, credentials, or training/eval source material into an incident +Pointer refs and `observed_state` are summary/ref-only fields. Forge rejects obvious raw or sensitive payload keys such as `payload`, `raw_payload`, `document_text`, `claim_text`, `phi`, `ssn`, and `dob`; this is boundary hygiene, not a substitute for upstream redaction. + Governance remains the approval and export-control plane. Forge may record that a handoff or approval issue occurred, but the authoritative rights, redaction, use-approval, manifest, and export state lives outside Forge. Forge emits a Proofhouse V0.1 `IncidentRef` projection through `forge ref ` and the `forge_incident_ref` MCP tool. This projection is generated from saved YAML fields when structured fields exist, and falls back to compatibility inference for older incidents. Until Forge stores structured tenant metadata, the projection uses `organization_id: "unscoped"` and `environment_id: "default"` to avoid implying that `project` is a tenant boundary. diff --git a/forge_cli/cli.py b/forge_cli/cli.py index 07e886f..94ab64a 100644 --- a/forge_cli/cli.py +++ b/forge_cli/cli.py @@ -24,6 +24,8 @@ print_success, ) from forge_cli.incident_store import ( + AmbiguousIncidentLookupError, + DuplicateIncidentError, find_incident, find_incident_path, generate_id, @@ -38,6 +40,7 @@ LIFECYCLE_STAGE_VALUES, PROOFHOUSE_REF_FIELDS, USE_CLASS_VALUES, + WORKFLOW_ARCHETYPE_VALUES, FailureType, Incident, Severity, @@ -186,6 +189,11 @@ def log( "--workflow-evidence-snapshot", help="WorkflowEvidenceSnapshot pointer as JSON object or snapshot id", ), + subject_ref: Optional[str] = typer.Option( + None, + "--subject-ref", + help="SubjectRef pointer as JSON object or ref id", + ), assessment_ref: Optional[str] = typer.Option( None, "--assessment-ref", @@ -267,6 +275,9 @@ def log( "lifecycle_stage", lifecycle_stage, LIFECYCLE_STAGE_VALUES ) issue_class = _validate_optional_choice("issue_class", issue_class, ISSUE_CLASS_VALUES) + workflow_archetype = _validate_optional_choice( + "workflow_archetype", workflow_archetype, WORKFLOW_ARCHETYPE_VALUES + ) blocked_use_class = _validate_optional_choice( "blocked_use_class", blocked_use_class, USE_CLASS_VALUES ) @@ -291,6 +302,7 @@ def log( "workflow_ref": workflow_ref, "evidence_ref": evidence_ref, "workflow_evidence_snapshot": workflow_evidence_snapshot, + "subject_ref": subject_ref, "assessment_ref": assessment_ref, "policy_decision_ref": policy_decision_ref, "use_approval_ref": use_approval_ref, @@ -341,7 +353,11 @@ def log( print_info("Incident discarded.") raise typer.Exit(0) - filepath = save_incident(incident, cfg.incidents_dir) + try: + filepath = save_incident(incident, cfg.incidents_dir) + except DuplicateIncidentError as e: + print_error(str(e)) + raise typer.Exit(1) print_success(f"Saved: {filepath}") @@ -351,6 +367,19 @@ def list_cmd( severity: Optional[str] = typer.Option(None, "--severity", "-s", help="Filter by severity"), since: Optional[str] = typer.Option(None, "--since", help="Filter by date (YYYY-MM-DD)"), tag: Optional[str] = typer.Option(None, "--tag", "-T", help="Filter by tag"), + issue_class: Optional[str] = typer.Option(None, "--issue-class", help="Filter by issue class"), + capability_area: Optional[str] = typer.Option( + None, "--capability-area", help="Filter by capability area" + ), + lifecycle_stage: Optional[str] = typer.Option( + None, "--lifecycle-stage", help="Filter by lifecycle stage" + ), + workflow_archetype: Optional[str] = typer.Option( + None, "--workflow-archetype", help="Filter by workflow archetype" + ), + blocked_use_class: Optional[str] = typer.Option( + None, "--blocked-use-class", help="Filter by blocked use class" + ), limit: int = typer.Option(10, "--limit", "-n", help="Max incidents to show"), ) -> None: """List recent incidents with optional filters.""" @@ -366,6 +395,11 @@ def list_cmd( severity=severity, since=since, tag=tag, + issue_class=issue_class, + capability_area=capability_area, + lifecycle_stage=lifecycle_stage, + workflow_archetype=workflow_archetype, + blocked_use_class=blocked_use_class, limit=limit, ) @@ -383,7 +417,11 @@ def show( print_error(str(e)) raise typer.Exit(1) - incident = find_incident(cfg.incidents_dir, incident_id) + try: + incident = find_incident(cfg.incidents_dir, incident_id) + except AmbiguousIncidentLookupError as e: + print_error(str(e)) + raise typer.Exit(1) if incident is None: print_error(f"No incident found matching '{incident_id}'.") raise typer.Exit(1) @@ -403,7 +441,11 @@ def ref_cmd( print_error(str(e)) raise typer.Exit(1) - incident = find_incident(cfg.incidents_dir, incident_id) + try: + incident = find_incident(cfg.incidents_dir, incident_id) + except AmbiguousIncidentLookupError as e: + print_error(str(e)) + raise typer.Exit(1) if incident is None: print_error(f"No incident found matching '{incident_id}'.") raise typer.Exit(1) @@ -423,7 +465,11 @@ def edit( print_error(str(e)) raise typer.Exit(1) - path = find_incident_path(cfg.incidents_dir, incident_id) + try: + path = find_incident_path(cfg.incidents_dir, incident_id) + except AmbiguousIncidentLookupError as e: + print_error(str(e)) + raise typer.Exit(1) if path is None: print_error(f"No incident found matching '{incident_id}'.") raise typer.Exit(1) @@ -549,6 +595,19 @@ def stats( project: Optional[str] = typer.Option(None, "--project", "-p", help="Filter by project"), severity: Optional[str] = typer.Option(None, "--severity", "-s", help="Filter by severity"), since: Optional[str] = typer.Option(None, "--since", help="Filter by date (YYYY-MM-DD)"), + issue_class: Optional[str] = typer.Option(None, "--issue-class", help="Filter by issue class"), + capability_area: Optional[str] = typer.Option( + None, "--capability-area", help="Filter by capability area" + ), + lifecycle_stage: Optional[str] = typer.Option( + None, "--lifecycle-stage", help="Filter by lifecycle stage" + ), + workflow_archetype: Optional[str] = typer.Option( + None, "--workflow-archetype", help="Filter by workflow archetype" + ), + blocked_use_class: Optional[str] = typer.Option( + None, "--blocked-use-class", help="Filter by blocked use class" + ), ) -> None: """Show summary statistics across all incidents.""" try: @@ -565,6 +624,16 @@ def stats( incidents = [i for i in incidents if i.severity == severity] if since: incidents = [i for i in incidents if i.timestamp >= since] + if issue_class: + incidents = [i for i in incidents if i.issue_class == issue_class] + if capability_area: + incidents = [i for i in incidents if i.capability_area == capability_area] + if lifecycle_stage: + incidents = [i for i in incidents if i.lifecycle_stage == lifecycle_stage] + if workflow_archetype: + incidents = [i for i in incidents if i.workflow_archetype == workflow_archetype] + if blocked_use_class: + incidents = [i for i in incidents if i.blocked_use_class == blocked_use_class] display_stats(incidents) diff --git a/forge_cli/display.py b/forge_cli/display.py index ba80469..53b4fa9 100644 --- a/forge_cli/display.py +++ b/forge_cli/display.py @@ -8,7 +8,7 @@ from rich.table import Table from rich.text import Text -from forge_cli.models import Incident +from forge_cli.models import PROOFHOUSE_REF_FIELDS, Incident console = Console() @@ -140,21 +140,7 @@ def display_incident_detail(incident: Incident) -> None: lines.append("[cyan bold]Observed state:[/]") for key, value in incident.observed_state.items(): lines.append(f" {key}: {value}") - present_refs = [ - field_name - for field_name in [ - "workflow_ref", - "evidence_ref", - "workflow_evidence_snapshot", - "assessment_ref", - "policy_decision_ref", - "use_approval_ref", - "asset_ref", - "derivation_ref", - "transform_ref", - ] - if getattr(incident, field_name) - ] + present_refs = [field_name for field_name in PROOFHOUSE_REF_FIELDS if getattr(incident, field_name)] if present_refs: lines.append("") lines.append(f"[cyan bold]Pointer refs:[/] {', '.join(present_refs)}") @@ -201,6 +187,8 @@ def display_stats(incidents: list[Incident]) -> None: by_project = Counter(i.project for i in incidents) by_platform = Counter(i.platform for i in incidents if i.platform) all_tags = Counter(tag for i in incidents for tag in i.tags) + by_issue_class = Counter(i.issue_class for i in incidents if i.issue_class) + by_capability_area = Counter(i.capability_area for i in incidents if i.capability_area) # Date range timestamps = sorted(i.timestamp for i in incidents if i.timestamp) @@ -235,6 +223,12 @@ def display_stats(incidents: list[Incident]) -> None: console.print(Columns([type_table, plat_table], equal=True)) + if by_issue_class or by_capability_area: + console.print() + issue_table = _counter_table("By Issue Class", by_issue_class, color="yellow") + capability_table = _counter_table("By Capability Area", by_capability_area, color="cyan") + console.print(Columns([issue_table, capability_table], equal=True)) + # Top tags if all_tags: console.print() diff --git a/forge_cli/incident_store.py b/forge_cli/incident_store.py index f61614f..355f738 100644 --- a/forge_cli/incident_store.py +++ b/forge_cli/incident_store.py @@ -1,7 +1,9 @@ from __future__ import annotations from datetime import date, datetime +import os from pathlib import Path +import tempfile import yaml @@ -33,6 +35,14 @@ def _ordered_dict_representer(dumper: yaml.Dumper, data: dict) -> yaml.MappingNo # --- ID generation --- +class DuplicateIncidentError(FileExistsError): + """Raised when saving would overwrite an existing incident file.""" + + +class AmbiguousIncidentLookupError(LookupError): + """Raised when a suffix lookup matches multiple incidents.""" + + def generate_id(incidents_dir: Path, incident_date: date | None = None) -> str: """Generate the next incident ID for the given date (YYYY-MM-DD-NNN).""" if incident_date is None: @@ -63,8 +73,29 @@ def save_incident(incident: Incident, incidents_dir: Path) -> Path: data = incident.to_dict() - with open(filepath, "w") as f: - yaml.dump(data, f, Dumper=_BlockDumper, default_flow_style=False, allow_unicode=True) + if filepath.exists(): + raise DuplicateIncidentError(f"Incident id already exists: {incident.id}") + + tmp_path: Path | None = None + try: + with tempfile.NamedTemporaryFile( + "w", + dir=month_dir, + prefix=f".{incident.id}.", + suffix=".tmp", + delete=False, + ) as f: + tmp_path = Path(f.name) + yaml.dump(data, f, Dumper=_BlockDumper, default_flow_style=False, allow_unicode=True) + f.flush() + os.fsync(f.fileno()) + try: + os.link(tmp_path, filepath) + except FileExistsError as exc: + raise DuplicateIncidentError(f"Incident id already exists: {incident.id}") from exc + finally: + if tmp_path is not None and tmp_path.exists(): + tmp_path.unlink() return filepath @@ -82,6 +113,11 @@ def list_incidents( severity: str | None = None, since: str | None = None, tag: str | None = None, + issue_class: str | None = None, + capability_area: str | None = None, + lifecycle_stage: str | None = None, + workflow_archetype: str | None = None, + blocked_use_class: str | None = None, limit: int = 10, ) -> list[Incident]: """List incidents with optional filtering, most recent first.""" @@ -103,6 +139,16 @@ def list_incidents( continue if tag and tag not in incident.tags: continue + if issue_class and incident.issue_class != issue_class: + continue + if capability_area and incident.capability_area != capability_area: + continue + if lifecycle_stage and incident.lifecycle_stage != lifecycle_stage: + continue + if workflow_archetype and incident.workflow_archetype != workflow_archetype: + continue + if blocked_use_class and incident.blocked_use_class != blocked_use_class: + continue incidents.append(incident) if len(incidents) >= limit: @@ -120,9 +166,12 @@ def find_incident_path(incidents_dir: Path, incident_id: str) -> Path | None: if exact_path.exists(): return exact_path - for filepath in incidents_dir.rglob("*.yml"): - if incident_id in filepath.stem: - return filepath + matches = sorted(filepath for filepath in incidents_dir.rglob("*.yml") if filepath.stem.endswith(incident_id)) + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + ids = ", ".join(match.stem for match in matches) + raise AmbiguousIncidentLookupError(f"Ambiguous incident id '{incident_id}'. Matches: {ids}") return None diff --git a/forge_cli/mcp_server.py b/forge_cli/mcp_server.py index 86b2d25..246dda3 100644 --- a/forge_cli/mcp_server.py +++ b/forge_cli/mcp_server.py @@ -10,6 +10,8 @@ from forge_cli.config import load_config from forge_cli.incident_store import ( + AmbiguousIncidentLookupError, + DuplicateIncidentError, find_incident, generate_id, get_all_incidents, @@ -23,12 +25,14 @@ PROOFHOUSE_AXIS_FIELDS, PROOFHOUSE_REF_FIELDS, USE_CLASS_VALUES, + WORKFLOW_ARCHETYPE_VALUES, FailureType, Incident, Severity, parse_observed_state, parse_pointer_value, ) +from forge_cli.schema_metadata import STRUCTURED_AXIS_METADATA mcp = FastMCP("Forge", json_response=True) @@ -107,6 +111,7 @@ def forge_log( workflow_ref: str = "", evidence_ref: str = "", workflow_evidence_snapshot: str = "", + subject_ref: str = "", assessment_ref: str = "", policy_decision_ref: str = "", use_approval_ref: str = "", @@ -142,6 +147,7 @@ def forge_log( workflow_ref: Optional WorkflowRef pointer as JSON object or ref id. evidence_ref: Optional EvidenceRef pointer as JSON object or ref id. workflow_evidence_snapshot: Optional WorkflowEvidenceSnapshot pointer as JSON object or id. + subject_ref: Optional SubjectRef pointer as JSON object or ref id. assessment_ref: Optional AssessmentRef pointer as JSON object or ref id. policy_decision_ref: Optional PolicyDecisionRef pointer as JSON object or ref id. use_approval_ref: Optional UseApprovalRef pointer as JSON object or ref id. @@ -165,6 +171,7 @@ def forge_log( ("capability_area", capability_area, CAPABILITY_AREA_VALUES), ("lifecycle_stage", lifecycle_stage, LIFECYCLE_STAGE_VALUES), ("issue_class", issue_class, ISSUE_CLASS_VALUES), + ("workflow_archetype", workflow_archetype, WORKFLOW_ARCHETYPE_VALUES), ("blocked_use_class", blocked_use_class, USE_CLASS_VALUES), ]: if value and value not in choices: @@ -177,6 +184,7 @@ def forge_log( "workflow_evidence_snapshot": parse_pointer_value( workflow_evidence_snapshot, "workflow_evidence_snapshot" ), + "subject_ref": parse_pointer_value(subject_ref, "subject_ref"), "assessment_ref": parse_pointer_value(assessment_ref, "assessment_ref"), "policy_decision_ref": parse_pointer_value(policy_decision_ref, "policy_decision_ref"), "use_approval_ref": parse_pointer_value(use_approval_ref, "use_approval_ref"), @@ -222,7 +230,10 @@ def forge_log( **pointer_refs, ) - filepath = save_incident(incident, cfg.incidents_dir) + try: + filepath = save_incident(incident, cfg.incidents_dir) + except DuplicateIncidentError as e: + return str(e) return f"Incident logged: {incident_id}\nSaved to: {filepath}" @@ -232,6 +243,11 @@ def forge_list( severity: str = "", since: str = "", tag: str = "", + issue_class: str = "", + capability_area: str = "", + lifecycle_stage: str = "", + workflow_archetype: str = "", + blocked_use_class: str = "", limit: int = 10, ) -> str: """List forge incidents with optional filters. @@ -241,6 +257,11 @@ def forge_list( severity: Filter by severity (cosmetic, functional, safety-critical) since: Filter by date, showing incidents from this date onward (YYYY-MM-DD) tag: Filter by tag (e.g., "silent-fallback", "hallucination") + issue_class: Filter by structured issue class + capability_area: Filter by structured capability area + lifecycle_stage: Filter by lifecycle stage + workflow_archetype: Filter by workflow archetype + blocked_use_class: Filter by blocked use class limit: Maximum number of incidents to return (default 10) """ cfg = load_config() @@ -250,6 +271,11 @@ def forge_list( severity=severity or None, since=since or None, tag=tag or None, + issue_class=issue_class or None, + capability_area=capability_area or None, + lifecycle_stage=lifecycle_stage or None, + workflow_archetype=workflow_archetype or None, + blocked_use_class=blocked_use_class or None, limit=limit, ) @@ -260,7 +286,8 @@ def forge_list( for inc in incidents: summary = (inc.actual_behavior or "").strip().split("\n")[0][:80] results.append( - f"[{inc.id}] {inc.project}/{inc.platform} | {inc.severity} | {inc.failure_type} | {summary}" + f"[{inc.id}] {inc.project}/{inc.platform} | {inc.severity} | {inc.failure_type}" + f" | {inc.issue_class or '-'} | {summary}" ) return f"Found {len(incidents)} incident(s):\n\n" + "\n".join(results) @@ -274,7 +301,10 @@ def forge_show(incident_id: str) -> str: incident_id: Incident ID (e.g., "2026-03-04-001") or suffix (e.g., "001") """ cfg = load_config() - incident = find_incident(cfg.incidents_dir, incident_id) + try: + incident = find_incident(cfg.incidents_dir, incident_id) + except AmbiguousIncidentLookupError as e: + return str(e) if incident is None: return f"No incident found matching '{incident_id}'." @@ -290,7 +320,10 @@ def forge_incident_ref(incident_id: str) -> str: incident_id: Incident ID (e.g., "2026-03-04-001") or suffix (e.g., "001") """ cfg = load_config() - incident = find_incident(cfg.incidents_dir, incident_id) + try: + incident = find_incident(cfg.incidents_dir, incident_id) + except AmbiguousIncidentLookupError as e: + return str(e) if incident is None: return f"No incident found matching '{incident_id}'." @@ -302,12 +335,16 @@ def forge_incident_ref(incident_id: str) -> str: def forge_stats( project: str = "", severity: str = "", + issue_class: str = "", + capability_area: str = "", ) -> str: """Show aggregate statistics across forge incidents. Args: project: Filter by project name severity: Filter by severity level + issue_class: Filter by structured issue class + capability_area: Filter by capability area """ from collections import Counter @@ -318,6 +355,10 @@ def forge_stats( incidents = [i for i in incidents if i.project == project] if severity: incidents = [i for i in incidents if i.severity == severity] + if issue_class: + incidents = [i for i in incidents if i.issue_class == issue_class] + if capability_area: + incidents = [i for i in incidents if i.capability_area == capability_area] if not incidents: return "No incidents found." @@ -326,6 +367,8 @@ def forge_stats( by_type = Counter(i.failure_type for i in incidents) by_project = Counter(i.project for i in incidents) by_platform = Counter(i.platform for i in incidents if i.platform) + by_issue_class = Counter(i.issue_class for i in incidents if i.issue_class) + by_capability_area = Counter(i.capability_area for i in incidents if i.capability_area) all_tags = Counter(tag for i in incidents for tag in i.tags) lines = [f"Total incidents: {len(incidents)}", ""] @@ -346,6 +389,16 @@ def forge_stats( for plat, count in by_platform.most_common(): lines.append(f" {plat}: {count}") + if by_issue_class: + lines.append("\nBy Issue Class:") + for value, count in by_issue_class.most_common(): + lines.append(f" {value}: {count}") + + if by_capability_area: + lines.append("\nBy Capability Area:") + for value, count in by_capability_area.most_common(): + lines.append(f" {value}: {count}") + if all_tags: lines.append("\nTop Tags:") for tag, count in all_tags.most_common(10): @@ -421,22 +474,29 @@ def forge_schema() -> str: "capability_area", "lifecycle_stage", "issue_class", "workflow_archetype", "subject_type", "blocked_use_class", "observed_state", "workflow_ref", "evidence_ref", "workflow_evidence_snapshot", - "assessment_ref", "policy_decision_ref", "use_approval_ref", + "subject_ref", "assessment_ref", "policy_decision_ref", "use_approval_ref", "asset_ref", "derivation_ref", "transform_ref", ], "structured_axis_fields": PROOFHOUSE_AXIS_FIELDS, "structured_axis_values": { - "capability_area": CAPABILITY_AREA_VALUES, - "lifecycle_stage": LIFECYCLE_STAGE_VALUES, - "issue_class": ISSUE_CLASS_VALUES, - "blocked_use_class": USE_CLASS_VALUES, + field_name: metadata.values + for field_name, metadata in STRUCTURED_AXIS_METADATA.items() + if metadata.values + }, + "structured_axis_metadata": { + field_name: { + "description": metadata.description, + "values": metadata.values, + "required": metadata.required, + } + for field_name, metadata in STRUCTURED_AXIS_METADATA.items() }, "emitted_refs": ["IncidentRef"], "incident_ref_fields": [ "incident_id", "failure_type", "severity", "capability_area", "lifecycle_stage", "issue_class", "workflow_archetype", "subject_type", "blocked_use_class", "workflow_ref", "evidence_ref", - "workflow_evidence_snapshot", "assessment_ref", "policy_decision_ref", + "workflow_evidence_snapshot", "subject_ref", "assessment_ref", "policy_decision_ref", "use_approval_ref", "asset_ref", "derivation_ref", "transform_ref", "playbook_entry", ], diff --git a/forge_cli/models.py b/forge_cli/models.py index 3a56633..b1eef65 100644 --- a/forge_cli/models.py +++ b/forge_cli/models.py @@ -1,20 +1,30 @@ from __future__ import annotations import json +import re from dataclasses import asdict, dataclass, field from enum import Enum from typing import Any +from forge_cli import schema_metadata + PROOFHOUSE_SHARED_CONTRACT_VERSION = "proofhouse-shared-contracts/v0.1" FORGE_PRODUCER_SYSTEM = "proofhouse-forge" FORGE_UNSCOPED_ORGANIZATION_ID = "unscoped" FORGE_DEFAULT_ENVIRONMENT_ID = "default" +CAPABILITY_AREA_VALUES = schema_metadata.CAPABILITY_AREA_VALUES +ISSUE_CLASS_VALUES = schema_metadata.ISSUE_CLASS_VALUES +LIFECYCLE_STAGE_VALUES = schema_metadata.LIFECYCLE_STAGE_VALUES +WORKFLOW_ARCHETYPE_VALUES = schema_metadata.WORKFLOW_ARCHETYPE_VALUES +USE_CLASS_VALUES = schema_metadata.USE_CLASS_VALUES + PROOFHOUSE_REF_FIELDS = [ "workflow_ref", "evidence_ref", "workflow_evidence_snapshot", + "subject_ref", "assessment_ref", "policy_decision_ref", "use_approval_ref", @@ -40,72 +50,11 @@ *PROOFHOUSE_REF_FIELDS, ] -CAPABILITY_AREA_VALUES = [ - "workflow_context", - "readiness", - "governance", - "forge", - "operational_learning", - "analyst", - "external_integration", -] - -LIFECYCLE_STAGE_VALUES = [ - "capture", - "document_review", - "evidence_review", - "assessment", - "policy_decision", - "redaction_review", - "use_approval", - "promotion", - "asset_derivation", - "transform", - "internal_eval", - "internal_training", - "export", - "escalation", - "runtime", - "handoff", -] - -ISSUE_CLASS_VALUES = [ - "redaction_miss", - "rights_ambiguity", - "promotion_failure", - "export_control_failure", - "transform_failure", - "derivation_quality_failure", - "evidence_gap", - "escalation_miss", - "reviewer_disagreement", - "phi_redaction_failure", - "missing_claim_evidence", - "rate_source_ambiguity", - "contract_rate_mismatch", - "allowed_amount_conflict", - "approval_bypass", - "downstream_export_mismatch", - "savings_recognition_dispute", - "use_approval", - "provenance_gap", - "readiness_gap", - "workflow_truth", - "other", -] - -USE_CLASS_VALUES = [ - "evidence_only", - "internal_eval", - "internal_training", - "policy_learning", - "external_export", -] - REF_TYPE_BY_FIELD = { "workflow_ref": "workflow", "evidence_ref": "evidence", "workflow_evidence_snapshot": "workflow_evidence_snapshot", + "subject_ref": "subject", "assessment_ref": "assessment", "policy_decision_ref": "policy_decision", "use_approval_ref": "use_approval", @@ -114,6 +63,58 @@ "transform_ref": "transform", } +FORBIDDEN_SUMMARY_KEY_PARTS = { + "payload", + "raw_payload", + "source_payload", + "document_text", + "claim_text", + "claim_payload", + "raw_claim", + "raw_claim_text", + "source_document_text", + "phi", + "ssn", + "dob", + "date_of_birth", + "member_name", + "patient_name", + "customer_data", + "rate_table_row", + "payment_payload", +} + +SENSITIVE_SUMMARY_KEY_PREFIXES = ( + "phi_", + "ssn_", + "dob_", + "date_of_birth_", + "member_name_", + "patient_name_", +) + +SAFE_SUMMARY_KEY_SUFFIXES = ( + "_class", + "_decision", + "_digest", + "_id", + "_label", + "_mode", + "_owner", + "_policy", + "_present", + "_ref", + "_ref_id", + "_required", + "_scope", + "_state", + "_status", + "_summary", + "_type", +) + +CAMEL_CASE_BOUNDARY = re.compile(r"(?<=[a-z0-9])(?=[A-Z])") + CAPABILITY_AREA_ALIASES = [ ( "governance", @@ -284,11 +285,41 @@ def _has_value(value: Any) -> bool: return True +def _normalize_key(value: Any) -> str: + normalized = CAMEL_CASE_BOUNDARY.sub("_", str(value).strip()) + return normalized.lower().replace("-", "_").replace(" ", "_") + + +def _is_forbidden_summary_key(normalized: str) -> bool: + if normalized in FORBIDDEN_SUMMARY_KEY_PARTS: + return True + if normalized.startswith(SENSITIVE_SUMMARY_KEY_PREFIXES): + return not normalized.endswith(SAFE_SUMMARY_KEY_SUFFIXES) + return False + + +def _validate_summary_only_mapping(value: dict[str, Any], field_name: str) -> None: + for key, nested_value in value.items(): + normalized = _normalize_key(key) + if _is_forbidden_summary_key(normalized): + raise ValueError( + f"{field_name} contains forbidden raw/sensitive payload key '{key}'. " + "Store pointer refs, ids, digests, labels, or short summaries only." + ) + if isinstance(nested_value, dict): + _validate_summary_only_mapping(nested_value, field_name) + elif isinstance(nested_value, list): + for item in nested_value: + if isinstance(item, dict): + _validate_summary_only_mapping(item, field_name) + + def parse_pointer_value(value: Any, field_name: str) -> dict[str, Any] | None: """Parse a pointer ref from YAML data or CLI/MCP text input.""" if value is None: return None if isinstance(value, dict): + _validate_summary_only_mapping(value, field_name) return value or None if isinstance(value, str): stripped = value.strip() @@ -298,6 +329,7 @@ def parse_pointer_value(value: Any, field_name: str) -> dict[str, Any] | None: parsed = json.loads(stripped) if not isinstance(parsed, dict): raise ValueError(f"{field_name} must be a JSON object when JSON is supplied") + _validate_summary_only_mapping(parsed, field_name) return parsed return { "ref_id": stripped, @@ -312,6 +344,7 @@ def parse_observed_state(value: Any) -> dict[str, Any] | None: if value is None: return None if isinstance(value, dict): + _validate_summary_only_mapping(value, "observed_state") return value or None if isinstance(value, str): stripped = value.strip() @@ -321,6 +354,7 @@ def parse_observed_state(value: Any) -> dict[str, Any] | None: parsed = json.loads(stripped) if not isinstance(parsed, dict): raise ValueError("observed_state must be a JSON object when JSON is supplied") + _validate_summary_only_mapping(parsed, "observed_state") return parsed return {"summary": stripped} raise TypeError("observed_state must be a mapping, JSON object string, string summary, or empty") @@ -381,6 +415,7 @@ class IncidentRef: workflow_ref: dict[str, Any] | None = None evidence_ref: dict[str, Any] | None = None workflow_evidence_snapshot: dict[str, Any] | None = None + subject_ref: dict[str, Any] | None = None assessment_ref: dict[str, Any] | None = None policy_decision_ref: dict[str, Any] | None = None use_approval_ref: dict[str, Any] | None = None @@ -421,6 +456,7 @@ class Incident: workflow_ref: dict[str, Any] | None = None evidence_ref: dict[str, Any] | None = None workflow_evidence_snapshot: dict[str, Any] | None = None + subject_ref: dict[str, Any] | None = None assessment_ref: dict[str, Any] | None = None policy_decision_ref: dict[str, Any] | None = None use_approval_ref: dict[str, Any] | None = None @@ -480,6 +516,7 @@ def from_dict(cls, data: dict) -> Incident: workflow_evidence_snapshot=parse_pointer_value( data.get("workflow_evidence_snapshot"), "workflow_evidence_snapshot" ), + subject_ref=parse_pointer_value(data.get("subject_ref"), "subject_ref"), assessment_ref=parse_pointer_value(data.get("assessment_ref"), "assessment_ref"), policy_decision_ref=parse_pointer_value( data.get("policy_decision_ref"), "policy_decision_ref" @@ -537,6 +574,7 @@ def build_incident_ref(incident: Incident) -> IncidentRef: workflow_ref=incident.workflow_ref, evidence_ref=incident.evidence_ref, workflow_evidence_snapshot=incident.workflow_evidence_snapshot, + subject_ref=incident.subject_ref, assessment_ref=incident.assessment_ref, policy_decision_ref=incident.policy_decision_ref, use_approval_ref=incident.use_approval_ref, diff --git a/forge_cli/schema_metadata.py b/forge_cli/schema_metadata.py new file mode 100644 index 0000000..00474d5 --- /dev/null +++ b/forge_cli/schema_metadata.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class AxisMetadata: + field_name: str + description: str + values: list[str] + required: bool = False + + +CAPABILITY_AREA_VALUES = [ + "workflow_context", + "readiness", + "governance", + "forge", + "operational_learning", + "analyst", + "external_integration", +] + +LIFECYCLE_STAGE_VALUES = [ + "capture", + "document_review", + "evidence_review", + "assessment", + "policy_decision", + "redaction_review", + "use_approval", + "promotion", + "asset_derivation", + "transform", + "internal_eval", + "internal_training", + "export", + "escalation", + "runtime", + "handoff", +] + +ISSUE_CLASS_VALUES = [ + "redaction_miss", + "rights_ambiguity", + "promotion_failure", + "export_control_failure", + "transform_failure", + "derivation_quality_failure", + "evidence_gap", + "escalation_miss", + "reviewer_disagreement", + "phi_redaction_failure", + "missing_claim_evidence", + "rate_source_ambiguity", + "contract_rate_mismatch", + "allowed_amount_conflict", + "approval_bypass", + "downstream_export_mismatch", + "savings_recognition_dispute", + "use_approval", + "provenance_gap", + "readiness_gap", + "workflow_truth", + "other", +] + +WORKFLOW_ARCHETYPE_VALUES = [ + "document_operations", + "claims_hybrid_high_dollar_review", + "other", +] + +USE_CLASS_VALUES = [ + "evidence_only", + "internal_eval", + "internal_training", + "policy_learning", + "external_export", +] + +STRUCTURED_AXIS_METADATA = { + "issue_class": AxisMetadata( + field_name="issue_class", + description="Forge-owned failure or boundary issue class for incident-memory filtering.", + values=ISSUE_CLASS_VALUES, + ), + "capability_area": AxisMetadata( + field_name="capability_area", + description="Proofhouse capability area involved in the incident; this is not ownership transfer.", + values=CAPABILITY_AREA_VALUES, + ), + "lifecycle_stage": AxisMetadata( + field_name="lifecycle_stage", + description="Workflow or Operational Learning lifecycle stage where the failure appeared.", + values=LIFECYCLE_STAGE_VALUES, + ), + "workflow_archetype": AxisMetadata( + field_name="workflow_archetype", + description="High-level workflow pattern label used for incident discovery.", + values=WORKFLOW_ARCHETYPE_VALUES, + ), + "blocked_use_class": AxisMetadata( + field_name="blocked_use_class", + description="Use class affected by the incident; Forge records the issue but does not approve or block use.", + values=USE_CLASS_VALUES, + ), +} diff --git a/templates/analysis-prompt.md b/templates/analysis-prompt.md index 08c2142..d90549e 100644 --- a/templates/analysis-prompt.md +++ b/templates/analysis-prompt.md @@ -18,7 +18,7 @@ For document-operations or Operational Learning incidents, prefer the structured - `subject_type` - `blocked_use_class` -Expected document-operations issue classes include `redaction_miss`, `rights_ambiguity`, `promotion_failure`, `export_control_failure`, `transform_failure`, `derivation_quality_failure`, `evidence_gap`, `escalation_miss`, and `reviewer_disagreement`. Treat `workflow_ref`, `evidence_ref`, `workflow_evidence_snapshot`, `assessment_ref`, `policy_decision_ref`, `use_approval_ref`, `asset_ref`, `derivation_ref`, and `transform_ref` as pointer refs only. +Expected document-operations issue classes include `redaction_miss`, `rights_ambiguity`, `promotion_failure`, `export_control_failure`, `transform_failure`, `derivation_quality_failure`, `evidence_gap`, `escalation_miss`, and `reviewer_disagreement`. Treat `workflow_ref`, `evidence_ref`, `workflow_evidence_snapshot`, `subject_ref`, `assessment_ref`, `policy_decision_ref`, `use_approval_ref`, `asset_ref`, `derivation_ref`, and `transform_ref` as pointer refs only. ## Incident Data diff --git a/templates/incident.yml b/templates/incident.yml index 5ba5d64..c245028 100644 --- a/templates/incident.yml +++ b/templates/incident.yml @@ -46,6 +46,7 @@ observed_state: {} # incident-local state summary only workflow_ref: null evidence_ref: null workflow_evidence_snapshot: null +subject_ref: null assessment_ref: null policy_decision_ref: null use_approval_ref: null diff --git a/tests/test_analyzer.py b/tests/test_analyzer.py index 2eeee60..1e52080 100644 --- a/tests/test_analyzer.py +++ b/tests/test_analyzer.py @@ -39,6 +39,13 @@ def test_render_analysis_prompt_substitutes_incident_yaml(): assert rendered.endswith("Footer") +def test_analysis_prompt_mentions_subject_ref_boundary(): + prompt = Path("templates/analysis-prompt.md").read_text(encoding="utf-8") + + assert "subject_ref" in prompt + assert "pointer refs only" in prompt + + def test_next_analysis_output_path_uses_sequence_when_needed(tmp_path): first = next_analysis_output_path(tmp_path, date_prefix="2026-03-28") assert first == tmp_path / "2026-03-28-analysis.md" diff --git a/tests/test_cli.py b/tests/test_cli.py index af553c4..acc25aa 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -144,3 +144,122 @@ def test_log_command_accepts_claims_issue_class(tmp_path): assert incident.issue_class == "rate_source_ambiguity" assert incident.workflow_archetype == "claims_hybrid_high_dollar_review" assert incident.workflow_ref["ref_id"] == "workflow:claims-hybrid-high-dollar-review-v0" + + +def test_log_command_rejects_unknown_workflow_archetype(tmp_path): + data_root = tmp_path / "forge-data" + runner = CliRunner() + result = runner.invoke( + app, + [ + "log", + "--project", + "proofhouse-claims", + "--agent", + "claims-review-fixture", + "--platform", + "codex", + "--severity", + "functional", + "--type", + "integration_failure", + "--workflow-archetype", + "claims_custom_unreviewed", + ], + env={"FORGE_DATA_ROOT": str(data_root)}, + ) + + assert result.exit_code == 1 + assert "Invalid workflow_archetype" in result.output + assert not list((data_root / "incidents").rglob("*.yml")) + + +def test_list_command_filters_structured_axes(tmp_path, sample_data): + data_root = tmp_path / "forge-data" + incidents_dir = data_root / "incidents" + incidents_dir.mkdir(parents=True) + claims_data = sample_data.copy() + claims_data.update( + { + "issue_class": "rate_source_ambiguity", + "capability_area": "workflow_context", + "lifecycle_stage": "evidence_review", + "workflow_archetype": "claims_hybrid_high_dollar_review", + "blocked_use_class": "internal_eval", + } + ) + docs_data = sample_data.copy() + docs_data.update( + { + "id": "2026-03-04-002", + "issue_class": "redaction_miss", + "capability_area": "governance", + "lifecycle_stage": "redaction_review", + "workflow_archetype": "document_operations", + "blocked_use_class": "external_export", + } + ) + save_incident(Incident.from_dict(claims_data), incidents_dir) + save_incident(Incident.from_dict(docs_data), incidents_dir) + + runner = CliRunner() + result = runner.invoke( + app, + [ + "list", + "--issue-class", + "rate_source_ambiguity", + "--capability-area", + "workflow_context", + "--lifecycle-stage", + "evidence_review", + "--workflow-archetype", + "claims_hybrid_high_dollar_review", + "--blocked-use-class", + "internal_eval", + ], + env={"FORGE_DATA_ROOT": str(data_root)}, + ) + + assert result.exit_code == 0 + assert "2026-03-04-001" in result.output + assert "2026-03-04-002" not in result.output + + +def test_log_command_accepts_subject_ref(tmp_path): + data_root = tmp_path / "forge-data" + runner = CliRunner() + result = runner.invoke( + app, + [ + "log", + "--project", + "proofhouse-document-operations", + "--agent", + "document-review-fixture", + "--platform", + "codex", + "--severity", + "functional", + "--type", + "other", + "--subject-ref", + "subject:document-packet:synthetic-demo", + ], + input=( + "Expected behavior\n" + "Actual behavior\n" + "Context\n" + "root-cause\n" + "Immediate fix\n" + "Systemic takeaway\n" + "document-operations\n" + "y\n" + ), + env={"FORGE_DATA_ROOT": str(data_root)}, + ) + + assert result.exit_code == 0 + saved = next((data_root / "incidents").rglob("*.yml")) + incident = Incident.from_dict(yaml.safe_load(saved.read_text())) + assert incident.subject_ref["ref_id"] == "subject:document-packet:synthetic-demo" diff --git a/tests/test_incident_store.py b/tests/test_incident_store.py index 9ebff80..a88ce0b 100644 --- a/tests/test_incident_store.py +++ b/tests/test_incident_store.py @@ -2,6 +2,8 @@ from pathlib import Path from forge_cli.incident_store import ( + AmbiguousIncidentLookupError, + DuplicateIncidentError, find_incident, find_incident_path, generate_id, @@ -156,6 +158,72 @@ def test_list_incidents_tag_filter(tmp_incidents_dir, sample_data): assert len(result) == 0 +def test_list_incidents_filters_structured_axes(tmp_incidents_dir, sample_data): + first = sample_data.copy() + first.update( + { + "issue_class": "rate_source_ambiguity", + "capability_area": "workflow_context", + "lifecycle_stage": "evidence_review", + "workflow_archetype": "claims_hybrid_high_dollar_review", + "blocked_use_class": "internal_eval", + } + ) + second = sample_data.copy() + second.update( + { + "id": "2026-03-04-002", + "issue_class": "redaction_miss", + "capability_area": "governance", + "lifecycle_stage": "redaction_review", + "workflow_archetype": "document_operations", + "blocked_use_class": "external_export", + } + ) + save_incident(Incident.from_dict(first), tmp_incidents_dir) + save_incident(Incident.from_dict(second), tmp_incidents_dir) + + result = list_incidents( + tmp_incidents_dir, + issue_class="rate_source_ambiguity", + capability_area="workflow_context", + lifecycle_stage="evidence_review", + workflow_archetype="claims_hybrid_high_dollar_review", + blocked_use_class="internal_eval", + ) + + assert [incident.id for incident in result] == ["2026-03-04-001"] + + +def test_save_incident_rejects_duplicate_id(tmp_incidents_dir, sample_data): + incident = Incident.from_dict(sample_data) + save_incident(incident, tmp_incidents_dir) + + try: + save_incident(incident, tmp_incidents_dir) + except DuplicateIncidentError as exc: + assert "2026-03-04-001" in str(exc) + else: + raise AssertionError("expected duplicate incident id to be rejected") + + +def test_find_incident_path_rejects_ambiguous_suffix(tmp_incidents_dir, sample_data): + first = Incident.from_dict(sample_data) + second_data = sample_data.copy() + second_data["id"] = "2026-03-05-001" + second_data["timestamp"] = "2026-03-05T14:30:00Z" + save_incident(first, tmp_incidents_dir) + save_incident(Incident.from_dict(second_data), tmp_incidents_dir) + + try: + find_incident_path(tmp_incidents_dir, "001") + except AmbiguousIncidentLookupError as exc: + assert "2026-03-04-001" in str(exc) + assert "2026-03-05-001" in str(exc) + else: + raise AssertionError("expected ambiguous suffix lookup to be rejected") + + def test_document_operations_example_loads_as_structured_stub(): fixture_path = ( Path(__file__).parents[1] diff --git a/tests/test_mcp_http.py b/tests/test_mcp_http.py index 5a799c3..ff099ee 100644 --- a/tests/test_mcp_http.py +++ b/tests/test_mcp_http.py @@ -3,6 +3,12 @@ mcp = pytest.importorskip("mcp") +import json # noqa: E402 +import yaml # noqa: E402 + +from forge_cli.incident_store import save_incident # noqa: E402 +from forge_cli.models import Incident # noqa: E402 +from forge_cli.mcp_server import forge_list, forge_log, forge_schema # noqa: E402 from forge_cli.mcp_http import ( # noqa: E402 MCPHTTPServerOptions, resolve_transport_security, @@ -47,3 +53,109 @@ def test_resolve_transport_security_can_disable_protection_for_private_network_u assert security is not None assert security.enable_dns_rebinding_protection is False + + +def test_forge_schema_exposes_centralized_structured_axis_metadata(): + schema = json.loads(forge_schema()) + + assert "structured_axis_metadata" in schema + assert schema["structured_axis_metadata"]["issue_class"]["values"] + assert "subject_ref" in schema["pointer_ref_fields"] + assert "subject_ref" in schema["incident_ref_fields"] + + +def test_forge_list_filters_structured_axes(tmp_path, monkeypatch, sample_data): + data_root = tmp_path / "forge-data" + incidents_dir = data_root / "incidents" + incidents_dir.mkdir(parents=True) + claims_data = sample_data.copy() + claims_data.update( + { + "issue_class": "rate_source_ambiguity", + "capability_area": "workflow_context", + "lifecycle_stage": "evidence_review", + "workflow_archetype": "claims_hybrid_high_dollar_review", + "blocked_use_class": "internal_eval", + } + ) + docs_data = sample_data.copy() + docs_data.update( + { + "id": "2026-03-04-002", + "issue_class": "redaction_miss", + "capability_area": "governance", + "lifecycle_stage": "redaction_review", + "workflow_archetype": "document_operations", + "blocked_use_class": "external_export", + } + ) + save_incident(Incident.from_dict(claims_data), incidents_dir) + save_incident(Incident.from_dict(docs_data), incidents_dir) + monkeypatch.setenv("FORGE_DATA_ROOT", str(data_root)) + + result = forge_list( + issue_class="rate_source_ambiguity", + capability_area="workflow_context", + lifecycle_stage="evidence_review", + workflow_archetype="claims_hybrid_high_dollar_review", + blocked_use_class="internal_eval", + ) + + assert "2026-03-04-001" in result + assert "2026-03-04-002" not in result + + +def test_forge_log_rejects_raw_payload_pointer_keys(tmp_path, monkeypatch): + data_root = tmp_path / "forge-data" + monkeypatch.setenv("FORGE_DATA_ROOT", str(data_root)) + + result = forge_log( + project="proofhouse-claims", + agent="claims-review-fixture", + severity="functional", + failure_type="other", + expected_behavior="Expected behavior", + actual_behavior="Actual behavior", + workflow_ref=json.dumps({"ref_id": "workflow:demo", "claim_text": "raw claim"}), + ) + + assert "claim_text" in result + assert not list((data_root / "incidents").rglob("*.yml")) + + +def test_forge_log_rejects_unknown_workflow_archetype(tmp_path, monkeypatch): + data_root = tmp_path / "forge-data" + monkeypatch.setenv("FORGE_DATA_ROOT", str(data_root)) + + result = forge_log( + project="proofhouse-claims", + agent="claims-review-fixture", + severity="functional", + failure_type="other", + expected_behavior="Expected behavior", + actual_behavior="Actual behavior", + workflow_archetype="claims_custom_unreviewed", + ) + + assert "Invalid workflow_archetype" in result + assert not list((data_root / "incidents").rglob("*.yml")) + + +def test_forge_log_accepts_subject_ref(tmp_path, monkeypatch): + data_root = tmp_path / "forge-data" + monkeypatch.setenv("FORGE_DATA_ROOT", str(data_root)) + + result = forge_log( + project="proofhouse-document-operations", + agent="document-review-fixture", + severity="functional", + failure_type="other", + expected_behavior="Expected behavior", + actual_behavior="Actual behavior", + subject_ref="subject:document-packet:synthetic-demo", + ) + + assert "Incident logged:" in result + saved = next((data_root / "incidents").rglob("*.yml")) + incident = Incident.from_dict(yaml.safe_load(saved.read_text())) + assert incident.subject_ref["ref_id"] == "subject:document-packet:synthetic-demo" diff --git a/tests/test_models.py b/tests/test_models.py index 84f2e8b..37475f1 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,12 +1,18 @@ +import pytest + from forge_cli.models import ( FORGE_DEFAULT_ENVIRONMENT_ID, FORGE_UNSCOPED_ORGANIZATION_ID, + PROOFHOUSE_REF_FIELDS, PROOFHOUSE_SHARED_CONTRACT_VERSION, FailureType, Incident, ISSUE_CLASS_VALUES, Severity, + parse_observed_state, + parse_pointer_value, ) +from forge_cli.schema_metadata import STRUCTURED_AXIS_METADATA def test_severity_enum_values(): @@ -143,6 +149,78 @@ def test_claims_issue_classes_are_valid_structured_values(): assert expected.issubset(set(ISSUE_CLASS_VALUES)) +def test_structured_axis_metadata_is_single_source_for_claims_axes(): + assert STRUCTURED_AXIS_METADATA["issue_class"].values is ISSUE_CLASS_VALUES + assert STRUCTURED_AXIS_METADATA["capability_area"].description + assert STRUCTURED_AXIS_METADATA["workflow_archetype"].description + assert STRUCTURED_AXIS_METADATA["blocked_use_class"].values + + +def test_subject_ref_is_supported_as_pointer_only(sample_data): + data = sample_data.copy() + data["subject_ref"] = "subject:document-packet:synthetic-demo" + + incident = Incident.from_dict(data) + ref = incident.to_ref() + + assert "subject_ref" in PROOFHOUSE_REF_FIELDS + assert incident.subject_ref["ref_id"] == "subject:document-packet:synthetic-demo" + assert incident.subject_ref["cache_policy"] == "ref_only" + assert ref.subject_ref["ref_id"] == "subject:document-packet:synthetic-demo" + + +def test_pointer_refs_reject_obvious_raw_payload_keys(): + try: + parse_pointer_value({"ref_id": "workflow:demo", "payload": {"claim": "raw"}}, "workflow_ref") + except ValueError as exc: + assert "payload" in str(exc) + else: + raise AssertionError("expected raw payload key to be rejected") + + +def test_pointer_refs_allow_digest_metadata(): + parsed = parse_pointer_value( + {"ref_id": "workflow:demo", "payload_digest": "sha256:placeholder"}, + "workflow_ref", + ) + + assert parsed["payload_digest"] == "sha256:placeholder" + + +def test_observed_state_rejects_obvious_sensitive_keys(): + try: + parse_observed_state({"state": "needs_review", "claim_text": "raw claim body"}) + except ValueError as exc: + assert "claim_text" in str(exc) + else: + raise AssertionError("expected raw observed_state key to be rejected") + + +def test_observed_state_rejects_camel_case_raw_payload_keys(): + for raw_key in [ + "rawPayload", + "claimText", + "documentText", + "dateOfBirth", + "patientName", + "memberName", + ]: + with pytest.raises(ValueError, match=raw_key): + parse_observed_state({raw_key: "raw value"}) + + +def test_observed_state_allows_boundary_safe_phi_status_keys(): + parsed = parse_observed_state( + { + "phi_redaction_status": "passed", + "phi_boundary_state": "summary_only", + "phi_packet_digest": "sha256:placeholder", + } + ) + + assert parsed["phi_redaction_status"] == "passed" + + def test_incident_ref_projection_infers_claims_issue_class_alias(sample_data): data = sample_data.copy() data.update(