diff --git a/django-backend/docs/development_tooling.md b/django-backend/docs/development_tooling.md new file mode 100644 index 000000000..81b8ac9c3 --- /dev/null +++ b/django-backend/docs/development_tooling.md @@ -0,0 +1,230 @@ +# SoroScan Development Tooling + +This document covers the new utilities added for database seeding, event replay, bulk import, and backup verification. + +--- + +## Database Seeder (`seed_database`) + +Populates the development database with realistic test data. + +### Commands + +```bash +cd django-backend +``` + +Full seed with default fixture: +```bash +python manage.py seed_database +``` + +Use a custom fixture file: +```bash +python manage.py seed_database --fixture=fixtures/custom.json +``` + +Use a predefined scenario: +```bash +python manage.py seed_database --scenario=minimal +python manage.py seed_database --scenario=webhook +``` + +Clear all seeded data before re-seeding: +```bash +python manage.py seed_database --clear +``` + +### Fixture Format (`fixtures/development.json`) + +```json +{ + "organizations": [...], + "teams": [...], + "users": [...], + "memberships": [...], + "team_memberships": [...], + "contracts": [...], + "events": { + "per_contract": 20, + "contracts": ["CAAAA..."], + "event_types": ["swap", "deposit"], + "days_back": 14 + }, + "webhooks": [...], + "webhook_delivery_logs": { + "per_webhook": 5, + "status_codes": [200, 200, 200, 500, 408] + }, + "alert_rules": [...], + "api_keys": [...] +} +``` + +--- + +## Local Event Replay (`replay_events`) + +Replay ContractEvent records through webhook delivery for debugging and retesting. + +```bash +cd django-backend +``` + +Basic replay (dispatches to all active webhooks): +```bash +python manage.py replay_events --contract=CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZ2KZ --limit=10 +``` + +Dry run preview: +```bash +python manage.py replay_events --contract=CAAAA... --dry-run +``` + +Filter by event type: +```bash +python manage.py replay_events --contract=CAAAA... --event-type=swap +``` + +Filter by ledger range: +```bash +python manage.py replay_events --contract=CAAAA... --from-ledger=100 --to-ledger=1000 +``` + +Filter by date range: +```bash +python manage.py replay_events --contract=CAAAA... --from-date=2025-01-01 --to-date=2025-01-31 +``` + +Replay to a single webhook: +```bash +python manage.py replay_events --contract=CAAAA... --webhook-id=3 +``` + +Write report to JSON: +```bash +python manage.py replay_events --contract=CAAAA... --output-json=replay_report.json +``` + +--- + +## Bulk Metadata Import (`bulk_import_metadata`) + +Import contract metadata from CSV or JSON with validation and rollback support. + +```bash +cd django-backend +``` + +CSV import: +```bash +python manage.py bulk_import_metadata --input=contracts.csv --format=csv +``` + +JSON import: +```bash +python manage.py bulk_import_metadata --input=contracts.json --format=json +``` + +Dry run (validate without changing DB): +```bash +python manage.py bulk_import_metadata --input=contracts.csv --dry-run +``` + +Skip errors instead of rolling back: +```bash +python manage.py bulk_import_metadata --input=contracts.csv --on-error=skip +``` + +Write import report: +```bash +python manage.py bulk_import_metadata --input=contracts.csv --report=import_report.json +``` + +### CSV Format + +``` +contract_id,name,description,tags,documentation_url,github_repo,team_email +CAAAA...,Stablecoin Swap,An AMM,"defi,amm,swap",https://...,https://...,team@acme.example.com +``` + +### JSON Format + +```json +{ + "metadata": [ + { + "contract_id": "CAAAA...", + "name": "Stablecoin Swap", + "description": "An AMM", + "tags": ["defi", "amm", "swap"], + "documentation_url": "https://...", + "github_repo": "https://...", + "team_email": "team@acme.example.com" + } + ] +} +``` + +--- + +## Backup Verification & Testing (`test_backup.py`) + +Automated backup verification and restoration testing. + +```bash +cd scripts/backup +``` + +Full test (verify + restore): +```bash +python test_backup.py --bucket=soroscan-backups --prefix=pg-backups --db-url=postgresql://... +``` + +Or use environment variables: +```bash +export DATABASE_URL=postgresql://user:pass@host:5432/dbname +export S3_BUCKET=soroscan-backups +export S3_PREFIX=pg-backups +python test_backup.py full-test +``` + +Verify only (no restore): +```bash +python test_backup.py verify-only +``` + +Restore only: +```bash +python test_backup.py restore-only --s3-key=pg-backups/20250101.dump.gz +``` + +Print manual runbook: +```bash +python test_backup.py --runbook +``` + +Write test report to JSON: +```bash +python test_backup.py full-test --output=backup_report.json +``` + +Alerting: +- Set `SLACK_WEBHOOK_URL` to receive alerts on test failure. +- Results include RTO (restore duration) and RPO (time since backup) in seconds. + +--- + +## Automation + +### Nightly Backup Verification (cron) + +```cron +0 6 * * * cd /path/to/soroscan/scripts/backup && python test_backup.py verify-only >> /var/log/backup_verify.log 2>&1 +``` + +### Weekly Restoration Test (cron) + +```cron +0 4 * * 0 cd /path/to/soroscan/scripts/backup && python test_backup.py restore-only >> /var/log/backup_restore.log 2>&1 +``` diff --git a/django-backend/fixtures/development.json b/django-backend/fixtures/development.json new file mode 100644 index 000000000..008d07d88 --- /dev/null +++ b/django-backend/fixtures/development.json @@ -0,0 +1,241 @@ +{ + "version": "1.0", + "description": "Development seed data for SoroScan database", + "organizations": [ + { + "name": "Acme DeFi", + "slug": "acme-defi", + "owner_email": "admin@acme.example.com", + "settings": {}, + "quota": 10000 + }, + { + "name": "Beta Labs", + "slug": "beta-labs", + "owner_email": "admin@beta.example.com", + "settings": {}, + "quota": 5000 + }, + { + "name": "Gamma Finance", + "slug": "gamma-finance", + "owner_email": "admin@gamma.example.com", + "settings": {}, + "quota": 0 + } + ], + "teams": [ + { + "name": "DeFi Team", + "organization_slug": "acme-defi", + "created_by_email": "admin@acme.example.com" + }, + { + "name": "NFT Team", + "organization_slug": "acme-defi", + "created_by_email": "admin@acme.example.com" + }, + { + "name": "Core Team", + "organization_slug": "beta-labs", + "created_by_email": "admin@beta.example.com" + } + ], + "users": [ + { + "email": "admin@acme.example.com", + "password": "testpass123", + "first_name": "Alice", + "last_name": "Admin" + }, + { + "email": "dev@acme.example.com", + "password": "testpass123", + "first_name": "Bob", + "last_name": "Developer" + }, + { + "email": "admin@beta.example.com", + "password": "testpass123", + "first_name": "Charlie", + "last_name": "Beta" + }, + { + "email": "analyst@gamma.example.com", + "password": "testpass123", + "first_name": "Diana", + "last_name": "Analyst" + } + ], + "memberships": [ + {"user_email": "admin@acme.example.com", "organization_slug": "acme-defi", "role": "owner"}, + {"user_email": "dev@acme.example.com", "organization_slug": "acme-defi", "role": "admin"}, + {"user_email": "admin@beta.example.com", "organization_slug": "beta-labs", "role": "owner"}, + {"user_email": "analyst@gamma.example.com", "organization_slug": "gamma-finance", "role": "member"} + ], + "team_memberships": [ + {"user_email": "admin@acme.example.com", "team_name": "DeFi Team", "role": "owner"}, + {"user_email": "dev@acme.example.com", "team_name": "DeFi Team", "role": "member"}, + {"user_email": "dev@acme.example.com", "team_name": "NFT Team", "role": "admin"}, + {"user_email": "admin@beta.example.com", "team_name": "Core Team", "role": "owner"} + ], + "contracts": [ + { + "contract_id": "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZ2KZ", + "name": "Stablecoin Swap", + "alias": "", + "description": "Automated market maker for stablecoin pairs", + "owner_email": "admin@acme.example.com", + "organization_slug": "acme-defi", + "team_name": "DeFi Team", + "abi_schema": { + "events": [ + {"name": "swap", "params": ["amount_in", "amount_out", "trader"]}, + {"name": "liquidity_added", "params": ["provider", "amount"]}, + {"name": "liquidity_removed", "params": ["provider", "amount"]} + ] + }, + "event_filter_type": "none", + "event_filter_list": [] + }, + { + "contract_id": "CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBZ2KZ", + "name": "Lending Protocol", + "alias": "Acme Lending", + "description": "Overcollateralized lending and borrowing", + "owner_email": "admin@acme.example.com", + "organization_slug": "acme-defi", + "team_name": "DeFi Team", + "abi_schema": { + "events": [ + {"name": "deposit", "params": ["user", "asset", "amount"]}, + {"name": "withdraw", "params": ["user", "asset", "amount"]}, + {"name": "borrow", "params": ["user", "asset", "amount"]}, + {"name": "repay", "params": ["user", "asset", "amount"]}, + {"name": "liquidate", "params": ["user", "asset", "amount"]} + ] + }, + "event_filter_type": "whitelist", + "event_filter_list": ["deposit", "withdraw", "borrow", "repay", "liquidate"] + }, + { + "contract_id": "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCZ2KZ", + "name": "NFT Collection", + "alias": "Beta NFTs", + "description": "NFT minting and marketplace contract", + "owner_email": "dev@acme.example.com", + "organization_slug": "acme-defi", + "team_name": "NFT Team", + "abi_schema": { + "events": [ + {"name": "mint", "params": ["token_id", "owner"]}, + {"name": "transfer", "params": ["from", "to", "token_id"]}, + {"name": "sale", "params": ["token_id", "price", "seller"]} + ] + }, + "event_filter_type": "none", + "event_filter_list": [] + }, + { + "contract_id": "CDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDZ2KZ", + "name": "Bridge Contract", + "alias": "", + "description": "Cross-chain asset bridge", + "owner_email": "admin@beta.example.com", + "organization_slug": "beta-labs", + "team_name": "Core Team", + "abi_schema": null, + "event_filter_type": "blacklist", + "event_filter_list": ["dust_event"] + }, + { + "contract_id": "CEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEZ2KZ", + "name": "Governance Token", + "alias": "GOV Token", + "description": "Protocol governance token with voting", + "owner_email": "admin@beta.example.com", + "organization_slug": "beta-labs", + "team_name": "Core Team", + "abi_schema": { + "events": [ + {"name": "transfer", "params": ["from", "to", "amount"]}, + {"name": "approval", "params": ["owner", "spender", "amount"]}, + {"name": "vote_cast", "params": ["voter", "proposal_id", "support"]} + ] + }, + "event_filter_type": "none", + "event_filter_list": [] + } + ], + "events": { + "per_contract": 20, + "contracts": ["CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZ2KZ", "CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBZ2KZ"], + "event_types": ["swap", "deposit", "withdraw", "borrow", "liquidity_added"], + "days_back": 14 + }, + "webhooks": [ + { + "contract_id": "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZ2KZ", + "event_type": "swap", + "target_url": "https://example.com/webhook/swaps", + "timeout_seconds": 10, + "retry_backoff_strategy": "exponential", + "retry_backoff_seconds": 60, + "signature_algorithm": "sha256" + }, + { + "contract_id": "CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBZ2KZ", + "event_type": "liquidation", + "target_url": "https://example.com/webhook/liquidations", + "timeout_seconds": 15, + "retry_backoff_strategy": "linear", + "retry_backoff_seconds": 30, + "signature_algorithm": "sha256" + }, + { + "contract_id": "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCZ2KZ", + "event_type": "", + "target_url": "https://example.com/webhook/nft-events", + "timeout_seconds": 10, + "retry_backoff_strategy": "fixed", + "retry_backoff_seconds": 120, + "signature_algorithm": "sha1" + } + ], + "webhook_delivery_logs": { + "per_webhook": 5, + "status_codes": [200, 200, 200, 500, 408] + }, + "alert_rules": [ + { + "contract_id": "CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBZ2KZ", + "name": "High Liquidation Alert", + "condition": {"op": "gt", "field": "amount", "value": 100000}, + "channels": [{"type": "slack", "target": "https://hooks.slack.com/services/TEST"}], + "action_type": "webhook", + "action_target": "https://example.com/alerts/liquidations" + }, + { + "contract_id": "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZ2KZ", + "name": "Large Swap Alert", + "condition": {"op": "gt", "field": "amount_in", "value": 50000}, + "channels": [], + "action_type": "email", + "action_target": "ops@acme.example.com" + } + ], + "api_keys": [ + { + "user_email": "admin@acme.example.com", + "team_name": "DeFi Team", + "name": "Production Key", + "tier": "enterprise" + }, + { + "user_email": "dev@acme.example.com", + "team_name": null, + "name": "Dev Key", + "tier": "free" + } + ] +} diff --git a/django-backend/soroscan/ingest/management/commands/bulk_import_metadata.py b/django-backend/soroscan/ingest/management/commands/bulk_import_metadata.py new file mode 100644 index 000000000..781a84e3d --- /dev/null +++ b/django-backend/soroscan/ingest/management/commands/bulk_import_metadata.py @@ -0,0 +1,332 @@ +""" +Management command: bulk_import_metadata + +Import contract metadata in bulk from CSV or JSON files. + +Supported fields: + contract_id Stellar contract address (C...) — REQUIRED + name Contract display name + description Free-text description + tags Comma-separated tags (CSV) or JSON array (JSON) + documentation_url + github_repo + team_email + +CSV format: + contract_id,name,description,tags,documentation_url,github_repo,team_email + CAAAA...,Stablecoin Swap,An AMM,"defi,amm,swap",https://...,https://...,team@acme.example.com + +JSON format: + { + "metadata": [ + { "contract_id": "CAAAA...", "name": "...", "tags": ["defi"] } + ] + } + +Features: + - Validation before import (dry-run) + - Rollback on first error (default) or skip-and-continue + - Detailed import report (created, updated, skipped, errors) + +Usage: + python manage.py bulk_import_metadata --input=contracts.csv --format=csv + python manage.py bulk_import_metadata --input=contracts.json --format=json + python manage.py bulk_import_metadata --input=contracts.csv --dry-run + python manage.py bulk_import_metadata --input=contracts.csv --on-error=skip +""" +import csv +import io +import json +from pathlib import Path +from typing import Any + +from django.core.management.base import BaseCommand, CommandError + +from soroscan.ingest.models import ContractMetadata, TrackedContract + + +class Command(BaseCommand): + help = "Bulk import contract metadata from CSV or JSON." + + def add_arguments(self, parser): + parser.add_argument( + "--input", + required=True, + help="Input file path (use - for stdin)", + ) + parser.add_argument( + "--format", + choices=["csv", "json"], + default=None, + help="Input format (auto-detected from extension if omitted)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Validate all rows without modifying the database", + ) + parser.add_argument( + "--on-error", + choices=["rollback", "skip"], + default="rollback", + help="Behavior on validation error: rollback entire batch or skip row", + ) + parser.add_argument( + "--encoding", + default="utf-8", + help="File encoding (default: utf-8)", + ) + parser.add_argument( + "--report", + default=None, + help="Write import report JSON to this file", + ) + + def handle(self, *args, **options): + input_path = options["input"] + fmt = options["format"] + dry_run = options["dry_run"] + on_error = options["on_error"] + encoding = options["encoding"] + report_path = options["report"] + + fmt = self._detect_format(input_path, fmt) + raw = self._load_input(input_path, encoding) + rows = self._parse_rows(raw, fmt) + + if not rows: + self.stdout.write("No metadata rows to import.") + return + + report = self._import_rows(rows, dry_run, on_error) + + self._print_report(report) + if report_path: + self._write_report(report, report_path) + + def _detect_format(self, input_path: str, fmt: str | None) -> str: + if fmt: + return fmt + if input_path == "-": + raise CommandError("--format is required when reading from stdin (-).") + ext = Path(input_path).suffix.lower() + if ext == ".csv": + return "csv" + if ext == ".json": + return "json" + raise CommandError( + f"Cannot auto-detect format for {input_path}. Use --format=csv or --format=json." + ) + + def _load_input(self, input_path: str, encoding: str) -> str: + if input_path == "-": + return self.stdin.read() + try: + with open(input_path, "r", encoding=encoding, newline="") as f: + return f.read() + except OSError as exc: + raise CommandError(f"Cannot read input file {input_path}: {exc}") + + def _parse_rows(self, raw: str, fmt: str) -> list[dict[str, Any]]: + if fmt == "csv": + return self._parse_csv(raw) + return self._parse_json(raw) + + def _parse_csv(self, raw: str) -> list[dict[str, Any]]: + try: + reader = csv.DictReader(io.StringIO(raw)) + rows = list(reader) + except csv.Error as exc: + raise CommandError(f"CSV parse error: {exc}") + if not rows or "contract_id" not in (reader.fieldnames or []): + raise CommandError("CSV file must have a 'contract_id' column.") + normalized = [] + for row in rows: + contract_id = (row.get("contract_id") or "").strip() + if not contract_id: + continue + tags_raw = (row.get("tags") or "").strip() + tags = [t.strip() for t in tags_raw.split(",") if t.strip()] if tags_raw else [] + normalized.append({ + "contract_id": contract_id, + "name": (row.get("name") or "").strip(), + "description": (row.get("description") or "").strip(), + "tags": tags, + "documentation_url": (row.get("documentation_url") or "").strip(), + "github_repo": (row.get("github_repo") or "").strip(), + "team_email": (row.get("team_email") or "").strip(), + }) + return normalized + + def _parse_json(self, raw: str) -> list[dict[str, Any]]: + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + raise CommandError(f"JSON parse error: {exc}") + if isinstance(data, list): + items = data + elif isinstance(data, dict): + items = data.get("metadata") or data.get("items") or [] + else: + raise CommandError("JSON must be an array or an object with 'metadata' key.") + if not isinstance(items, list): + raise CommandError("Expected a list of metadata entries in JSON.") + normalized = [] + for item in items: + if not isinstance(item, dict): + continue + contract_id = str(item.get("contract_id") or "").strip() + if not contract_id: + continue + tags = item.get("tags", []) + if isinstance(tags, str): + tags = [t.strip() for t in tags.split(",") if t.strip()] + normalized.append({ + "contract_id": contract_id, + "name": str(item.get("name") or "").strip(), + "description": str(item.get("description") or "").strip(), + "tags": tags, + "documentation_url": str(item.get("documentation_url") or "").strip(), + "github_repo": str(item.get("github_repo") or "").strip(), + "team_email": str(item.get("team_email") or "").strip(), + }) + return normalized + + def _import_rows(self, rows: list[dict[str, Any]], dry_run: bool, on_error: str) -> dict: + report = { + "mode": "dry-run" if dry_run else "live", + "on_error": on_error, + "total_rows": len(rows), + "created": 0, + "updated": 0, + "skipped_no_contract": 0, + "skipped_on_error": 0, + "errors": 0, + "error_details": [], + } + created_objs: list[ContractMetadata] = [] + updated_objs: list[tuple[ContractMetadata, dict]] = [] + + for idx, row in enumerate(rows, start=1): + try: + contract = TrackedContract.objects.filter(contract_id=row["contract_id"]).first() + if not contract: + report["skipped_no_contract"] += 1 + report["error_details"].append({ + "row": idx, + "contract_id": row["contract_id"], + "error": "TrackedContract not found", + }) + if on_error == "rollback": + self._rollback_created(created_objs, updated_objs) + raise CommandError( + f"Row {idx}: TrackedContract not found for {row['contract_id']}. " + "Rolling back." + ) + continue + + validated = self._validate_row(row) + if dry_run: + report["created" if not hasattr(contract, "contractmetadata") else "updated"] += 1 + continue + + defaults = { + "name": validated["name"], + "description": validated["description"], + "tags": validated["tags"], + "documentation_url": validated["documentation_url"] or None, + "github_repo": validated["github_repo"] or None, + "team_email": validated["team_email"] or None, + } + meta, created = ContractMetadata.objects.update_or_create( + contract=contract, + defaults=defaults, + ) + if created: + report["created"] += 1 + created_objs.append(meta) + else: + report["updated"] += 1 + updated_objs.append((meta, defaults)) + except CommandError: + raise + except Exception as exc: + report["errors"] += 1 + report["error_details"].append({ + "row": idx, + "contract_id": row.get("contract_id"), + "error": str(exc), + }) + if on_error == "rollback": + self._rollback_created(created_objs, updated_objs) + raise CommandError( + f"Row {idx}: {exc}. Rolling back import." + ) from exc + + return report + + def _validate_row(self, row: dict[str, Any]) -> dict[str, Any]: + validated = dict(row) + if not validated.get("contract_id"): + raise ValueError("contract_id is required") + if not validated.get("name"): + validated["name"] = validated["contract_id"] + if len(validated.get("name", "")) > 256: + raise ValueError("name exceeds max length of 256") + if len(validated.get("description", "")) > 5000: + raise ValueError("description exceeds max length of 5000") + tags = validated.get("tags", []) + if not isinstance(tags, list): + raise ValueError("tags must be a list") + if len(tags) > 100: + raise ValueError("tags exceeds max length of 100") + for field in ("documentation_url", "github_repo"): + value = validated.get(field, "") + if value and len(value) > 2048: + raise ValueError(f"{field} exceeds max length of 2048") + if validated.get("team_email") and len(validated["team_email"]) > 254: + raise ValueError("team_email exceeds max length of 254") + return validated + + def _rollback_created(self, created_objs: list, updated_objs: list): + for meta in reversed(created_objs): + try: + contract_id = meta.contract.contract_id + meta.delete() + self.stdout.write(self.style.WARNING(f" Rolled back created metadata for {contract_id}")) + except Exception: + pass + for meta, old_values in updated_objs: + try: + ContractMetadata.objects.filter(pk=meta.pk).update(**old_values) + self.stdout.write( + self.style.WARNING(f" Rolled back updated metadata for {meta.contract.contract_id}") + ) + except Exception: + pass + + def _print_report(self, report: dict): + self.stdout.write("") + self.stdout.write(self.style.SUCCESS("=== Import Report ===")) + self.stdout.write(f"Mode: {report['mode']}") + self.stdout.write(f"Total rows: {report['total_rows']}") + self.stdout.write(f"Created: {report['created']}") + self.stdout.write(f"Updated: {report['updated']}") + self.stdout.write(f"Skipped (no contract): {report['skipped_no_contract']}") + self.stdout.write(f"Skipped (on error): {report['skipped_on_error']}") + self.stdout.write(f"Errors: {report['errors']}") + if report["error_details"]: + self.stdout.write("") + self.stdout.write(self.style.ERROR("Errors:")) + for err in report["error_details"]: + self.stdout.write( + f" Row {err['row']} ({err.get('contract_id', '?')}): {err['error']}" + ) + + def _write_report(self, report: dict, path: str): + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + with open(out, "w", encoding="utf-8") as f: + json.dump(report, f, indent=2, default=str) + self.stdout.write("") + self.stdout.write(self.style.SUCCESS(f"Report written to {out}")) diff --git a/django-backend/soroscan/ingest/management/commands/replay_events.py b/django-backend/soroscan/ingest/management/commands/replay_events.py new file mode 100644 index 000000000..bae6ad328 --- /dev/null +++ b/django-backend/soroscan/ingest/management/commands/replay_events.py @@ -0,0 +1,292 @@ +""" +Management command: replay_events + +Replay ContractEvent records through webhook delivery for debugging and retesting. + +Usage: + python manage.py replay_events --contract=CA7N... + python manage.py replay_events --contract=CA7N... --event-type=swap + python manage.py replay_events --contract=CA7N... --from-ledger=100 --to-ledger=200 + python manage.py replay_events --contract=CA7N... --dry-run + python manage.py replay_events --contract=CA7N... --limit=10 + +Options: + --contract Contract ID (required) + --event-type Filter by event type + --from-ledger Start ledger (inclusive) + --to-ledger End ledger (inclusive) + --from-date Start date (ISO format) + --to-date End date (ISO format) + --limit Max events to replay (default: 100, 0=all) + --dry-run Preview without dispatching webhooks + --webhook-id Replay only to specific webhook subscription ID + --output-json Write delivery report to JSON file instead of stdout +""" +import json +from datetime import datetime +from pathlib import Path + +from django.core.management.base import BaseCommand, CommandError +from django.utils import timezone + +from soroscan.ingest.models import ( + ContractEvent, + TrackedContract, + WebhookSubscription, + WebhookDeliveryLog, +) + + +class Command(BaseCommand): + help = "Replay contract events through webhook delivery for debugging." + + def add_arguments(self, parser): + parser.add_argument( + "--contract", + required=True, + help="Contract ID to replay events for", + ) + parser.add_argument( + "--event-type", + default=None, + help="Filter by event type", + ) + parser.add_argument( + "--from-ledger", + type=int, + default=None, + help="Include events from this ledger (inclusive)", + ) + parser.add_argument( + "--to-ledger", + type=int, + default=None, + help="Include events up to this ledger (inclusive)", + ) + parser.add_argument( + "--from-date", + default=None, + help="Include events from this date (ISO format)", + ) + parser.add_argument( + "--to-date", + default=None, + help="Include events up to this date (ISO format)", + ) + parser.add_argument( + "--limit", + type=int, + default=100, + help="Max events to replay (default: 100, 0 = all)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview replay plan without dispatching", + ) + parser.add_argument( + "--webhook-id", + type=int, + default=None, + help="Replay only to specific webhook subscription ID", + ) + parser.add_argument( + "--output-json", + default=None, + help="Write delivery report to a JSON file", + ) + + def handle(self, *args, **options): + contract_id = options["contract"] + event_type = options["event_type"] + from_ledger = options["from_ledger"] + to_ledger = options["to_ledger"] + from_date = options["from_date"] + to_date = options["to_date"] + limit = options["limit"] + dry_run = options["dry_run"] + webhook_id = options["webhook_id"] + output_json = options["output_json"] + + if not TrackedContract.objects.filter(contract_id=contract_id).exists(): + raise CommandError(f"No TrackedContract found with contract_id={contract_id!r}") + + qs = ( + ContractEvent.objects.filter(contract__contract_id=contract_id) + .select_related("contract") + .order_by("ledger", "event_index", "timestamp") + ) + + if event_type: + qs = qs.filter(event_type=event_type) + if from_ledger is not None: + qs = qs.filter(ledger__gte=from_ledger) + if to_ledger is not None: + qs = qs.filter(ledger__lte=to_ledger) + if from_date: + try: + from_dt = datetime.fromisoformat(from_date) + if timezone.is_naive(from_dt): + from_dt = timezone.make_aware(from_dt) + qs = qs.filter(timestamp__gte=from_dt) + except ValueError as exc: + raise CommandError(f"Invalid --from-date: {exc}") + if to_date: + try: + to_dt = datetime.fromisoformat(to_date) + if timezone.is_naive(to_dt): + to_dt = timezone.make_aware(to_dt) + qs = qs.filter(timestamp__lte=to_dt) + except ValueError as exc: + raise CommandError(f"Invalid --to-date: {exc}") + + total = qs.count() + if limit > 0: + qs = qs[:limit] + + events = list(qs) + if not events: + self.stdout.write("No events found matching the filters.") + return + + self.stderr.write(f"Found {total} matching events, replaying {len(events)}...") + + if webhook_id: + try: + single_webhook = WebhookSubscription.objects.get(pk=webhook_id) + except WebhookSubscription.DoesNotExist: + raise CommandError(f"No WebhookSubscription found with id={webhook_id}") + if str(single_webhook.contract.contract_id) != str(contract_id): + raise CommandError(f"Webhook {webhook_id} does not belong to contract {contract_id}") + webhooks = [single_webhook] + else: + webhooks = list( + WebhookSubscription.objects.filter( + contract__contract_id=contract_id, + is_active=True, + ) + ) + if not webhooks: + self.stdout.write("No active webhooks found for this contract. Nothing to replay.") + return + + report = { + "contract_id": contract_id, + "mode": "dry-run" if dry_run else "live", + "filters": { + "event_type": event_type, + "from_ledger": from_ledger, + "to_ledger": to_ledger, + "from_date": from_date, + "to_date": to_date, + "limit": limit, + "webhook_id": webhook_id, + }, + "summary": { + "events_processed": 0, + "webhook_dispatches": 0, + "successes": 0, + "failures": 0, + "skipped": 0, + }, + "deliveries": [], + } + + for event in events: + report["summary"]["events_processed"] += 1 + for webhook in webhooks: + if event_type and webhook.event_type and webhook.event_type != event_type: + continue + if not webhook.should_ingest_event(event.event_type): + report["summary"]["skipped"] += 1 + continue + + if dry_run: + self.stdout.write( + f" [DRY-RUN] Would dispatch event {event.id} ({event.event_type}) " + f"to webhook {webhook.id} -> {webhook.target_url}" + ) + report["summary"]["webhook_dispatches"] += 1 + report["deliveries"].append({ + "event_id": event.id, + "event_type": event.event_type, + "ledger": event.ledger, + "timestamp": event.timestamp.isoformat() if event.timestamp else None, + "webhook_id": webhook.id, + "webhook_url": webhook.target_url, + "status": "dry-run", + }) + else: + dispatch_result = self._replay_dispatch(webhook, event) + report["summary"]["webhook_dispatches"] += 1 + if dispatch_result["success"]: + report["summary"]["successes"] += 1 + else: + report["summary"]["failures"] += 1 + report["deliveries"].append({ + "event_id": event.id, + "event_type": event.event_type, + "ledger": event.ledger, + "timestamp": event.timestamp.isoformat() if event.timestamp else None, + "webhook_id": webhook.id, + "webhook_url": webhook.target_url, + **dispatch_result, + }) + + # Print summary + self.stdout.write("") + self.stdout.write(self.style.SUCCESS("=== Replay Summary ===")) + self.stdout.write(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}") + self.stdout.write(f"Events processed: {report['summary']['events_processed']}") + self.stdout.write(f"Webhook dispatches: {report['summary']['webhook_dispatches']}") + self.stdout.write(f"Successes: {report['summary']['successes']}") + self.stdout.write(f"Failures: {report['summary']['failures']}") + self.stdout.write(f"Skipped: {report['summary']['skipped']}") + + if output_json: + self._write_report(report, output_json) + elif not dry_run: + self.stdout.write("") + self.stdout.write("Detailed delivery log (last 10):") + for delivery in report["deliveries"][-10:]: + status = delivery["status"] + status_code = delivery.get("status_code") + if "success" in status: + style = self.style.SUCCESS + elif "fail" in status or (status_code and status_code >= 400): + style = self.style.ERROR + else: + def style(x): return x + self.stdout.write( + f" Event {delivery['event_id']} ({delivery['event_type']}@{delivery['ledger']}) " + f"-> Webhook {delivery['webhook_id']}: {style(status)} {status_code or ''}".rstrip() + ) + + def _replay_dispatch(self, webhook: WebhookSubscription, event: ContractEvent) -> dict: + try: + from soroscan.ingest.tasks import dispatch_webhook + except ImportError: + return {"success": False, "status": "error", "status_code": None, "error": "Celery tasks unavailable"} + + try: + result = dispatch_webhook.apply(args=[webhook.id, event.id]) + if result.successful(): + return {"success": True, "status": "success", "status_code": None, "error": ""} + else: + exc = result.result + error_msg = str(exc) if exc else "Task failed" + attempt = WebhookDeliveryLog.objects.filter( + subscription=webhook, + event=event, + ).order_by("-attempt_number").first() + status_code = getattr(attempt, "status_code", None) + return {"success": False, "status": "failed", "status_code": status_code, "error": error_msg} + except Exception as exc: + return {"success": False, "status": "error", "status_code": None, "error": str(exc)} + + def _write_report(self, report: dict, path: str): + out_path = Path(path) + out_path.parent.mkdir(parents=True, exist_ok=True) + with open(out_path, "w", encoding="utf-8") as f: + json.dump(report, f, indent=2, default=str) + self.stdout.write(self.style.SUCCESS(f"Report written to {out_path}")) diff --git a/django-backend/soroscan/ingest/management/commands/seed_database.py b/django-backend/soroscan/ingest/management/commands/seed_database.py new file mode 100644 index 000000000..ccbe6256b --- /dev/null +++ b/django-backend/soroscan/ingest/management/commands/seed_database.py @@ -0,0 +1,495 @@ +""" +Management command: seed_database + +Populates the development database with realistic test data from fixture files. + +Usage: + python manage.py seed_database + python manage.py seed_database --fixture=fixtures/custom.json + python manage.py seed_database --scenario=minimal + python manage.py seed_database --clear + +Scenarios: + default - Full dataset (organizations, teams, users, contracts, events, webhooks) + minimal - Single user, one organization, three contracts, no events + webhook - Contracts with multiple webhook subscriptions and delivery logs + +Clear mode removes all seeded data (users with is_staff=False and emails ending in @example.com). +""" +import json +import random +import string +from datetime import timedelta +from pathlib import Path + +from django.core.management.base import BaseCommand, CommandError +from django.utils import timezone as django_tz + +from soroscan.ingest.models import ( + AlertExecution, + AlertRule, + ContractEvent, + ContractInvocation, + ContractMetadata, + ContractSource, + ContractVerification, + Organization, + OrganizationMembership, + Team, + TeamMembership, + TrackedContract, + WebhookDeliveryLog, + WebhookSubscription, +) + +FIXTURES_DIR = Path(__file__).resolve().parent.parent.parent.parent / "fixtures" + + +def _random_hex(length=64): + return "".join(random.choices("0123456789abcdef", k=length)) + + +def _random_address(): + return "G" + "".join(random.choices(string.ascii_uppercase + string.digits, k=55)) + + +def _now(): + return django_tz.now() + + +class Command(BaseCommand): + help = "Seed the development database with realistic test data." + + def add_arguments(self, parser): + parser.add_argument( + "--fixture", + default=None, + help="Path to JSON fixture file (default: fixtures/development.json)", + ) + parser.add_argument( + "--scenario", + choices=["default", "minimal", "webhook"], + default="default", + help="Predefined scenario to seed (default: default)", + ) + parser.add_argument( + "--clear", + action="store_true", + help="Remove all seeded data before seeding", + ) + + def handle(self, *args, **options): + fixture_path = options["fixture"] + scenario = options["scenario"] + clear = options["clear"] + + if fixture_path: + data = self._load_fixture(Path(fixture_path)) + else: + fixture_file = FIXTURES_DIR / "development.json" + if fixture_file.exists(): + data = self._load_fixture(fixture_file) + else: + self.stdout.write( + self.style.WARNING( + f"Fixture file not found at {fixture_file}, using built-in {scenario} scenario." + ) + ) + data = self._get_scenario(scenario) + + if clear: + self._clear_seeded_data() + + self.stdout.write("Seeding database...") + self._seed(data) + self.stdout.write(self.style.SUCCESS("Database seeded successfully.")) + + def _load_fixture(self, path: Path) -> dict: + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except json.JSONDecodeError as exc: + raise CommandError(f"Invalid JSON in fixture file {path}: {exc}") + + def _get_scenario(self, scenario: str) -> dict: + if scenario == "minimal": + return { + "organizations": [ + { + "name": "Minimal Org", + "slug": "minimal-org", + "owner_email": "admin@example.com", + "settings": {}, + "quota": 1000, + } + ], + "teams": [], + "users": [ + { + "email": "admin@example.com", + "password": "testpass123", + "first_name": "Admin", + "last_name": "User", + } + ], + "memberships": [ + {"user_email": "admin@example.com", "organization_slug": "minimal-org", "role": "owner"} + ], + "team_memberships": [], + "contracts": [ + { + "contract_id": "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZ2KZ", + "name": "Token Contract", + "alias": "Simple Token", + "description": "Basic token contract", + "owner_email": "admin@example.com", + "organization_slug": "minimal-org", + "team_name": None, + "abi_schema": None, + "event_filter_type": "none", + "event_filter_list": [], + }, + { + "contract_id": "CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBZ2KZ", + "name": "Exchange", + "alias": "Mini DEX", + "description": "Simple exchange", + "owner_email": "admin@example.com", + "organization_slug": "minimal-org", + "team_name": None, + "abi_schema": {"events": [{"name": "swap", "params": ["in", "out", "trader"]}]}, + "event_filter_type": "none", + "event_filter_list": [], + }, + { + "contract_id": "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCZ2KZ", + "name": "Governor", + "alias": "Voting", + "description": "Governance contract", + "owner_email": "admin@example.com", + "organization_slug": "minimal-org", + "team_name": None, + "abi_schema": None, + "event_filter_type": "none", + "event_filter_list": [], + }, + ], + "events": None, + "webhooks": [], + "webhook_delivery_logs": None, + "alert_rules": [], + "api_keys": [], + } + if scenario == "webhook": + base = self._get_scenario("default") + base["webhooks"] = [ + { + "contract_id": "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZ2KZ", + "event_type": "swap", + "target_url": "https://httpbin.org/post", + "timeout_seconds": 5, + "retry_backoff_strategy": "exponential", + "retry_backoff_seconds": 30, + "signature_algorithm": "sha256", + }, + { + "contract_id": "CBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBZ2KZ", + "event_type": "", + "target_url": "https://httpbin.org/post", + "timeout_seconds": 10, + "retry_backoff_strategy": "fixed", + "retry_backoff_seconds": 60, + "signature_algorithm": "sha256", + }, + ] + base["webhook_delivery_logs"] = {"per_webhook": 8, "status_codes": [200, 200, 201, 500, 408, 200, 500, 200]} + return base + return self._get_scenario("default") + + def _clear_seeded_data(self): + self.stdout.write("Clearing seeded data...") + WebhookDeliveryLog.objects.filter( + subscription__contract__owner__email__endswith="@example.com" + ).delete() + WebhookSubscription.objects.filter( + contract__owner__email__endswith="@example.com" + ).delete() + AlertExecution.objects.filter( + rule__contract__owner__email__endswith="@example.com" + ).delete() + AlertRule.objects.filter( + contract__owner__email__endswith="@example.com" + ).delete() + ContractEvent.objects.filter( + contract__owner__email__endswith="@example.com" + ).delete() + ContractInvocation.objects.filter( + contract__owner__email__endswith="@example.com" + ).delete() + ContractMetadata.objects.filter( + contract__owner__email__endswith="@example.com" + ).delete() + ContractSource.objects.filter( + contract__owner__email__endswith="@example.com" + ).delete() + ContractVerification.objects.filter( + contract__owner__email__endswith="@example.com" + ).delete() + TrackedContract.objects.filter( + owner__email__endswith="@example.com" + ).delete() + TeamMembership.objects.filter( + user__email__endswith="@example.com" + ).delete() + Team.objects.filter( + created_by__email__endswith="@example.com" + ).delete() + OrganizationMembership.objects.filter( + user__email__endswith="@example.com" + ).delete() + Organization.objects.filter( + owner__email__endswith="@example.com" + ).delete() + from django.contrib.auth import get_user_model + + User = get_user_model() + User.objects.filter(email__endswith="@example.com", is_staff=False).delete() + self.stdout.write(self.style.SUCCESS("Cleared seeded data.")) + + def _seed(self, data: dict): + users = self._seed_users(data.get("users", [])) + orgs = self._seed_organizations(data.get("organizations", []), users) + teams = self._seed_teams(data.get("teams", []), orgs, users) + self._seed_memberships(data.get("memberships", []), orgs, users) + self._seed_team_memberships(data.get("team_memberships", []), teams, users) + contracts = self._seed_contracts(data.get("contracts", []), orgs, teams, users) + self._seed_events(data.get("events"), contracts) + self._seed_webhooks(data.get("webhooks", []), contracts) + self._seed_webhook_delivery_logs(data.get("webhook_delivery_logs"), contracts) + self._seed_alert_rules(data.get("alert_rules", []), contracts, users) + + def _seed_users(self, users_data: list) -> dict: + from django.contrib.auth import get_user_model + + User = get_user_model() + users = {} + for ud in users_data: + email = ud["email"] + user, created = User.objects.get_or_create( + email=email, + defaults={ + "username": email, + "first_name": ud.get("first_name", ""), + "last_name": ud.get("last_name", ""), + "is_staff": False, + "is_superuser": False, + }, + ) + if created and ud.get("password"): + user.set_password(ud["password"]) + user.save() + users[email] = user + return users + + def _seed_organizations(self, orgs_data: list, users: dict) -> dict: + orgs = {} + for od in orgs_data: + owner_email = od["owner_email"] + owner = users.get(owner_email) + if not owner: + continue + org, created = Organization.objects.get_or_create( + slug=od["slug"], + defaults={ + "name": od["name"], + "owner": owner, + "settings": od.get("settings", {}), + "quota": od.get("quota", 0), + }, + ) + orgs[od["slug"]] = org + return orgs + + def _seed_teams(self, teams_data: list, orgs: dict, users: dict) -> dict: + teams = {} + for td in teams_data: + org_slug = td.get("organization_slug") + org = orgs.get(org_slug) + if not org: + continue + creator_email = td.get("created_by_email") + creator = users.get(creator_email) if creator_email else None + team, created = Team.objects.get_or_create( + slug=f"{org.slug}-{td['name'].lower().replace(' ', '-')}", + defaults={ + "name": td["name"], + "organization": org, + "created_by": creator, + }, + ) + teams[td["name"]] = team + return teams + + def _seed_memberships(self, memberships_data: list, orgs: dict, users: dict): + for md in memberships_data: + user = users.get(md["user_email"]) + org = orgs.get(md["organization_slug"]) + if not user or not org: + continue + OrganizationMembership.objects.get_or_create( + organization=org, + user=user, + defaults={"role": md.get("role", "member")}, + ) + + def _seed_team_memberships(self, memberships_data: list, teams: dict, users: dict): + for md in memberships_data: + user = users.get(md["user_email"]) + team = teams.get(md["team_name"]) + if not user or not team: + continue + TeamMembership.objects.get_or_create( + team=team, + user=user, + defaults={"role": md.get("role", "member")}, + ) + + def _seed_contracts(self, contracts_data: list, orgs: dict, teams: dict, users: dict) -> dict: + contracts = {} + for cd in contracts_data: + owner = users.get(cd.get("owner_email")) + if not owner: + continue + org = orgs.get(cd.get("organization_slug")) if cd.get("organization_slug") else None + team = teams.get(cd.get("team_name")) if cd.get("team_name") else None + contract, created = TrackedContract.objects.get_or_create( + contract_id=cd["contract_id"], + defaults={ + "name": cd["name"], + "alias": cd.get("alias", ""), + "description": cd.get("description", ""), + "owner": owner, + "organization": org, + "team": team, + "abi_schema": cd.get("abi_schema"), + "event_filter_type": cd.get("event_filter_type", "none"), + "event_filter_list": cd.get("event_filter_list", []), + "is_active": True, + "deprecation_status": "active", + }, + ) + contracts[cd["contract_id"]] = contract + return contracts + + def _seed_events(self, events_config, contracts: dict): + if not events_config: + return + per_contract = events_config.get("per_contract", 20) + contract_ids = events_config.get("contracts", list(contracts.keys())) + event_types = events_config.get("event_types", ["event_default"]) + days_back = events_config.get("days_back", 14) + + for contract_id in contract_ids: + contract = contracts.get(contract_id) + if not contract: + continue + base_time = _now() - timedelta(days=days_back) + for i in range(per_contract): + event_type = random.choice(event_types) + ledger = 100000 + i + random.randint(0, 5000) + timestamp = base_time + timedelta( + minutes=random.randint(0, days_back * 24 * 60) + ) + payload = { + "amount": random.randint(1, 1000000), + "trader": _random_address(), + "token_id": random.randint(1, 1000), + } + try: + event = ContractEvent( + contract=contract, + event_type=event_type, + schema_version=1, + validation_status="passed", + payload=payload, + ledger=ledger, + event_index=i % 5, + timestamp=timestamp, + tx_hash=_random_hex(64), + raw_xdr="", + decoding_status="no_abi", + signature_status="missing", + ) + event.save() + except Exception: + pass + + def _seed_webhooks(self, webhooks_data: list, contracts: dict): + for wd in webhooks_data: + contract = contracts.get(wd["contract_id"]) + if not contract: + continue + try: + WebhookSubscription.objects.create( + contract=contract, + event_type=wd.get("event_type", ""), + target_url=wd["target_url"], + timeout_seconds=wd.get("timeout_seconds", 10), + retry_backoff_strategy=wd.get("retry_backoff_strategy", "exponential"), + retry_backoff_seconds=wd.get("retry_backoff_seconds", 60), + signature_algorithm=wd.get("signature_algorithm", "sha256"), + ) + except Exception: + pass + + def _seed_webhook_delivery_logs(self, logs_config, contracts: dict): + if not logs_config: + return + per_webhook = logs_config.get("per_webhook", 5) + status_codes = logs_config.get("status_codes", [200]) + webhooks = list(WebhookSubscription.objects.filter(contract__in=contracts.values())) + for webhook in webhooks: + events = list( + ContractEvent.objects.filter(contract=webhook.contract).order_by("?")[:per_webhook] + ) + for idx, event in enumerate(events, start=1): + status_code = status_codes[idx % len(status_codes)] + success = 200 <= status_code < 300 + try: + WebhookDeliveryLog.objects.create( + subscription=webhook, + event=event, + attempt_number=idx, + status_code=status_code, + success=success, + error="" if success else f"HTTP {status_code}", + payload_bytes=random.randint(200, 2000), + ) + except Exception: + pass + + def _seed_alert_rules(self, rules_data: list, contracts: dict, users: dict): + for rd in rules_data: + contract = contracts.get(rd["contract_id"]) + if not contract: + continue + try: + rule = AlertRule.objects.create( + contract=contract, + name=rd["name"], + condition=rd.get("condition", {}), + channels=rd.get("channels", []), + action_type=rd.get("action_type", "webhook"), + action_target=rd.get("action_target", ""), + ) + events = list(ContractEvent.objects.filter(contract=contract)[:2]) + for event in events: + AlertExecution.objects.create( + rule=rule, + event=event, + channel=rd.get("action_type", "webhook"), + status=random.choice(["sent", "sent", "failed"]), + response="OK" if random.random() > 0.3 else "Timeout", + ) + except Exception: + pass diff --git a/scripts/backup/README.md b/scripts/backup/README.md index 1de11377a..fb7def414 100644 --- a/scripts/backup/README.md +++ b/scripts/backup/README.md @@ -100,6 +100,21 @@ Or add to your Helm chart's `postgresql.conf` override. --- +## Automated Backup Testing + +`test_backup.py` validates backup integrity and performs restoration tests. + +```bash +python test_backup.py full-test +python test_backup.py verify-only +python test_backup.py restore-only --s3-key=pg-backups/ +python test_backup.py --runbook +``` + +Set `SLACK_WEBHOOK_URL` to receive alerts on failure. + +--- + ## Environment Variables | Variable | Required | Description | diff --git a/scripts/backup/test_backup.py b/scripts/backup/test_backup.py new file mode 100644 index 000000000..68abd4764 --- /dev/null +++ b/scripts/backup/test_backup.py @@ -0,0 +1,374 @@ +#!/usr/bin/env python3 +""" +Backup verification and restoration testing for SoroScan. + +Features: + - Backup verification (checks S3 backup exists, size, pg_dump integrity) + - Restoration testing (downloads backup and verifies it restores) + - RTO/RPO metrics tracking + - Alerting hooks (Slack/webhook) on failure + - Runbook output for manual restore steps + +Usage (locally): + python scripts/backup/test_backup.py --bucket=soroscan-backups --prefix=pg-backups --db-url=postgresql://... + + python scripts/backup/test_backup.py verify-only + python scripts/backup/test_backup.py restore-only --s3-key=pg-backups/20250101.dump.gz + python scripts/backup/test_backup.py full-test + +Environment variables: + DATABASE_URL Target database for restore verification + S3_BUCKET Backup bucket name + S3_PREFIX Backup key prefix + AWS_ACCESS_KEY_ID (optional, uses default credential chain) + AWS_SECRET_ACCESS_KEY + AWS_DEFAULT_REGION + SLACK_WEBHOOK_URL (optional) alert destination +""" +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import tempfile +import time +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +try: + import boto3 +except ImportError: + boto3 = None + + +REQUIRED_ENV = ["DATABASE_URL", "S3_BUCKET", "S3_PREFIX"] + + +@dataclass +class TestResult: + test_name: str + passed: bool + duration_seconds: float + details: str = "" + metadata: dict = field(default_factory=dict) + + def to_dict(self): + return asdict(self) + + +@dataclass +class BackupTestReport: + timestamp: str + bucket: str + prefix: str + verify_result: Optional[TestResult] = None + restore_result: Optional[TestResult] = None + rto_seconds: Optional[float] = None + rpo_seconds: Optional[float] = None + overall_passed: bool = False + + def to_dict(self): + return { + "timestamp": self.timestamp, + "bucket": self.bucket, + "prefix": self.prefix, + "verify": self.verify_result, + "restore": self.restore_result, + "rto_seconds": self.rto_seconds, + "rpo_seconds": self.rpo_seconds, + "overall_passed": self.overall_passed, + } + + +class BackupTester: + def __init__( + self, + db_url: str, + bucket: str, + prefix: str, + s3_key: str | None = None, + restore_db_url: str | None = None, + slack_webhook: str | None = None, + region: str = "us-east-1", + ): + self.db_url = db_url + self.bucket = bucket + self.prefix = prefix + self.s3_key = s3_key + self.restore_db_url = restore_db_url or db_url + self.slack_webhook = slack_webhook + self.region = region + self.s3_client = None + if boto3: + self.s3_client = boto3.client("s3", region_name=region) + + def latest_backup_key(self) -> str: + if self.s3_key: + return self.s3_key + if not self.s3_client: + raise RuntimeError("boto3 is not installed. Install it or pass --s3-key.") + + response = self.s3_client.list_objects_v2( + Bucket=self.bucket, Prefix=f"{self.prefix}/" + ) + contents = response.get("Contents", []) + if not contents: + raise RuntimeError(f"No backups found in s3://{self.bucket}/{self.prefix}/") + + contents.sort(key=lambda obj: obj["LastModified"], reverse=True) + return contents[0]["Key"] + + def verify_backup(self, s3_key: str) -> TestResult: + start = time.monotonic() + try: + if not self.s3_client: + raise RuntimeError("boto3 not installed") + + head = self.s3_client.head_object(Bucket=self.bucket, Key=s3_key) + size_bytes = head.get("ContentLength", 0) + + if size_bytes == 0: + return TestResult( + test_name="verify_backup", + passed=False, + duration_seconds=time.monotonic() - start, + details="Backup file is 0 bytes.", + metadata={"s3_key": s3_key, "size_bytes": 0}, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + local_path = Path(tmpdir) / Path(s3_key).name + self.s3_client.download_file(self.bucket, s3_key, str(local_path)) + + result = subprocess.run( + ["pg_restore", "--list", str(local_path)], + capture_output=True, + text=True, + ) + if result.returncode != 0: + return TestResult( + test_name="verify_backup", + passed=False, + duration_seconds=time.monotonic() - start, + details=f"pg_restore --list failed: {result.stderr}", + metadata={"s3_key": s3_key, "size_bytes": size_bytes}, + ) + + archive_ok = result.stdout.strip() != "" + + return TestResult( + test_name="verify_backup", + passed=True, + duration_seconds=time.monotonic() - start, + details=f"Backup verified. Size={size_bytes} bytes.", + metadata={"s3_key": s3_key, "size_bytes": size_bytes, "archive_valid": archive_ok}, + ) + except Exception as exc: + return TestResult( + test_name="verify_backup", + passed=False, + duration_seconds=time.monotonic() - start, + details=str(exc), + metadata={"s3_key": s3_key}, + ) + + def restore_backup(self, s3_key: str) -> TestResult: + start = time.monotonic() + try: + self.run_restore_script(s3_key) + elapsed = time.monotonic() - start + return TestResult( + test_name="restore_backup", + passed=True, + duration_seconds=elapsed, + details=f"Restored backup {s3_key} successfully.", + metadata={"s3_key": s3_key}, + ) + except Exception as exc: + return TestResult( + test_name="restore_backup", + passed=False, + duration_seconds=time.monotonic() - start, + details=str(exc), + metadata={"s3_key": s3_key}, + ) + + def run_restore_script(self, s3_key: str): + script_path = Path(__file__).resolve().parent / "pg_restore.sh" + if not script_path.exists(): + raise RuntimeError(f"Restore script not found: {script_path}") + + env = os.environ.copy() + env.update({ + "DATABASE_URL": self.restore_db_url, + "S3_BUCKET": self.bucket, + "S3_PREFIX": self.prefix, + "AWS_DEFAULT_REGION": self.region, + }) + + result = subprocess.run( + ["bash", str(script_path), s3_key], + env=env, + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise RuntimeError( + f"Restore script failed (exit {result.returncode}): {result.stderr}" + ) + + def compute_rpo(self, s3_key: str) -> float: + try: + timestamp_part = Path(s3_key).stem.replace(".dump", "") + for fmt in ("%Y%m%dT%H%M%SZ", "%Y-%m-%dT%H:%M:%SZ"): + try: + backup_time = datetime.strptime(timestamp_part, fmt).replace(tzinfo=timezone.utc) + break + except ValueError: + continue + else: + return 0.0 + return (datetime.now(timezone.utc) - backup_time).total_seconds() + except Exception: + return 0.0 + + def run_full_test(self) -> BackupTestReport: + now = datetime.now(timezone.utc).isoformat() + report = BackupTestReport(timestamp=now, bucket=self.bucket, prefix=self.prefix) + + s3_key = self.latest_backup_key() + report.verify_result = self.verify_backup(s3_key) + if report.verify_result.passed: + report.restore_result = self.restore_backup(s3_key) + if report.restore_result.passed: + report.rto_seconds = report.restore_result.duration_seconds + report.rpo_seconds = self.compute_rpo(s3_key) + + report.overall_passed = ( + report.verify_result.passed and (report.restore_result is None or report.restore_result.passed) + ) + self._maybe_alert(report) + return report + + def _maybe_alert(self, report: BackupTestReport): + if not report.overall_passed and self.slack_webhook: + try: + import requests + + text = ( + f":warning: SoroScan backup test *FAILED*\\n" + f"Bucket: `{report.bucket}`\\n" + f"Prefix: `{report.prefix}`\\n" + f"Verify: {report.verify_result.passed if report.verify_result else 'N/A'}\\n" + f"Restore: {report.restore_result.passed if report.restore_result else 'N/A'}" + ) + requests.post(self.slack_webhook, json={"text": text}, timeout=10) + except Exception: + pass + + def print_runbook(self): + self.stdout.write("") + self.stdout.write(self.style.SUCCESS("=== Manual Restore Runbook ===")) + self.stdout.write("") + self.stdout.write("Prerequisites:") + self.stdout.write(" - AWS CLI configured with access to the backup bucket") + self.stdout.write(" - pg_restore installed") + self.stdout.write(" - Target database credentials (DATABASE_URL)") + self.stdout.write("") + self.stdout.write("Steps:") + self.stdout.write(" 1. Find the latest backup:") + self.stdout.write(f" aws s3 ls s3://{self.bucket}/{self.prefix}/ | sort | tail -n 5") + self.stdout.write("") + self.stdout.write(" 2. Execute the restore script:") + self.stdout.write(f" bash scripts/backup/pg_restore.sh ") + self.stdout.write("") + self.stdout.write(" 3. Verify the database:") + self.stdout.write(" psql $DATABASE_URL -c 'SELECT count(*) FROM ingest_trackedcontract;'") + self.stdout.write("") + self.stdout.write(" 4. Re-run migrations if needed:") + self.stdout.write(" cd django-backend && python manage.py migrate") + self.stdout.write("") + self.stdout.write("Rollback:") + self.stdout.write(" - Restore from the previous backup (repeat step 2 with the prior S3_KEY).") + self.stdout.write("") + + +def main(): + parser = argparse.ArgumentParser(description="SoroScan backup verification and restoration testing") + parser.add_argument("mode", nargs="?", default="full-test", choices=["full-test", "verify-only", "restore-only"]) + parser.add_argument("--bucket", default=os.environ.get("S3_BUCKET", "soroscan-backups")) + parser.add_argument("--prefix", default=os.environ.get("S3_PREFIX", "pg-backups")) + parser.add_argument("--db-url", default=os.environ.get("DATABASE_URL", "")) + parser.add_argument("--restore-db-url", default=None) + parser.add_argument("--s3-key", default=None) + parser.add_argument("--region", default=os.environ.get("AWS_DEFAULT_REGION", "us-east-1")) + parser.add_argument("--slack-webhook", default=os.environ.get("SLACK_WEBHOOK_URL")) + parser.add_argument("--output", default=None, help="Write report JSON to this file") + parser.add_argument("--runbook", action="store_true", help="Print manual restore runbook") + args = parser.parse_args() + + if args.runbook: + tester = BackupTester( + db_url=args.db_url, + bucket=args.bucket, + prefix=args.prefix, + region=args.region, + slack_webhook=args.slack_webhook, + ) + tester.print_runbook() + return + + if not args.db_url: + parser.error("--db-url or DATABASE_URL is required") + + tester = BackupTester( + db_url=args.db_url, + bucket=args.bucket, + prefix=args.prefix, + s3_key=args.s3_key, + restore_db_url=args.restore_db_url, + region=args.region, + slack_webhook=args.slack_webhook, + ) + + if args.mode == "verify-only": + s3_key = tester.latest_backup_key() if not args.s3_key else args.s3_key + result = tester.verify_backup(s3_key) + report = BackupTestReport( + timestamp=datetime.now(timezone.utc).isoformat(), + bucket=args.bucket, + prefix=args.prefix, + verify_result=result, + overall_passed=result.passed, + ) + elif args.mode == "restore-only": + s3_key = args.s3_key or tester.latest_backup_key() + result = tester.restore_backup(s3_key) + report = BackupTestReport( + timestamp=datetime.now(timezone.utc).isoformat(), + bucket=args.bucket, + prefix=args.prefix, + restore_result=result, + overall_passed=result.passed, + ) + else: + report = tester.run_full_test() + + summary = report.to_dict() + json.dump(summary, sys.stdout, indent=2, default=str) + sys.stdout.write("\\n") + + if report.overall_passed: + print("Backup test PASSED", file=sys.stderr) + sys.exit(0) + else: + print("Backup test FAILED", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main()