-
Notifications
You must be signed in to change notification settings - Fork 0
Kafka
The spotify/kafka docker image already includes Zookeeper so we don't need to link it with Kafka externally.
The port mappings are as follow:
- "2181:2181" for Zookeeper
- "9092:9092" for Kafka
- In /etc/hosts add a new line that maps kafka to localhost:
127.0.0.1 kafka
- Use docker run command to instantiate a Kafka container:
docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=kafka --env ADVERTISED_PORT=9092 --hostname kafka --name kafka spotify/kafka
- Create a topic in the kafka server
docker exec kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
- Test Producer and Consumer interaction with the created topic
In point 1, setting ADVERTISED_HOST=kafka (i.e to localhost) will make Kafka only work with consumers and producers started within its container. For this test to work we need to use the IP address outputed by the command below and add it to the option --broker-list (in 3.2 and 3.3).
3.1 In the same terminal:
docker inspect kafka | grep IPAddress
--------------------------------
Output:
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.2",
"IPAddress": "172.17.0.2",
3.2 In a new terminal create a kafka producer(specify the IP address obtained above in the command):
docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092 --topic testTopic
3.3 In another new terminal create a kafka consumer:
docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092 --topic testTopic --from-beginning
3.4 In the kafka producer terminal enter a word. In the kafka consumer terminal that word should appear.
- List all topics
docker ps
docker exec -it kafka bash
root@kafka:/opt/kafka_2.11-0.10.1.0/bin# ./kafka-topics.sh --list --zookeeper localhost:2181
- Delete topic
docker ps
docker exec -it kafka bash
root@kafka:/opt/kafka_2.11-0.10.1.0/bin# ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic testTopic
The Java Producer is implemented in the class EventProducer.java and it has to do two tasks:
- configure Kafka properties
- send string data to a specific topic and partition in the Kafka server, using producer records
public class EventProducer
{
/**
* Declaration of the the singleton instance for this class.
* The application will only have one producer
*/
private static EventProducer single_instance = null;
/**
* It's a persistent set of properties
*/
private Properties properties;
/**
* Kafka producer
*/
private KafkaProducer kafkaProducer;
/**
* constructor
*/
public EventProducer ()
{
this.properties = new Properties();
this.properties.put("bootstrap.servers", "172.17.0.2:9092");
this.properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.kafkaProducer = new KafkaProducer(this.properties);
}
/**
* Send event to the kafka cluster
* @param recordTopic
* @param recordPartition
* @param recordVaue
*/
public void storeEvent(String recordTopic, int recordPartition, String recordVaue){
try{
// recordTopic = "merchUpdateTopic"
this.kafkaProducer.send(new ProducerRecord(recordTopic, Integer.toString(recordPartition), recordVaue));
}catch (Exception e){
e.printStackTrace();
this.kafkaProducer.close();
}
}
/**
* static method to create and return an instance of this class
* @return EventProducer
*/
public static EventProducer getInstance()
{
if (single_instance == null)
single_instance = new EventProducer();
return single_instance;
}
}The TimerSessionBean.java uses this class to send update messages to the Kafka Server with the line:
producer.storeEvent("merchUpdateTopic", 1, "The search results for \""+ searchName + "\" were updated!");The Java Producer is implemented in a separate Java project and it follows the same structure as the Java Consumer:
- configure Kafka properties
- subscribe to the same topic as the kafka producer
- receive the producer records published by the Kafka producer and extract the information within
public class Consumer {
/**
* Kafka consumer instance
*/
private KafkaConsumer kafkaConsumer;
/**
* properties
*/
private Properties properties;
/**
* constructor
* @param topic
*/
public Consumer(String topic) {
properties = new Properties();
properties.put("bootstrap.servers", "172.17.0.2:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-group");
this.kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(topic));
}
/**
* obtain the events stored in the kafka cluster
* @return ArrayList of the events retrieved
*/
public ArrayList<String> consumeRecordsFromKafka()
{
ArrayList<String> recordsList = new ArrayList<>();
ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
for (ConsumerRecord record : records)
{
recordsList.add(String.valueOf(record.value()));
}
return recordsList;
}
}Wiki
Project configuration