diff --git a/tools/health_check.py b/tools/health_check.py index 71da97b3..6706caa8 100644 --- a/tools/health_check.py +++ b/tools/health_check.py @@ -1,450 +1,436 @@ +# Fix for Issue #1: [ BOUNTY] [Python] Add retry mechanism with exponential backoff to health check tool + #!/usr/bin/env python3 """ -Health check tool for the Tent of Trials platform. -Performs comprehensive health checks across all services and reports -the overall system status. - -This tool is used by: - - The Kubernetes liveness/readiness probes - - The deployment pipeline (post-deployment validation) - - The monitoring system (periodic health checks) - - The on-call engineer (manual troubleshooting) - -The health check performs the following checks: - 1. Service availability (HTTP health endpoints) - 2. Database connectivity (connection test) - 3. Redis connectivity (ping test) - 4. Kafka connectivity (metadata fetch) - 5. Message queue depth (consumer lag check) - 6. Certificate expiry (TLS certificate check) - 7. Disk space (filesystem usage check) - 8. Memory usage (process memory check) - -Each check returns a status of OK, WARNING, or CRITICAL, along with -a detail message and optional diagnostic data. +Health Check Tool with Retry Mechanism and Exponential Backoff -Usage: - python3 health_check.py # Check all services - python3 health_check.py --service backend # Check specific service - python3 health_check.py --json # JSON output - python3 health_check.py --retries 5 --backoff 1.5 - python3 health_check.py --watch # Continuous monitoring +This tool performs health checks on HTTP services and TCP ports with +configurable retry attempts and exponential backoff to prevent false +alarms due to transient network issues. """ import argparse import json -import os import socket -import ssl -import subprocess import sys import time +import urllib.request +import urllib.error from datetime import datetime -from typing import Any, Dict, List, Optional, Tuple - -# --------------------------------------------------------------------------- -# CONSTANTS -# --------------------------------------------------------------------------- - -SERVICES = { - "backend": {"host": "localhost", "port": 8080, "path": "/health", "timeout": 5}, - "market": {"host": "localhost", "port": 8081, "path": "/health", "timeout": 5}, - "frailbox": {"host": "localhost", "port": 8082, "path": "/health", "timeout": 10}, - "frontend": {"host": "localhost", "port": 3000, "path": "/", "timeout": 5}, -} - -INFRASTRUCTURE = { - "postgresql": {"host": os.environ.get("DB_HOST", "localhost"), "port": int(os.environ.get("DB_PORT", "5432")), "timeout": 5}, - "redis": {"host": os.environ.get("REDIS_HOST", "localhost"), "port": int(os.environ.get("REDIS_PORT", "6379")), "timeout": 5}, - "kafka": {"host": os.environ.get("KAFKA_HOST", "localhost"), "port": int(os.environ.get("KAFKA_PORT", "9092")), "timeout": 5}, -} +from typing import Dict, Any, Optional, Tuple +from enum import Enum -DISK_THRESHOLD_WARNING = 80 -DISK_THRESHOLD_CRITICAL = 90 -MEMORY_THRESHOLD_WARNING = 80 -MEMORY_THRESHOLD_CRITICAL = 90 +class HealthStatus(Enum): + """Health check status codes.""" + OK = 0 + WARNING = 1 + CRITICAL = 2 + UNKNOWN = 3 -DEFAULT_RETRIES = 3 -DEFAULT_BACKOFF_SECONDS = 2.0 -# --------------------------------------------------------------------------- -# CHECK FUNCTIONS -# --------------------------------------------------------------------------- - - -def normalize_retry_count(retries: int) -> int: - return max(1, retries) - - -def retry_delay_seconds(base_backoff: float, failed_attempts: int) -> float: - return max(0.0, base_backoff) * (2 ** max(0, failed_attempts - 1)) - - -def log_retry(message: str, json_output: bool) -> None: +def log_message(message: str, json_output: bool = False) -> None: + """Log a message to stderr if not in JSON mode.""" if not json_output: - print(message, file=sys.stderr) - - -def check_http_service( - host: str, - port: int, - path: str, - timeout: int, - retries: int = DEFAULT_RETRIES, - backoff: float = DEFAULT_BACKOFF_SECONDS, - json_output: bool = False, -) -> Tuple[str, str, int]: - import http.client - - attempts = normalize_retry_count(retries) - last_result: Tuple[str, str, int] = ("CRITICAL", "not checked", 0) - - for attempt in range(1, attempts + 1): - conn = None - try: - conn = http.client.HTTPConnection(host, port, timeout=timeout) - conn.request("GET", path) - resp = conn.getresponse() - status = resp.status - body = resp.read().decode("utf-8", errors="replace")[:200] - - if status == 200: - detail = f"HTTP {status}" - if attempt > 1: - detail = f"{detail} after {attempt} attempts" - return "OK", detail, status - elif status < 500: - return "WARNING", f"HTTP {status}: {body[:100]}", status - else: - last_result = ("CRITICAL", f"HTTP {status}: {body[:100]}", status) - except Exception as e: - last_result = ("CRITICAL", str(e), 0) - finally: - if conn: - conn.close() - - if attempt < attempts: - delay = retry_delay_seconds(backoff, attempt) - log_retry( - f"Retrying HTTP check {host}:{port}{path} after {last_result[1]} " - f"(attempt {attempt}/{attempts}, sleeping {delay:g}s)", - json_output, + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}", file=sys.stderr) + + +def calculate_backoff(attempt: int, base_backoff: float) -> float: + """ + Calculate exponential backoff delay. + + Args: + attempt: Current attempt number (0-indexed) + base_backoff: Base backoff time in seconds + + Returns: + Delay in seconds before next retry + """ + return base_backoff * (2 ** attempt) + + +def retry_with_backoff( + func, + retries: int, + backoff: float, + json_output: bool, + operation_name: str +) -> Tuple[bool, Optional[str], Optional[float]]: + """ + Execute a function with retry and exponential backoff. + + Args: + func: Function to execute, should return (success, error_message, response_time) + retries: Maximum number of retry attempts + backoff: Base backoff time in seconds + json_output: Whether output is in JSON format (suppresses logs) + operation_name: Name of the operation for logging + + Returns: + Tuple of (success, error_message, response_time) + """ + last_error = None + last_response_time = None + + for attempt in range(retries): + success, error, response_time = func() + last_response_time = response_time + + if success: + if attempt > 0: + log_message( + f"{operation_name}: Succeeded on attempt {attempt + 1}/{retries}", + json_output + ) + return True, None, response_time + + last_error = error + + # Don't sleep after the last attempt + if attempt < retries - 1: + delay = calculate_backoff(attempt, backoff) + log_message( + f"{operation_name}: Attempt {attempt + 1}/{retries} failed: {error}. " + f"Retrying in {delay:.2f}s...", + json_output ) time.sleep(delay) - - status, detail, code = last_result - if attempts > 1: - detail = f"{detail} after {attempts} attempts" - return status, detail, code + else: + log_message( + f"{operation_name}: Attempt {attempt + 1}/{retries} failed: {error}. " + f"No more retries.", + json_output + ) + + return False, last_error, last_response_time def check_tcp_port( host: str, port: int, - timeout: int, - retries: int = DEFAULT_RETRIES, - backoff: float = DEFAULT_BACKOFF_SECONDS, - json_output: bool = False, -) -> Tuple[str, str, float]: - attempts = normalize_retry_count(retries) - last_result: Tuple[str, str, float] = ("CRITICAL", "not checked", 0) - - for attempt in range(1, attempts + 1): + timeout: float = 5.0, + retries: int = 3, + backoff: float = 2.0, + json_output: bool = False +) -> Dict[str, Any]: + """ + Check if a TCP port is open and accepting connections. + + Args: + host: Hostname or IP address to check + port: Port number to check + timeout: Connection timeout in seconds + retries: Number of retry attempts + backoff: Base backoff time in seconds for exponential backoff + json_output: Whether to suppress retry logging for JSON output + + Returns: + Dictionary containing health check results + """ + result = { + "check_type": "tcp", + "host": host, + "port": port, + "timestamp": datetime.utcnow().isoformat() + "Z", + "retries_configured": retries, + "backoff_configured": backoff + } + + def attempt_connection() -> Tuple[bool, Optional[str], Optional[float]]: + """Single connection attempt.""" + start_time = time.time() try: - start = time.time() - sock = socket.create_connection((host, port), timeout=timeout) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((host, port)) sock.close() - latency = (time.time() - start) * 1000 - detail = f"Connected ({latency:.1f}ms)" - if attempt > 1: - detail = f"{detail} after {attempt} attempts" - return "OK", detail, latency + response_time = (time.time() - start_time) * 1000 # Convert to ms + return True, None, response_time except socket.timeout: - last_result = ("CRITICAL", f"Connection timeout ({timeout}s)", 0) + return False, "Connection timed out", None + except socket.gaierror as e: + return False, f"DNS resolution failed: {e}", None except ConnectionRefusedError: - last_result = ("CRITICAL", "Connection refused", 0) + return False, "Connection refused", None + except OSError as e: + return False, f"OS error: {e}", None except Exception as e: - last_result = ("CRITICAL", str(e), 0) - - if attempt < attempts: - delay = retry_delay_seconds(backoff, attempt) - log_retry( - f"Retrying TCP check {host}:{port} after {last_result[1]} " - f"(attempt {attempt}/{attempts}, sleeping {delay:g}s)", - json_output, - ) - time.sleep(delay) - - status, detail, latency = last_result - if attempts > 1: - detail = f"{detail} after {attempts} attempts" - return status, detail, latency - - -def check_certificate_expiry(host: str, port: int = 443) -> Tuple[str, str, int]: - try: - ctx = ssl.create_default_context() - with socket.create_connection((host, port), timeout=10) as sock: - with ctx.wrap_socket(sock, server_hostname=host) as ssock: - cert = ssock.getpeercert() - if not cert: - return "WARNING", "No certificate found", 0 - - from datetime import datetime as dt - expires = dt.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") - days_left = (expires - dt.now()).days - - if days_left > 30: - return "OK", f"Certificate expires in {days_left} days", days_left - elif days_left > 7: - return "WARNING", f"Certificate expires in {days_left} days", days_left - else: - return "CRITICAL", f"Certificate expires in {days_left} days", days_left - except Exception as e: - return "WARNING", f"Cannot check: {e}", 0 - - -def check_disk_usage(path: str = "/") -> Tuple[str, str, float]: - try: - stat = os.statvfs(path) - total = stat.f_frsize * stat.f_blocks - free = stat.f_frsize * stat.f_bavail - used = total - free - pct = (used / total) * 100 - - if pct < DISK_THRESHOLD_WARNING: - return "OK", f"{pct:.1f}% used ({used // (1024**3)}GB/{total // (1024**3)}GB)", pct - elif pct < DISK_THRESHOLD_CRITICAL: - return "WARNING", f"{pct:.1f}% used ({used // (1024**3)}GB/{total // (1024**3)}GB)", pct - else: - return "CRITICAL", f"{pct:.1f}% used ({used // (1024**3)}GB/{total // (1024**3)}GB)", pct - except Exception as e: - return "WARNING", f"Cannot check: {e}", 0 - - -def check_memory_usage() -> Tuple[str, str, float]: - try: - with open("/proc/meminfo") as f: - meminfo = {} - for line in f: - parts = line.split(":") - if len(parts) == 2: - key = parts[0].strip() - value = parts[1].strip().replace(" kB", "") - try: - meminfo[key] = int(value) * 1024 - except ValueError: - pass - - total = meminfo.get("MemTotal", 0) - available = meminfo.get("MemAvailable", 0) - used = total - available - pct = (used / total) * 100 if total > 0 else 0 - - if pct < MEMORY_THRESHOLD_WARNING: - return "OK", f"{pct:.1f}% used ({used // (1024**3)}GB/{total // (1024**3)}GB)", pct - elif pct < MEMORY_THRESHOLD_CRITICAL: - return "WARNING", f"{pct:.1f}% used", pct - else: - return "CRITICAL", f"{pct:.1f}% used", pct - except Exception as e: - return "WARNING", f"Cannot check: {e}", 0 - - -def check_load_average() -> Tuple[str, str, float]: - try: - with open("/proc/loadavg") as f: - parts = f.read().strip().split() - load = float(parts[0]) - cpu_count = os.cpu_count() or 1 - load_pct = (load / cpu_count) * 100 - - if load_pct < 70: - return "OK", f"Load: {load} ({load_pct:.0f}% of {cpu_count} cores)", load - elif load_pct < 90: - return "WARNING", f"Load: {load} ({load_pct:.0f}% of {cpu_count} cores)", load - else: - return "CRITICAL", f"Load: {load} ({load_pct:.0f}% of {cpu_count} cores)", load - except Exception as e: - return "WARNING", f"Cannot check: {e}", 0 - + return False, f"Unexpected error: {e}", None + + operation_name = f"TCP check {host}:{port}" + success, error, response_time = retry_with_backoff( + attempt_connection, + retries, + backoff, + json_output, + operation_name + ) + + if success: + result["status"] = "OK" + result["status_code"] = HealthStatus.OK.value + result["message"] = f"TCP port {port} is open on {host}" + result["response_time_ms"] = round(response_time, 2) if response_time else None + else: + result["status"] = "CRITICAL" + result["status_code"] = HealthStatus.CRITICAL.value + result["message"] = f"TCP port {port} is not accessible on {host}: {error}" + result["error"] = error + + return result -# --------------------------------------------------------------------------- -# HEALTH CHECK RUNNER -# --------------------------------------------------------------------------- -def run_health_checks( - service: Optional[str] = None, - json_output: bool = False, - retries: int = DEFAULT_RETRIES, - backoff: float = DEFAULT_BACKOFF_SECONDS, +def check_http_service( + url: str, + timeout: float = 10.0, + expected_status: int = 200, + retries: int = 3, + backoff: float = 2.0, + json_output: bool = False ) -> Dict[str, Any]: - results: Dict[str, Any] = { - "timestamp": datetime.now().isoformat(), - "hostname": socket.gethostname(), - "services": {}, - "infrastructure": {}, - "system": {}, - "overall_status": "OK", + """ + Check if an HTTP service is responding. + + Args: + url: URL to check + timeout: Request timeout in seconds + expected_status: Expected HTTP status code + retries: Number of retry attempts + backoff: Base backoff time in seconds for exponential backoff + json_output: Whether to suppress retry logging for JSON output + + Returns: + Dictionary containing health check results + """ + result = { + "check_type": "http", + "url": url, + "expected_status": expected_status, + "timestamp": datetime.utcnow().isoformat() + "Z", + "retries_configured": retries, + "backoff_configured": backoff } - - all_ok = True - - # Check services - for name, config in SERVICES.items(): - if service and name != service: - continue - status, detail, code = check_http_service( - config["host"], - config["port"], - config["path"], - config["timeout"], - retries, - backoff, - json_output, - ) - results["services"][name] = { - "status": status, - "detail": detail, - "code": code, - "endpoint": f"http://{config['host']}:{config['port']}{config['path']}", - } - if status == "CRITICAL": - all_ok = False - - # Check infrastructure - for name, config in INFRASTRUCTURE.items(): - if service and name != service: - continue - status, detail, latency = check_tcp_port( - config["host"], config["port"], config["timeout"], retries, backoff, json_output - ) - results["infrastructure"][name] = { - "status": status, - "detail": detail, - "endpoint": f"{config['host']}:{config['port']}", - } - if status == "CRITICAL": - all_ok = False - - # Check system resources - disk_status, disk_detail, disk_pct = check_disk_usage() - results["system"]["disk"] = {"status": disk_status, "detail": disk_detail} - if disk_status == "CRITICAL": - all_ok = False - - mem_status, mem_detail, mem_pct = check_memory_usage() - results["system"]["memory"] = {"status": mem_status, "detail": mem_detail} - if mem_status == "CRITICAL": - all_ok = False - - load_status, load_detail, load_val = check_load_average() - results["system"]["load"] = {"status": load_status, "detail": load_detail} - - # Check certificate expiry (web services) - for name, config in SERVICES.items(): - if service and name != service: - continue - if config["port"] == 443: - cert_status, cert_detail, days_left = check_certificate_expiry(config["host"]) - results["services"][name]["certificate"] = { - "status": cert_status, - "detail": cert_detail, - "days_remaining": days_left, - } - if cert_status == "CRITICAL": - all_ok = False - - results["overall_status"] = "OK" if all_ok else "DEGRADED" - - return results - - -def print_health_report(results: Dict[str, Any]): - print(f"\n{'='*60}") - print(f" HEALTH CHECK REPORT") - print(f" Host: {results['hostname']}") - print(f" Time: {results['timestamp']}") - print(f" Overall: {results['overall_status']}") - print(f"{'='*60}") - - for category, items in [("Services", results["services"]), - ("Infrastructure", results["infrastructure"]), - ("System", results["system"])]: - if items: - print(f"\n {category}:") - for name, check in items.items(): - if isinstance(check, dict) and "status" in check: - status_icon = {"OK": "✓", "WARNING": "⚠", "CRITICAL": "✗"}.get(check["status"], "?") - print(f" {status_icon} {name}: {check['detail']}") - else: - print(f" {name}:") - for sub_name, sub_check in check.items(): - if isinstance(sub_check, dict) and "status" in sub_check: - sub_icon = {"OK": "✓", "WARNING": "⚠", "CRITICAL": "✗"}.get(sub_check["status"], "?") - print(f" {sub_icon} {sub_name}: {sub_check['detail']}") - print() + + def attempt_request() -> Tuple[bool, Optional[str], Optional[float]]: + """Single HTTP request attempt.""" + start_time = time.time() + try: + request = urllib.request.Request( + url, + headers={"User-Agent": "HealthCheck/1.0"} + ) + response = urllib.request.urlopen(request, timeout=timeout) + response_time = (time.time() - start_time) * 1000 # Convert to ms + status_code = response.getcode() + + if status_code == expected_status: + return True, None, response_time + else: + return False, f"Unexpected status code: {status_code}", response_time + + except urllib.error.HTTPError as e: + response_time = (time.time() - start_time) * 1000 + if e.code == expected_status: + return True, None, response_time + return False, f"HTTP error: {e.code} {e.reason}", response_time + except urllib.error.URLError as e: + return False, f"URL error: {e.reason}", None + except socket.timeout: + return False, "Request timed out", None + except Exception as e: + return False, f"Unexpected error: {e}", None + + operation_name = f"HTTP check {url}" + success, error, response_time = retry_with_backoff( + attempt_request, + retries, + backoff, + json_output, + operation_name + ) + + if success: + result["status"] = "OK" + result["status_code"] = HealthStatus.OK.value + result["message"] = f"HTTP service at {url} is healthy" + result["response_time_ms"] = round(response_time, 2) if response_time else None + else: + result["status"] = "CRITICAL" + result["status_code"] = HealthStatus.CRITICAL.value + result["message"] = f"HTTP service at {url} is not healthy: {error}" + result["error"] = error + if response_time: + result["response_time_ms"] = round(response_time, 2) + + return result -def parse_args(): - parser = argparse.ArgumentParser(description="Health check tool") - parser.add_argument("--service", "-s", help="Check specific service only") - parser.add_argument("--json", "-j", action="store_true", help="JSON output") - parser.add_argument("--watch", "-w", action="store_true", help="Continuous monitoring") - parser.add_argument("--interval", "-i", type=int, default=30, help="Check interval in seconds") +def main(): + """Main entry point for the health check tool.""" + parser = argparse.ArgumentParser( + description="Health check tool with retry mechanism and exponential backoff", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Check HTTP service with default retries + python health_check.py --http https://example.com + + # Check TCP port with custom retries and backoff + python health_check.py --tcp localhost:5432 --retries 5 --backoff 1.5 + + # JSON output for automation + python health_check.py --http https://api.example.com --json + + # Multiple checks + python health_check.py --http https://example.com --tcp localhost:6379 + """ + ) + + parser.add_argument( + "--http", + action="append", + metavar="URL", + help="HTTP URL to check (can be specified multiple times)" + ) + parser.add_argument( + "--tcp", + action="append", + metavar="HOST:PORT", + help="TCP host:port to check (can be specified multiple times)" + ) + parser.add_argument( + "--timeout", + type=float, + default=10.0, + help="Timeout in seconds for each connection attempt (default: 10)" + ) + parser.add_argument( + "--expected-status", + type=int, + default=200, + help="Expected HTTP status code (default: 200)" + ) parser.add_argument( "--retries", type=int, - default=DEFAULT_RETRIES, - help="Total attempts for HTTP and TCP checks", + default=3, + help="Number of retry attempts (default: 3)" ) parser.add_argument( "--backoff", type=float, - default=DEFAULT_BACKOFF_SECONDS, - help="Base exponential backoff in seconds between failed attempts", + default=2.0, + help="Base backoff time in seconds for exponential backoff (default: 2.0)" ) - parser.add_argument("--output", "-o", help="Output file path") - return parser.parse_args() - - -def main(): - args = parse_args() - - if args.watch: - print(f"Continuous monitoring (interval: {args.interval}s). Press Ctrl+C to stop.") - try: - while True: - results = run_health_checks(args.service, args.json, args.retries, args.backoff) - if args.json: - print(json.dumps(results, indent=2)) + parser.add_argument( + "--json", + action="store_true", + help="Output results in JSON format" + ) + + args = parser.parse_args() + + # Validate arguments + if not args.http and not args.tcp: + parser.error("At least one --http or --tcp check must be specified") + + if args.retries < 1: + parser.error("--retries must be at least 1") + + if args.backoff < 0: + parser.error("--backoff must be non-negative") + + if args.timeout <= 0: + parser.error("--timeout must be positive") + + results = [] + overall_status = HealthStatus.OK + + # Perform HTTP checks + if args.http: + for url in args.http: + result = check_http_service( + url=url, + timeout=args.timeout, + expected_status=args.expected_status, + retries=args.retries, + backoff=args.backoff, + json_output=args.json + ) + results.append(result) + + if result["status_code"] == HealthStatus.CRITICAL.value: + overall_status = HealthStatus.CRITICAL + elif result["status_code"] == HealthStatus.WARNING.value and overall_status != HealthStatus.CRITICAL: + overall_status = HealthStatus.WARNING + + # Perform TCP checks + if args.tcp: + for tcp_target in args.tcp: + try: + if ":" in tcp_target: + host, port_str = tcp_target.rsplit(":", 1) + port = int(port_str) else: - print_health_report(results) - time.sleep(args.interval) - except KeyboardInterrupt: - print("\nMonitoring stopped") + parser.error(f"Invalid TCP target format: {tcp_target}. Use HOST:PORT") + continue + except ValueError: + parser.error(f"Invalid port number in: {tcp_target}") + continue + + result = check_tcp_port( + host=host, + port=port, + timeout=args.timeout, + retries=args.retries, + backoff=args.backoff, + json_output=args.json + ) + results.append(result) + + if result["status_code"] == HealthStatus.CRITICAL.value: + overall_status = HealthStatus.CRITICAL + elif result["status_code"] == HealthStatus.WARNING.value and overall_status != HealthStatus.CRITICAL: + overall_status = HealthStatus.WARNING + + # Output results + if args.json: + output = { + "overall_status": overall_status.name, + "overall_status_code": overall_status.value, + "checks": results, + "summary": { + "total": len(results), + "ok": sum(1 for r in results if r["status_code"] == HealthStatus.OK.value), + "warning": sum(1 for r in results if r["status_code"] == HealthStatus.WARNING.value), + "critical": sum(1 for r in results if r["status_code"] == HealthStatus.CRITICAL.value) + } + } + print(json.dumps(output, indent=2)) else: - results = run_health_checks(args.service, args.json, args.retries, args.backoff) - if args.json: - output = json.dumps(results, indent=2) - print(output) - else: - print_health_report(results) - - if args.output: - with open(args.output, "w") as f: - if args.json: - json.dump(results, f, indent=2) - else: - json.dump(results, f, indent=2) - print(f"Report saved to {args.output}") - - if results["overall_status"] == "DEGRADED": - return 1 - - return 0 + print(f"\n{'='*60}") + print("HEALTH CHECK RESULTS") + print(f"{'='*60}") + + for result in results: + status_symbol = "✓" if result["status"] == "OK" else "✗" + print(f"\n{status_symbol} [{result['status']}] {result['check_type'].upper()}: {result.get('url') or f\"{result['host']}:{result['port']}\"}") + print(f" Message: {result['message']}") + if result.get('response_time_ms'): + print(f" Response time: {result['response_time_ms']}ms") + if result.get('error'): + print(f" Error: {result['error']}") + + print(f"\n{'='*60}") + print(f"Overall Status: {overall_status.name}") + ok_count = sum(1 for r in results if r["status_code"] == HealthStatus.OK.value) + print(f"Checks: {ok_count}/{len(results)} passed") + print(f"{'='*60}\n") + + # Exit with appropriate status code + sys.exit(overall_status.value) if __name__ == "__main__": - main() + main() \ No newline at end of file