A comprehensive example demonstrating explicit topic naming conventions for Kafka Streams applications, including changelog, repartition, and windowed state store topics.
This project implements a time-windowed aggregation Kafka Streams application that showcases:
- Explicit changelog topic naming using
Materialized.as() - Explicit repartition topic naming using
Grouped.as() - Predictable topic names following organizational standards
- Manual topic creation with auto-creation disabled
- Time-windowed aggregation with 5-minute tumbling windows
- Comprehensive testing with TopologyTestDriver and Testcontainers
This implementation fully complies with the PRD requirements:
- β All topics use explicit, predictable naming conventions
- β
Auto topic creation is disabled (
auto.create.topics.enable=false) - β
Topic names follow organizational pattern:
{{domain_id}}-{{environment}}-{{accessibility}}-{{service}}-{{function}} - β Changelog and repartition topics are explicitly named
- β All topics can be pre-created and managed via IaC
- β CI/CD ready with Docker Compose and comprehensive tests
Input Topic Kafka Streams Application Output Topic
βββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββ
βcus-s-pub-windowed-agg-input ββββββΆβ WindowedAggregation βββββββΆβcus-s-pub-windowed-agg-output β
β β β β β β
β Events (JSON) β β β’ Group by key β β Aggregated (JSON) β
β - user-1, click, 10 β β β’ 5-min tumbling window β β - count, sum, avg β
β - user-1, view, 20 β β β’ Aggregate statistics β β - window timestamps β
βββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββββββββ
β Repartition β β Changelog β
β Topic β β Topic β
ββββββββββββββββββββ ββββββββββββββββββββββββββ
cus-s-pub-windowed- cus-s-pub-windowed-agg-
agg-events-by-key- event-count-store-
repartition changelog
- Java 17+
- Maven 3.6+
- Docker and Docker Compose
# Clone the repository
git clone <repo-url>
cd kafka-streams-using-topic-naming
# Build the project
mvn clean package# Start Kafka, Schema Registry, and Kafka UI
docker-compose up -d
# Verify topics were created
docker exec windowed-agg-broker kafka-topics --list --bootstrap-server localhost:9092Expected topics:
cus-s-pub-windowed-agg-inputcus-s-pub-windowed-agg-outputcus-s-pub-windowed-agg-event-count-store-changelogcus-s-pub-windowed-agg-events-by-key-repartition
# Run via Maven
mvn exec:java -Dexec.mainClass="com.github.osodevops.kafka.StreamsApplication"
# Or run the JAR
java -jar target/kafka-streams-using-topic-naming-1.0.0-SNAPSHOT-jar-with-dependencies.jar# Make script executable
chmod +x scripts/produce-sample-data.sh
# Generate and produce 50 sample events
./scripts/produce-sample-data.sh localhost:9092 50Option 1: Kafka Console Consumer
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic cus-s-pub-windowed-agg-output \
--from-beginning \
--property print.key=true \
--property key.separator=': 'Option 2: Kafka UI
Open http://localhost:8080 and navigate to Topics β cus-s-pub-windowed-agg-output
# Stop application (Ctrl+C)
# Stop Docker services
docker-compose down
# Remove all data
docker-compose down -vkafka-streams-using-topic-naming/
βββ pom.xml # Maven configuration
βββ docker-compose.yml # Local Kafka environment
βββ README.md # This file
β
βββ src/main/java/com/github/osodevops/kafka/
β βββ StreamsApplication.java # Main application
β βββ config/
β β βββ TopicConfig.java # Centralized topic naming
β βββ model/
β β βββ Event.java # Input event model
β β βββ AggregatedEvent.java # Output aggregation model
β βββ serde/
β β βββ JsonSerdes.java # JSON serializers/deserializers
β βββ topology/
β βββ WindowedAggregationTopology.java # Streams topology
β
βββ src/main/resources/
β βββ application.properties # Application configuration
β βββ log4j2.xml # Logging configuration
β
βββ src/test/java/com/github/osodevops/kafka/
β βββ topology/
β β βββ WindowedAggregationTopologyTest.java # Unit tests
β βββ integration/
β βββ StreamsIntegrationTest.java # Integration tests
β
βββ scripts/
β βββ create-topics.sh # Manual topic creation
β βββ produce-sample-data.sh # Sample data generator
β
βββ doc/
βββ kafka-streams-topic-naming-prd.md # Product requirements
βββ topic-naming-guide.md # Topic naming conventions
βββ topic-retention-and-deletion.md # Retention and deletion policies
βββ deployment-guide.md # Deployment instructions
All topics follow the organizational pattern: {{domain_id}}-{{environment}}-{{accessibility}}-{{service}}-{{function}}
Example Configuration (in TopicConfig.java):
- Domain:
cus(Customer) - Environment:
s(Staging) - Accessibility:
pub(Public) - Service:
windowed-agg
| Topic Name | Purpose | Cleanup Policy | Retention |
|---|---|---|---|
cus-s-pub-windowed-agg-input |
Consumes raw events | delete | 7 days |
cus-s-pub-windowed-agg-output |
Publishes aggregated results | delete | 7 days |
| Topic Name | Purpose | Cleanup Policy | Retention |
|---|---|---|---|
cus-s-pub-windowed-agg-event-count-store-changelog |
State store changelog | compact,delete | 7 days |
cus-s-pub-windowed-agg-events-by-key-repartition |
Data repartitioning | delete | 1 hour |
See Topic Retention and Deletion Guide for detailed configuration.
Changelog Topic:
// TopicConfig.java sets APPLICATION_ID = "cus-s-pub-windowed-agg"
Materialized.<String, AggregationState, WindowStore>as("event-count-store")
// Results in: cus-s-pub-windowed-agg-event-count-store-changelogRepartition Topic:
// TopicConfig.java sets APPLICATION_ID = "cus-s-pub-windowed-agg"
Grouped.<String, Event>as("events-by-key")
// Results in: cus-s-pub-windowed-agg-events-by-key-repartitionSee Topic Naming Guide for complete details.
Application Configuration (application.properties):
# Application identifier (used as topic prefix)
application.id=windowed-agg
# Kafka brokers
bootstrap.servers=localhost:9092
# CRITICAL: Disable auto topic creation
auto.create.topics.enable=false
# Processing guarantee
processing.guarantee=exactly_once_v2
# State directory
state.dir=/tmp/kafka-streamsOverride via system properties:
java -jar app.jar \
-Dkafka.bootstrap.servers=prod-kafka:9092 \
-Dkafka.replication.factor=3Tests use TopologyTestDriver for fast, isolated testing:
# Run unit tests
mvn test -Dtest=WindowedAggregationTopologyTestTest Coverage:
- Single event aggregation
- Multiple events in same window
- Events split across windows
- Different keys produce independent aggregations
- Window timestamp correctness
- Topic naming verification
Tests use Testcontainers with real Kafka:
# Run integration tests
mvn test -Dtest=StreamsIntegrationTestTest Coverage:
- End-to-end event processing
- Internal topic creation verification
- Topic naming convention compliance
- Multi-instance behavior
# All tests
mvn test
# With coverage report
mvn test jacoco:reportThe application performs time-based aggregation:
- Input: Events with key, type, value, and timestamp
- Grouping: Group events by key (creates repartition topic)
- Windowing: 5-minute tumbling windows
- Aggregation: Count, sum, and average per window
- Output: Aggregated statistics with window boundaries
- Store Name:
event-count-store - Store Type: Windowed key-value store
- Changelog Topic:
cus-s-pub-windowed-agg-event-count-store-changelog - Retention: Based on window size and grace period
- Compaction: Enabled for changelog topic
Custom JSON serialization using Jackson:
- Input: Event β JSON
- Output: AggregatedEvent β JSON
- State: Internal aggregation state β JSON
- Timestamp: ISO 8601 format
Access at http://localhost:8080 to view:
- Topic messages and metadata
- Consumer group lag
- Broker metrics
- Schema registry (if using Avro)
Key metrics to monitor:
# Stream state
kafka.streams:type=stream-state-metrics,state-id=*
# Thread performance
kafka.streams:type=stream-thread-metrics,thread-id=*
# Task metrics
kafka.streams:type=stream-task-metrics,task-id=*Application logs location:
- Console: Standard output
- File:
logs/kafka-streams-app.log(configurable in log4j2.xml)
See Deployment Guide for:
- Production deployment steps
- Topic creation scripts
- Configuration examples
- Scaling strategies
- Troubleshooting guide
- All topics manually created with appropriate replication
- Auto topic creation disabled on brokers
- State directory on persistent storage
- Monitoring and alerting configured
- ACLs configured (if security enabled)
- Backup and recovery procedures in place
Application won't start: "Topic does not exist"
Ensure all topics are created via docker-compose or manually:
docker-compose up -d
docker-compose logs kafka-setupWrong topic names created
Verify application.id matches TopicConfig.APPLICATION_ID:
// In TopicConfig.java
public static final String APPLICATION_ID = "windowed-agg";No output produced
Check:
- Application is running and consuming
- Events are being produced to input topic
- Window time has advanced (wait 5+ minutes)
Contributions welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
- PRD - Product requirements
- Topic Naming Guide - Naming conventions
- Deployment Guide - Deployment instructions
- Java: 17
- Kafka: 4.1.0
- Kafka Streams: 4.1.0
- Jackson: 2.18.2
- JUnit: 5.11.4
- Testcontainers: 1.20.4
- Maven: 3.x
- Docker Compose: 3.8
This project is provided as an example implementation for educational purposes.
For issues and questions:
- Check the Troubleshooting section
- Review the documentation
- Check Kafka Streams logs for error details
- Open an issue with reproduction steps
Built with explicit topic naming for operational excellence