Introduction
This is Part-2 of a 2-Part articles series on Running Apache Kafka Server, Configuring Kafka Topic, and Creating a Core Java Based Kafka Consumer, as also a Core Java Based Kafka Producer. All this is demonstrated step-by-step example. All of this is for Java v8.0, Apache Kafka v3.4 on Windows 10. Part-1 focussed on Kafka Consumer and Kafka Producer from the Command Line. This article focuses on the Core Java counterparts. It is important that the reader reads the Part-1 of the article, completes the example so that the reader has a basic foundation in Kafka. This article also provides the Maven Dependencies required to create, build and run Apache Kafka Consumers and Producers. Finally, It shows the code sample to run the Consumer and Producer Thread. The code sample is straightforward for an Intermediate+ Java Developer, hence the explanations is omitted for the code.
Pre-Requisites
1. Install Java//JRE (v8.0 is used in this Example)4. UnZIP/UnTAR Apache Kafka Downloaded in (2)
Maven Project (Eclipse) and Dependencies
<dependencies>
<!-- This is the Core Library Containing the Classes We will Use -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
<!-- The Kafka Client Libraries use the slf4j Logger, So we Need to Add
This as a Dependency so that the Required Classes are Present in Our
Classpath for the Kafka Client Libraries to Use -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
</dependencies>
Developing the Java Producer
/**
* Author @sumith.puri (Addl. Ref: https://www.sohamkamani.com/java/kafka/)
*/
package com.kafka.poc.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaPoCProducer implements Runnable {
private static final String TOPIC = "test";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
@Override
public void run() {
produce();
}
private void produce() {
// Create Configuration Options for our Producer and Initialize a New Producer
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
// We Configure the Serializer to Describe the Format in which we Want To
// Produce Data into our Kafka Cluster
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Since we Need to Close our Producer, We can use the try-with-resources
// Statement to create a New Producer
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// Here, We Run an Infinite Loop to Send a Message to the Cluster Every Second
for (int i = 0;; i++) {
String key = Integer.toString(i);
String message = "Watson, Please Come Over Here " + Integer.toString(i);
producer.send(new ProducerRecord<String, String>(TOPIC, key, message));
// Log a Confirmation Once The Message is Written
System.out.println("Sent Message " + key);
try {
// Sleep for a Second
Thread.sleep(1000);
} catch (Exception e) {
break;
}
}
} catch (Exception e) {
System.out.println("Could not Start Producer Due To: " + e);
}
}
}
Developing the Java Consumer
/**
* Author @sumith.puri (Addl. Ref: https://www.sohamkamani.com/java/kafka/)
*/
package com.kafka.poc.consumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaPoCConsumer implements Runnable {
private static final String TOPIC = "test";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
@Override
public void run() {
consume();
}
private void consume() {
// Create Configuration Options for our Consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
// The Group ID is a Unique Identified for Each Consumer Group
props.setProperty("group.id", "my-group-id");
// Since our Producer uses a String Serializer, We need to use the Corresponding
// Deserializer
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Every Time We Consume a Message from kafka, We Need to "commit", That Is,
// Acknowledge Receipts of the Messages.... We Can Set up an Auto-Commit at
// Regular intervals, so that this is Taken Care of in the Background
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
// Since We Need to Close our Consumer, We can Use the try-with-resources
// Statement to Create It
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// Subscribe this Consumer to the Same Topic that we Wrote Messages to Earlier
consumer.subscribe(Arrays.asList(TOPIC));
// Run an Infinite Loop where we Consume and Print New Messages to the Topic
while (true) {
// The consumer.poll Method Checks and Waits..For Any New Messages To Arrive For
// The Subscribed Topic in case there are No Messages for the Duration Specified
// In the Argument (1000 ms In this Case), It returns an Empty List
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received Message: %s\n", record.value());
}
}
}
}
}
/**
* Author @sumith.puri (Addl. Ref: https://www.sohamkamani.com/java/kafka/)
*/
package com.kafka.poc.app;
import com.kafka.poc.consumer.KafkaPoCConsumer;
import com.kafka.poc.producer.KafkaPoCProducer;
public class KafkaPoCApp {
public static void main(String[] args) {
Thread cThread = new Thread(new KafkaPoCConsumer());
cThread.start();
Thread pThread = new Thread(new KafkaPoCProducer());
pThread.start();
}
}