From ed0312301dd31023553082701d957e53be9ea71a Mon Sep 17 00:00:00 2001 From: Simone Rodigari Date: Wed, 6 Aug 2025 21:23:18 +0100 Subject: [PATCH 1/3] data flow architecture, add debug, fmt --- README.md | 59 ++++++++++ cmd/server/main_test.go | 8 +- internal/api/handlers.go | 62 +++++----- internal/api/handlers_test.go | 108 +++++++++--------- internal/core/types.go | 62 +++++----- internal/core/types_test.go | 51 +++++---- internal/events/events.go | 16 +-- internal/events/events_test.go | 26 ++--- internal/programs/base.go | 73 ++++++------ internal/programs/connection/connection.go | 56 ++++----- .../programs/connection/connection_test.go | 50 ++++---- internal/programs/manager.go | 56 ++++----- internal/programs/packet_drop/packet_drop.go | 40 +++---- .../programs/packet_drop/packet_drop_test.go | 36 +++--- internal/storage/memory.go | 58 ++++++---- internal/storage/memory_test.go | 2 +- internal/system/system.go | 22 ++-- pkg/logger/logger_test.go | 30 ++--- 18 files changed, 445 insertions(+), 370 deletions(-) diff --git a/README.md b/README.md index a174e85..bbc1936 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,65 @@ curl "http://localhost:8080/api/programs" - **API Layer**: HTTP endpoints for querying events and program status - **System Layer**: Top-level coordination and initialization +## Event Flow Architecture + +The system processes events through a real-time streaming pipeline that ensures low latency and high throughput: + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ eBPF Program │ │ Ring Buffer │ │ Event Parser │ │ Event Stream │ +│ │ │ │ │ │ │ │ +│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ +│ │ sys_connect │ │───▶│ │ events │ │───▶│ │ Connection │ │───▶│ │ Channel │ │ +│ │ tracepoint │ │ │ │ (16MB) │ │ │ │ Parser │ │ │ │ (buffered) │ │ +│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │ +│ │ │ │ │ │ │ │ +│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ │ +│ │ kfree_skb │ │───▶│ │drop_events │ │───▶│ │ PacketDrop │ │───▶│ │ +│ │ tracepoint │ │ │ │ (256KB) │ │ │ │ Parser │ │ │ │ +│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │ │ │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘ + │ │ │ │ + Kernel Space Ring Buffer Go Application Event Stream + (eBPF Programs) (Temporary) (Event Parsing) (Buffered) + │ │ + ▼ ▼ + ┌─────────────────────┐ ┌─────────────────────┐ + │ Always Empty │ │ ┌─────────────────┐ │ + │ │ │ │ Memory Storage │ │ + │ Events consumed │ │ │ │ │ + │ immediately by │ │ │ • Query Events │ │ + │ userspace readers │ │ │ • Time Filters │ │ + └─────────────────────┘ │ │ • PID Grouping │ │ + │ └─────────────────┘ │ + │ │ │ + │ ▼ │ + │ ┌─────────────────┐ │ + │ │ HTTP API │ │ + │ │ │ │ + │ │ /api/list- │ │ + │ │ connections │ │ + │ │ │ │ + │ │ /api/list- │ │ + │ │ packet-drops │ │ + │ └─────────────────┘ │ + └─────────────────────┘ + + +``` + +### Ring buffers + +Ring buffers in eBPF are designed for real-time streaming: + +1. **eBPF programs** write events to ring buffers using `bpf_ringbuf_reserve()` and `bpf_ringbuf_submit()` +2. **Userspace readers** immediately consume events using `ringbuf.NewReader()` +3. **Events are parsed** and sent to Go event streams +4. **Ring buffers become empty** as events are consumed in real-time +5. **Events are stored** in memory for API queries + +Events flow through the pipeline without accumulating in kernel space. + ## Extending the System 📚 **[Complete Development Guide](docs/program-development.md)** - Detailed guide for creating new eBPF monitoring programs diff --git a/cmd/server/main_test.go b/cmd/server/main_test.go index e3570f9..385b25f 100644 --- a/cmd/server/main_test.go +++ b/cmd/server/main_test.go @@ -16,10 +16,10 @@ import ( func setupTestSystem() *system.System { // Create a test system testSystem := system.NewSystem() - + // Initialize the API with the test system api.Initialize(testSystem) - + return testSystem } @@ -78,7 +78,7 @@ func TestHTTPServerSetup(t *testing.T) { func TestHTTPHealthEndpoint(t *testing.T) { // Setup test system _ = setupTestSystem() - + mux := http.NewServeMux() mux.HandleFunc("/health", api.HandleHealth) @@ -131,7 +131,7 @@ func TestHTTPAPIEndpoints(t *testing.T) { func TestHTTPConnectionSummaryValidation(t *testing.T) { // Setup test system _ = setupTestSystem() - + mux := http.NewServeMux() mux.HandleFunc("/api/connection-summary", api.HandleConnectionSummary) diff --git a/internal/api/handlers.go b/internal/api/handlers.go index 98395d4..f6c3029 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -112,39 +112,39 @@ func HandleEvents(w http.ResponseWriter, r *http.Request) { // Parse query parameters query := core.Query{} - + if eventType := r.URL.Query().Get("type"); eventType != "" { query.EventType = eventType } - + if pidStr := r.URL.Query().Get("pid"); pidStr != "" { if pid, err := strconv.ParseUint(pidStr, 10, 32); err == nil { query.PID = uint32(pid) } } - + if command := r.URL.Query().Get("command"); command != "" { query.Command = command } - + if sinceStr := r.URL.Query().Get("since"); sinceStr != "" { if since, err := time.Parse(time.RFC3339, sinceStr); err == nil { query.Since = since } } - + if untilStr := r.URL.Query().Get("until"); untilStr != "" { if until, err := time.Parse(time.RFC3339, untilStr); err == nil { query.Until = until } } - + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 { query.Limit = limit } } - + // Default limit to prevent overwhelming responses if query.Limit == 0 { query.Limit = 100 @@ -356,7 +356,7 @@ func HandlePacketDropSummary(w http.ResponseWriter, r *http.Request) { // @Router /api/list-connections [get] func HandleListConnections(w http.ResponseWriter, r *http.Request) { logger.Debugf("🌐 HTTP REQUEST: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) - + if globalSystem == nil { http.Error(w, "System not initialized", http.StatusServiceUnavailable) return @@ -412,7 +412,7 @@ func HandleListConnections(w http.ResponseWriter, r *http.Request) { // @Router /api/list-packet-drops [get] func HandleListPacketDrops(w http.ResponseWriter, r *http.Request) { logger.Debugf("🌐 HTTP REQUEST: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) - + if globalSystem == nil { http.Error(w, "System not initialized", http.StatusServiceUnavailable) return @@ -459,48 +459,48 @@ func HandleListPacketDrops(w http.ResponseWriter, r *http.Request) { // ConnectionSummaryRequest represents the request body for connection summary type ConnectionSummaryRequest struct { - PID uint32 `json:"pid" example:"1234"` // Process ID - Command string `json:"command" example:"curl"` // Command name - Duration int `json:"duration_seconds" example:"60"` // Duration in seconds + PID uint32 `json:"pid" example:"1234"` // Process ID + Command string `json:"command" example:"curl"` // Command name + Duration int `json:"duration_seconds" example:"60"` // Duration in seconds } // ConnectionSummaryResponse represents the response for connection summary type ConnectionSummaryResponse struct { - Count int `json:"count" example:"5"` // Number of connection events - PID uint32 `json:"pid" example:"1234"` // Process ID - Command string `json:"command" example:"curl"` // Command name - DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds + Count int `json:"count" example:"5"` // Number of connection events + PID uint32 `json:"pid" example:"1234"` // Process ID + Command string `json:"command" example:"curl"` // Command name + DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp } // PacketDropSummaryRequest represents the request body for packet drop summary type PacketDropSummaryRequest struct { - PID uint32 `json:"pid" example:"1234"` // Process ID - Command string `json:"command" example:"nginx"` // Command name - Duration int `json:"duration_seconds" example:"60"` // Duration in seconds + PID uint32 `json:"pid" example:"1234"` // Process ID + Command string `json:"command" example:"nginx"` // Command name + Duration int `json:"duration_seconds" example:"60"` // Duration in seconds } // PacketDropSummaryResponse represents the response for packet drop summary type PacketDropSummaryResponse struct { - Count int `json:"count" example:"3"` // Number of packet drop events - PID uint32 `json:"pid" example:"1234"` // Process ID - Command string `json:"command" example:"nginx"` // Command name - DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds + Count int `json:"count" example:"3"` // Number of packet drop events + PID uint32 `json:"pid" example:"1234"` // Process ID + Command string `json:"command" example:"nginx"` // Command name + DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp } // ConnectionListResponse represents the response for listing connections type ConnectionListResponse struct { - TotalPIDs int `json:"total_pids" example:"3"` // Number of unique PIDs - TotalEvents int `json:"total_events" example:"10"` // Total number of events - EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID - QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp + TotalPIDs int `json:"total_pids" example:"3"` // Number of unique PIDs + TotalEvents int `json:"total_events" example:"10"` // Total number of events + EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp } // PacketDropListResponse represents the response for listing packet drops type PacketDropListResponse struct { - TotalPIDs int `json:"total_pids" example:"2"` // Number of unique PIDs - TotalEvents int `json:"total_events" example:"7"` // Total number of events - EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID - QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp + TotalPIDs int `json:"total_pids" example:"2"` // Number of unique PIDs + TotalEvents int `json:"total_events" example:"7"` // Total number of events + EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID + QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp } diff --git a/internal/api/handlers_test.go b/internal/api/handlers_test.go index df661cb..0a14547 100644 --- a/internal/api/handlers_test.go +++ b/internal/api/handlers_test.go @@ -14,16 +14,16 @@ func TestHandleHealthWithoutSystem(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("GET", "/health", nil) w := httptest.NewRecorder() - + HandleHealth(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } - + body := w.Body.String() if body != "System not initialized\n" { t.Errorf("expected error message 'System not initialized', got %s", body) @@ -35,28 +35,28 @@ func TestHandleProgramsWithoutSystem(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("GET", "/api/programs", nil) w := httptest.NewRecorder() - + HandlePrograms(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } } -// TestHandleEventsWithoutSystem tests the events endpoint when system is not initialized +// TestHandleEventsWithoutSystem tests the events endpoint when system is not initialized func TestHandleEventsWithoutSystem(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("GET", "/api/events", nil) w := httptest.NewRecorder() - + HandleEvents(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } @@ -67,12 +67,12 @@ func TestHandleConnectionSummaryWithoutSystem(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("GET", "/api/connection-summary", nil) w := httptest.NewRecorder() - + HandleConnectionSummary(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } @@ -83,12 +83,12 @@ func TestHandlePacketDropSummaryWithoutSystem(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("GET", "/api/packet-drop-summary", nil) w := httptest.NewRecorder() - + HandlePacketDropSummary(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } @@ -99,12 +99,12 @@ func TestHandleListConnectionsWithoutSystem(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("GET", "/api/list-connections", nil) w := httptest.NewRecorder() - + HandleListConnections(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } @@ -115,12 +115,12 @@ func TestHandleListPacketDropsWithoutSystem(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("GET", "/api/list-packet-drops", nil) w := httptest.NewRecorder() - + HandleListPacketDrops(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } @@ -131,13 +131,13 @@ func TestHandleConnectionSummaryPOSTInvalidJSON(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("POST", "/api/connection-summary", bytes.NewReader([]byte("invalid json"))) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() - + HandleConnectionSummary(w, req) - + // Should fail due to no system first, but if we had a system it would fail due to invalid JSON if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) @@ -147,15 +147,15 @@ func TestHandleConnectionSummaryPOSTInvalidJSON(t *testing.T) { // TestHandlePacketDropSummaryPOSTInvalidJSON tests POST with invalid JSON func TestHandlePacketDropSummaryPOSTInvalidJSON(t *testing.T) { originalSystem := globalSystem - globalSystem = nil + globalSystem = nil defer func() { globalSystem = originalSystem }() - + req := httptest.NewRequest("POST", "/api/packet-drop-summary", bytes.NewReader([]byte("invalid json"))) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() - + HandlePacketDropSummary(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } @@ -166,7 +166,7 @@ func TestHandleEventsQueryParameterParsing(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + // Test that URL query parameters are parsed correctly, even without a system // The handler should parse the parameters before checking for system availability testCases := []struct { @@ -181,14 +181,14 @@ func TestHandleEventsQueryParameterParsing(t *testing.T) { {"until parameter", "?until=2023-01-01T13:00:00Z"}, {"multiple parameters", "?type=connection&pid=1234&limit=10"}, } - + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { req := httptest.NewRequest("GET", "/api/events"+tc.params, nil) w := httptest.NewRecorder() - + HandleEvents(w, req) - + // Should fail due to no system, but the URL parsing should work if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) @@ -202,7 +202,7 @@ func TestHandleConnectionSummaryGETParameterParsing(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + testCases := []struct { name string params string @@ -212,14 +212,14 @@ func TestHandleConnectionSummaryGETParameterParsing(t *testing.T) { {"duration parameter", "?duration_seconds=30"}, {"all parameters", "?pid=1234&command=curl&duration_seconds=120"}, } - + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { req := httptest.NewRequest("GET", "/api/connection-summary"+tc.params, nil) w := httptest.NewRecorder() - + HandleConnectionSummary(w, req) - + if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) } @@ -232,24 +232,24 @@ func TestHandleConnectionSummaryPOSTValidJSON(t *testing.T) { originalSystem := globalSystem globalSystem = nil defer func() { globalSystem = originalSystem }() - + requestBody := map[string]interface{}{ "pid": 1234, "command": "curl", "duration_seconds": 60, } - + jsonBody, err := json.Marshal(requestBody) if err != nil { t.Fatalf("failed to marshal JSON: %v", err) } - + req := httptest.NewRequest("POST", "/api/connection-summary", bytes.NewReader(jsonBody)) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() - + HandleConnectionSummary(w, req) - + // Should fail due to no system, but JSON parsing should work if w.Code != http.StatusServiceUnavailable { t.Errorf("expected status %d, got %d", http.StatusServiceUnavailable, w.Code) @@ -264,27 +264,27 @@ func TestSwaggerModelStructures(t *testing.T) { Command: "curl", Duration: 60, } - + if req.PID != 1234 { t.Errorf("expected PID 1234, got %d", req.PID) } - + // Test marshaling to JSON jsonData, err := json.Marshal(req) if err != nil { t.Errorf("failed to marshal ConnectionSummaryRequest: %v", err) } - + // Test unmarshaling from JSON var decoded ConnectionSummaryRequest if err := json.Unmarshal(jsonData, &decoded); err != nil { t.Errorf("failed to unmarshal ConnectionSummaryRequest: %v", err) } - + if decoded.Command != "curl" { t.Errorf("expected command 'curl', got %s", decoded.Command) } - + // Test ConnectionSummaryResponse resp := ConnectionSummaryResponse{ Count: 5, @@ -293,23 +293,23 @@ func TestSwaggerModelStructures(t *testing.T) { DurationSeconds: 60, QueryTime: "2023-01-01T12:00:00Z", } - + if resp.Count != 5 { t.Errorf("expected count 5, got %d", resp.Count) } - + // Test PacketDropSummaryRequest dropReq := PacketDropSummaryRequest{ PID: 5678, Command: "nginx", Duration: 120, } - + if dropReq.PID != 5678 { t.Errorf("expected PID 5678, got %d", dropReq.PID) } - - // Test PacketDropSummaryResponse + + // Test PacketDropSummaryResponse dropResp := PacketDropSummaryResponse{ Count: 3, PID: 5678, @@ -317,7 +317,7 @@ func TestSwaggerModelStructures(t *testing.T) { DurationSeconds: 120, QueryTime: "2023-01-01T12:00:00Z", } - + if dropResp.Count != 3 { t.Errorf("expected count 3, got %d", dropResp.Count) } diff --git a/internal/core/types.go b/internal/core/types.go index b4e5223..dca5084 100644 --- a/internal/core/types.go +++ b/internal/core/types.go @@ -13,25 +13,25 @@ import ( type Event interface { // ID returns a unique identifier for this event ID() string - + // Type returns the event type (e.g., "connection", "packet_drop") Type() string - + // PID returns the process ID that generated this event PID() uint32 - + // Command returns the command name of the process Command() string - + // Timestamp returns the kernel timestamp (nanoseconds since boot) Timestamp() uint64 - + // Time returns the wall clock time when the event occurred Time() time.Time - + // Metadata returns event-specific data as a map Metadata() map[string]interface{} - + // JSON serialization json.Marshaler } @@ -40,7 +40,7 @@ type Event interface { type EventParser interface { // Parse converts raw bytes into an Event Parse(data []byte) (Event, error) - + // EventType returns the type of events this parser handles EventType() string } @@ -49,7 +49,7 @@ type EventParser interface { type EventStream interface { // Events returns a channel that delivers events Events() <-chan Event - + // Close stops the event stream and closes the channel Close() error } @@ -58,10 +58,10 @@ type EventStream interface { type EventSink interface { // Store saves an event Store(ctx context.Context, event Event) error - + // Query retrieves events matching the given criteria Query(ctx context.Context, query Query) ([]Event, error) - + // Count returns the number of events matching the criteria Count(ctx context.Context, query Query) (int, error) } @@ -70,28 +70,28 @@ type EventSink interface { type Program interface { // Name returns the program name Name() string - + // Description returns a human-readable description Description() string - + // Load compiles and loads the eBPF program into the kernel Load(ctx context.Context) error - + // Attach attaches the program to appropriate kernel hooks Attach(ctx context.Context) error - + // Detach detaches the program from kernel hooks Detach(ctx context.Context) error - + // IsLoaded returns true if the program is loaded IsLoaded() bool - + // IsAttached returns true if the program is attached IsAttached() bool - + // EventStream returns a stream of events from this program EventStream() EventStream - + // GetStats returns event processing statistics GetStats() (totalEvents, droppedEvents uint64, dropRate float64) } @@ -100,25 +100,25 @@ type Program interface { type Manager interface { // RegisterProgram adds a program to the manager RegisterProgram(program Program) error - + // LoadAll loads all registered programs LoadAll(ctx context.Context) error - + // AttachAll attaches all loaded programs AttachAll(ctx context.Context) error - + // DetachAll detaches all programs DetachAll(ctx context.Context) error - + // Programs returns all registered programs Programs() []Program - + // GetProgramStatus returns status of all programs GetProgramStatus() []ProgramStatus - + // EventStream returns a unified stream of events from all programs EventStream() EventStream - + // IsRunning returns true if the manager is active IsRunning() bool } @@ -127,19 +127,19 @@ type Manager interface { type Query struct { // EventType filters by event type (optional) EventType string - + // PID filters by process ID (optional, 0 means no filter) PID uint32 - + // Command filters by command name (optional) Command string - + // Since filters events after this time (optional) Since time.Time - + // Until filters events before this time (optional) Until time.Time - + // Limit limits the number of results (optional, 0 means no limit) Limit int } diff --git a/internal/core/types_test.go b/internal/core/types_test.go index 66636a1..b9733ba 100644 --- a/internal/core/types_test.go +++ b/internal/core/types_test.go @@ -18,12 +18,12 @@ type MockEvent struct { metadata map[string]interface{} } -func (m *MockEvent) ID() string { return m.id } -func (m *MockEvent) Type() string { return m.eventType } -func (m *MockEvent) PID() uint32 { return m.pid } -func (m *MockEvent) Command() string { return m.command } -func (m *MockEvent) Timestamp() uint64 { return m.timestamp } -func (m *MockEvent) Time() time.Time { return m.time } +func (m *MockEvent) ID() string { return m.id } +func (m *MockEvent) Type() string { return m.eventType } +func (m *MockEvent) PID() uint32 { return m.pid } +func (m *MockEvent) Command() string { return m.command } +func (m *MockEvent) Timestamp() uint64 { return m.timestamp } +func (m *MockEvent) Time() time.Time { return m.time } func (m *MockEvent) Metadata() map[string]interface{} { return m.metadata } func (m *MockEvent) MarshalJSON() ([]byte, error) { @@ -103,15 +103,15 @@ type MockProgram struct { stream EventStream } -func (m *MockProgram) Name() string { return m.name } -func (m *MockProgram) Description() string { return m.description } -func (m *MockProgram) Load(ctx context.Context) error { m.loaded = true; return nil } -func (m *MockProgram) Attach(ctx context.Context) error { m.attached = true; return nil } -func (m *MockProgram) Detach(ctx context.Context) error { m.attached = false; return nil } -func (m *MockProgram) IsLoaded() bool { return m.loaded } -func (m *MockProgram) IsAttached() bool { return m.attached } -func (m *MockProgram) EventStream() EventStream { return m.stream } -func (m *MockProgram) GetStats() (uint64, uint64, float64) { return 0, 0, 0.0 } +func (m *MockProgram) Name() string { return m.name } +func (m *MockProgram) Description() string { return m.description } +func (m *MockProgram) Load(ctx context.Context) error { m.loaded = true; return nil } +func (m *MockProgram) Attach(ctx context.Context) error { m.attached = true; return nil } +func (m *MockProgram) Detach(ctx context.Context) error { m.attached = false; return nil } +func (m *MockProgram) IsLoaded() bool { return m.loaded } +func (m *MockProgram) IsAttached() bool { return m.attached } +func (m *MockProgram) EventStream() EventStream { return m.stream } +func (m *MockProgram) GetStats() (uint64, uint64, float64) { return 0, 0, 0.0 } // MockManager implements Manager for testing type MockManager struct { @@ -119,20 +119,23 @@ type MockManager struct { running bool } -func (m *MockManager) RegisterProgram(program Program) error { m.programs = append(m.programs, program); return nil } -func (m *MockManager) LoadAll(ctx context.Context) error { m.running = true; return nil } -func (m *MockManager) AttachAll(ctx context.Context) error { return nil } -func (m *MockManager) DetachAll(ctx context.Context) error { return nil } -func (m *MockManager) Programs() []Program { return m.programs } -func (m *MockManager) GetProgramStatus() []ProgramStatus { return []ProgramStatus{} } -func (m *MockManager) EventStream() EventStream { return &MockEventStream{events: make(chan Event)} } -func (m *MockManager) IsRunning() bool { return m.running } +func (m *MockManager) RegisterProgram(program Program) error { + m.programs = append(m.programs, program) + return nil +} +func (m *MockManager) LoadAll(ctx context.Context) error { m.running = true; return nil } +func (m *MockManager) AttachAll(ctx context.Context) error { return nil } +func (m *MockManager) DetachAll(ctx context.Context) error { return nil } +func (m *MockManager) Programs() []Program { return m.programs } +func (m *MockManager) GetProgramStatus() []ProgramStatus { return []ProgramStatus{} } +func (m *MockManager) EventStream() EventStream { return &MockEventStream{events: make(chan Event)} } +func (m *MockManager) IsRunning() bool { return m.running } // TestEvent tests the Event interface func TestEvent(t *testing.T) { now := time.Now() timestamp := uint64(now.UnixNano()) - + event := &MockEvent{ id: "test-123", eventType: "connection", diff --git a/internal/events/events.go b/internal/events/events.go index 02882b9..c83500c 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -30,11 +30,11 @@ var ( func calculateSystemBootTime() time.Time { bootTimeMutex.Lock() defer bootTimeMutex.Unlock() - + if bootTimeCalculated { return systemBootTime } - + // Try Linux-specific method first if bootTime, err := getBootTimeLinux(); err == nil { systemBootTime = bootTime @@ -42,7 +42,7 @@ func calculateSystemBootTime() time.Time { logger.Debugf("System boot time calculated (Linux): %v", systemBootTime) return systemBootTime } - + // Fallback for non-Linux systems or when /proc/stat is unavailable // This provides a reasonable approximation for development/testing systemBootTime = time.Now().Add(-time.Hour * 24) // Assume system has been up for less than 24 hours @@ -57,7 +57,7 @@ func getBootTimeLinux() (time.Time, error) { if err != nil { return time.Time{}, err } - + // Parse /proc/stat to find the btime line lines := strings.Split(string(data), "\n") for _, line := range lines { @@ -72,7 +72,7 @@ func getBootTimeLinux() (time.Time, error) { } } } - + return time.Time{}, fmt.Errorf("btime not found in /proc/stat") } @@ -81,7 +81,7 @@ func getBootTimeLinux() (time.Time, error) { // since system boot. To convert to wall-clock time, we add this to the system boot time. func convertEBPFTimestamp(ebpfTimestampNs uint64) time.Time { bootTime := calculateSystemBootTime() - + // Add the eBPF timestamp (nanoseconds since boot) to the boot time return bootTime.Add(time.Duration(ebpfTimestampNs) * time.Nanosecond) } @@ -293,14 +293,14 @@ func (m *MergedStream) Close() error { // Wait for all goroutines to finish before closing the events channel m.wg.Wait() close(m.events) - + return nil } // readFromStream reads events from a source stream and forwards them. func (m *MergedStream) readFromStream(stream core.EventStream) { defer m.wg.Done() - + for { select { case event, ok := <-stream.Events(): diff --git a/internal/events/events_test.go b/internal/events/events_test.go index b7adcd0..43895e0 100644 --- a/internal/events/events_test.go +++ b/internal/events/events_test.go @@ -11,7 +11,7 @@ import ( // TestBaseEvent tests the BaseEvent implementation func TestBaseEvent(t *testing.T) { metadata := map[string]interface{}{ - "dest": "127.0.0.1:80", + "dest": "127.0.0.1:80", "proto": "tcp", } @@ -358,23 +358,23 @@ func TestChannelStreamCapacity(t *testing.T) { func TestTimestampConversion(t *testing.T) { // Reset boot time cache for testing ResetBootTimeCache() - + // Test with a known timestamp testTimestamp := uint64(5000000000) // 5 seconds since boot in nanoseconds - + event := NewBaseEvent("test", 1234, "test", testTimestamp, nil) - + // The converted time should be reasonable (not zero, not in the future) eventTime := event.Time() if eventTime.IsZero() { t.Error("converted time should not be zero") } - + // The event time should be in the past (assuming system has been up for more than 5 seconds) if eventTime.After(time.Now()) { t.Error("converted time should not be in the future") } - + // Test that the timestamp is preserved if event.Timestamp() != testTimestamp { t.Errorf("expected timestamp %d, got %d", testTimestamp, event.Timestamp()) @@ -385,19 +385,19 @@ func TestTimestampConversion(t *testing.T) { func TestBootTimeCalculation(t *testing.T) { // Reset boot time cache ResetBootTimeCache() - + // First calculation bootTime1 := calculateSystemBootTime() if bootTime1.IsZero() { t.Error("boot time should not be zero") } - + // Second calculation should return cached value bootTime2 := calculateSystemBootTime() if !bootTime1.Equal(bootTime2) { t.Error("boot time should be cached and consistent") } - + // Boot time should be in the past if bootTime1.After(time.Now()) { t.Error("boot time should be in the past") @@ -408,20 +408,20 @@ func TestBootTimeCalculation(t *testing.T) { func TestEBPFTimestampConversion(t *testing.T) { // Reset boot time cache ResetBootTimeCache() - + // Test with zero timestamp (should equal boot time) zeroTime := convertEBPFTimestamp(0) bootTime := calculateSystemBootTime() - + // Zero timestamp should equal boot time if !zeroTime.Equal(bootTime) { t.Errorf("zero timestamp should equal boot time, got %v, expected %v", zeroTime, bootTime) } - + // Test with 1 second offset oneSecond := convertEBPFTimestamp(1000000000) // 1 second in nanoseconds expectedTime := bootTime.Add(time.Second) - + if !oneSecond.Equal(expectedTime) { t.Errorf("1 second timestamp incorrect, got %v, expected %v", oneSecond, expectedTime) } diff --git a/internal/programs/base.go b/internal/programs/base.go index 3c4d7fb..5c4e02b 100644 --- a/internal/programs/base.go +++ b/internal/programs/base.go @@ -36,7 +36,7 @@ type BaseProgram struct { loaded bool attached bool mu sync.RWMutex - + // Event processing metrics droppedEvents uint64 totalEvents uint64 @@ -68,21 +68,21 @@ func (p *BaseProgram) Description() string { func (p *BaseProgram) Load(ctx context.Context) error { p.mu.Lock() defer p.mu.Unlock() - + if p.loaded { return nil } - + logger.Debugf("Loading eBPF program %s from %s", p.name, p.objectPath) - + collection, err := ebpf.LoadCollection(p.objectPath) if err != nil { return fmt.Errorf("failed to load eBPF collection: %w", err) } - + p.collection = collection p.loaded = true - + logger.Debugf("Successfully loaded eBPF program %s", p.name) return nil } @@ -110,26 +110,26 @@ func (p *BaseProgram) EventStream() core.EventStream { func (p *BaseProgram) GetStats() (totalEvents, droppedEvents uint64, dropRate float64) { p.mu.RLock() defer p.mu.RUnlock() - + totalEvents = p.totalEvents droppedEvents = p.droppedEvents - + if totalEvents > 0 { dropRate = float64(droppedEvents) / float64(totalEvents) } - + return totalEvents, droppedEvents, dropRate } // checkDropRateAndAlert monitors drop rate and triggers alerts when necessary. func (p *BaseProgram) checkDropRateAndAlert() { total, dropped, dropRate := p.GetStats() - + // Only check if we have enough events to make a meaningful assessment if total < DropRateWindowSize { return } - + // Check if drop rate exceeds threshold and we haven't alerted recently if dropRate > DropRateAlertThreshold { now := time.Now() @@ -138,7 +138,7 @@ func (p *BaseProgram) checkDropRateAndAlert() { "This indicates system overload or insufficient buffer capacity. "+ "Consider increasing buffer sizes or optimizing event processing.", p.name, dropRate*100, dropped, total) - + p.mu.Lock() p.lastAlertTime = now p.mu.Unlock() @@ -150,29 +150,29 @@ func (p *BaseProgram) checkDropRateAndAlert() { func (p *BaseProgram) Detach(ctx context.Context) error { p.mu.Lock() defer p.mu.Unlock() - + if !p.attached { return nil } - + // Close all links for _, l := range p.links { if err := l.Close(); err != nil { logger.Errorf("Error closing link for program %s: %v", p.name, err) } } - + p.links = p.links[:0] p.attached = false - + // Close event stream p.eventStream.Close() - + // Reset event statistics p.droppedEvents = 0 p.totalEvents = 0 p.lastAlertTime = time.Time{} - + logger.Debugf("Detached program %s", p.name) return nil } @@ -198,24 +198,24 @@ func (p *BaseProgram) StartRingBufferReader(mapName string, parser core.EventPar if collection == nil { return fmt.Errorf("program not loaded") } - + ringbufMap := collection.Maps[mapName] if ringbufMap == nil { return fmt.Errorf("ring buffer map %s not found", mapName) } - + logger.Debugf("Starting ring buffer reader for map %s in program %s", mapName, p.name) - + reader, err := ringbuf.NewReader(ringbufMap) if err != nil { return fmt.Errorf("failed to create ring buffer reader: %w", err) } - + // Start reading in a goroutine go func() { defer reader.Close() defer logger.Debugf("Ring buffer reader stopped for %s", p.name) - + for { record, err := reader.Read() if err != nil { @@ -225,20 +225,25 @@ func (p *BaseProgram) StartRingBufferReader(mapName string, parser core.EventPar logger.Errorf("Error reading from ring buffer in %s: %v", p.name, err) continue } - + // Parse the event event, err := parser.Parse(record.RawSample) if err != nil { logger.Errorf("Error parsing event in %s: %v", p.name, err) continue } - + // Track total events p.mu.Lock() p.totalEvents++ currentTotal := p.totalEvents p.mu.Unlock() - + + // Log ring buffer activity (only for debugging) + if currentTotal%100 == 0 { // Log every 100th event to avoid spam + logger.Debugf("📡 RING BUFFER: %s processed %d events", p.name, currentTotal) + } + // Send to event stream with backpressure handling if !p.eventStream.Send(event) { // Event dropped due to full buffer - this is a critical issue @@ -246,19 +251,19 @@ func (p *BaseProgram) StartRingBufferReader(mapName string, parser core.EventPar p.droppedEvents++ droppedCount := p.droppedEvents p.mu.Unlock() - + // Log error (not just debug) since this represents data loss logger.Errorf("Event stream full for %s, DROPPED EVENT (PID: %d, Type: %s). "+ "Total dropped: %d/%d events. This indicates backpressure - "+ "consider increasing buffer size or optimizing downstream processing.", p.name, event.PID(), event.Type(), droppedCount, currentTotal) - + // Check if we need to trigger high drop rate alerts p.checkDropRateAndAlert() } } }() - + return nil } @@ -268,21 +273,21 @@ func (p *BaseProgram) AttachToTracepoint(progName, group, name string) error { if collection == nil { return fmt.Errorf("program not loaded") } - + prog := collection.Programs[progName] if prog == nil { return fmt.Errorf("program %s not found in collection", progName) } - + logger.Debugf("Attaching program %s to tracepoint %s:%s", progName, group, name) - + l, err := link.Tracepoint(group, name, prog, nil) if err != nil { return fmt.Errorf("failed to attach to tracepoint %s:%s: %w", group, name, err) } - + p.AddLink(l) logger.Debugf("Successfully attached program %s to tracepoint %s:%s", progName, group, name) - + return nil } diff --git a/internal/programs/connection/connection.go b/internal/programs/connection/connection.go index 4762771..01010ed 100644 --- a/internal/programs/connection/connection.go +++ b/internal/programs/connection/connection.go @@ -18,11 +18,11 @@ const ( ProgramName = "connection" ProgramDescription = "Monitors network connection attempts via sys_enter_connect tracepoint" ObjectPath = "bpf/connection.o" - + // eBPF program and map names TracepointProgram = "trace_connect" EventsMapName = "events" - + // Tracepoint configuration TracepointGroup = "syscalls" TracepointName = "sys_enter_connect" @@ -46,20 +46,20 @@ func (p *Program) Attach(ctx context.Context) error { if !p.IsLoaded() { return fmt.Errorf("program not loaded") } - + logger.Debugf("Attaching connection monitoring program") - + // Attach to sys_enter_connect tracepoint if err := p.AttachToTracepoint(TracepointProgram, TracepointGroup, TracepointName); err != nil { return fmt.Errorf("failed to attach to tracepoint: %w", err) } - + // Start ring buffer reader parser := NewEventParser() if err := p.StartRingBufferReader(EventsMapName, parser); err != nil { return fmt.Errorf("failed to start ring buffer reader: %w", err) } - + logger.Info("Connection monitoring program attached and active") return nil } @@ -82,7 +82,7 @@ func (p *EventParser) Parse(data []byte) (core.Event, error) { if len(data) != 60 { return nil, fmt.Errorf("invalid connection event size: expected 60 bytes, got %d", len(data)) } - + // Parse binary data based on C struct layout: // struct event_t { // u32 pid; // 0-3 @@ -97,23 +97,23 @@ func (p *EventParser) Parse(data []byte) (core.Event, error) { // u8 sock_type; // 57 // u16 padding; // 58-59 // } - + pid := binary.LittleEndian.Uint32(data[0:4]) timestamp := binary.LittleEndian.Uint64(data[4:12]) ret := int32(binary.LittleEndian.Uint32(data[12:16])) - + // Extract command (null-terminated string) command := extractNullTerminatedString(data[16:32]) - + destIPv4 := binary.LittleEndian.Uint32(data[32:36]) var destIPv6 [16]byte copy(destIPv6[:], data[36:52]) - + destPort := binary.LittleEndian.Uint16(data[52:54]) family := binary.LittleEndian.Uint16(data[54:56]) protocol := data[56] sockType := data[57] - + // Build metadata with parsed fields and derived information metadata := map[string]interface{}{ "return_code": ret, @@ -123,26 +123,26 @@ func (p *EventParser) Parse(data []byte) (core.Event, error) { "address_family": family, "protocol": formatProtocol(protocol), "socket_type": formatSocketType(sockType), - + // Raw values for further processing if needed - "raw_ipv4": destIPv4, - "raw_ipv6": destIPv6, - "raw_protocol": protocol, - "raw_socktype": sockType, + "raw_ipv4": destIPv4, + "raw_ipv6": destIPv6, + "raw_protocol": protocol, + "raw_socktype": sockType, } - + event := events.NewBaseEvent("connection", pid, command, timestamp, metadata) - + // Debug log the parsed connection event destination := formatDestination(family, destIPv4, destIPv6, destPort) if destination != "" { - logger.Debugf("🔗 CONNECTION EVENT: PID=%d cmd=%s dest=%s proto=%s ret=%d", + logger.Debugf("🔗 CONNECTION EVENT: PID=%d cmd=%s dest=%s proto=%s ret=%d", pid, command, destination, formatProtocol(protocol), ret) } else { - logger.Debugf("🔗 CONNECTION EVENT: PID=%d cmd=%s family=%d (local socket) ret=%d", + logger.Debugf("🔗 CONNECTION EVENT: PID=%d cmd=%s family=%d (local socket) ret=%d", pid, command, family, ret) } - + return event, nil } @@ -162,7 +162,7 @@ func formatIP(family uint16, ipv4 uint32, ipv6 [16]byte) string { AF_INET = 2 AF_INET6 = 10 ) - + switch family { case AF_INET: if ipv4 == 0 { @@ -171,7 +171,7 @@ func formatIP(family uint16, ipv4 uint32, ipv6 [16]byte) string { // Convert from little-endian uint32 to IP address ip := net.IPv4(byte(ipv4), byte(ipv4>>8), byte(ipv4>>16), byte(ipv4>>24)) return ip.String() - + case AF_INET6: // Check if IPv6 address is all zeros allZero := true @@ -186,7 +186,7 @@ func formatIP(family uint16, ipv4 uint32, ipv6 [16]byte) string { } ip := net.IP(ipv6[:]) return ip.String() - + default: return "" } @@ -195,17 +195,17 @@ func formatIP(family uint16, ipv4 uint32, ipv6 [16]byte) string { // formatDestination formats the destination as "IP:port". func formatDestination(family uint16, ipv4 uint32, ipv6 [16]byte, port uint16) string { const AF_INET6 = 10 - + ip := formatIP(family, ipv4, ipv6) if ip == "" { return "" } - + // IPv6 addresses need to be wrapped in brackets if family == AF_INET6 { return fmt.Sprintf("[%s]:%d", ip, port) } - + return fmt.Sprintf("%s:%d", ip, port) } diff --git a/internal/programs/connection/connection_test.go b/internal/programs/connection/connection_test.go index 565a834..deea608 100644 --- a/internal/programs/connection/connection_test.go +++ b/internal/programs/connection/connection_test.go @@ -54,7 +54,7 @@ func TestParseValidConnectionEvent(t *testing.T) { // Set test values based on C struct layout: // struct event_t { // u32 pid; // 0-3 - // u64 ts; // 4-11 + // u64 ts; // 4-11 // u32 ret; // 12-15 // char comm[16]; // 16-31 // u32 dest_ip; // 32-35 @@ -68,31 +68,31 @@ func TestParseValidConnectionEvent(t *testing.T) { // pid (offset 0, 4 bytes) binary.LittleEndian.PutUint32(testData[0:4], 1234) - - // timestamp (offset 4, 8 bytes) + + // timestamp (offset 4, 8 bytes) binary.LittleEndian.PutUint64(testData[4:12], 1000000) - + // ret (offset 12, 4 bytes) binary.LittleEndian.PutUint32(testData[12:16], 0) // Success - + // command (offset 16, 16 bytes) copy(testData[16:32], []byte("curl\x00")) - + // dest_ip (offset 32, 4 bytes) - 127.0.0.1 // Need to store as little-endian but the IP extraction expects different byte order binary.LittleEndian.PutUint32(testData[32:36], 0x0100007f) - + // dest_ip6 (offset 36, 16 bytes) - leave as zeros for IPv4 - + // dest_port (offset 52, 2 bytes) binary.LittleEndian.PutUint16(testData[52:54], 80) - + // family (offset 54, 2 bytes) - AF_INET = 2 binary.LittleEndian.PutUint16(testData[54:56], 2) - + // protocol (offset 56, 1 byte) - TCP = 6 testData[56] = 6 - + // sock_type (offset 57, 1 byte) - STREAM = 1 testData[57] = 1 @@ -121,7 +121,7 @@ func TestParseValidConnectionEvent(t *testing.T) { // Check metadata metadata := event.Metadata() - + if metadata["protocol"] != "TCP" { t.Errorf("expected protocol 'TCP', got %v", metadata["protocol"]) } @@ -157,7 +157,7 @@ func TestParseIPv6ConnectionEvent(t *testing.T) { testData := make([]byte, 60) // Set basic fields - binary.LittleEndian.PutUint32(testData[0:4], 5678) // pid + binary.LittleEndian.PutUint32(testData[0:4], 5678) // pid binary.LittleEndian.PutUint64(testData[4:12], 2000000) // timestamp binary.LittleEndian.PutUint32(testData[12:16], 0) // ret copy(testData[16:32], []byte("wget\x00")) // command @@ -169,10 +169,10 @@ func TestParseIPv6ConnectionEvent(t *testing.T) { ipv6 := [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} copy(testData[36:52], ipv6[:]) - binary.LittleEndian.PutUint16(testData[52:54], 443) // dest_port (HTTPS) - binary.LittleEndian.PutUint16(testData[54:56], 10) // family (AF_INET6 = 10) - testData[56] = 6 // protocol (TCP) - testData[57] = 1 // sock_type (STREAM) + binary.LittleEndian.PutUint16(testData[52:54], 443) // dest_port (HTTPS) + binary.LittleEndian.PutUint16(testData[54:56], 10) // family (AF_INET6 = 10) + testData[56] = 6 // protocol (TCP) + testData[57] = 1 // sock_type (STREAM) event, err := parser.Parse(testData) if err != nil { @@ -180,7 +180,7 @@ func TestParseIPv6ConnectionEvent(t *testing.T) { } metadata := event.Metadata() - + if metadata["destination_ip"] != "::1" { t.Errorf("expected destination_ip '::1', got %v", metadata["destination_ip"]) } @@ -200,16 +200,16 @@ func TestParseLocalSocketEvent(t *testing.T) { testData := make([]byte, 60) // Set basic fields - binary.LittleEndian.PutUint32(testData[0:4], 9999) // pid - binary.LittleEndian.PutUint64(testData[4:12], 3000000) // timestamp + binary.LittleEndian.PutUint32(testData[0:4], 9999) // pid + binary.LittleEndian.PutUint64(testData[4:12], 3000000) // timestamp binary.LittleEndian.PutUint32(testData[12:16], 0xFFFFFFFF) // ret (error -1) - copy(testData[16:32], []byte("test\x00")) // command + copy(testData[16:32], []byte("test\x00")) // command // No IP addresses (all zeros) // family = 1 (AF_UNIX), no destination info - binary.LittleEndian.PutUint16(testData[54:56], 1) // family (AF_UNIX) - testData[56] = 0 // protocol - testData[57] = 1 // sock_type (STREAM) + binary.LittleEndian.PutUint16(testData[54:56], 1) // family (AF_UNIX) + testData[56] = 0 // protocol + testData[57] = 1 // sock_type (STREAM) event, err := parser.Parse(testData) if err != nil { @@ -217,7 +217,7 @@ func TestParseLocalSocketEvent(t *testing.T) { } metadata := event.Metadata() - + // For local sockets, IP should be empty if metadata["destination_ip"] != "" { t.Errorf("expected empty destination_ip for local socket, got %v", metadata["destination_ip"]) diff --git a/internal/programs/manager.go b/internal/programs/manager.go index 9985074..9e0f560 100644 --- a/internal/programs/manager.go +++ b/internal/programs/manager.go @@ -30,22 +30,22 @@ func NewManager() *Manager { func (m *Manager) RegisterProgram(program core.Program) error { m.mu.Lock() defer m.mu.Unlock() - + // Check for nil program if program == nil { return fmt.Errorf("cannot register nil program") } - + // Check for duplicate names for _, p := range m.programs { if p.Name() == program.Name() { return fmt.Errorf("program with name %s already registered", program.Name()) } } - + m.programs = append(m.programs, program) logger.Debugf("Registered program: %s", program.Name()) - + return nil } @@ -53,15 +53,15 @@ func (m *Manager) RegisterProgram(program core.Program) error { func (m *Manager) LoadAll(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() - + logger.Debugf("Loading %d eBPF programs", len(m.programs)) - + for _, program := range m.programs { if err := program.Load(ctx); err != nil { return fmt.Errorf("failed to load program %s: %w", program.Name(), err) } } - + logger.Info("All eBPF programs loaded successfully") return nil } @@ -70,28 +70,28 @@ func (m *Manager) LoadAll(ctx context.Context) error { func (m *Manager) AttachAll(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() - + logger.Debugf("Attaching %d eBPF programs", len(m.programs)) - + // Collect event streams from all programs var streams []core.EventStream - + for _, program := range m.programs { if !program.IsLoaded() { return fmt.Errorf("program %s is not loaded", program.Name()) } - + if err := program.Attach(ctx); err != nil { return fmt.Errorf("failed to attach program %s: %w", program.Name(), err) } - + streams = append(streams, program.EventStream()) } - + // Create merged event stream m.eventStream = events.NewMergedStream(streams) m.running = true - + logger.Info("All eBPF programs attached successfully") return nil } @@ -100,29 +100,29 @@ func (m *Manager) AttachAll(ctx context.Context) error { func (m *Manager) DetachAll(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() - + if !m.running { return nil } - + logger.Debugf("Detaching %d eBPF programs", len(m.programs)) - + // Close merged event stream if m.eventStream != nil { m.eventStream.Close() m.eventStream = nil } - + // Detach all programs for _, program := range m.programs { if err := program.Detach(ctx); err != nil { logger.Errorf("Error detaching program %s: %v", program.Name(), err) } } - + m.running = false logger.Info("All eBPF programs detached") - + return nil } @@ -130,11 +130,11 @@ func (m *Manager) DetachAll(ctx context.Context) error { func (m *Manager) Programs() []core.Program { m.mu.RLock() defer m.mu.RUnlock() - + // Return a copy to prevent external modifications programs := make([]core.Program, len(m.programs)) copy(programs, m.programs) - + return programs } @@ -142,7 +142,7 @@ func (m *Manager) Programs() []core.Program { func (m *Manager) EventStream() core.EventStream { m.mu.RLock() defer m.mu.RUnlock() - + return m.eventStream } @@ -150,7 +150,7 @@ func (m *Manager) EventStream() core.EventStream { func (m *Manager) IsRunning() bool { m.mu.RLock() defer m.mu.RUnlock() - + return m.running } @@ -158,12 +158,12 @@ func (m *Manager) IsRunning() bool { func (m *Manager) GetProgramStatus() []core.ProgramStatus { m.mu.RLock() defer m.mu.RUnlock() - + status := make([]core.ProgramStatus, len(m.programs)) - + for i, program := range m.programs { totalEvents, droppedEvents, dropRate := program.GetStats() - + status[i] = core.ProgramStatus{ Name: program.Name(), Description: program.Description(), @@ -174,6 +174,6 @@ func (m *Manager) GetProgramStatus() []core.ProgramStatus { DropRate: dropRate, } } - + return status } diff --git a/internal/programs/packet_drop/packet_drop.go b/internal/programs/packet_drop/packet_drop.go index 7f6fae1..36a58d9 100644 --- a/internal/programs/packet_drop/packet_drop.go +++ b/internal/programs/packet_drop/packet_drop.go @@ -17,11 +17,11 @@ const ( ProgramName = "packet_drop" ProgramDescription = "Monitors packet drops via kfree_skb tracepoint" ObjectPath = "bpf/packet_drop.o" - + // eBPF program and map names TracepointProgram = "trace_kfree_skb" EventsMapName = "drop_events" - + // Tracepoint configuration TracepointGroup = "skb" TracepointName = "kfree_skb" @@ -45,20 +45,20 @@ func (p *Program) Attach(ctx context.Context) error { if !p.IsLoaded() { return fmt.Errorf("program not loaded") } - + logger.Debugf("Attaching packet drop monitoring program") - + // Attach to kfree_skb tracepoint if err := p.AttachToTracepoint(TracepointProgram, TracepointGroup, TracepointName); err != nil { return fmt.Errorf("failed to attach to tracepoint: %w", err) } - + // Start ring buffer reader parser := NewEventParser() if err := p.StartRingBufferReader(EventsMapName, parser); err != nil { return fmt.Errorf("failed to start ring buffer reader: %w", err) } - + logger.Info("Packet drop monitoring program attached and active") return nil } @@ -81,40 +81,40 @@ func (p *EventParser) Parse(data []byte) (core.Event, error) { if len(data) != 44 { return nil, fmt.Errorf("invalid packet drop event size: expected 44 bytes, got %d", len(data)) } - + // Parse binary data based on C struct layout: // struct drop_event_t { // u32 pid; // 0-3 - // u64 ts; // 4-11 + // u64 ts; // 4-11 // char comm[16]; // 12-27 // u32 drop_reason; // 28-31 // u32 skb_len; // 32-35 // u8 padding[8]; // 36-43 // } - + pid := binary.LittleEndian.Uint32(data[0:4]) timestamp := binary.LittleEndian.Uint64(data[4:12]) - + // Extract command (null-terminated string) command := extractNullTerminatedString(data[12:28]) - + dropReason := binary.LittleEndian.Uint32(data[28:32]) skbLen := binary.LittleEndian.Uint32(data[32:36]) - + // Build metadata with parsed fields and derived information metadata := map[string]interface{}{ - "drop_reason_code": dropReason, - "drop_reason": formatDropReason(dropReason), - "skb_length": skbLen, - "packet_size_bytes": skbLen, + "drop_reason_code": dropReason, + "drop_reason": formatDropReason(dropReason), + "skb_length": skbLen, + "packet_size_bytes": skbLen, } - + event := events.NewBaseEvent("packet_drop", pid, command, timestamp, metadata) - + // Debug log the parsed packet drop event - logger.Debugf("📦 PACKET DROP EVENT: PID=%d cmd=%s reason=%s (%d) size=%d bytes", + logger.Debugf("📦 PACKET DROP EVENT: PID=%d cmd=%s reason=%s (%d) size=%d bytes", pid, command, formatDropReason(dropReason), dropReason, skbLen) - + return event, nil } diff --git a/internal/programs/packet_drop/packet_drop_test.go b/internal/programs/packet_drop/packet_drop_test.go index 49c3819..a8d9e1f 100644 --- a/internal/programs/packet_drop/packet_drop_test.go +++ b/internal/programs/packet_drop/packet_drop_test.go @@ -54,7 +54,7 @@ func TestParseValidPacketDropEvent(t *testing.T) { // Set test values based on C struct layout: // struct drop_event_t { // u32 pid; // 0-3 - // u64 ts; // 4-11 + // u64 ts; // 4-11 // char comm[16]; // 12-27 // u32 drop_reason; // 28-31 // u32 skb_len; // 32-35 @@ -63,16 +63,16 @@ func TestParseValidPacketDropEvent(t *testing.T) { // pid (offset 0, 4 bytes) binary.LittleEndian.PutUint32(testData[0:4], 5678) - - // timestamp (offset 4, 8 bytes) + + // timestamp (offset 4, 8 bytes) binary.LittleEndian.PutUint64(testData[4:12], 2000000) - + // command (offset 12, 16 bytes) copy(testData[12:28], []byte("iptables\x00")) - + // drop_reason (offset 28, 4 bytes) binary.LittleEndian.PutUint32(testData[28:32], 2) // TCP_DROP - + // skb_len (offset 32, 4 bytes) binary.LittleEndian.PutUint32(testData[32:36], 1500) @@ -101,7 +101,7 @@ func TestParseValidPacketDropEvent(t *testing.T) { // Check metadata metadata := event.Metadata() - + if metadata["drop_reason_code"] != uint32(2) { t.Errorf("expected drop_reason_code 2, got %v", metadata["drop_reason_code"]) } @@ -139,13 +139,13 @@ func TestParsePacketDropWithDifferentReasons(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { testData := make([]byte, 44) - + // Set basic fields - binary.LittleEndian.PutUint32(testData[0:4], 1234) // pid - binary.LittleEndian.PutUint64(testData[4:12], 1000000) // timestamp - copy(testData[12:28], []byte("test\x00")) // command + binary.LittleEndian.PutUint32(testData[0:4], 1234) // pid + binary.LittleEndian.PutUint64(testData[4:12], 1000000) // timestamp + copy(testData[12:28], []byte("test\x00")) // command binary.LittleEndian.PutUint32(testData[28:32], tc.reasonCode) // drop_reason - binary.LittleEndian.PutUint32(testData[32:36], 500) // skb_len + binary.LittleEndian.PutUint32(testData[32:36], 500) // skb_len event, err := parser.Parse(testData) if err != nil { @@ -182,12 +182,12 @@ func TestParsePacketDropWithDifferentSizes(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { testData := make([]byte, 44) - + // Set basic fields - binary.LittleEndian.PutUint32(testData[0:4], 1234) // pid - binary.LittleEndian.PutUint64(testData[4:12], 1000000) // timestamp - copy(testData[12:28], []byte("test\x00")) // command - binary.LittleEndian.PutUint32(testData[28:32], 1) // drop_reason (SKB_FREE) + binary.LittleEndian.PutUint32(testData[0:4], 1234) // pid + binary.LittleEndian.PutUint64(testData[4:12], 1000000) // timestamp + copy(testData[12:28], []byte("test\x00")) // command + binary.LittleEndian.PutUint32(testData[28:32], 1) // drop_reason (SKB_FREE) binary.LittleEndian.PutUint32(testData[32:36], tc.size) // skb_len event, err := parser.Parse(testData) @@ -346,7 +346,7 @@ func TestPacketDropEventCompleteScenarios(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { testData := make([]byte, 44) - + binary.LittleEndian.PutUint32(testData[0:4], scenario.pid) binary.LittleEndian.PutUint64(testData[4:12], 3000000) copy(testData[12:28], []byte(scenario.command+"\x00")) diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 85ee8bd..928f00f 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -29,13 +29,13 @@ func NewMemoryStorage() *MemoryStorage { func (s *MemoryStorage) Store(ctx context.Context, event core.Event) error { s.mu.Lock() defer s.mu.Unlock() - + s.events = append(s.events, event) - + // Debug log stored events - logger.Debugf("💾 STORED EVENT: type=%s PID=%d cmd=%s total_events=%d", + logger.Debugf("💾 STORED EVENT: type=%s PID=%d cmd=%s total_events=%d", event.Type(), event.PID(), event.Command(), len(s.events)) - + return nil } @@ -43,25 +43,33 @@ func (s *MemoryStorage) Store(ctx context.Context, event core.Event) error { func (s *MemoryStorage) Query(ctx context.Context, query core.Query) ([]core.Event, error) { s.mu.RLock() defer s.mu.RUnlock() - + var results []core.Event - + totalChecked := 0 + for _, event := range s.events { + totalChecked++ if s.matchesQuery(event, query) { results = append(results, event) } } - + + // Log only if we're filtering by time and getting unexpected results + if !query.Since.IsZero() && len(results) == 0 && totalChecked > 0 { + logger.Debugf("🔍 STORAGE QUERY: type=%s since=%s checked=%d matched=%d", + query.EventType, query.Since.Format(time.RFC3339), totalChecked, len(results)) + } + // Sort by timestamp (most recent first) sort.Slice(results, func(i, j int) bool { return results[i].Timestamp() > results[j].Timestamp() }) - + // Apply limit if query.Limit > 0 && len(results) > query.Limit { results = results[:query.Limit] } - + return results, nil } @@ -69,14 +77,14 @@ func (s *MemoryStorage) Query(ctx context.Context, query core.Query) ([]core.Eve func (s *MemoryStorage) Count(ctx context.Context, query core.Query) (int, error) { s.mu.RLock() defer s.mu.RUnlock() - + count := 0 for _, event := range s.events { if s.matchesQuery(event, query) { count++ } } - + return count, nil } @@ -86,17 +94,17 @@ func (s *MemoryStorage) matchesQuery(event core.Event, query core.Query) bool { if query.EventType != "" && event.Type() != query.EventType { return false } - + // Filter by PID if query.PID != 0 && event.PID() != query.PID { return false } - + // Filter by command if query.Command != "" && event.Command() != query.Command { return false } - + // Filter by time range eventTime := event.Time() if !query.Since.IsZero() && eventTime.Before(query.Since) { @@ -105,7 +113,7 @@ func (s *MemoryStorage) matchesQuery(event core.Event, query core.Query) bool { if !query.Until.IsZero() && eventTime.After(query.Until) { return false } - + return true } @@ -113,16 +121,16 @@ func (s *MemoryStorage) matchesQuery(event core.Event, query core.Query) bool { func (s *MemoryStorage) Cleanup(maxAge time.Duration) { s.mu.Lock() defer s.mu.Unlock() - + cutoff := time.Now().Add(-maxAge) var kept []core.Event - + for _, event := range s.events { if event.Time().After(cutoff) { kept = append(kept, event) } } - + s.events = kept } @@ -137,17 +145,17 @@ type StorageWithSink struct { // NewStorageWithSink creates storage that automatically consumes from an event stream. func NewStorageWithSink(storage core.EventSink, stream core.EventStream) *StorageWithSink { ctx, cancel := context.WithCancel(context.Background()) - + s := &StorageWithSink{ storage: storage, stream: stream, ctx: ctx, cancel: cancel, } - + // Start consuming events go s.consumeEvents() - + return s } @@ -180,13 +188,13 @@ func (s *StorageWithSink) consumeEvents() { if !ok { return } - + if err := s.storage.Store(s.ctx, event); err != nil { // Log the storage error - this could indicate memory pressure, // disk space issues, or other critical storage problems - logger.Errorf("Failed to store event (PID: %d, Type: %s): %v", + logger.Errorf("Failed to store event (PID: %d, Type: %s): %v", event.PID(), event.Type(), err) - + // For critical storage failures, we continue processing to avoid // blocking the event stream, but log the error for monitoring // In production, consider implementing: @@ -194,7 +202,7 @@ func (s *StorageWithSink) consumeEvents() { // - Circuit breaker for persistent failures // - Backup storage mechanisms } - + case <-s.ctx.Done(): return } diff --git a/internal/storage/memory_test.go b/internal/storage/memory_test.go index e1c544e..5ff20fe 100644 --- a/internal/storage/memory_test.go +++ b/internal/storage/memory_test.go @@ -184,7 +184,7 @@ func TestMemoryStorageQueryByTimeRange(t *testing.T) { ctx := context.Background() now := time.Now() - + // Store events with different times times := []time.Time{ now.Add(-2 * time.Hour), diff --git a/internal/system/system.go b/internal/system/system.go index b7e7cdc..8750f2b 100644 --- a/internal/system/system.go +++ b/internal/system/system.go @@ -23,7 +23,7 @@ type System struct { func NewSystem() *System { manager := programs.NewManager() memStorage := storage.NewMemoryStorage() - + return &System{ manager: manager, storage: memStorage, @@ -33,21 +33,21 @@ func NewSystem() *System { // Initialize sets up the system with all available programs. func (s *System) Initialize() error { logger.Info("🚀 Initializing eBPF monitoring system") - + // Register connection monitoring program connProgram := connection.NewProgram() if err := s.manager.RegisterProgram(connProgram); err != nil { return fmt.Errorf("failed to register connection program: %w", err) } logger.Debugf("✅ Registered connection monitoring program") - + // Register packet drop monitoring program dropProgram := packet_drop.NewProgram() if err := s.manager.RegisterProgram(dropProgram); err != nil { return fmt.Errorf("failed to register packet drop program: %w", err) } logger.Debugf("✅ Registered packet drop monitoring program") - + logger.Info("🚀 eBPF monitoring system initialized successfully") return nil } @@ -55,26 +55,26 @@ func (s *System) Initialize() error { // Start loads and attaches all programs, then starts event collection. func (s *System) Start(ctx context.Context) error { logger.Info("🔧 Starting eBPF monitoring system") - + // Load all programs if err := s.manager.LoadAll(ctx); err != nil { return fmt.Errorf("failed to load programs: %w", err) } logger.Debugf("✅ All eBPF programs loaded") - + // Attach all programs if err := s.manager.AttachAll(ctx); err != nil { return fmt.Errorf("failed to attach programs: %w", err) } logger.Debugf("✅ All eBPF programs attached to kernel") - + // Start consuming events and storing them eventStream := s.manager.EventStream() if eventStream != nil { s.storage = storage.NewStorageWithSink(s.storage, eventStream) logger.Debugf("✅ Event storage pipeline started") } - + logger.Info("🎯 eBPF monitoring system started and ready to capture events!") return nil } @@ -82,17 +82,17 @@ func (s *System) Start(ctx context.Context) error { // Stop detaches all programs and cleans up resources. func (s *System) Stop(ctx context.Context) error { logger.Info("Stopping eBPF monitoring system") - + // Stop storage sink if storageWithSink, ok := s.storage.(*storage.StorageWithSink); ok { storageWithSink.Close() } - + // Detach all programs if err := s.manager.DetachAll(ctx); err != nil { return fmt.Errorf("failed to detach programs: %w", err) } - + logger.Info("eBPF monitoring system stopped") return nil } diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index 0675dc4..c43a973 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -10,20 +10,20 @@ import ( func TestLogLevels(t *testing.T) { // Capture log output var buf bytes.Buffer - + // Create a logger with INFO level logger := New(INFO) logger.logger = log.New(&buf, "", 0) // Remove timestamp for testing - + // Test INFO level logging logger.logger.Print("info message") if !strings.Contains(buf.String(), "info message") { t.Errorf("Expected info message to be logged") } - + // Reset buffer buf.Reset() - + // Test DEBUG level should not show with INFO level if logger.level >= DEBUG { logger.logger.Print("[DEBUG] debug message") @@ -36,11 +36,11 @@ func TestLogLevels(t *testing.T) { func TestDebugLogging(t *testing.T) { // Capture log output var buf bytes.Buffer - + // Create a logger with DEBUG level logger := New(DEBUG) logger.logger = log.New(&buf, "", 0) - + // Test that debug messages appear at DEBUG level if logger.level >= DEBUG { logger.logger.Print("[DEBUG] debug message") @@ -56,30 +56,30 @@ func TestGlobalLogger(t *testing.T) { defer func() { defaultLogger = originalLogger }() - + // Create test logger var buf bytes.Buffer defaultLogger = New(DEBUG) defaultLogger.logger = log.New(&buf, "", 0) - + // Test global functions Info("test info") if !strings.Contains(buf.String(), "test info") { t.Errorf("Global Info should work") } - + buf.Reset() Debug("test debug") if !strings.Contains(buf.String(), "[DEBUG] test debug") { t.Errorf("Global Debug should work") } - + buf.Reset() Infof("formatted %s", "info") if !strings.Contains(buf.String(), "formatted info") { t.Errorf("Global Infof should work") } - + buf.Reset() Debugf("formatted %s", "debug") if !strings.Contains(buf.String(), "[DEBUG] formatted debug") { @@ -93,13 +93,13 @@ func TestSetLevel(t *testing.T) { defer func() { defaultLogger = originalLogger }() - + // Test SetLevel SetLevel(DEBUG) if defaultLogger.level != DEBUG { t.Errorf("SetLevel should change the default logger level") } - + // Test SetDebug SetLevel(INFO) SetDebug() @@ -114,12 +114,12 @@ func TestIsDebugEnabled(t *testing.T) { defer func() { defaultLogger = originalLogger }() - + SetLevel(INFO) if IsDebugEnabled() { t.Errorf("IsDebugEnabled should return false for INFO level") } - + SetLevel(DEBUG) if !IsDebugEnabled() { t.Errorf("IsDebugEnabled should return true for DEBUG level") From fa4eed94c8800bf603f117f279ff98fbe7c9bd83 Mon Sep 17 00:00:00 2001 From: Simone Rodigari Date: Wed, 6 Aug 2025 22:20:46 +0100 Subject: [PATCH 2/3] remove storage debug --- internal/storage/memory.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 928f00f..feae159 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -45,21 +45,13 @@ func (s *MemoryStorage) Query(ctx context.Context, query core.Query) ([]core.Eve defer s.mu.RUnlock() var results []core.Event - totalChecked := 0 for _, event := range s.events { - totalChecked++ if s.matchesQuery(event, query) { results = append(results, event) } } - // Log only if we're filtering by time and getting unexpected results - if !query.Since.IsZero() && len(results) == 0 && totalChecked > 0 { - logger.Debugf("🔍 STORAGE QUERY: type=%s since=%s checked=%d matched=%d", - query.EventType, query.Since.Format(time.RFC3339), totalChecked, len(results)) - } - // Sort by timestamp (most recent first) sort.Slice(results, func(i, j int) bool { return results[i].Timestamp() > results[j].Timestamp() From 669422faad9a0c2798da6cc0c52d14f5ea7b4157 Mon Sep 17 00:00:00 2001 From: Simone Rodigari Date: Wed, 6 Aug 2025 22:24:56 +0100 Subject: [PATCH 3/3] fix ringbuf log --- internal/programs/base.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/programs/base.go b/internal/programs/base.go index 5c4e02b..cef0b44 100644 --- a/internal/programs/base.go +++ b/internal/programs/base.go @@ -41,6 +41,7 @@ type BaseProgram struct { droppedEvents uint64 totalEvents uint64 lastAlertTime time.Time + logCounter uint64 // Counter for efficient logging every N events } // NewBaseProgram creates a new base program. @@ -236,11 +237,16 @@ func (p *BaseProgram) StartRingBufferReader(mapName string, parser core.EventPar // Track total events p.mu.Lock() p.totalEvents++ + p.logCounter++ currentTotal := p.totalEvents + shouldLog := p.logCounter >= 100 // Log every 100th event to avoid spam + if shouldLog { + p.logCounter = 0 // Reset counter + } p.mu.Unlock() // Log ring buffer activity (only for debugging) - if currentTotal%100 == 0 { // Log every 100th event to avoid spam + if shouldLog { logger.Debugf("📡 RING BUFFER: %s processed %d events", p.name, currentTotal) }