Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/anglerfish/cli/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ def _parse_args(argv: Sequence[str] | None) -> argparse.Namespace:
dest="records_dir",
help=("Directory containing deployment record JSON files. Default: ~/.anglerfish/records/"),
)
list_parser.add_argument(
"--format",
choices=("table", "json"),
default="table",
dest="output_format",
help="Output format: table (default) or json.",
)

monitor_parser = subparsers.add_parser("monitor", help="Monitor audit logs for canary access events.")
monitor_parser.add_argument(
Expand Down Expand Up @@ -424,6 +431,13 @@ def _parse_args(argv: Sequence[str] | None) -> argparse.Namespace:
default=None,
help="Credential type for application auth.",
)
verify_parser.add_argument(
"--format",
choices=("table", "json"),
default="table",
dest="output_format",
help="Output format: table (default) or json.",
)

demo_access_parser = subparsers.add_parser(
"demo-access",
Expand Down
46 changes: 41 additions & 5 deletions src/anglerfish/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import dataclasses
import datetime
import json
import os
from pathlib import Path

Expand Down Expand Up @@ -116,8 +117,12 @@ def _print_demo_access_success(console: Console, result: dict[str, str]) -> None
def _run_list(args: argparse.Namespace, console: Console) -> int:
"""List all deployment records in a directory as a Rich table."""
records_dir = Path(args.records_dir)
output_format = getattr(args, "output_format", "table")

if not records_dir.exists():
if output_format == "json":
console.print("[]")
return 0
console.print(f"[yellow]Records directory not found:[/yellow] {records_dir}")
console.print(
"[dim]Deploy with --output-json pointing into this directory, "
Expand All @@ -127,6 +132,9 @@ def _run_list(args: argparse.Namespace, console: Console) -> int:

record_files = sorted(records_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
if not record_files:
if output_format == "json":
console.print("[]")
return 0
console.print("[yellow]No deployment records found in[/yellow] " + str(records_dir))
return 0

Expand All @@ -144,6 +152,7 @@ def _run_list(args: argparse.Namespace, console: Console) -> int:
table.add_column("Status", no_wrap=True)

row_count = 0
json_records: list[dict] = []
for record_file in record_files:
try:
record = read_deployment_record(record_file)
Expand All @@ -154,6 +163,7 @@ def _run_list(args: argparse.Namespace, console: Console) -> int:
canary_type = str(record.get("canary_type", record.get("type", "unknown"))).strip().lower()
if canary_type != "outlook":
continue
json_records.append(record)
template_name = str(record.get("template_name", "\u2014"))
target = str(record.get("target_user") or "\u2014")
timestamp = str(record.get("timestamp", "\u2014"))
Expand All @@ -175,6 +185,10 @@ def _run_list(args: argparse.Namespace, console: Console) -> int:
table.add_row(record_id, canary_type, template_name, target, deployed_str, status_markup)
row_count += 1

if output_format == "json":
console.print(json.dumps(json_records, indent=2, sort_keys=True))
return 0

if row_count:
console.print(table)
else:
Expand Down Expand Up @@ -466,11 +480,15 @@ def _run_verify(args: argparse.Namespace, console: Console) -> int:
from ..monitor import load_records
from ..verify import VerifyResult, VerifyStatus, run_verify

_print_banner(console)
output_format = getattr(args, "output_format", "table")

if output_format != "json":
_print_banner(console)

if getattr(args, "demo", False):
# Demo mode: show simulated output.
console.print("Verifying [bold]3[/bold] deployment record(s)...\n")
if output_format != "json":
console.print("Verifying [bold]3[/bold] deployment record(s)...\n")
results = [
VerifyResult(
canary_type="outlook",
Expand Down Expand Up @@ -506,10 +524,14 @@ def _run_verify(args: argparse.Namespace, console: Console) -> int:
records = load_records(default_dir)

if not records:
if output_format == "json":
console.print("[]")
return 1
console.print("[yellow]No deployment records found to verify.[/yellow]")
return 1

console.print(f"Verifying [bold]{len(records)}[/bold] deployment record(s)...\n")
if output_format != "json":
console.print(f"Verifying [bold]{len(records)}[/bold] deployment record(s)...\n")
pending_graph_checks: list[tuple[int, tuple[str, dict]]] = []
ordered_results: list[VerifyResult | None] = [None] * len(records)

Expand Down Expand Up @@ -560,20 +582,26 @@ def _run_verify(args: argparse.Namespace, console: Console) -> int:
)
if auth_result is None:
return 130
console.print("Authenticating with Microsoft Graph...")
if output_format != "json":
console.print("Authenticating with Microsoft Graph...")
try:
token = authenticate(auth_mode="application", app_credential_mode=auth_result.credential_mode)
finally:
_clear_prompted_env_values(auth_result)
graph = GraphClient(token)
_print_auth_success(console)
if output_format != "json":
_print_auth_success(console)

graph_results = run_verify([record_item for _index, record_item in pending_graph_checks], graph)
for (index, _record_item), graph_result in zip(pending_graph_checks, graph_results):
ordered_results[index] = graph_result

results = [result for result in ordered_results if result is not None]

if output_format == "json":
console.print(json.dumps([_verify_result_to_json(result) for result in results], indent=2, sort_keys=True))
return 1 if any(result.status != VerifyStatus.OK for result in results) else 0

# Render table.
table = Table(box=box.ROUNDED, title="Canary Verification")
table.add_column("Type", style="dim")
Expand Down Expand Up @@ -604,3 +632,11 @@ def _run_verify(args: argparse.Namespace, console: Console) -> int:
console.print("[bold yellow]Demo mode \u2014 no Graph API calls were made.[/bold yellow]")

return 1 if any_bad else 0


def _verify_result_to_json(result) -> dict[str, str]:
payload = dataclasses.asdict(result)
status = payload.get("status")
if hasattr(status, "value"):
payload["status"] = status.value
return payload
10 changes: 10 additions & 0 deletions tests/test_cli_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ def test_output_json(self):
args = _parse_args(["--output-json", "/tmp/out.json"])
assert args.output_json == "/tmp/out.json"

def test_list_format_json(self):
args = _parse_args(["list", "--format", "json"])
assert args.subcommand == "list"
assert args.output_format == "json"

def test_verify_format_json(self):
args = _parse_args(["verify", "--format", "json"])
assert args.subcommand == "verify"
assert args.output_format == "json"


class TestPrintAuthSuccess:
def test_application_mode_prints_application_success(self):
Expand Down
48 changes: 48 additions & 0 deletions tests/test_cli_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,54 @@ def test_main_list_returns_zero_when_records_dir_missing(tmp_path):
assert main(["list", "--records-dir", str(missing)]) == 0


def test_main_list_format_json_outputs_records(tmp_path, capsys):
(tmp_path / "outlook.json").write_text(
json.dumps(
{
"timestamp": "2026-05-06T18:00:00+00:00",
"canary_type": "outlook",
"template_name": "Outlook",
"target_user": "adele.vance@contoso.com",
"status": "active",
}
),
encoding="utf-8",
)
(tmp_path / "sharepoint.json").write_text(
json.dumps(
{
"timestamp": "2026-05-06T18:00:00+00:00",
"canary_type": "sharepoint",
"template_name": "Legacy",
"status": "active",
}
),
encoding="utf-8",
)

assert main(["list", "--format", "json", "--records-dir", str(tmp_path)]) == 0

records = json.loads(capsys.readouterr().out)
assert records == [
{
"timestamp": "2026-05-06T18:00:00+00:00",
"canary_type": "outlook",
"template_name": "Outlook",
"target_user": "adele.vance@contoso.com",
"status": "active",
}
]


def test_main_verify_format_json_demo_outputs_results(capsys):
assert main(["verify", "--demo", "--format", "json"]) == 1

results = json.loads(capsys.readouterr().out)
assert [result["status"] for result in results] == ["OK", "GONE", "ERROR"]
assert results[0]["target"] == "cfo@contoso.com"
assert all(set(result) == {"canary_type", "template_name", "target", "status", "detail"} for result in results)


def test_main_cleanup_outlook_happy_path(monkeypatch, tmp_path):
record_path = tmp_path / "record.json"
record_path.write_text("{}", encoding="utf-8")
Expand Down