Apache Kafka for Event-Driven Spring Boot Microservices
- bash kafka.sh format
Run the following commands:
- bash kafka.sh start1
- bash kafka.sh start2
- bash kafka.sh start3
Start spring boot
- cd ProductsMicroservice && bash run
List specific topic 'product-created-events-topic'
- bash bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic product-created-events-topic
- https://kafka.apache.org/community/downloads/ - version: 4.2.0
- Udemy: https://www.udemy.com/course/apache-kafka-for-spring-boot-microservices
bash bin/kafka-storage.sh format \
-t jDe9QDCvSxChxBxHN7tRmg \
-c config/server.properties \
--standalone
bash bin/kafka-server-start.sh config/server.propertiesCLUSTER_ID=GZr1B__JQ7Ozviiig_UBeg DIR_ID_1=$(bash bin/kafka-storage.sh random-uuid) DIR_ID_2=$(bash bin/kafka-storage.sh random-uuid) DIR_ID_3=$(bash bin/kafka-storage.sh random-uuid)
INITIAL_CONTROLLERS="1@localhost:9093:${DIR_ID_1},2@localhost:9095:${DIR_ID_2},3@localhost:9097:${DIR_ID_3}"
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bash bin/kafka-storage.sh format
-t "rO_gM3IJRVy6q4u-ESGU3Q"
-c config/server-1.properties
bash bin/kafka-storage.sh format
-t "rO_gM3IJRVy6q4u-ESGU3Q"
-c config/server-2.properties
bash bin/kafka-storage.sh format
-t "rO_gM3IJRVy6q4u-ESGU3Q"
-c config/server-3.properties
bash bin/kafka-server-start.sh config/server-1.properties bash bin/kafka-server-start.sh config/server-2.properties bash bin/kafka-server-start.sh config/server-3.properties
CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
bin/kafka-storage.sh format -t $CLUSTER_ID -c config/server-1.properties bin/kafka-storage.sh format -t $CLUSTER_ID -c config/server-2.properties bin/kafka-storage.sh format -t $CLUSTER_ID -c config/server-3.properties
bash bin/kafka-server-stop.sh
bash bin/kafka-topics.sh --create --topic topic1 --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092,localhost:9094
bash bin/kafka-topics.sh --create --topic topic2 --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092,localhost:9094
bash bin/kafka-topics.sh --create --topic insync-topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092 --config min.insync.replicas=2
rm -rf /tmp/server-{1,2,3}/kraft-combined-logs
bash bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bash bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
Replicas: 2,3,1 is Broker ID
bash bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic {topicName}
bash bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic product-created-events-topic
bash bin/kafka-topics.sh --delete --topic topic1 --bootstrap-server localhost:9092
bash bin/kafka-console-producer.sh --bootstrap-server localhost:9092,localhost:9094 --topic new-topic
bash bin/kafka-console-producer.sh --bootstrap-server localhost:9092,localhost:9094 --topic new-topic --property "parse.key=true" --property "key.separator=:"
bash bin/kafka-console-producer.sh
--bootstrap-server localhost:9092
--topic topic1
--reader-property "parse.key=true"
--reader-property "key.separator=:"
bin/kafka-console-consumer.sh
--topic topic1
--from-beginning
--bootstrap-server localhost:9092
bin/kafka-console-consumer.sh
--topic topic1
--bootstrap-server localhost:9092
bash bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092,localhost:9094
--topic topic1
--formatter-property print.key=true
--formatter-property key.separator=" : "
bash bin/kafka-console-consumer.sh
--topic product-created-events-topic
--bootstrap-server localhost:9092,localhost:9094
--formatter-property print.key=true
--formatter-property key.separator=" : "
bash bin/kafka-console-consumer.sh
--topic topic2
--bootstrap-server localhost:9092,localhost:9094
--formatter-property print.key=true
--formatter-property key.separator=" : "
-
Publish/Produce Messages
-
Serialize to binary format
-
Specify topic name
-
Specify topic partition
-
No response - if the producer is configured with acks=0
-
Acknowledgement (ACK) of Successful Storage
-
Non-Retryable Error - A permanent problem that is unlikely to be resolved by retrying the spend operation
- Synchronous
- Asynchronous
This project initializes local Kafka transactions in TransferService by combining:
- a producer transaction id prefix
- a transaction-capable
DefaultKafkaProducerFactory KafkaTemplateKafkaTransactionManagerKafkaTemplate.executeInTransaction(...)
Configuration used by this project:
spring.kafka.producer.transaction-id-prefix=transfer-service-${random.value}-Producer and transaction manager setup:
@Bean
ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> producerFactory =
new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix(transactionIdPrefix);
return producerFactory;
}
@Bean
KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}Service usage:
public boolean transfer(TransferRestModel transferRestModel) {
kafkaTemplate.executeInTransaction(operations -> {
operations.send("withdraw-money-topic", withdrawalEvent);
callRemoteService();
operations.send("deposit-money-topic", depositEvent);
return true;
});
return true;
}Important behavior:
- Kafka creates a transactional producer when the transaction starts
- records sent with
KafkaTemplatestay invisible toREAD_COMMITTEDconsumers until commit - if the method throws an exception, the transaction is rolled back and consumers do not receive those records
In this project, that is why a failed POST /transfers no longer produces a visible withdrawal event in WithdrawalService.
- spring.kafka.producer.acks=all
- replication-factor 5
- config min.insync.replicas=2
- spring.kafka.producer.acks=1
- spring.kafka.producer.acks=0
- spring.kafka.producer.retries=10
- spring.kafka.producer.properties.retry.backoff.ms=1000
- spring.kafka.producer.properties.delivery.timeout.ms=120000
delivery.timeout.ms >= linger.ms + request.timeout.ms
spring.kafka.producer.proerties.linger.ms=0
spring.kafka.producer.request.timeout.ms=30000
bash bin/kafka-configs.sh
--bootstrap-server localhost:9092
--alter
--entity-type topics
--entity-name topic2
--add-config min.insync.replicas=2
bash bin/kafka-topics.sh --delete --topic __consumer_offsets --bootstrap-server localhost:9092
-
- Trying how Kafka producer retries work, bash kafka.sh start1
- A Kafka consumer reads records from a topic by polling Kafka brokers.
- Kafka tracks consumer progress with offsets, stored per consumer group.
- If multiple consumers use the same group id, Kafka distributes partitions across them.
- If different consumer groups read the same topic, each group receives its own copy of the stream.
When POST /transfers is handled by TransferService, both Kafka sends run inside a Kafka transaction.
Current flow in TransferService:
- Send
WithdrawalRequestedEventtowithdraw-money-topic - Call remote service
- Send
DepositRequestedEventtodeposit-money-topic
If the request fails before the method completes successfully, the Kafka transaction is rolled back.
That means:
WithdrawalServicewill not receive the withdrawal event anymoreDepositServicewill not receive the deposit event- Kafka may show producer transaction logs, but consumers using
READ_COMMITTEDwill not see rolled-back records
This is expected with the current configuration because:
TransferServiceuses a transactional Kafka producerWithdrawalServiceusesspring.kafka.consumer.isolation-level=READ_COMMITTED
So if POST /transfers fails, Received a new withdrawal event should not appear in WithdrawalService logs.
Only when the transfer flow completes and the Kafka transaction commits will WithdrawalService receive the event.
Current project behavior:
KafkaConfig#createTopic()createsproduct-created-events-topicProductServiceImpl#createProduct()currently publishes totopic2
Consume from the topic currently used by the Spring Boot service:
bash bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--topic topic2 \
--from-beginningConsume and show message keys:
bash bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--topic topic2 \
--from-beginning \
--property print.key=true \
--property key.separator=" : "Consume from product-created-events-topic if you switch the producer back to that topic:
bash bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--topic product-created-events-topic \
--from-beginning \
--property print.key=true \
--property key.separator=" : "
# List dead letter topic
bash bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--topic product-created-events-topic.DLT \
--from-beginning \
--formatter-property print.key=true \
--formatter-property print.value=true
bash bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--topic product-created-events-topic-dlt \
--from-beginning \
--formatter-property print.key=true \
--formatter-property print.value=true# Console producer
bash bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--topic product-created-events-topic \
--reader-property "parse.key=true" \
--reader-property "key.separator=:"Read with a named consumer group:
bash bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--topic topic2 \
--group product-created-events-consumer-group \
--from-beginningDescribe the consumer group and inspect committed offsets:
bash bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092,localhost:9094 \
--describe \
--group product-created-events-consumer-group--from-beginningreplays existing records only when the consumer group has no committed offset yet.- Once offsets are committed for a group, the next run resumes from the last committed position.
- Offsets are stored in Kafka, typically in the
__consumer_offsetsinternal topic. - With 3 partitions, Kafka can actively assign work to up to 3 consumers in the same group for that topic.
- Access h2-console-db: http://localhost:{Port_Number}/h2-console
- ./kafka-console-consumer.sh --topic orders-events --bootstrap-server localhost:9092
- ./kafka-console-consumer.sh --topic products-commands --bootstrap-server localhost:9092
- /kafka-console-consumer.sh --topic products-commands --bootstrap-server localhost:9092
- ./kafka-console-consumer.sh --topic products-events --bootstrap-server localhost:9092
- ប្រើ Apache Kafka ជា event broker
- មិនមាន controller កណ្ដាល (no orchestrator)
- Service នីមួយៗ:
- 📥 Subscribe event
- ⚙️ Process
- 📤 Publish event បន្ត