Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,65 @@ curl "http://localhost:8080/api/programs"
- **API Layer**: HTTP endpoints for querying events and program status
- **System Layer**: Top-level coordination and initialization

## Event Flow Architecture

The system processes events through a real-time streaming pipeline that ensures low latency and high throughput:

```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ eBPF Program │ │ Ring Buffer │ │ Event Parser │ │ Event Stream │
│ │ │ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ sys_connect │ │───▶│ │ events │ │───▶│ │ Connection │ │───▶│ │ Channel │ │
│ │ tracepoint │ │ │ │ (16MB) │ │ │ │ Parser │ │ │ │ (buffered) │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
│ │ │ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ │
│ │ kfree_skb │ │───▶│ │drop_events │ │───▶│ │ PacketDrop │ │───▶│ │
│ │ tracepoint │ │ │ │ (256KB) │ │ │ │ Parser │ │ │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │ │
Kernel Space Ring Buffer Go Application Event Stream
(eBPF Programs) (Temporary) (Event Parsing) (Buffered)
│ │
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ Always Empty │ │ ┌─────────────────┐ │
│ │ │ │ Memory Storage │ │
│ Events consumed │ │ │ │ │
│ immediately by │ │ │ • Query Events │ │
│ userspace readers │ │ │ • Time Filters │ │
└─────────────────────┘ │ │ • PID Grouping │ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ HTTP API │ │
│ │ │ │
│ │ /api/list- │ │
│ │ connections │ │
│ │ │ │
│ │ /api/list- │ │
│ │ packet-drops │ │
│ └─────────────────┘ │
└─────────────────────┘


```

### Ring buffers

Ring buffers in eBPF are designed for real-time streaming:

1. **eBPF programs** write events to ring buffers using `bpf_ringbuf_reserve()` and `bpf_ringbuf_submit()`
2. **Userspace readers** immediately consume events using `ringbuf.NewReader()`
3. **Events are parsed** and sent to Go event streams
4. **Ring buffers become empty** as events are consumed in real-time
5. **Events are stored** in memory for API queries

Events flow through the pipeline without accumulating in kernel space.

## Extending the System

📚 **[Complete Development Guide](docs/program-development.md)** - Detailed guide for creating new eBPF monitoring programs
Expand Down
8 changes: 4 additions & 4 deletions cmd/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
func setupTestSystem() *system.System {
// Create a test system
testSystem := system.NewSystem()

// Initialize the API with the test system
api.Initialize(testSystem)

return testSystem
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func TestHTTPServerSetup(t *testing.T) {
func TestHTTPHealthEndpoint(t *testing.T) {
// Setup test system
_ = setupTestSystem()

mux := http.NewServeMux()
mux.HandleFunc("/health", api.HandleHealth)

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestHTTPAPIEndpoints(t *testing.T) {
func TestHTTPConnectionSummaryValidation(t *testing.T) {
// Setup test system
_ = setupTestSystem()

mux := http.NewServeMux()
mux.HandleFunc("/api/connection-summary", api.HandleConnectionSummary)

Expand Down
62 changes: 31 additions & 31 deletions internal/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,39 +112,39 @@ func HandleEvents(w http.ResponseWriter, r *http.Request) {

// Parse query parameters
query := core.Query{}

if eventType := r.URL.Query().Get("type"); eventType != "" {
query.EventType = eventType
}

if pidStr := r.URL.Query().Get("pid"); pidStr != "" {
if pid, err := strconv.ParseUint(pidStr, 10, 32); err == nil {
query.PID = uint32(pid)
}
}

if command := r.URL.Query().Get("command"); command != "" {
query.Command = command
}

if sinceStr := r.URL.Query().Get("since"); sinceStr != "" {
if since, err := time.Parse(time.RFC3339, sinceStr); err == nil {
query.Since = since
}
}

if untilStr := r.URL.Query().Get("until"); untilStr != "" {
if until, err := time.Parse(time.RFC3339, untilStr); err == nil {
query.Until = until
}
}

if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 {
query.Limit = limit
}
}

// Default limit to prevent overwhelming responses
if query.Limit == 0 {
query.Limit = 100
Expand Down Expand Up @@ -356,7 +356,7 @@ func HandlePacketDropSummary(w http.ResponseWriter, r *http.Request) {
// @Router /api/list-connections [get]
func HandleListConnections(w http.ResponseWriter, r *http.Request) {
logger.Debugf("🌐 HTTP REQUEST: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)

if globalSystem == nil {
http.Error(w, "System not initialized", http.StatusServiceUnavailable)
return
Expand Down Expand Up @@ -412,7 +412,7 @@ func HandleListConnections(w http.ResponseWriter, r *http.Request) {
// @Router /api/list-packet-drops [get]
func HandleListPacketDrops(w http.ResponseWriter, r *http.Request) {
logger.Debugf("🌐 HTTP REQUEST: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)

if globalSystem == nil {
http.Error(w, "System not initialized", http.StatusServiceUnavailable)
return
Expand Down Expand Up @@ -459,48 +459,48 @@ func HandleListPacketDrops(w http.ResponseWriter, r *http.Request) {

// ConnectionSummaryRequest represents the request body for connection summary
type ConnectionSummaryRequest struct {
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"curl"` // Command name
Duration int `json:"duration_seconds" example:"60"` // Duration in seconds
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"curl"` // Command name
Duration int `json:"duration_seconds" example:"60"` // Duration in seconds
}

// ConnectionSummaryResponse represents the response for connection summary
type ConnectionSummaryResponse struct {
Count int `json:"count" example:"5"` // Number of connection events
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"curl"` // Command name
DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds
Count int `json:"count" example:"5"` // Number of connection events
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"curl"` // Command name
DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds
QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp
}

// PacketDropSummaryRequest represents the request body for packet drop summary
type PacketDropSummaryRequest struct {
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"nginx"` // Command name
Duration int `json:"duration_seconds" example:"60"` // Duration in seconds
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"nginx"` // Command name
Duration int `json:"duration_seconds" example:"60"` // Duration in seconds
}

// PacketDropSummaryResponse represents the response for packet drop summary
type PacketDropSummaryResponse struct {
Count int `json:"count" example:"3"` // Number of packet drop events
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"nginx"` // Command name
DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds
Count int `json:"count" example:"3"` // Number of packet drop events
PID uint32 `json:"pid" example:"1234"` // Process ID
Command string `json:"command" example:"nginx"` // Command name
DurationSeconds int `json:"duration_seconds" example:"60"` // Duration in seconds
QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp
}

// ConnectionListResponse represents the response for listing connections
type ConnectionListResponse struct {
TotalPIDs int `json:"total_pids" example:"3"` // Number of unique PIDs
TotalEvents int `json:"total_events" example:"10"` // Total number of events
EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID
QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp
TotalPIDs int `json:"total_pids" example:"3"` // Number of unique PIDs
TotalEvents int `json:"total_events" example:"10"` // Total number of events
EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID
QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp
}

// PacketDropListResponse represents the response for listing packet drops
type PacketDropListResponse struct {
TotalPIDs int `json:"total_pids" example:"2"` // Number of unique PIDs
TotalEvents int `json:"total_events" example:"7"` // Total number of events
EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID
QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp
TotalPIDs int `json:"total_pids" example:"2"` // Number of unique PIDs
TotalEvents int `json:"total_events" example:"7"` // Total number of events
EventsByPID map[uint32][]core.Event `json:"events_by_pid"` // Events grouped by PID
QueryTime string `json:"query_time" example:"2023-01-01T12:00:00Z"` // Query timestamp
}
Loading
Loading