diff --git a/LICENSE b/LICENSE index 2643d79..5989c2e 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2025 eBPF Network Monitor +Copyright (c) 2025 Simone Rodigari Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 6ea9f0d..3119e21 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,8 @@ make kind-deploy # Deploy to kind cluster make kind-integration-test # Run comprehensive tests ``` +To get detailed API documentation for the aggregator, available only in Kubernetes mode [see API Aggregator Documentation](https://petstore.swagger.io/?url=https://raw.githubusercontent.com/srodi/ebpf-server/main/docs/swagger-aggregator/swagger.json) + ### VM Deployment (Traditional) For single-server deployments: diff --git a/bpf/connection.o b/bpf/connection.o deleted file mode 100644 index d4d8c38..0000000 Binary files a/bpf/connection.o and /dev/null differ diff --git a/bpf/packet_drop.o b/bpf/packet_drop.o deleted file mode 100644 index fc221dd..0000000 Binary files a/bpf/packet_drop.o and /dev/null differ diff --git a/cmd/aggregator/main.go b/cmd/aggregator/main.go index 843b868..1a66aca 100644 --- a/cmd/aggregator/main.go +++ b/cmd/aggregator/main.go @@ -52,6 +52,13 @@ func main() { mux.HandleFunc("/api/events", agg.HandleEvents) mux.HandleFunc("/api/events/ingest", agg.HandleIngest) mux.HandleFunc("/api/stats", agg.HandleStats) + mux.HandleFunc("/api/programs", agg.HandlePrograms) + + // Connection and packet drop API endpoints (aggregator-specific) + mux.HandleFunc("/api/list-connections", agg.HandleListConnections) + mux.HandleFunc("/api/list-packet-drops", agg.HandleListPacketDrops) + mux.HandleFunc("/api/connection-summary", agg.HandleConnectionSummary) + mux.HandleFunc("/api/packet-drop-summary", agg.HandlePacketDropSummary) // Swagger documentation mux.HandleFunc("/swagger/", httpSwagger.WrapHandler) diff --git a/docs/swagger-aggregator/docs.go b/docs/swagger-aggregator/docs.go index 72f782b..7cc53a3 100644 --- a/docs/swagger-aggregator/docs.go +++ b/docs/swagger-aggregator/docs.go @@ -39,19 +39,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -115,19 +115,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -232,8 +232,7 @@ const docTemplate = `{ "200": { "description": "Filtered events", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/api.EventsResponse" } }, "500": { @@ -285,8 +284,7 @@ const docTemplate = `{ "200": { "description": "Ingestion result", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_aggregator.IngestResponse" } }, "400": { @@ -408,19 +406,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -484,19 +482,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -563,8 +561,7 @@ const docTemplate = `{ "200": { "description": "List of eBPF programs", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/api.ProgramsResponse" } }, "500": { @@ -605,8 +602,7 @@ const docTemplate = `{ "200": { "description": "Aggregation statistics", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_aggregator.AggregationStatsResponse" } }, "405": { @@ -635,8 +631,7 @@ const docTemplate = `{ "200": { "description": "Health status", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/api.HealthResponse" } }, "503": { @@ -731,6 +726,99 @@ const docTemplate = `{ } } }, + "api.EventFilters": { + "type": "object", + "properties": { + "command": { + "description": "Command filter", + "type": "string", + "example": "curl" + }, + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "pid": { + "description": "Process ID filter", + "type": "integer", + "example": 1234 + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "api.EventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 25 + }, + "events": { + "description": "List of events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/api.EventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 150 + } + } + }, + "api.HealthResponse": { + "type": "object", + "properties": { + "component": { + "description": "Component name", + "type": "string", + "example": "eBPF Monitor API" + }, + "status": { + "description": "Service status", + "type": "string", + "example": "healthy" + }, + "uptime": { + "description": "Service uptime", + "type": "string", + "example": "1h30m" + }, + "version": { + "description": "API version", + "type": "string", + "example": "1.0.0" + } + } + }, "api.PacketDropListResponse": { "type": "object", "properties": { @@ -809,6 +897,310 @@ const docTemplate = `{ } } }, + "api.ProgramInfo": { + "type": "object", + "properties": { + "id": { + "description": "Program ID", + "type": "integer", + "example": 123 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "loaded" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } + }, + "api.ProgramsResponse": { + "type": "object", + "properties": { + "programs": { + "description": "List of eBPF programs", + "type": "array", + "items": { + "$ref": "#/definitions/api.ProgramInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of programs", + "type": "integer", + "example": 2 + } + } + }, + "internal_aggregator.AgentInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Number of events from this agent", + "type": "integer", + "example": 2500 + }, + "last_seen": { + "description": "Last seen timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "node_name": { + "description": "Node name", + "type": "string", + "example": "worker-1" + }, + "programs": { + "description": "Programs running on this agent", + "type": "array", + "items": { + "$ref": "#/definitions/internal_aggregator.ProgramInfo" + } + }, + "status": { + "description": "Agent status", + "type": "string", + "example": "active" + } + } + }, + "internal_aggregator.AggregatedEventFilters": { + "type": "object", + "properties": { + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "node": { + "description": "Node name filter", + "type": "string", + "example": "worker-1" + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "internal_aggregator.AggregatedEventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 50 + }, + "events": { + "description": "List of aggregated events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/internal_aggregator.AggregatedEventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 1250 + } + } + }, + "internal_aggregator.AggregatedListResponse": { + "type": "object", + "properties": { + "events_by_node": { + "description": "Event count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "events_by_pid": { + "description": "Events grouped by PID", + "type": "object", + "additionalProperties": { + "type": "array", + "items": {} + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total number of events", + "type": "integer", + "example": 45 + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + }, + "total_pids": { + "description": "Number of unique PIDs across all nodes", + "type": "integer", + "example": 8 + } + } + }, + "internal_aggregator.AggregatedSummaryResponse": { + "type": "object", + "properties": { + "command": { + "description": "Command name (if filtered)", + "type": "string", + "example": "curl" + }, + "count": { + "description": "Total count across all nodes", + "type": "integer", + "example": 15 + }, + "count_by_node": { + "description": "Count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "duration_seconds": { + "description": "Duration in seconds", + "type": "integer", + "example": 60 + }, + "pid": { + "description": "Process ID (if filtered)", + "type": "integer", + "example": 1234 + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + } + } + }, + "internal_aggregator.AggregationStatsResponse": { + "type": "object", + "properties": { + "aggregation_start": { + "description": "When aggregation started", + "type": "string", + "example": "2023-01-01T10:00:00Z" + }, + "connected_agents": { + "description": "Number of connected agents", + "type": "integer", + "example": 5 + }, + "events_by_node": { + "description": "Events grouped by node", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "events_by_type": { + "description": "Events grouped by type", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "last_event_time": { + "description": "Timestamp of last event", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total events stored", + "type": "integer", + "example": 12500 + } + } + }, + "internal_aggregator.AggregatorProgramsResponse": { + "type": "object", + "properties": { + "all_programs": { + "description": "All programs across agents", + "type": "array", + "items": { + "$ref": "#/definitions/internal_aggregator.ProgramInfo" + } + }, + "connected_agents": { + "description": "List of connected agents", + "type": "array", + "items": { + "$ref": "#/definitions/internal_aggregator.AgentInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_agents": { + "description": "Total number of agents", + "type": "integer", + "example": 3 + }, + "total_programs": { + "description": "Total number of programs", + "type": "integer", + "example": 6 + } + } + }, "internal_aggregator.HealthCheck": { "type": "object", "properties": { @@ -826,6 +1218,61 @@ const docTemplate = `{ "type": "string" } } + }, + "internal_aggregator.IngestResponse": { + "type": "object", + "properties": { + "events_processed": { + "description": "Number of events processed", + "type": "integer", + "example": 25 + }, + "message": { + "description": "Status message", + "type": "string", + "example": "Events ingested successfully" + }, + "success": { + "description": "Ingestion success status", + "type": "boolean", + "example": true + }, + "timestamp": { + "description": "Processing timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + } + } + }, + "internal_aggregator.ProgramInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Events generated by this program", + "type": "integer", + "example": 1250 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "node": { + "description": "Node where program is running", + "type": "string", + "example": "worker-1" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "active" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } } } }` diff --git a/docs/swagger-aggregator/swagger.json b/docs/swagger-aggregator/swagger.json index 47b9556..db7e831 100644 --- a/docs/swagger-aggregator/swagger.json +++ b/docs/swagger-aggregator/swagger.json @@ -33,19 +33,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -109,19 +109,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -226,8 +226,7 @@ "200": { "description": "Filtered events", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/api.EventsResponse" } }, "500": { @@ -279,8 +278,7 @@ "200": { "description": "Ingestion result", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_aggregator.IngestResponse" } }, "400": { @@ -402,19 +400,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -478,19 +476,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -557,8 +555,7 @@ "200": { "description": "List of eBPF programs", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/api.ProgramsResponse" } }, "500": { @@ -599,8 +596,7 @@ "200": { "description": "Aggregation statistics", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_aggregator.AggregationStatsResponse" } }, "405": { @@ -629,8 +625,7 @@ "200": { "description": "Health status", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/api.HealthResponse" } }, "503": { @@ -725,6 +720,99 @@ } } }, + "api.EventFilters": { + "type": "object", + "properties": { + "command": { + "description": "Command filter", + "type": "string", + "example": "curl" + }, + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "pid": { + "description": "Process ID filter", + "type": "integer", + "example": 1234 + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "api.EventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 25 + }, + "events": { + "description": "List of events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/api.EventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 150 + } + } + }, + "api.HealthResponse": { + "type": "object", + "properties": { + "component": { + "description": "Component name", + "type": "string", + "example": "eBPF Monitor API" + }, + "status": { + "description": "Service status", + "type": "string", + "example": "healthy" + }, + "uptime": { + "description": "Service uptime", + "type": "string", + "example": "1h30m" + }, + "version": { + "description": "API version", + "type": "string", + "example": "1.0.0" + } + } + }, "api.PacketDropListResponse": { "type": "object", "properties": { @@ -803,6 +891,310 @@ } } }, + "api.ProgramInfo": { + "type": "object", + "properties": { + "id": { + "description": "Program ID", + "type": "integer", + "example": 123 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "loaded" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } + }, + "api.ProgramsResponse": { + "type": "object", + "properties": { + "programs": { + "description": "List of eBPF programs", + "type": "array", + "items": { + "$ref": "#/definitions/api.ProgramInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of programs", + "type": "integer", + "example": 2 + } + } + }, + "internal_aggregator.AgentInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Number of events from this agent", + "type": "integer", + "example": 2500 + }, + "last_seen": { + "description": "Last seen timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "node_name": { + "description": "Node name", + "type": "string", + "example": "worker-1" + }, + "programs": { + "description": "Programs running on this agent", + "type": "array", + "items": { + "$ref": "#/definitions/internal_aggregator.ProgramInfo" + } + }, + "status": { + "description": "Agent status", + "type": "string", + "example": "active" + } + } + }, + "internal_aggregator.AggregatedEventFilters": { + "type": "object", + "properties": { + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "node": { + "description": "Node name filter", + "type": "string", + "example": "worker-1" + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "internal_aggregator.AggregatedEventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 50 + }, + "events": { + "description": "List of aggregated events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/internal_aggregator.AggregatedEventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 1250 + } + } + }, + "internal_aggregator.AggregatedListResponse": { + "type": "object", + "properties": { + "events_by_node": { + "description": "Event count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "events_by_pid": { + "description": "Events grouped by PID", + "type": "object", + "additionalProperties": { + "type": "array", + "items": {} + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total number of events", + "type": "integer", + "example": 45 + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + }, + "total_pids": { + "description": "Number of unique PIDs across all nodes", + "type": "integer", + "example": 8 + } + } + }, + "internal_aggregator.AggregatedSummaryResponse": { + "type": "object", + "properties": { + "command": { + "description": "Command name (if filtered)", + "type": "string", + "example": "curl" + }, + "count": { + "description": "Total count across all nodes", + "type": "integer", + "example": 15 + }, + "count_by_node": { + "description": "Count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "duration_seconds": { + "description": "Duration in seconds", + "type": "integer", + "example": 60 + }, + "pid": { + "description": "Process ID (if filtered)", + "type": "integer", + "example": 1234 + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + } + } + }, + "internal_aggregator.AggregationStatsResponse": { + "type": "object", + "properties": { + "aggregation_start": { + "description": "When aggregation started", + "type": "string", + "example": "2023-01-01T10:00:00Z" + }, + "connected_agents": { + "description": "Number of connected agents", + "type": "integer", + "example": 5 + }, + "events_by_node": { + "description": "Events grouped by node", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "events_by_type": { + "description": "Events grouped by type", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "last_event_time": { + "description": "Timestamp of last event", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total events stored", + "type": "integer", + "example": 12500 + } + } + }, + "internal_aggregator.AggregatorProgramsResponse": { + "type": "object", + "properties": { + "all_programs": { + "description": "All programs across agents", + "type": "array", + "items": { + "$ref": "#/definitions/internal_aggregator.ProgramInfo" + } + }, + "connected_agents": { + "description": "List of connected agents", + "type": "array", + "items": { + "$ref": "#/definitions/internal_aggregator.AgentInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_agents": { + "description": "Total number of agents", + "type": "integer", + "example": 3 + }, + "total_programs": { + "description": "Total number of programs", + "type": "integer", + "example": 6 + } + } + }, "internal_aggregator.HealthCheck": { "type": "object", "properties": { @@ -820,6 +1212,61 @@ "type": "string" } } + }, + "internal_aggregator.IngestResponse": { + "type": "object", + "properties": { + "events_processed": { + "description": "Number of events processed", + "type": "integer", + "example": 25 + }, + "message": { + "description": "Status message", + "type": "string", + "example": "Events ingested successfully" + }, + "success": { + "description": "Ingestion success status", + "type": "boolean", + "example": true + }, + "timestamp": { + "description": "Processing timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + } + } + }, + "internal_aggregator.ProgramInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Events generated by this program", + "type": "integer", + "example": 1250 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "node": { + "description": "Node where program is running", + "type": "string", + "example": "worker-1" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "active" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } } } } \ No newline at end of file diff --git a/docs/swagger-aggregator/swagger.yaml b/docs/swagger-aggregator/swagger.yaml index 6168b51..0d5a17f 100644 --- a/docs/swagger-aggregator/swagger.yaml +++ b/docs/swagger-aggregator/swagger.yaml @@ -59,6 +59,75 @@ definitions: example: "2023-01-01T12:00:00Z" type: string type: object + api.EventFilters: + properties: + command: + description: Command filter + example: curl + type: string + limit: + description: Limit filter + example: 100 + type: integer + pid: + description: Process ID filter + example: 1234 + type: integer + since: + description: Start time filter + example: "2023-01-01T12:00:00Z" + type: string + type: + description: Event type filter + example: connection + type: string + until: + description: End time filter + example: "2023-01-01T13:00:00Z" + type: string + type: object + api.EventsResponse: + properties: + count: + description: Number of events returned + example: 25 + type: integer + events: + description: List of events + items: {} + type: array + filters: + allOf: + - $ref: '#/definitions/api.EventFilters' + description: Applied filters + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_count: + description: Total number of matching events + example: 150 + type: integer + type: object + api.HealthResponse: + properties: + component: + description: Component name + example: eBPF Monitor API + type: string + status: + description: Service status + example: healthy + type: string + uptime: + description: Service uptime + example: 1h30m + type: string + version: + description: API version + example: 1.0.0 + type: string + type: object api.PacketDropListResponse: properties: events_by_pid: @@ -118,6 +187,233 @@ definitions: example: "2023-01-01T12:00:00Z" type: string type: object + api.ProgramInfo: + properties: + id: + description: Program ID + example: 123 + type: integer + name: + description: Program name + example: connection_tracer + type: string + status: + description: Program status + example: loaded + type: string + type: + description: Program type + example: kprobe + type: string + type: object + api.ProgramsResponse: + properties: + programs: + description: List of eBPF programs + items: + $ref: '#/definitions/api.ProgramInfo' + type: array + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_count: + description: Total number of programs + example: 2 + type: integer + type: object + internal_aggregator.AgentInfo: + properties: + event_count: + description: Number of events from this agent + example: 2500 + type: integer + last_seen: + description: Last seen timestamp + example: "2023-01-01T12:00:00Z" + type: string + node_name: + description: Node name + example: worker-1 + type: string + programs: + description: Programs running on this agent + items: + $ref: '#/definitions/internal_aggregator.ProgramInfo' + type: array + status: + description: Agent status + example: active + type: string + type: object + internal_aggregator.AggregatedEventFilters: + properties: + limit: + description: Limit filter + example: 100 + type: integer + node: + description: Node name filter + example: worker-1 + type: string + since: + description: Start time filter + example: "2023-01-01T12:00:00Z" + type: string + type: + description: Event type filter + example: connection + type: string + until: + description: End time filter + example: "2023-01-01T13:00:00Z" + type: string + type: object + internal_aggregator.AggregatedEventsResponse: + properties: + count: + description: Number of events returned + example: 50 + type: integer + events: + description: List of aggregated events + items: {} + type: array + filters: + allOf: + - $ref: '#/definitions/internal_aggregator.AggregatedEventFilters' + description: Applied filters + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_count: + description: Total number of matching events + example: 1250 + type: integer + type: object + internal_aggregator.AggregatedListResponse: + properties: + events_by_node: + additionalProperties: + type: integer + description: Event count by node + type: object + events_by_pid: + additionalProperties: + items: {} + type: array + description: Events grouped by PID + type: object + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_events: + description: Total number of events + example: 45 + type: integer + total_nodes: + description: Number of nodes with events + example: 3 + type: integer + total_pids: + description: Number of unique PIDs across all nodes + example: 8 + type: integer + type: object + internal_aggregator.AggregatedSummaryResponse: + properties: + command: + description: Command name (if filtered) + example: curl + type: string + count: + description: Total count across all nodes + example: 15 + type: integer + count_by_node: + additionalProperties: + type: integer + description: Count by node + type: object + duration_seconds: + description: Duration in seconds + example: 60 + type: integer + pid: + description: Process ID (if filtered) + example: 1234 + type: integer + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_nodes: + description: Number of nodes with events + example: 3 + type: integer + type: object + internal_aggregator.AggregationStatsResponse: + properties: + aggregation_start: + description: When aggregation started + example: "2023-01-01T10:00:00Z" + type: string + connected_agents: + description: Number of connected agents + example: 5 + type: integer + events_by_node: + additionalProperties: + format: int64 + type: integer + description: Events grouped by node + type: object + events_by_type: + additionalProperties: + format: int64 + type: integer + description: Events grouped by type + type: object + last_event_time: + description: Timestamp of last event + example: "2023-01-01T12:00:00Z" + type: string + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_events: + description: Total events stored + example: 12500 + type: integer + type: object + internal_aggregator.AggregatorProgramsResponse: + properties: + all_programs: + description: All programs across agents + items: + $ref: '#/definitions/internal_aggregator.ProgramInfo' + type: array + connected_agents: + description: List of connected agents + items: + $ref: '#/definitions/internal_aggregator.AgentInfo' + type: array + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_agents: + description: Total number of agents + example: 3 + type: integer + total_programs: + description: Total number of programs + example: 6 + type: integer + type: object internal_aggregator.HealthCheck: properties: component: @@ -130,6 +426,48 @@ definitions: uptime: type: string type: object + internal_aggregator.IngestResponse: + properties: + events_processed: + description: Number of events processed + example: 25 + type: integer + message: + description: Status message + example: Events ingested successfully + type: string + success: + description: Ingestion success status + example: true + type: boolean + timestamp: + description: Processing timestamp + example: "2023-01-01T12:00:00Z" + type: string + type: object + internal_aggregator.ProgramInfo: + properties: + event_count: + description: Events generated by this program + example: 1250 + type: integer + name: + description: Program name + example: connection_tracer + type: string + node: + description: Node where program is running + example: worker-1 + type: string + status: + description: Program status + example: active + type: string + type: + description: Program type + example: kprobe + type: string + type: object host: localhost:8081 info: contact: @@ -150,15 +488,15 @@ paths: description: Get count of connection events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -201,15 +539,15 @@ paths: description: Get count of connection events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -282,8 +620,7 @@ paths: "200": description: Filtered events schema: - additionalProperties: true - type: object + $ref: '#/definitions/api.EventsResponse' "500": description: Internal server error schema: @@ -317,8 +654,7 @@ paths: "200": description: Ingestion result schema: - additionalProperties: true - type: object + $ref: '#/definitions/internal_aggregator.IngestResponse' "400": description: Bad request schema: @@ -395,15 +731,15 @@ paths: description: Get count of packet drop events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -446,15 +782,15 @@ paths: description: Get count of packet drop events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -502,8 +838,7 @@ paths: "200": description: List of eBPF programs schema: - additionalProperties: true - type: object + $ref: '#/definitions/api.ProgramsResponse' "500": description: Internal server error schema: @@ -531,8 +866,7 @@ paths: "200": description: Aggregation statistics schema: - additionalProperties: true - type: object + $ref: '#/definitions/internal_aggregator.AggregationStatsResponse' "405": description: Method not allowed schema: @@ -551,8 +885,7 @@ paths: "200": description: Health status schema: - additionalProperties: true - type: object + $ref: '#/definitions/api.HealthResponse' "503": description: Service unavailable schema: diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index d5e136a..63eb0aa 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -39,19 +39,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -115,19 +115,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -232,8 +232,7 @@ const docTemplate = `{ "200": { "description": "Filtered events", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_api.EventsResponse" } }, "500": { @@ -257,6 +256,58 @@ const docTemplate = `{ } } }, + "/api/events/ingest": { + "post": { + "description": "Accept events from eBPF agents for aggregation and storage", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "events" + ], + "summary": "Ingest events from agents", + "parameters": [ + { + "description": "Events to ingest", + "name": "events", + "in": "body", + "required": true, + "schema": { + "type": "object" + } + } + ], + "responses": { + "200": { + "description": "Ingestion result", + "schema": { + "$ref": "#/definitions/aggregator.IngestResponse" + } + }, + "400": { + "description": "Bad request", + "schema": { + "type": "string" + } + }, + "405": { + "description": "Method not allowed", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal server error", + "schema": { + "type": "string" + } + } + } + } + }, "/api/list-connections": { "get": { "description": "Get recent connection events grouped by PID", @@ -355,19 +406,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -431,19 +482,19 @@ const docTemplate = `{ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -510,8 +561,7 @@ const docTemplate = `{ "200": { "description": "List of eBPF programs", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_api.ProgramsResponse" } }, "500": { @@ -535,6 +585,35 @@ const docTemplate = `{ } } }, + "/api/stats": { + "get": { + "description": "Retrieve statistics about event aggregation including counts by type and node", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "stats" + ], + "summary": "Get aggregation statistics", + "responses": { + "200": { + "description": "Aggregation statistics", + "schema": { + "$ref": "#/definitions/aggregator.AggregationStatsResponse" + } + }, + "405": { + "description": "Method not allowed", + "schema": { + "type": "string" + } + } + } + } + }, "/health": { "get": { "description": "Get the health status of the eBPF monitoring system", @@ -552,8 +631,7 @@ const docTemplate = `{ "200": { "description": "Health status", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_api.HealthResponse" } }, "503": { @@ -570,6 +648,336 @@ const docTemplate = `{ } }, "definitions": { + "aggregator.AgentInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Number of events from this agent", + "type": "integer", + "example": 2500 + }, + "last_seen": { + "description": "Last seen timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "node_name": { + "description": "Node name", + "type": "string", + "example": "worker-1" + }, + "programs": { + "description": "Programs running on this agent", + "type": "array", + "items": { + "$ref": "#/definitions/aggregator.ProgramInfo" + } + }, + "status": { + "description": "Agent status", + "type": "string", + "example": "active" + } + } + }, + "aggregator.AggregatedEventFilters": { + "type": "object", + "properties": { + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "node": { + "description": "Node name filter", + "type": "string", + "example": "worker-1" + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "aggregator.AggregatedEventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 50 + }, + "events": { + "description": "List of aggregated events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/aggregator.AggregatedEventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 1250 + } + } + }, + "aggregator.AggregatedListResponse": { + "type": "object", + "properties": { + "events_by_node": { + "description": "Event count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "events_by_pid": { + "description": "Events grouped by PID", + "type": "object", + "additionalProperties": { + "type": "array", + "items": {} + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total number of events", + "type": "integer", + "example": 45 + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + }, + "total_pids": { + "description": "Number of unique PIDs across all nodes", + "type": "integer", + "example": 8 + } + } + }, + "aggregator.AggregatedSummaryResponse": { + "type": "object", + "properties": { + "command": { + "description": "Command name (if filtered)", + "type": "string", + "example": "curl" + }, + "count": { + "description": "Total count across all nodes", + "type": "integer", + "example": 15 + }, + "count_by_node": { + "description": "Count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "duration_seconds": { + "description": "Duration in seconds", + "type": "integer", + "example": 60 + }, + "pid": { + "description": "Process ID (if filtered)", + "type": "integer", + "example": 1234 + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + } + } + }, + "aggregator.AggregationStatsResponse": { + "type": "object", + "properties": { + "aggregation_start": { + "description": "When aggregation started", + "type": "string", + "example": "2023-01-01T10:00:00Z" + }, + "connected_agents": { + "description": "Number of connected agents", + "type": "integer", + "example": 5 + }, + "events_by_node": { + "description": "Events grouped by node", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "events_by_type": { + "description": "Events grouped by type", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "last_event_time": { + "description": "Timestamp of last event", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total events stored", + "type": "integer", + "example": 12500 + } + } + }, + "aggregator.AggregatorProgramsResponse": { + "type": "object", + "properties": { + "all_programs": { + "description": "All programs across agents", + "type": "array", + "items": { + "$ref": "#/definitions/aggregator.ProgramInfo" + } + }, + "connected_agents": { + "description": "List of connected agents", + "type": "array", + "items": { + "$ref": "#/definitions/aggregator.AgentInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_agents": { + "description": "Total number of agents", + "type": "integer", + "example": 3 + }, + "total_programs": { + "description": "Total number of programs", + "type": "integer", + "example": 6 + } + } + }, + "aggregator.HealthCheck": { + "type": "object", + "properties": { + "component": { + "type": "string" + }, + "stats": { + "type": "object", + "additionalProperties": true + }, + "status": { + "type": "string" + }, + "uptime": { + "type": "string" + } + } + }, + "aggregator.IngestResponse": { + "type": "object", + "properties": { + "events_processed": { + "description": "Number of events processed", + "type": "integer", + "example": 25 + }, + "message": { + "description": "Status message", + "type": "string", + "example": "Events ingested successfully" + }, + "success": { + "description": "Ingestion success status", + "type": "boolean", + "example": true + }, + "timestamp": { + "description": "Processing timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + } + } + }, + "aggregator.ProgramInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Events generated by this program", + "type": "integer", + "example": 1250 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "node": { + "description": "Node where program is running", + "type": "string", + "example": "worker-1" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "active" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } + }, "internal_api.ConnectionListResponse": { "type": "object", "properties": { @@ -648,6 +1056,99 @@ const docTemplate = `{ } } }, + "internal_api.EventFilters": { + "type": "object", + "properties": { + "command": { + "description": "Command filter", + "type": "string", + "example": "curl" + }, + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "pid": { + "description": "Process ID filter", + "type": "integer", + "example": 1234 + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "internal_api.EventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 25 + }, + "events": { + "description": "List of events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/internal_api.EventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 150 + } + } + }, + "internal_api.HealthResponse": { + "type": "object", + "properties": { + "component": { + "description": "Component name", + "type": "string", + "example": "eBPF Monitor API" + }, + "status": { + "description": "Service status", + "type": "string", + "example": "healthy" + }, + "uptime": { + "description": "Service uptime", + "type": "string", + "example": "1h30m" + }, + "version": { + "description": "API version", + "type": "string", + "example": "1.0.0" + } + } + }, "internal_api.PacketDropListResponse": { "type": "object", "properties": { @@ -725,6 +1226,53 @@ const docTemplate = `{ "example": "2023-01-01T12:00:00Z" } } + }, + "internal_api.ProgramInfo": { + "type": "object", + "properties": { + "id": { + "description": "Program ID", + "type": "integer", + "example": 123 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "loaded" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } + }, + "internal_api.ProgramsResponse": { + "type": "object", + "properties": { + "programs": { + "description": "List of eBPF programs", + "type": "array", + "items": { + "$ref": "#/definitions/internal_api.ProgramInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of programs", + "type": "integer", + "example": 2 + } + } } } }` diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 8365979..44661cc 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -33,19 +33,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -109,19 +109,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -226,8 +226,7 @@ "200": { "description": "Filtered events", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_api.EventsResponse" } }, "500": { @@ -251,6 +250,58 @@ } } }, + "/api/events/ingest": { + "post": { + "description": "Accept events from eBPF agents for aggregation and storage", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "events" + ], + "summary": "Ingest events from agents", + "parameters": [ + { + "description": "Events to ingest", + "name": "events", + "in": "body", + "required": true, + "schema": { + "type": "object" + } + } + ], + "responses": { + "200": { + "description": "Ingestion result", + "schema": { + "$ref": "#/definitions/aggregator.IngestResponse" + } + }, + "400": { + "description": "Bad request", + "schema": { + "type": "string" + } + }, + "405": { + "description": "Method not allowed", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal server error", + "schema": { + "type": "string" + } + } + } + } + }, "/api/list-connections": { "get": { "description": "Get recent connection events grouped by PID", @@ -349,19 +400,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -425,19 +476,19 @@ "parameters": [ { "type": "integer", - "description": "Process ID (GET only)", + "description": "Process ID", "name": "pid", "in": "query" }, { "type": "string", - "description": "Command name (GET only)", + "description": "Command name", "name": "command", "in": "query" }, { "type": "integer", - "description": "Duration in seconds (GET only, default: 60)", + "description": "Duration in seconds (default: 60)", "name": "duration_seconds", "in": "query" }, @@ -504,8 +555,7 @@ "200": { "description": "List of eBPF programs", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_api.ProgramsResponse" } }, "500": { @@ -529,6 +579,35 @@ } } }, + "/api/stats": { + "get": { + "description": "Retrieve statistics about event aggregation including counts by type and node", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "stats" + ], + "summary": "Get aggregation statistics", + "responses": { + "200": { + "description": "Aggregation statistics", + "schema": { + "$ref": "#/definitions/aggregator.AggregationStatsResponse" + } + }, + "405": { + "description": "Method not allowed", + "schema": { + "type": "string" + } + } + } + } + }, "/health": { "get": { "description": "Get the health status of the eBPF monitoring system", @@ -546,8 +625,7 @@ "200": { "description": "Health status", "schema": { - "type": "object", - "additionalProperties": true + "$ref": "#/definitions/internal_api.HealthResponse" } }, "503": { @@ -564,6 +642,336 @@ } }, "definitions": { + "aggregator.AgentInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Number of events from this agent", + "type": "integer", + "example": 2500 + }, + "last_seen": { + "description": "Last seen timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "node_name": { + "description": "Node name", + "type": "string", + "example": "worker-1" + }, + "programs": { + "description": "Programs running on this agent", + "type": "array", + "items": { + "$ref": "#/definitions/aggregator.ProgramInfo" + } + }, + "status": { + "description": "Agent status", + "type": "string", + "example": "active" + } + } + }, + "aggregator.AggregatedEventFilters": { + "type": "object", + "properties": { + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "node": { + "description": "Node name filter", + "type": "string", + "example": "worker-1" + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "aggregator.AggregatedEventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 50 + }, + "events": { + "description": "List of aggregated events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/aggregator.AggregatedEventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 1250 + } + } + }, + "aggregator.AggregatedListResponse": { + "type": "object", + "properties": { + "events_by_node": { + "description": "Event count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "events_by_pid": { + "description": "Events grouped by PID", + "type": "object", + "additionalProperties": { + "type": "array", + "items": {} + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total number of events", + "type": "integer", + "example": 45 + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + }, + "total_pids": { + "description": "Number of unique PIDs across all nodes", + "type": "integer", + "example": 8 + } + } + }, + "aggregator.AggregatedSummaryResponse": { + "type": "object", + "properties": { + "command": { + "description": "Command name (if filtered)", + "type": "string", + "example": "curl" + }, + "count": { + "description": "Total count across all nodes", + "type": "integer", + "example": 15 + }, + "count_by_node": { + "description": "Count by node", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "duration_seconds": { + "description": "Duration in seconds", + "type": "integer", + "example": 60 + }, + "pid": { + "description": "Process ID (if filtered)", + "type": "integer", + "example": 1234 + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_nodes": { + "description": "Number of nodes with events", + "type": "integer", + "example": 3 + } + } + }, + "aggregator.AggregationStatsResponse": { + "type": "object", + "properties": { + "aggregation_start": { + "description": "When aggregation started", + "type": "string", + "example": "2023-01-01T10:00:00Z" + }, + "connected_agents": { + "description": "Number of connected agents", + "type": "integer", + "example": 5 + }, + "events_by_node": { + "description": "Events grouped by node", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "events_by_type": { + "description": "Events grouped by type", + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "int64" + } + }, + "last_event_time": { + "description": "Timestamp of last event", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_events": { + "description": "Total events stored", + "type": "integer", + "example": 12500 + } + } + }, + "aggregator.AggregatorProgramsResponse": { + "type": "object", + "properties": { + "all_programs": { + "description": "All programs across agents", + "type": "array", + "items": { + "$ref": "#/definitions/aggregator.ProgramInfo" + } + }, + "connected_agents": { + "description": "List of connected agents", + "type": "array", + "items": { + "$ref": "#/definitions/aggregator.AgentInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_agents": { + "description": "Total number of agents", + "type": "integer", + "example": 3 + }, + "total_programs": { + "description": "Total number of programs", + "type": "integer", + "example": 6 + } + } + }, + "aggregator.HealthCheck": { + "type": "object", + "properties": { + "component": { + "type": "string" + }, + "stats": { + "type": "object", + "additionalProperties": true + }, + "status": { + "type": "string" + }, + "uptime": { + "type": "string" + } + } + }, + "aggregator.IngestResponse": { + "type": "object", + "properties": { + "events_processed": { + "description": "Number of events processed", + "type": "integer", + "example": 25 + }, + "message": { + "description": "Status message", + "type": "string", + "example": "Events ingested successfully" + }, + "success": { + "description": "Ingestion success status", + "type": "boolean", + "example": true + }, + "timestamp": { + "description": "Processing timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + } + } + }, + "aggregator.ProgramInfo": { + "type": "object", + "properties": { + "event_count": { + "description": "Events generated by this program", + "type": "integer", + "example": 1250 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "node": { + "description": "Node where program is running", + "type": "string", + "example": "worker-1" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "active" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } + }, "internal_api.ConnectionListResponse": { "type": "object", "properties": { @@ -642,6 +1050,99 @@ } } }, + "internal_api.EventFilters": { + "type": "object", + "properties": { + "command": { + "description": "Command filter", + "type": "string", + "example": "curl" + }, + "limit": { + "description": "Limit filter", + "type": "integer", + "example": 100 + }, + "pid": { + "description": "Process ID filter", + "type": "integer", + "example": 1234 + }, + "since": { + "description": "Start time filter", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "type": { + "description": "Event type filter", + "type": "string", + "example": "connection" + }, + "until": { + "description": "End time filter", + "type": "string", + "example": "2023-01-01T13:00:00Z" + } + } + }, + "internal_api.EventsResponse": { + "type": "object", + "properties": { + "count": { + "description": "Number of events returned", + "type": "integer", + "example": 25 + }, + "events": { + "description": "List of events", + "type": "array", + "items": {} + }, + "filters": { + "description": "Applied filters", + "allOf": [ + { + "$ref": "#/definitions/internal_api.EventFilters" + } + ] + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of matching events", + "type": "integer", + "example": 150 + } + } + }, + "internal_api.HealthResponse": { + "type": "object", + "properties": { + "component": { + "description": "Component name", + "type": "string", + "example": "eBPF Monitor API" + }, + "status": { + "description": "Service status", + "type": "string", + "example": "healthy" + }, + "uptime": { + "description": "Service uptime", + "type": "string", + "example": "1h30m" + }, + "version": { + "description": "API version", + "type": "string", + "example": "1.0.0" + } + } + }, "internal_api.PacketDropListResponse": { "type": "object", "properties": { @@ -719,6 +1220,53 @@ "example": "2023-01-01T12:00:00Z" } } + }, + "internal_api.ProgramInfo": { + "type": "object", + "properties": { + "id": { + "description": "Program ID", + "type": "integer", + "example": 123 + }, + "name": { + "description": "Program name", + "type": "string", + "example": "connection_tracer" + }, + "status": { + "description": "Program status", + "type": "string", + "example": "loaded" + }, + "type": { + "description": "Program type", + "type": "string", + "example": "kprobe" + } + } + }, + "internal_api.ProgramsResponse": { + "type": "object", + "properties": { + "programs": { + "description": "List of eBPF programs", + "type": "array", + "items": { + "$ref": "#/definitions/internal_api.ProgramInfo" + } + }, + "query_time": { + "description": "Query timestamp", + "type": "string", + "example": "2023-01-01T12:00:00Z" + }, + "total_count": { + "description": "Total number of programs", + "type": "integer", + "example": 2 + } + } } } } \ No newline at end of file diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 2e84124..5bd2e03 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -1,5 +1,251 @@ basePath: / definitions: + aggregator.AgentInfo: + properties: + event_count: + description: Number of events from this agent + example: 2500 + type: integer + last_seen: + description: Last seen timestamp + example: "2023-01-01T12:00:00Z" + type: string + node_name: + description: Node name + example: worker-1 + type: string + programs: + description: Programs running on this agent + items: + $ref: '#/definitions/aggregator.ProgramInfo' + type: array + status: + description: Agent status + example: active + type: string + type: object + aggregator.AggregatedEventFilters: + properties: + limit: + description: Limit filter + example: 100 + type: integer + node: + description: Node name filter + example: worker-1 + type: string + since: + description: Start time filter + example: "2023-01-01T12:00:00Z" + type: string + type: + description: Event type filter + example: connection + type: string + until: + description: End time filter + example: "2023-01-01T13:00:00Z" + type: string + type: object + aggregator.AggregatedEventsResponse: + properties: + count: + description: Number of events returned + example: 50 + type: integer + events: + description: List of aggregated events + items: {} + type: array + filters: + allOf: + - $ref: '#/definitions/aggregator.AggregatedEventFilters' + description: Applied filters + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_count: + description: Total number of matching events + example: 1250 + type: integer + type: object + aggregator.AggregatedListResponse: + properties: + events_by_node: + additionalProperties: + type: integer + description: Event count by node + type: object + events_by_pid: + additionalProperties: + items: {} + type: array + description: Events grouped by PID + type: object + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_events: + description: Total number of events + example: 45 + type: integer + total_nodes: + description: Number of nodes with events + example: 3 + type: integer + total_pids: + description: Number of unique PIDs across all nodes + example: 8 + type: integer + type: object + aggregator.AggregatedSummaryResponse: + properties: + command: + description: Command name (if filtered) + example: curl + type: string + count: + description: Total count across all nodes + example: 15 + type: integer + count_by_node: + additionalProperties: + type: integer + description: Count by node + type: object + duration_seconds: + description: Duration in seconds + example: 60 + type: integer + pid: + description: Process ID (if filtered) + example: 1234 + type: integer + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_nodes: + description: Number of nodes with events + example: 3 + type: integer + type: object + aggregator.AggregationStatsResponse: + properties: + aggregation_start: + description: When aggregation started + example: "2023-01-01T10:00:00Z" + type: string + connected_agents: + description: Number of connected agents + example: 5 + type: integer + events_by_node: + additionalProperties: + format: int64 + type: integer + description: Events grouped by node + type: object + events_by_type: + additionalProperties: + format: int64 + type: integer + description: Events grouped by type + type: object + last_event_time: + description: Timestamp of last event + example: "2023-01-01T12:00:00Z" + type: string + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_events: + description: Total events stored + example: 12500 + type: integer + type: object + aggregator.AggregatorProgramsResponse: + properties: + all_programs: + description: All programs across agents + items: + $ref: '#/definitions/aggregator.ProgramInfo' + type: array + connected_agents: + description: List of connected agents + items: + $ref: '#/definitions/aggregator.AgentInfo' + type: array + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_agents: + description: Total number of agents + example: 3 + type: integer + total_programs: + description: Total number of programs + example: 6 + type: integer + type: object + aggregator.HealthCheck: + properties: + component: + type: string + stats: + additionalProperties: true + type: object + status: + type: string + uptime: + type: string + type: object + aggregator.IngestResponse: + properties: + events_processed: + description: Number of events processed + example: 25 + type: integer + message: + description: Status message + example: Events ingested successfully + type: string + success: + description: Ingestion success status + example: true + type: boolean + timestamp: + description: Processing timestamp + example: "2023-01-01T12:00:00Z" + type: string + type: object + aggregator.ProgramInfo: + properties: + event_count: + description: Events generated by this program + example: 1250 + type: integer + name: + description: Program name + example: connection_tracer + type: string + node: + description: Node where program is running + example: worker-1 + type: string + status: + description: Program status + example: active + type: string + type: + description: Program type + example: kprobe + type: string + type: object internal_api.ConnectionListResponse: properties: events_by_pid: @@ -59,6 +305,75 @@ definitions: example: "2023-01-01T12:00:00Z" type: string type: object + internal_api.EventFilters: + properties: + command: + description: Command filter + example: curl + type: string + limit: + description: Limit filter + example: 100 + type: integer + pid: + description: Process ID filter + example: 1234 + type: integer + since: + description: Start time filter + example: "2023-01-01T12:00:00Z" + type: string + type: + description: Event type filter + example: connection + type: string + until: + description: End time filter + example: "2023-01-01T13:00:00Z" + type: string + type: object + internal_api.EventsResponse: + properties: + count: + description: Number of events returned + example: 25 + type: integer + events: + description: List of events + items: {} + type: array + filters: + allOf: + - $ref: '#/definitions/internal_api.EventFilters' + description: Applied filters + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_count: + description: Total number of matching events + example: 150 + type: integer + type: object + internal_api.HealthResponse: + properties: + component: + description: Component name + example: eBPF Monitor API + type: string + status: + description: Service status + example: healthy + type: string + uptime: + description: Service uptime + example: 1h30m + type: string + version: + description: API version + example: 1.0.0 + type: string + type: object internal_api.PacketDropListResponse: properties: events_by_pid: @@ -118,6 +433,41 @@ definitions: example: "2023-01-01T12:00:00Z" type: string type: object + internal_api.ProgramInfo: + properties: + id: + description: Program ID + example: 123 + type: integer + name: + description: Program name + example: connection_tracer + type: string + status: + description: Program status + example: loaded + type: string + type: + description: Program type + example: kprobe + type: string + type: object + internal_api.ProgramsResponse: + properties: + programs: + description: List of eBPF programs + items: + $ref: '#/definitions/internal_api.ProgramInfo' + type: array + query_time: + description: Query timestamp + example: "2023-01-01T12:00:00Z" + type: string + total_count: + description: Total number of programs + example: 2 + type: integer + type: object host: localhost:8080 info: contact: @@ -138,15 +488,15 @@ paths: description: Get count of connection events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -189,15 +539,15 @@ paths: description: Get count of connection events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -270,8 +620,7 @@ paths: "200": description: Filtered events schema: - additionalProperties: true - type: object + $ref: '#/definitions/internal_api.EventsResponse' "500": description: Internal server error schema: @@ -287,6 +636,40 @@ paths: summary: Query events tags: - events + /api/events/ingest: + post: + consumes: + - application/json + description: Accept events from eBPF agents for aggregation and storage + parameters: + - description: Events to ingest + in: body + name: events + required: true + schema: + type: object + produces: + - application/json + responses: + "200": + description: Ingestion result + schema: + $ref: '#/definitions/aggregator.IngestResponse' + "400": + description: Bad request + schema: + type: string + "405": + description: Method not allowed + schema: + type: string + "500": + description: Internal server error + schema: + type: string + summary: Ingest events from agents + tags: + - events /api/list-connections: get: consumes: @@ -348,15 +731,15 @@ paths: description: Get count of packet drop events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -399,15 +782,15 @@ paths: description: Get count of packet drop events filtered by PID, command, and time window parameters: - - description: Process ID (GET only) + - description: Process ID in: query name: pid type: integer - - description: Command name (GET only) + - description: Command name in: query name: command type: string - - description: 'Duration in seconds (GET only, default: 60)' + - description: 'Duration in seconds (default: 60)' in: query name: duration_seconds type: integer @@ -455,8 +838,7 @@ paths: "200": description: List of eBPF programs schema: - additionalProperties: true - type: object + $ref: '#/definitions/internal_api.ProgramsResponse' "500": description: Internal server error schema: @@ -472,6 +854,26 @@ paths: summary: List eBPF programs tags: - programs + /api/stats: + get: + consumes: + - application/json + description: Retrieve statistics about event aggregation including counts by + type and node + produces: + - application/json + responses: + "200": + description: Aggregation statistics + schema: + $ref: '#/definitions/aggregator.AggregationStatsResponse' + "405": + description: Method not allowed + schema: + type: string + summary: Get aggregation statistics + tags: + - stats /health: get: consumes: @@ -483,8 +885,7 @@ paths: "200": description: Health status schema: - additionalProperties: true - type: object + $ref: '#/definitions/internal_api.HealthResponse' "503": description: Service unavailable schema: diff --git a/internal/aggregator/aggregator.go b/internal/aggregator/aggregator.go index 0f0611d..29ae8f0 100644 --- a/internal/aggregator/aggregator.go +++ b/internal/aggregator/aggregator.go @@ -26,18 +26,113 @@ import ( "github.com/srodi/ebpf-server/pkg/logger" ) +// Response types for aggregator API endpoints + +// AggregatedEventsResponse represents the response for querying aggregated events +type AggregatedEventsResponse struct { + Events []core.Event `json:"events"` // List of aggregated events + Count int `json:"count" example:"50"` // Number of events returned + TotalCount int `json:"total_count" example:"1250"` // Total number of matching events + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp + Filters AggregatedEventFilters `json:"filters"` // Applied filters +} + +// AggregatedEventFilters represents the filters applied to aggregated event queries +type AggregatedEventFilters struct { + Type string `json:"type,omitempty" example:"connection"` // Event type filter + Node string `json:"node,omitempty" example:"worker-1"` // Node name filter + Since string `json:"since,omitempty" example:"2023-01-01T12:00:00Z"` // Start time filter + Until string `json:"until,omitempty" example:"2023-01-01T13:00:00Z"` // End time filter + Limit int `json:"limit,omitempty" example:"100"` // Limit filter +} + +// IngestResponse represents the response for event ingestion +type IngestResponse struct { + EventsProcessed int `json:"events_processed" example:"25"` // Number of events processed + Success bool `json:"success" example:"true"` // Ingestion success status + Message string `json:"message" example:"Events ingested successfully"` // Status message + Timestamp string `json:"timestamp" example:"2023-01-01T12:00:00Z"` // Processing timestamp +} + +// AggregationStatsResponse represents the response for aggregation statistics +type AggregationStatsResponse struct { + TotalEvents int64 `json:"total_events" example:"12500"` // Total events stored + EventsByType map[string]int64 `json:"events_by_type"` // Events grouped by type + EventsByNode map[string]int64 `json:"events_by_node"` // Events grouped by node + ConnectedAgents int `json:"connected_agents" example:"5"` // Number of connected agents + LastEventTime string `json:"last_event_time" example:"2023-01-01T12:00:00Z"` // Timestamp of last event + AggregationStart string `json:"aggregation_start" example:"2023-01-01T10:00:00Z"` // When aggregation started + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp +} + +// AggregatorProgramsResponse represents the response for aggregator programs information +type AggregatorProgramsResponse struct { + ConnectedAgents []AgentInfo `json:"connected_agents"` // List of connected agents + AllPrograms []ProgramInfo `json:"all_programs"` // All programs across agents + TotalAgents int `json:"total_agents" example:"3"` // Total number of agents + TotalPrograms int `json:"total_programs" example:"6"` // Total number of programs + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp +} + +// AgentInfo represents information about a connected agent +type AgentInfo struct { + NodeName string `json:"node_name" example:"worker-1"` // Node name + LastSeen string `json:"last_seen" example:"2023-01-01T12:00:00Z"` // Last seen timestamp + EventCount int64 `json:"event_count" example:"2500"` // Number of events from this agent + Programs []ProgramInfo `json:"programs"` // Programs running on this agent + Status string `json:"status" example:"active"` // Agent status +} + +// ProgramInfo represents information about an eBPF program +type ProgramInfo struct { + Name string `json:"name" example:"connection_tracer"` // Program name + Type string `json:"type" example:"kprobe"` // Program type + Status string `json:"status" example:"active"` // Program status + Node string `json:"node" example:"worker-1"` // Node where program is running + EventCount int64 `json:"event_count" example:"1250"` // Events generated by this program +} + +// AggregatedListResponse represents the response for listing aggregated connection/packet drop events +type AggregatedListResponse struct { + TotalPIDs int `json:"total_pids" example:"8"` // Number of unique PIDs across all nodes + TotalEvents int `json:"total_events" example:"45"` // Total number of events + TotalNodes int `json:"total_nodes" example:"3"` // Number of nodes with events + EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID + EventsByNode map[string]int `json:"events_by_node"` // Event count by node + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp +} + +// AggregatedSummaryResponse represents the response for aggregated connection/packet drop summaries +type AggregatedSummaryResponse struct { + Count int `json:"count" example:"15"` // Total count across all nodes + CountByNode map[string]int `json:"count_by_node"` // Count by node + PID uint32 `json:"pid,omitempty" example:"1234"` // Process ID (if filtered) + Command string `json:"command,omitempty" example:"curl"` // Command name (if filtered) + DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds + TotalNodes int `json:"total_nodes" example:"3"` // Number of nodes with events + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp +} + // Config represents aggregator configuration. type Config struct { HTTPAddr string } +// ProgramCache caches program information to avoid expensive queries +type ProgramCache struct { + data *AggregatorProgramsResponse + lastCheck time.Time + mu sync.RWMutex +} + // Aggregator collects and aggregates events from multiple eBPF agents. type Aggregator struct { - config *Config - storage core.EventSink - stats *Stats - mu sync.RWMutex - running bool + config *Config + storage core.EventSink + stats *Stats + programCache *ProgramCache + mu sync.RWMutex + running bool } // Stats represents aggregation statistics. @@ -67,6 +162,7 @@ func New(config *Config) (*Aggregator, error) { EventsByNode: make(map[string]int64), StartTime: time.Now(), }, + programCache: &ProgramCache{}, }, nil } @@ -81,6 +177,10 @@ func (a *Aggregator) Start(ctx context.Context) error { logger.Info("Starting event aggregator") a.running = true + + // Start cleanup routine for memory storage + go a.cleanupRoutine(ctx) + return nil } @@ -116,7 +216,7 @@ func (a *Aggregator) IsRunning() bool { // @Param since query string false "Start time (RFC3339 format)" // @Param until query string false "End time (RFC3339 format)" // @Param limit query int false "Maximum number of events to return" -// @Success 200 {object} map[string]interface{} "Events and count" +// @Success 200 {object} AggregatedEventsResponse "Events and count" // @Failure 405 {string} string "Method not allowed" // @Failure 500 {string} string "Internal server error" // @Router /api/events [get] @@ -155,7 +255,7 @@ func (a *Aggregator) HandleEvents(w http.ResponseWriter, r *http.Request) { // @Accept json // @Produce json // @Param events body object true "Events to ingest" -// @Success 200 {object} map[string]interface{} "Ingestion result" +// @Success 200 {object} IngestResponse "Ingestion result" // @Failure 400 {string} string "Bad request" // @Failure 405 {string} string "Method not allowed" // @Failure 500 {string} string "Internal server error" @@ -189,6 +289,12 @@ func (a *Aggregator) HandleIngest(w http.ResponseWriter, r *http.Request) { // Update stats a.updateStats(int64(processed), requestData.Events) + // Invalidate program cache if we processed events successfully + // This ensures the cache reflects newly ingested data + if processed > 0 { + a.invalidateProgramCache() + } + // Return success response w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(map[string]interface{}{ @@ -207,7 +313,7 @@ func (a *Aggregator) HandleIngest(w http.ResponseWriter, r *http.Request) { // @Tags stats // @Accept json // @Produce json -// @Success 200 {object} map[string]interface{} "Aggregation statistics" +// @Success 200 {object} AggregationStatsResponse "Aggregation statistics" // @Failure 405 {string} string "Method not allowed" // @Router /api/stats [get] func (a *Aggregator) HandleStats(w http.ResponseWriter, r *http.Request) { @@ -226,12 +332,217 @@ func (a *Aggregator) HandleStats(w http.ResponseWriter, r *http.Request) { } a.stats.mu.RUnlock() + // Add current storage info for debugging + if memStorage, ok := a.storage.(*storage.MemoryStorage); ok { + // Get a rough count of current events (last hour) + query := core.Query{ + Since: time.Now().Add(-1 * time.Hour), + } + currentEvents, _ := memStorage.Count(context.Background(), query) + statsData["current_events_last_hour"] = currentEvents + + // Get total events in storage + allEvents, _ := memStorage.Count(context.Background(), core.Query{}) + statsData["total_events_in_storage"] = allEvents + } + w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(statsData); err != nil { logger.Errorf("Failed to encode stats: %v", err) } } +// HandlePrograms handles HTTP requests for program information. +// Since the aggregator doesn't run eBPF programs directly, it returns program status from connected agents. +// This endpoint uses caching to avoid expensive queries on each request. +// +// @Summary Get program information +// @Description Get information about eBPF programs running on connected agents +// @Tags programs +// @Accept json +// @Produce json +// @Success 200 {object} AggregatorProgramsResponse "Program information" +// @Failure 405 {string} string "Method not allowed" +// @Router /api/programs [get] +func (a *Aggregator) HandlePrograms(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + const cacheDuration = 2 * time.Minute // Cache for 2 minutes + + // Check if we have cached data that's still fresh + a.programCache.mu.RLock() + if a.programCache.data != nil && time.Since(a.programCache.lastCheck) < cacheDuration { + response := a.programCache.data + a.programCache.mu.RUnlock() + + logger.Debugf("Serving cached program information (age: %v)", time.Since(a.programCache.lastCheck)) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%d", int(cacheDuration.Seconds()))) + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Failed to encode cached programs response: %v", err) + } + return + } + a.programCache.mu.RUnlock() + + // Cache is stale or empty, refresh it + response, err := a.refreshProgramCache() + if err != nil { + logger.Errorf("Failed to refresh program cache: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + logger.Debugf("Serving fresh program information (%d agents, %d programs)", + response.TotalAgents, response.TotalPrograms) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%d", int(cacheDuration.Seconds()))) + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Failed to encode programs response: %v", err) + } +} + +// refreshProgramCache refreshes the program information cache by querying recent events +func (a *Aggregator) refreshProgramCache() (*AggregatorProgramsResponse, error) { + // Query recent events to infer connected agents and their programs + query := core.Query{ + Limit: 1000, // Get a good sample of recent events + Since: time.Now().Add(-10 * time.Minute), // Last 10 minutes + } + + events, err := a.storage.Query(context.Background(), query) + if err != nil { + return nil, fmt.Errorf("failed to query events for program info: %v", err) + } + + // Aggregate information about connected agents and their programs + agents := make(map[string]map[string]interface{}) // node_name -> agent info + eventTypes := make(map[string]bool) // unique event types (indicate programs) + + for _, event := range events { + metadata := event.Metadata() + + // Extract agent information + nodeName, hasNode := metadata["k8s_node_name"].(string) + podName, _ := metadata["k8s_pod_name"].(string) + namespace, _ := metadata["k8s_namespace"].(string) + + if hasNode && nodeName != "" { + if agents[nodeName] == nil { + agents[nodeName] = map[string]interface{}{ + "node_name": nodeName, + "pod_name": podName, + "namespace": namespace, + "event_types": make(map[string]bool), + "last_seen": event.Time(), + "event_count": 0, + } + } + + // Update agent info + agent := agents[nodeName] + eventTypesMap := agent["event_types"].(map[string]bool) + eventTypesMap[event.Type()] = true + agent["event_types"] = eventTypesMap + agent["event_count"] = agent["event_count"].(int) + 1 + + // Update last seen if this event is more recent + if event.Time().After(agent["last_seen"].(time.Time)) { + agent["last_seen"] = event.Time() + } + } + + // Track unique event types across all agents + eventTypes[event.Type()] = true + } + + // Convert agents map to slice and format programs + var connectedAgents []AgentInfo + var allPrograms []ProgramInfo + + for nodeName, agentInfo := range agents { + eventTypesMap := agentInfo["event_types"].(map[string]bool) + var programs []ProgramInfo + eventCount := int64(agentInfo["event_count"].(int)) + + for eventType := range eventTypesMap { + program := ProgramInfo{ + Name: eventType + "_tracer", + Type: eventType, + Status: "active", // Inferred from recent events + Node: nodeName, + EventCount: eventCount, + } + programs = append(programs, program) + allPrograms = append(allPrograms, program) + } + + agentData := AgentInfo{ + NodeName: nodeName, + LastSeen: agentInfo["last_seen"].(time.Time).Format(time.RFC3339), + EventCount: eventCount, + Programs: programs, + Status: "active", + } + connectedAgents = append(connectedAgents, agentData) + } + + response := &AggregatorProgramsResponse{ + ConnectedAgents: connectedAgents, + AllPrograms: allPrograms, + TotalAgents: len(connectedAgents), + TotalPrograms: len(allPrograms), + QueryTime: time.Now().Format(time.RFC3339), + } + + // Update cache + a.programCache.mu.Lock() + a.programCache.data = response + a.programCache.lastCheck = time.Now() + a.programCache.mu.Unlock() + + return response, nil +} + +// invalidateProgramCache invalidates the program information cache +func (a *Aggregator) invalidateProgramCache() { + a.programCache.mu.Lock() + a.programCache.data = nil + a.programCache.lastCheck = time.Time{} + a.programCache.mu.Unlock() +} + +// cleanupRoutine runs periodic cleanup of old events to prevent memory bloat +func (a *Aggregator) cleanupRoutine(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) // Cleanup every 5 minutes + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.Debug("Cleanup routine stopping due to context cancellation") + return + case <-ticker.C: + if !a.IsRunning() { + continue + } + + // Clean up events older than 2 hours + maxAge := 2 * time.Hour + + if memStorage, ok := a.storage.(*storage.MemoryStorage); ok { + logger.Debugf("Running cleanup: removing events older than %v", maxAge) + memStorage.Cleanup(maxAge) + logger.Debugf("Cleanup completed") + } + } + } +} + // ingestEvent processes a single event from an agent. func (a *Aggregator) ingestEvent(ctx context.Context, eventData json.RawMessage) error { // Parse event data into a generic event @@ -359,3 +670,359 @@ func (e *SimpleEvent) Metadata() map[string]interface{} { func (e *SimpleEvent) MarshalJSON() ([]byte, error) { return json.Marshal(e.data) } + +// QueryEvents retrieves events matching the criteria (for API compatibility). +func (a *Aggregator) QueryEvents(ctx context.Context, query core.Query) ([]core.Event, error) { + return a.storage.Query(ctx, query) +} + +// CountEvents returns the number of events matching the criteria (for API compatibility). +func (a *Aggregator) CountEvents(ctx context.Context, query core.Query) (int, error) { + return a.storage.Count(ctx, query) +} + +// GetPrograms returns program status (for API compatibility). +// The aggregator doesn't manage eBPF programs directly, so returns empty slice. +func (a *Aggregator) GetPrograms() []core.ProgramStatus { + return []core.ProgramStatus{} +} + +// HandleListConnections returns recent connection events from aggregated data. +// +// @Summary List connection events +// @Description Get recent connection events grouped by PID from aggregated data +// @Tags connections +// @Accept json +// @Produce json +// @Success 200 {object} AggregatedListResponse "Connection events" +// @Failure 500 {object} map[string]string "Internal server error" +// @Failure 503 {object} map[string]string "Service unavailable" +// @Router /api/list-connections [get] +func (a *Aggregator) HandleListConnections(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + query := core.Query{ + EventType: "connection", + Limit: 100, + Since: time.Now().Add(-1 * time.Hour), // Last hour by default + } + + events, err := a.storage.Query(r.Context(), query) + if err != nil { + logger.Errorf("Error querying connection events: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Group by PID for compatibility + eventsByPID := make(map[uint32][]core.Event) + eventsByNode := make(map[string]int) + nodeSet := make(map[string]bool) + + for _, event := range events { + pid := event.PID() + eventsByPID[pid] = append(eventsByPID[pid], event) + + // Extract node information from event metadata + if metadata := event.Metadata(); metadata != nil { + if nodeName, ok := metadata["k8s_node_name"].(string); ok && nodeName != "" { + eventsByNode[nodeName]++ + nodeSet[nodeName] = true + } + } + } + + response := AggregatedListResponse{ + TotalPIDs: len(eventsByPID), + TotalEvents: len(events), + TotalNodes: len(nodeSet), + EventsByPID: eventsByPID, + EventsByNode: eventsByNode, + QueryTime: time.Now().Format(time.RFC3339), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Error encoding list connections response: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } +} + +// HandleListPacketDrops returns recent packet drop events from aggregated data. +// +// @Summary List packet drop events +// @Description Get recent packet drop events grouped by PID from aggregated data +// @Tags packet_drops +// @Accept json +// @Produce json +// @Success 200 {object} AggregatedListResponse "Packet drop events" +// @Failure 500 {object} map[string]string "Internal server error" +// @Failure 503 {object} map[string]string "Service unavailable" +// @Router /api/list-packet-drops [get] +func (a *Aggregator) HandleListPacketDrops(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + query := core.Query{ + EventType: "packet_drop", + Limit: 100, + Since: time.Now().Add(-1 * time.Hour), // Last hour by default + } + + events, err := a.storage.Query(r.Context(), query) + if err != nil { + logger.Errorf("Error querying packet drop events: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Group by PID for compatibility + eventsByPID := make(map[uint32][]core.Event) + eventsByNode := make(map[string]int) + nodeSet := make(map[string]bool) + + for _, event := range events { + pid := event.PID() + eventsByPID[pid] = append(eventsByPID[pid], event) + + // Extract node information from event metadata + if metadata := event.Metadata(); metadata != nil { + if nodeName, ok := metadata["k8s_node_name"].(string); ok && nodeName != "" { + eventsByNode[nodeName]++ + nodeSet[nodeName] = true + } + } + } + + response := AggregatedListResponse{ + TotalPIDs: len(eventsByPID), + TotalEvents: len(events), + TotalNodes: len(nodeSet), + EventsByPID: eventsByPID, + EventsByNode: eventsByNode, + QueryTime: time.Now().Format(time.RFC3339), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Error encoding list packet drops response: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } +} + +// HandleConnectionSummary provides connection event summaries from aggregated data. +// +// @Summary Get connection statistics +// @Description Get count of connection events filtered by PID, command, and time window from aggregated data +// @Tags connections +// @Accept json +// @Produce json +// @Param pid query int false "Process ID" +// @Param command query string false "Command name" +// @Param duration_seconds query int false "Duration in seconds (default: 60)" +// @Param request body map[string]interface{} false "Connection summary request (POST only)" +// @Success 200 {object} AggregatedSummaryResponse "Connection statistics" +// @Failure 400 {object} map[string]string "Bad request" +// @Failure 500 {object} map[string]string "Internal server error" +// @Router /api/connection-summary [get] +// @Router /api/connection-summary [post] +func (a *Aggregator) HandleConnectionSummary(w http.ResponseWriter, r *http.Request) { + // Parse request body for POST requests + var request struct { + PID uint32 `json:"pid"` + Command string `json:"command"` + Duration int `json:"duration_seconds"` + } + + if r.Method == "POST" { + if err := json.NewDecoder(r.Body).Decode(&request); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + } else { + // Handle GET request with query parameters + if pidStr := r.URL.Query().Get("pid"); pidStr != "" { + if pid, err := strconv.ParseUint(pidStr, 10, 32); err == nil { + request.PID = uint32(pid) + } + } + request.Command = r.URL.Query().Get("command") + if durationStr := r.URL.Query().Get("duration_seconds"); durationStr != "" { + if duration, err := strconv.Atoi(durationStr); err == nil { + request.Duration = duration + } + } + } + + // Default duration to 60 seconds + if request.Duration == 0 { + request.Duration = 60 + } + + // Build query + query := core.Query{ + EventType: "connection", + PID: request.PID, + Command: request.Command, + Since: time.Now().Add(-time.Duration(request.Duration) * time.Second), + } + + count, err := a.storage.Count(r.Context(), query) + if err != nil { + logger.Errorf("Error counting connection events: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + logger.Debugf("🔍 CONNECTION SUMMARY: query=%+v count=%d", query, count) + + // Get events to analyze by node for detailed response + eventsQuery := query + eventsQuery.Limit = 1000 // Reasonable limit for analysis + events, err := a.storage.Query(r.Context(), eventsQuery) + if err != nil { + logger.Errorf("Error querying connection events for node analysis: %v", err) + // Fall back to basic response without node breakdown + events = nil + } + + // Analyze events by node + countByNode := make(map[string]int) + nodeSet := make(map[string]bool) + + for _, event := range events { + if metadata := event.Metadata(); metadata != nil { + if nodeName, ok := metadata["k8s_node_name"].(string); ok && nodeName != "" { + countByNode[nodeName]++ + nodeSet[nodeName] = true + } + } + } + + response := AggregatedSummaryResponse{ + Count: count, + CountByNode: countByNode, + PID: request.PID, + Command: request.Command, + DurationSeconds: request.Duration, + TotalNodes: len(nodeSet), + QueryTime: time.Now().Format(time.RFC3339), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Error encoding connection summary response: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } +} + +// HandlePacketDropSummary provides packet drop event summaries from aggregated data. +// +// @Summary Get packet drop statistics +// @Description Get count of packet drop events filtered by PID, command, and time window from aggregated data +// @Tags packet_drops +// @Accept json +// @Produce json +// @Param pid query int false "Process ID" +// @Param command query string false "Command name" +// @Param duration_seconds query int false "Duration in seconds (default: 60)" +// @Param request body map[string]interface{} false "Packet drop summary request (POST only)" +// @Success 200 {object} AggregatedSummaryResponse "Packet drop statistics" +// @Failure 400 {object} map[string]string "Bad request" +// @Failure 500 {object} map[string]string "Internal server error" +// @Router /api/packet-drop-summary [get] +// @Router /api/packet-drop-summary [post] +func (a *Aggregator) HandlePacketDropSummary(w http.ResponseWriter, r *http.Request) { + // Parse request body for POST requests + var request struct { + PID uint32 `json:"pid"` + Command string `json:"command"` + Duration int `json:"duration_seconds"` + } + + if r.Method == "POST" { + if err := json.NewDecoder(r.Body).Decode(&request); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + } else { + // Handle GET request with query parameters + if pidStr := r.URL.Query().Get("pid"); pidStr != "" { + if pid, err := strconv.ParseUint(pidStr, 10, 32); err == nil { + request.PID = uint32(pid) + } + } + request.Command = r.URL.Query().Get("command") + if durationStr := r.URL.Query().Get("duration_seconds"); durationStr != "" { + if duration, err := strconv.Atoi(durationStr); err == nil { + request.Duration = duration + } + } + } + + // Default duration to 60 seconds + if request.Duration == 0 { + request.Duration = 60 + } + + // Build query + query := core.Query{ + EventType: "packet_drop", + PID: request.PID, + Command: request.Command, + Since: time.Now().Add(-time.Duration(request.Duration) * time.Second), + } + + count, err := a.storage.Count(r.Context(), query) + if err != nil { + logger.Errorf("Error counting packet drop events: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + logger.Debugf("🔍 PACKET DROP SUMMARY: query=%+v count=%d", query, count) + + // Get events to analyze by node for detailed response + eventsQuery := query + eventsQuery.Limit = 1000 // Reasonable limit for analysis + events, err := a.storage.Query(r.Context(), eventsQuery) + if err != nil { + logger.Errorf("Error querying packet drop events for node analysis: %v", err) + // Fall back to basic response without node breakdown + events = nil + } + + // Analyze events by node + countByNode := make(map[string]int) + nodeSet := make(map[string]bool) + + for _, event := range events { + if metadata := event.Metadata(); metadata != nil { + if nodeName, ok := metadata["k8s_node_name"].(string); ok && nodeName != "" { + countByNode[nodeName]++ + nodeSet[nodeName] = true + } + } + } + + response := AggregatedSummaryResponse{ + Count: count, + CountByNode: countByNode, + PID: request.PID, + Command: request.Command, + DurationSeconds: request.Duration, + TotalNodes: len(nodeSet), + QueryTime: time.Now().Format(time.RFC3339), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Error encoding packet drop summary response: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } +} diff --git a/internal/aggregator/health.go b/internal/aggregator/health.go index 7541b37..c23dce6 100644 --- a/internal/aggregator/health.go +++ b/internal/aggregator/health.go @@ -10,9 +10,9 @@ import ( // HealthCheck represents the aggregator health status. type HealthCheck struct { - Status string `json:"status"` - Component string `json:"component"` - Uptime string `json:"uptime"` + Status string `json:"status"` + Component string `json:"component"` + Uptime string `json:"uptime"` Stats map[string]interface{} `json:"stats"` } diff --git a/internal/api/handlers.go b/internal/api/handlers.go index f6c3029..3748de1 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -14,6 +14,7 @@ package api import ( "context" + "crypto/sha256" "encoding/json" "net/http" "strconv" @@ -39,8 +40,8 @@ func Initialize(sys *system.System) { // @Tags health // @Accept json // @Produce json -// @Success 200 {object} map[string]interface{} "Health status" -// @Failure 503 {object} map[string]string "Service unavailable" +// @Success 200 {object} HealthResponse "Health status" +// @Failure 503 {object} map[string]string "Service unavailable" // @Router /health [get] func HandleHealth(w http.ResponseWriter, r *http.Request) { if globalSystem == nil { @@ -48,30 +49,33 @@ func HandleHealth(w http.ResponseWriter, r *http.Request) { return } - health := map[string]interface{}{ - "status": "healthy", - "running": globalSystem.IsRunning(), - "time": time.Now().Format(time.RFC3339), + w.Header().Set("Content-Type", "application/json") + + health := HealthResponse{ + Status: "healthy", + Component: "eBPF Monitor API", + Uptime: "active", // Since we don't have access to start time, use a generic status + Version: "1.0.0", } - w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) if err := json.NewEncoder(w).Encode(health); err != nil { logger.Errorf("Error encoding health response: %v", err) http.Error(w, "Internal server error", http.StatusInternalServerError) + return } } // HandlePrograms returns the status of all eBPF programs. -// -// @Summary List eBPF programs -// @Description Get the status and information of all loaded eBPF programs -// @Tags programs -// @Accept json -// @Produce json -// @Success 200 {object} map[string]interface{} "List of eBPF programs" -// @Failure 500 {object} map[string]string "Internal server error" -// @Failure 503 {object} map[string]string "Service unavailable" -// @Router /api/programs [get] +// @Summary List eBPF programs +// @Description Get the status and information of all loaded eBPF programs +// @Tags programs +// @Accept json +// @Produce json +// @Success 200 {object} ProgramsResponse "List of eBPF programs" +// @Failure 500 {object} map[string]string "Internal server error" +// @Failure 503 {object} map[string]string "Service unavailable" +// @Router /api/programs [get] func HandlePrograms(w http.ResponseWriter, r *http.Request) { if globalSystem == nil { http.Error(w, "System not initialized", http.StatusServiceUnavailable) @@ -80,8 +84,39 @@ func HandlePrograms(w http.ResponseWriter, r *http.Request) { programs := globalSystem.GetPrograms() + // Convert to structured response + var programList []ProgramInfo + for _, prog := range programs { + var status string + if prog.Loaded && prog.Attached { + status = "active" + } else if prog.Loaded { + status = "loaded" + } else { + status = "inactive" + } + + // Use a hash of the program name as a unique ID + hash := sha256.Sum256([]byte(prog.Name)) + // Use first 4 bytes of hash as ID for better uniqueness (2^32 possible values) + id := int(hash[0])<<24 | int(hash[1])<<16 | int(hash[2])<<8 | int(hash[3]) + programInfo := ProgramInfo{ + Name: prog.Name, + Type: "eBPF", // Generic type, could be enhanced + Status: status, + ID: id, + } + programList = append(programList, programInfo) + } + + response := ProgramsResponse{ + Programs: programList, + TotalCount: len(programList), + QueryTime: time.Now().Format(time.RFC3339), + } + w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(programs); err != nil { + if err := json.NewEncoder(w).Encode(response); err != nil { logger.Errorf("Error encoding programs response: %v", err) http.Error(w, "Internal server error", http.StatusInternalServerError) } @@ -100,7 +135,7 @@ func HandlePrograms(w http.ResponseWriter, r *http.Request) { // @Param since query string false "Start time (RFC3339 format)" // @Param until query string false "End time (RFC3339 format)" // @Param limit query int false "Maximum number of events to return (default: 100)" -// @Success 200 {object} map[string]interface{} "Filtered events" +// @Success 200 {object} EventsResponse "Filtered events" // @Failure 500 {object} map[string]string "Internal server error" // @Failure 503 {object} map[string]string "Service unavailable" // @Router /api/events [get] @@ -158,10 +193,34 @@ func HandleEvents(w http.ResponseWriter, r *http.Request) { return } - response := map[string]interface{}{ - "events": events, - "count": len(events), - "query": query, + // Get total count for the same query without limit + totalQuery := query + totalQuery.Limit = 0 + totalCount, err := globalSystem.CountEvents(ctx, totalQuery) + if err != nil { + totalCount = len(events) // Fallback to returned count + } + + // Build filters struct for response + filters := EventFilters{ + Type: query.EventType, + PID: query.PID, + Command: query.Command, + Limit: query.Limit, + } + if !query.Since.IsZero() { + filters.Since = query.Since.Format(time.RFC3339) + } + if !query.Until.IsZero() { + filters.Until = query.Until.Format(time.RFC3339) + } + + response := EventsResponse{ + Events: events, + Count: len(events), + TotalCount: totalCount, + QueryTime: time.Now().Format(time.RFC3339), + Filters: filters, } w.Header().Set("Content-Type", "application/json") @@ -178,9 +237,9 @@ func HandleEvents(w http.ResponseWriter, r *http.Request) { // @Tags connections // @Accept json // @Produce json -// @Param pid query int false "Process ID (GET only)" -// @Param command query string false "Command name (GET only)" -// @Param duration_seconds query int false "Duration in seconds (GET only, default: 60)" +// @Param pid query int false "Process ID" +// @Param command query string false "Command name" +// @Param duration_seconds query int false "Duration in seconds (default: 60)" // @Param request body ConnectionSummaryRequest false "Connection summary request (POST only)" // @Success 200 {object} ConnectionSummaryResponse "Connection statistics" // @Failure 400 {object} map[string]string "Bad request" @@ -264,9 +323,9 @@ func HandleConnectionSummary(w http.ResponseWriter, r *http.Request) { // @Tags packet_drops // @Accept json // @Produce json -// @Param pid query int false "Process ID (GET only)" -// @Param command query string false "Command name (GET only)" -// @Param duration_seconds query int false "Duration in seconds (GET only, default: 60)" +// @Param pid query int false "Process ID" +// @Param command query string false "Command name" +// @Param duration_seconds query int false "Duration in seconds (default: 60)" // @Param request body PacketDropSummaryRequest false "Packet drop summary request (POST only)" // @Success 200 {object} PacketDropSummaryResponse "Packet drop statistics" // @Failure 400 {object} map[string]string "Bad request" @@ -504,3 +563,45 @@ type PacketDropListResponse struct { EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp } + +// HealthResponse represents the response for health check +type HealthResponse struct { + Status string `json:"status" example:"healthy"` // Service status + Component string `json:"component" example:"eBPF Monitor API"` // Component name + Uptime string `json:"uptime" example:"1h30m"` // Service uptime + Version string `json:"version" example:"1.0.0"` // API version +} + +// ProgramsResponse represents the response for listing eBPF programs +type ProgramsResponse struct { + Programs []ProgramInfo `json:"programs"` // List of eBPF programs + TotalCount int `json:"total_count" example:"2"` // Total number of programs + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp +} + +// ProgramInfo represents information about an eBPF program +type ProgramInfo struct { + Name string `json:"name" example:"connection_tracer"` // Program name + Type string `json:"type" example:"kprobe"` // Program type + Status string `json:"status" example:"loaded"` // Program status + ID int `json:"id" example:"123"` // Program ID +} + +// EventsResponse represents the response for querying events +type EventsResponse struct { + Events []core.Event `json:"events"` // List of events + Count int `json:"count" example:"25"` // Number of events returned + TotalCount int `json:"total_count" example:"150"` // Total number of matching events + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp + Filters EventFilters `json:"filters"` // Applied filters +} + +// EventFilters represents the filters applied to the event query +type EventFilters struct { + Type string `json:"type,omitempty" example:"connection"` // Event type filter + PID uint32 `json:"pid,omitempty" example:"1234"` // Process ID filter + Command string `json:"command,omitempty" example:"curl"` // Command filter + Since string `json:"since,omitempty" example:"2023-01-01T12:00:00Z"` // Start time filter + Until string `json:"until,omitempty" example:"2023-01-01T13:00:00Z"` // End time filter + Limit int `json:"limit,omitempty" example:"100"` // Limit filter +} diff --git a/internal/events/events.go b/internal/events/events.go index 68d0896..ebb07c8 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -23,7 +23,7 @@ var ( systemBootTime time.Time bootTimeCalculated bool bootTimeMutex sync.Mutex - + // Global Kubernetes metadata provider with proper synchronization k8sProvider *kubernetes.Provider k8sMutex sync.RWMutex // Protects both k8sProvider and initialization @@ -123,17 +123,17 @@ func getKubernetesProvider() *kubernetes.Provider { return provider } k8sMutex.RUnlock() - + // Slow path: need to initialize, acquire write lock k8sMutex.Lock() defer k8sMutex.Unlock() - + // Double-check after acquiring write lock if !k8sInit { k8sProvider = kubernetes.NewProvider() k8sInit = true } - + return k8sProvider } @@ -142,7 +142,7 @@ func getKubernetesProvider() *kubernetes.Provider { func resetKubernetesProvider() { k8sMutex.Lock() defer k8sMutex.Unlock() - + k8sProvider = nil k8sInit = false } diff --git a/internal/events/kubernetes_integration_test.go b/internal/events/kubernetes_integration_test.go index b190cd7..7260311 100644 --- a/internal/events/kubernetes_integration_test.go +++ b/internal/events/kubernetes_integration_test.go @@ -29,7 +29,7 @@ func TestKubernetesMetadataIntegration(t *testing.T) { os.Unsetenv("NODE_NAME") os.Unsetenv("POD_NAME") os.Unsetenv("POD_NAMESPACE") - + // Reset provider using the safe method resetKubernetesProvider() @@ -63,7 +63,7 @@ func TestKubernetesMetadataIntegration(t *testing.T) { os.Setenv("NODE_NAME", "test-node-1") os.Setenv("POD_NAME", "ebpf-monitor-abcde") os.Setenv("POD_NAMESPACE", "ebpf-system") - + // Reset provider to pick up new env vars using the safe method resetKubernetesProvider() diff --git a/internal/kubernetes/metadata.go b/internal/kubernetes/metadata.go index e6b5af2..531be23 100644 --- a/internal/kubernetes/metadata.go +++ b/internal/kubernetes/metadata.go @@ -25,7 +25,7 @@ func NewProvider() *Provider { p := &Provider{ enabled: isKubernetesEnvironment(), } - + if p.enabled { p.metadata = &Metadata{ NodeName: os.Getenv("NODE_NAME"), @@ -33,7 +33,7 @@ func NewProvider() *Provider { Namespace: os.Getenv("POD_NAMESPACE"), } } - + return p } @@ -48,11 +48,11 @@ func (p *Provider) IsEnabled() bool { func (p *Provider) GetMetadata() *Metadata { p.mu.RLock() defer p.mu.RUnlock() - + if !p.enabled || p.metadata == nil { return nil } - + // Return a copy to avoid race conditions return &Metadata{ NodeName: p.metadata.NodeName, @@ -66,12 +66,12 @@ func (p *Provider) AddToMap(data map[string]interface{}) { if !p.IsEnabled() { return } - + metadata := p.GetMetadata() if metadata == nil { return } - + if metadata.NodeName != "" { data["k8s_node_name"] = metadata.NodeName } @@ -89,16 +89,16 @@ func isKubernetesEnvironment() bool { if os.Getenv("KUBERNETES_SERVICE_HOST") != "" { return true } - + // Check deployment mode environment variable if os.Getenv("DEPLOYMENT_MODE") == "kubernetes" { return true } - + // Check if we can find Kubernetes service account token if _, err := os.Stat("/var/run/secrets/kubernetes.io/serviceaccount/token"); err == nil { return true } - + return false } diff --git a/internal/kubernetes/metadata_test.go b/internal/kubernetes/metadata_test.go index 31b10f6..31dcabd 100644 --- a/internal/kubernetes/metadata_test.go +++ b/internal/kubernetes/metadata_test.go @@ -89,7 +89,7 @@ func TestKubernetesProvider(t *testing.T) { provider := NewProvider() data := make(map[string]interface{}) - + provider.AddToMap(data) if data["k8s_node_name"] != "test-node" { diff --git a/internal/storage/forwarding.go b/internal/storage/forwarding.go index 1581916..a1a0231 100644 --- a/internal/storage/forwarding.go +++ b/internal/storage/forwarding.go @@ -10,8 +10,8 @@ import ( // ForwardingStorage wraps another storage and forwards events to an aggregator. type ForwardingStorage struct { - primary core.EventSink - aggregatorClient *client.AggregatorClient + primary core.EventSink + aggregatorClient *client.AggregatorClient } // NewForwardingStorage creates a new forwarding storage. diff --git a/internal/system/system.go b/internal/system/system.go index 2673fc6..8c4ab19 100644 --- a/internal/system/system.go +++ b/internal/system/system.go @@ -16,9 +16,9 @@ import ( // System is the main orchestrator for the eBPF monitoring system. type System struct { - manager core.Manager - storage core.EventSink - aggregatorClient *client.AggregatorClient + manager core.Manager + storage core.EventSink + aggregatorClient *client.AggregatorClient } // NewSystem creates a new eBPF monitoring system. @@ -26,7 +26,7 @@ func NewSystem() *System { manager := programs.NewManager() memStorage := storage.NewMemoryStorage() aggregatorClient := client.NewAggregatorClient() - + // Wrap storage with forwarding to aggregator forwardingStorage := storage.NewForwardingStorage(memStorage, aggregatorClient) diff --git a/kubernetes/aggregator-deployment.yaml b/kubernetes/aggregator-deployment.yaml index 99add8e..9278f42 100644 --- a/kubernetes/aggregator-deployment.yaml +++ b/kubernetes/aggregator-deployment.yaml @@ -30,7 +30,7 @@ spec: - name: DEPLOYMENT_MODE value: "kubernetes" - name: LOG_LEVEL - value: "info" + value: "debug" resources: requests: memory: "256Mi"