Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/anglerfish/cli/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ def _parse_args(argv: Sequence[str] | None) -> argparse.Namespace:
dest="records_dir",
help=("Directory containing deployment record JSON files. Default: ~/.anglerfish/records/"),
)
list_parser.add_argument(
"--format",
choices=("table", "json"),
default="table",
help="Output format (default: table).",
)

monitor_parser = subparsers.add_parser("monitor", help="Monitor audit logs for canary access events.")
monitor_parser.add_argument(
Expand Down Expand Up @@ -424,6 +430,12 @@ def _parse_args(argv: Sequence[str] | None) -> argparse.Namespace:
default=None,
help="Credential type for application auth.",
)
verify_parser.add_argument(
"--format",
choices=("table", "json"),
default="table",
help="Output format (default: table).",
)

demo_access_parser = subparsers.add_parser(
"demo-access",
Expand Down
79 changes: 63 additions & 16 deletions src/anglerfish/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import dataclasses
import datetime
import json
import os
from pathlib import Path

Expand Down Expand Up @@ -116,8 +117,12 @@ def _print_demo_access_success(console: Console, result: dict[str, str]) -> None
def _run_list(args: argparse.Namespace, console: Console) -> int:
"""List all deployment records in a directory as a Rich table."""
records_dir = Path(args.records_dir)
output_format = getattr(args, "format", "table")

if not records_dir.exists():
if output_format == "json":
console.print("[]")
return 0
console.print(f"[yellow]Records directory not found:[/yellow] {records_dir}")
console.print(
"[dim]Deploy with --output-json pointing into this directory, "
Expand All @@ -127,9 +132,28 @@ def _run_list(args: argparse.Namespace, console: Console) -> int:

record_files = sorted(records_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
if not record_files:
if output_format == "json":
console.print("[]")
return 0
console.print("[yellow]No deployment records found in[/yellow] " + str(records_dir))
return 0

records: list[tuple[str, dict]] = []
for record_file in record_files:
try:
record = read_deployment_record(record_file)
except DeploymentError:
continue # Skip malformed records silently

canary_type = str(record.get("canary_type", record.get("type", "unknown"))).strip().lower()
if canary_type != "outlook":
continue
records.append((record_file.stem[:12], record))

if output_format == "json":
console.print(json.dumps([record for _record_id, record in records], indent=2, sort_keys=True), markup=False)
return 0

table = Table(
title=f"Deployed Outlook Canary Artifacts ({records_dir})",
box=box.SIMPLE_HEAVY,
Expand All @@ -144,16 +168,8 @@ def _run_list(args: argparse.Namespace, console: Console) -> int:
table.add_column("Status", no_wrap=True)

row_count = 0
for record_file in record_files:
try:
record = read_deployment_record(record_file)
except DeploymentError:
continue # Skip malformed records silently

record_id = record_file.stem[:12]
for record_id, record in records:
canary_type = str(record.get("canary_type", record.get("type", "unknown"))).strip().lower()
if canary_type != "outlook":
continue
template_name = str(record.get("template_name", "\u2014"))
target = str(record.get("target_user") or "\u2014")
timestamp = str(record.get("timestamp", "\u2014"))
Expand Down Expand Up @@ -466,11 +482,15 @@ def _run_verify(args: argparse.Namespace, console: Console) -> int:
from ..monitor import load_records
from ..verify import VerifyResult, VerifyStatus, run_verify

_print_banner(console)
output_format = getattr(args, "format", "table")

if output_format == "table":
_print_banner(console)

if getattr(args, "demo", False):
# Demo mode: show simulated output.
console.print("Verifying [bold]3[/bold] deployment record(s)...\n")
if output_format == "table":
console.print("Verifying [bold]3[/bold] deployment record(s)...\n")
results = [
VerifyResult(
canary_type="outlook",
Expand Down Expand Up @@ -506,10 +526,14 @@ def _run_verify(args: argparse.Namespace, console: Console) -> int:
records = load_records(default_dir)

if not records:
console.print("[yellow]No deployment records found to verify.[/yellow]")
if output_format == "json":
console.print("[]")
else:
console.print("[yellow]No deployment records found to verify.[/yellow]")
return 1

console.print(f"Verifying [bold]{len(records)}[/bold] deployment record(s)...\n")
if output_format == "table":
console.print(f"Verifying [bold]{len(records)}[/bold] deployment record(s)...\n")
pending_graph_checks: list[tuple[int, tuple[str, dict]]] = []
ordered_results: list[VerifyResult | None] = [None] * len(records)

Expand Down Expand Up @@ -560,28 +584,51 @@ def _run_verify(args: argparse.Namespace, console: Console) -> int:
)
if auth_result is None:
return 130
console.print("Authenticating with Microsoft Graph...")
if output_format == "table":
console.print("Authenticating with Microsoft Graph...")
try:
token = authenticate(auth_mode="application", app_credential_mode=auth_result.credential_mode)
finally:
_clear_prompted_env_values(auth_result)
graph = GraphClient(token)
_print_auth_success(console)
if output_format == "table":
_print_auth_success(console)

graph_results = run_verify([record_item for _index, record_item in pending_graph_checks], graph)
for (index, _record_item), graph_result in zip(pending_graph_checks, graph_results):
ordered_results[index] = graph_result

results = [result for result in ordered_results if result is not None]

any_bad = any(result.status != VerifyStatus.OK for result in results)

if output_format == "json":
console.print(
json.dumps(
[
{
"canary_type": result.canary_type,
"detail": result.detail,
"status": result.status.value,
"target": result.target,
"template_name": result.template_name,
}
for result in results
],
indent=2,
sort_keys=True,
),
markup=False,
)
return 1 if any_bad else 0

# Render table.
table = Table(box=box.ROUNDED, title="Canary Verification")
table.add_column("Type", style="dim")
table.add_column("Template")
table.add_column("Target")
table.add_column("Status")

any_bad = False
for r in results:
if r.status == VerifyStatus.OK:
status_str = "[green]OK[/green]"
Expand Down
58 changes: 58 additions & 0 deletions tests/test_cli_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,26 @@ def test_main_list_returns_zero_when_records_dir_missing(tmp_path):
assert main(["list", "--records-dir", str(missing)]) == 0


def test_main_list_json_emits_records_only(tmp_path, capsys):
records_dir = tmp_path / "records"
records_dir.mkdir()
record = {
"canary_type": "outlook",
"delivery_mode": "draft",
"folder_id": "folder-1",
"target_user": "alice@contoso.com",
"template_name": "Fake Password Reset",
"timestamp": "2026-05-06T14:00:00+00:00",
}
(records_dir / "record-1.json").write_text(json.dumps(record), encoding="utf-8")

assert main(["list", "--format", "json", "--records-dir", str(records_dir)]) == 0

output = capsys.readouterr().out
assert json.loads(output) == [record]
assert "Deployed Outlook Canary Artifacts" not in output


def test_main_cleanup_outlook_happy_path(monkeypatch, tmp_path):
record_path = tmp_path / "record.json"
record_path.write_text("{}", encoding="utf-8")
Expand Down Expand Up @@ -451,6 +471,14 @@ def test_parse_args_verify_subcommand():
assert args.demo is True


def test_parse_args_format_json():
list_args = main_mod._parse_args(["list", "--format", "json"])
verify_args = main_mod._parse_args(["verify", "--format", "json"])

assert list_args.format == "json"
assert verify_args.format == "json"


def test_verify_demo_exits_one():
import subprocess

Expand Down Expand Up @@ -529,6 +557,36 @@ def _fake_prompt_auth_setup(*args, **kwargs):
assert auth_calls == []


def test_verify_json_send_record_is_machine_readable_without_auth(monkeypatch, capsys):
monkeypatch.setattr(
deploy_mod,
"read_deployment_record",
lambda _path: {
"canary_type": "outlook",
"delivery_mode": "send",
"target_user": "alice@contoso.com",
"inbox_message_id": "msg-123",
"template_name": "Fake Password Reset",
},
)
monkeypatch.setattr(deploy_mod, "authenticate", lambda *args, **kwargs: pytest.fail("must not authenticate"))

result = main(["verify", "--format", "json", "record.json"])

output = capsys.readouterr().out
assert result == 1
assert json.loads(output) == [
{
"canary_type": "outlook",
"detail": "Verify only supports draft-mode outlook records",
"status": "ERROR",
"target": "alice@contoso.com",
"template_name": "Fake Password Reset",
}
]
assert "Canary Verification" not in output


def test_verify_unsupported_record_returns_error_without_auth(monkeypatch):
monkeypatch.setattr(
deploy_mod,
Expand Down