This document is your implementation guide. Work through it milestone by milestone. Each milestone builds on the previous one and teaches you core SRE/data-engineering concepts.
Your goal: Build a working data pipeline from scratch, understand why each piece exists, and be able to explain it in an interview.
| Concept | Where You'll Apply It |
|---|---|
| Message brokers & Kafka | Producer, Consumer, Topics |
| Consumer groups & partitions | Consumer offset management |
| At-least-once delivery | Commit strategy + idempotency |
| Backpressure & concurrency | Bounded worker pools |
| Retry patterns | Exponential backoff with jitter |
| Dead Letter Queues | Handling poison messages |
| Prometheus metrics | Counters, Gauges, Histograms |
| Health checks | Liveness vs Readiness probes |
| Observability | Structured logging, dashboards |
┌──────────────┐ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Producer │────▶│ Redpanda │────▶│ Consumer │────▶│ Postgres │
│ (Go CLI) │ │ (Kafka) │ │ (Go svc) │ │ │
└──────────────┘ └─────────────┘ └──────────────┘ └──────────────┘
│ │
│ ┌─────┴─────┐
│ │ /metrics │
│ │ /healthz │
│ │ /readyz │
│ └─────┬─────┘
│ │
┌─────┴─────┐ ┌─────┴─────┐
│ DLQ │ │Prometheus │────▶ Grafana
│ Topic │ └───────────┘
└───────────┘
Goal: Get comfortable with the local environment before writing code.
- Run
make upto start all services - Open Grafana at http://localhost:3000 (admin/admin)
- Open Prometheus at http://localhost:9090
- Connect to Postgres:
docker exec -it streamsre-postgres psql -U streamsre - Explore Redpanda:
docker exec -it streamsre-redpanda rpk cluster info
Run these commands and understand what they show:
# List all topics
docker exec streamsre-redpanda rpk topic list
# Create your main topic with 6 partitions
docker exec streamsre-redpanda rpk topic create events.main -p 6
# Create DLQ topic
docker exec streamsre-redpanda rpk topic create events.dlq -p 1
# Describe a topic (shows partitions, replicas)
docker exec streamsre-redpanda rpk topic describe events.main
# Produce a test message manually
echo '{"test": "hello"}' | docker exec -i streamsre-redpanda rpk topic produce events.main
# Consume messages (Ctrl+C to exit)
docker exec streamsre-redpanda rpk topic consume events.main --num 1- What is a partition? Why would you want multiple partitions?
- What happens when you produce a message without a key vs with a key?
- What's the difference between a topic and a consumer group?
Goal: Define your data structures and learn JSON serialization.
internal/event/model.go- Define theEventstructinternal/event/codec.go- Implement JSON encode/decode
type Event struct {
EventID uuid.UUID `json:"event_id"` // Unique ID for idempotency
EventType string `json:"event_type"` // e.g., "review_created"
Key string `json:"key"` // Partition key, e.g., "user:123"
Timestamp time.Time `json:"ts"` // When event was created
SchemaVersion int `json:"schema_version"` // For future compatibility
Payload ReviewData `json:"payload"` // The actual data
}
type ReviewData struct {
ReviewID string `json:"review_id"`
Rating int `json:"rating"`
Text string `json:"text"`
}- Implement the structs above
- Write
Encode(event *Event) ([]byte, error)usingencoding/json - Write
Decode(data []byte) (*Event, error) - Write a simple test in
internal/event/codec_test.go
- Why do we need
EventIDseparate fromReviewID? - What's the purpose of
SchemaVersion? - Why is
Keyimportant for Kafka partitioning?
Click for hints
- Use
json.Marshal()andjson.Unmarshal() - For uuid, use
github.com/google/uuid - Test with:
go test ./internal/event/...
Goal: Load config from environment variables.
internal/config/config.go- Config structs with env tags
- Define
ProducerConfigstruct with fields: Brokers, Topic, RatePerSec, HotKeyPct, Duration - Define
ConsumerConfigstruct with fields: Brokers, Topic, GroupID, MaxInflight, DatabaseURL, MetricsAddr - Use
github.com/caarlos0/env/v11to parse from environment
type ConsumerConfig struct {
Brokers []string `env:"KAFKA_BROKERS" envSeparator:"," envDefault:"localhost:9092"`
Topic string `env:"KAFKA_TOPIC" envDefault:"events.main"`
GroupID string `env:"KAFKA_GROUP_ID" envDefault:"streamsre-consumer"`
MaxInflight int `env:"MAX_INFLIGHT" envDefault:"64"`
DatabaseURL string `env:"DATABASE_URL"`
MetricsAddr string `env:"METRICS_ADDR" envDefault:":2112"`
}- Why use environment variables instead of config files for production?
- What's a reasonable default for
MaxInflight? Why not 1000?
Goal: Connect to Postgres and implement the queries you need.
internal/db/db.go- Connection pool setupinternal/db/queries.go- SQL operations
-- Idempotency: Have we processed this event before?
processed_events(event_id UUID PRIMARY KEY, processed_at TIMESTAMPTZ)
-- Sink: Where processed data lands
reviews(review_id TEXT PRIMARY KEY, user_key TEXT, rating INT, text TEXT, created_at TIMESTAMPTZ)
-- DLQ: Failed events for later analysis
dlq_events(event_id UUID PRIMARY KEY, reason TEXT, original_payload JSONB, failed_at TIMESTAMPTZ)- Create connection pool using
pgxpool.New(ctx, databaseURL) - Implement
IsEventProcessed(ctx, eventID) (bool, error) - Implement
MarkEventProcessed(ctx, eventID) error - Implement
InsertReview(ctx, review) error - Implement
InsertDLQEvent(ctx, eventID, reason, payload) error - Implement
ProcessEventTx(ctx, eventID, review) error- CRITICAL: This must be transactional!
// ProcessEventTx must do these steps atomically (in one transaction):
// 1. Check if event already processed (SELECT FROM processed_events)
// 2. If already processed, return early (skip - idempotent!)
// 3. Insert the review
// 4. Mark event as processed
// 5. Commit transaction
//
// If any step fails, rollback everything.
// This is how you achieve "exactly-once" semantics with "at-least-once" delivery.- Why do we need a transaction here? What could go wrong without one?
- What happens if the consumer crashes after writing to
reviewsbut before committing? - How does
processed_eventsprevent duplicate reviews?
Click for hints
// Start a transaction
tx, err := pool.Begin(ctx)
if err != nil { return err }
defer tx.Rollback(ctx) // Rollback if we don't commit
// Do your queries using tx.Exec() or tx.QueryRow()
// Commit at the end
return tx.Commit(ctx)Goal: Produce messages to Kafka with controlled rate and key distribution.
internal/kafka/producer.go- Kafka writer wrappercmd/producer/main.go- CLI that produces events
- Create a
kafka.Writerwith your config - Implement
Produce(ctx, event) errorthat encodes and writes - In main.go, implement a loop that:
- Generates random events at configured rate (use
time.Ticker) - Uses "hot keys" for some percentage (simulates real traffic patterns)
- Runs for configured duration then exits
- Generates random events at configured rate (use
Real systems have "hot partitions" - some users are way more active:
// 80% of traffic goes to 20% of keys (configurable via HOT_KEY_PCT)
hotKeys := []string{"user:1", "user:2", "user:3"} // Hot users
coldKeys := []string{"user:100", "user:101", ...} // Everyone else
if rand.Float64() < hotKeyPct {
key = hotKeys[rand.Intn(len(hotKeys))]
} else {
key = coldKeys[rand.Intn(len(coldKeys))]
}- What happens when all messages have the same key?
- How does Kafka decide which partition gets a message?
- Why would hot keys cause problems in production?
# Run producer
KAFKA_TOPIC=events.main RATE_PER_SEC=10 DURATION=30s go run ./cmd/producer
# In another terminal, watch messages arrive
docker exec streamsre-redpanda rpk topic consume events.mainGoal: Read messages from Kafka with a consumer group.
internal/kafka/consumer.go- Kafka reader wrappercmd/consumer/main.go- Service that consumes events
- Create a
kafka.Readerwith GroupID (this enables consumer groups!) - Implement
Consume(ctx, handler func(Message) error) error - In main.go, start consuming and just log each message for now
Consumer Groups: Multiple consumers with same GroupID share the work. Each partition is assigned to exactly one consumer in the group.
Offsets: Kafka tracks where each consumer group has read to. When you "commit", you're saying "I've processed up to here."
Commit Strategy (implement this):
// WRONG: Commit before processing
reader.CommitMessages(ctx, msg) // <- Danger!
processMessage(msg) // If this fails, message is lost
// RIGHT: Commit after processing
processMessage(msg) // Process first
reader.CommitMessages(ctx, msg) // Then commit
// If we crash between process and commit, message replays.
// That's why we need idempotency!- What happens if two consumers have different GroupIDs and read the same topic?
- What happens if a consumer crashes without committing?
- Why do we need idempotency if we commit after processing?
# Terminal 1: Run producer
go run ./cmd/producer
# Terminal 2: Run consumer
go run ./cmd/consumer
# You should see consumer logging each messageGoal: Process messages concurrently with backpressure.
If you process messages one at a time, you're slow. If you spawn unlimited goroutines, you'll OOM or overwhelm the database.
// Bounded worker pool with semaphore
sem := make(chan struct{}, maxInflight) // Limit concurrent workers
for msg := range messages {
sem <- struct{}{} // Acquire slot (blocks if pool is full = backpressure!)
go func(m Message) {
defer func() { <-sem }() // Release slot when done
processMessage(m)
}(msg)
}- Implement worker pool in your consumer
- Track inflight count as a metric (we'll wire metrics later)
- Handle graceful shutdown (wait for inflight to drain)
- What happens if database is slow and all 64 workers are busy?
- How does this prevent the consumer from falling behind Kafka?
- What's the tradeoff of setting MaxInflight too high vs too low?
Goal: Handle transient failures gracefully.
internal/service/retry.go- Retry logic with exponential backoff + jitter
Attempt 1: immediate
Attempt 2: wait 100ms + jitter
Attempt 3: wait 200ms + jitter
Attempt 4: wait 400ms + jitter
...
Attempt 8: give up → send to DLQ
Without jitter, if 1000 requests fail at the same time, they all retry at the same time → thundering herd → fail again → repeat.
Jitter spreads out retries randomly.
- Implement
Retry(ctx, maxAttempts, fn func() error) error - Use exponential backoff:
backoff = min(baseDelay * 2^attempt, maxDelay) - Add jitter:
actualDelay = backoff * (0.5 + rand.Float64()) - Respect context cancellation (don't retry if shutting down)
- Which errors should you retry? Which shouldn't you?
- Why cap the backoff at a maximum (e.g., 5 seconds)?
- What happens if you retry forever instead of giving up after N attempts?
Distinguishing Retryable Errors
// Retryable: transient failures
// - Network timeout
// - Connection refused (database might restart)
// - "too many connections"
// NOT retryable: permanent failures
// - Invalid JSON (will never parse correctly)
// - Constraint violation (duplicate key)
// - 400 Bad RequestGoal: Handle poison messages that can never be processed.
- Invalid payload: Can't decode JSON, missing required fields
- Max retries exceeded: Tried N times, still failing
- Business rule violation: Rating is -5 (impossible value)
- Implement
SendToDLQ(ctx, eventID, reason, originalPayload) error - Option A: Write to Kafka topic
events.dlq - Option B: Write to Postgres table
dlq_events - Wire into your processor: invalid → DLQ immediately, max retries → DLQ
- Why not just drop poison messages?
- How would you "replay" messages from the DLQ after fixing a bug?
- Should DLQ messages be retried automatically? Why or why not?
Goal: Expose Prometheus metrics for observability.
internal/obs/metrics.go- Define and register all metrics
| Metric | Type | Labels | Purpose |
|---|---|---|---|
events_consumed_total |
Counter | - | How many messages pulled from Kafka |
events_processed_total |
Counter | result |
success/fail/dlq outcomes |
processing_duration_seconds |
Histogram | - | How long each event takes |
db_query_duration_seconds |
Histogram | query |
Database latency |
retries_total |
Counter | - | How many retries happened |
inflight_workers |
Gauge | - | Current active goroutines |
consumer_lag |
Gauge | partition |
Messages behind |
- Create each metric using
prometheus.NewCounter(), etc. - Register with
prometheus.MustRegister() - Instrument your code to record metrics at the right places
- Expose
/metricsendpoint usingpromhttp.Handler()
start := time.Now()
err := doSomething()
duration := time.Since(start).Seconds()
metrics.ProcessingDuration.Observe(duration)- What's the difference between Counter, Gauge, and Histogram?
- Why use a Histogram for latency instead of just an average?
- What labels would be useful vs harmful? (Hint: cardinality)
# Run consumer
go run ./cmd/consumer
# Check metrics
curl localhost:2112/metrics | grep streamsreGoal: Measure how far behind the consumer is.
Lag = Latest Offset in Partition - Consumer's Committed Offset
If partition has 1000 messages and you've processed 800:
Lag = 1000 - 800 = 200 messages behind
- Implement lag measurement in
internal/kafka/lag.go - Run a background goroutine that samples lag every 10 seconds
- Expose as
consumer_laggauge metric
// Option 1: Use Reader Stats (simpler)
stats := reader.Stats()
// stats.Lag gives you total lag
// Option 2: Query offsets directly (more detailed)
// Get partition high watermarks via Conn
// Get consumer group committed offsets
// Calculate difference- Is lag of 1000 always bad? When might it be okay?
- What could cause lag to grow unboundedly?
- How would you alert on lag?
Goal: Implement Kubernetes-style health checks.
internal/service/http.go- HTTP server with health endpoints
/healthz (Liveness)
- Returns 200 if the process is running
- Kubernetes uses this to know if it should restart the container
- Should NOT check dependencies (database, Kafka)
/readyz (Readiness)
- Returns 200 if ready to receive traffic
- Check: Can I reach Kafka? Can I reach Postgres?
- If unhealthy, Kubernetes removes from load balancer (no traffic)
- Implement
/healthz- just return{"status": "ok"} - Implement
/readyz- ping Kafka and Postgres, return 200 or 503 - Implement
/metrics- serve Prometheus metrics
- Why separate liveness from readiness?
- What happens if
/readyzalways returns 500? - Should health checks have timeouts?
Goal: Visualize your metrics.
- Open Grafana at http://localhost:3000
- Create a new dashboard
- Add panels for:
- Consumer lag (timeseries)
- Events processed per second (rate)
- Processing latency p50/p95/p99 (histogram_quantile)
- Error rate (events failed / events total)
- Inflight workers (gauge)
- DB latency heatmap
# Rate of events processed per second
rate(events_processed_total[1m])
# 95th percentile processing latency
histogram_quantile(0.95, rate(processing_duration_seconds_bucket[5m]))
# Consumer lag
consumer_lag
# Error rate
rate(events_processed_total{result="fail"}[5m]) / rate(events_processed_total[5m])
- Why use
rate()instead of just the counter value? - What does
[1m]mean in PromQL? - How would you set up an alert for "lag > 10000 for 5 minutes"?
Goal: Break things on purpose and verify your system handles it.
# Produce faster than consumer can handle
RATE_PER_SEC=1000 go run ./cmd/producer
# Watch lag climb in Grafana
# Then reduce rate or increase MAX_INFLIGHTDocument in runbooks/lag_is_climbing.md:
- What metrics spiked?
- How did you diagnose the root cause?
- What was the mitigation?
# Add artificial latency to Postgres
docker exec streamsre-postgres psql -U streamsre -c "
CREATE OR REPLACE FUNCTION slow_trigger() RETURNS trigger AS \$\$
BEGIN
PERFORM pg_sleep(2);
RETURN NEW;
END;
\$\$ LANGUAGE plpgsql;
CREATE TRIGGER slow_insert BEFORE INSERT ON reviews
FOR EACH ROW EXECUTE FUNCTION slow_trigger();
"
# Run your pipeline and watch retries spike
# Clean up:
docker exec streamsre-postgres psql -U streamsre -c "DROP TRIGGER slow_insert ON reviews;"# Produce invalid JSON
echo 'not valid json at all' | docker exec -i streamsre-redpanda rpk topic produce events.main
# Verify it goes to DLQ
# Check dlq_events table or events.dlq topic# Start consumer, then kill it mid-processing
go run ./cmd/consumer &
PID=$!
sleep 5
kill -9 $PID
# Restart consumer - verify no duplicates in reviews table
# (idempotency should prevent double-writes)Goal: Document what you learned from the drills.
For each failure mode, document:
- Symptoms: What alerts fire? What does Grafana show?
- Immediate Checks: Commands to run, queries to execute
- Likely Causes: What usually causes this?
- Mitigation: How to fix it RIGHT NOW
- Follow-ups: What to do after the incident
Create postmortems/YYYY-MM-DD-title.md:
# Postmortem: [Title]
## Summary
One paragraph: what happened, impact, duration.
## Timeline
- HH:MM - First alert
- HH:MM - Investigation started
- HH:MM - Root cause identified
- HH:MM - Mitigation applied
- HH:MM - Resolved
## Root Cause
Why did this happen?
## Impact
What was affected? How many events delayed/lost?
## What Went Well
- Alerts fired correctly
- Runbook was helpful
## What Could Be Improved
- Need better monitoring for X
- Should add circuit breaker
## Action Items
- [ ] Add alert for Y
- [ ] Implement ZYou're finished when:
-
docker compose upstarts all infrastructure - Producer generates events at configurable rate
- Consumer processes events and writes to Postgres
- Killing consumer doesn't cause duplicate reviews (idempotency works!)
- Invalid messages go to DLQ
-
/metricsexposes lag, latency, throughput, errors - Grafana dashboard shows key metrics
- 3 runbooks completed with real examples
- 2 postmortems from chaos drills
- Circuit Breaker: Stop calling Postgres if it's consistently failing
- Rate Limiting: Consumer limits its own throughput
- Schema Registry: Validate events against a schema
- Multi-Consumer: Run 2+ consumers and watch partition rebalancing
- Exactly-Once: Use Kafka transactions (advanced!)
- Kubernetes: Deploy to minikube with proper manifests
- Kafka: The Definitive Guide (free ebook)
- kafka-go documentation
- Google SRE Book (free online)
- The Site Reliability Workbook
Good luck! Start with Milestone 0 to get comfortable with the environment, then work through each milestone in order. Don't skip ahead - each one builds on the previous.