Wednesday, July 5, 2023

Apache Kafka - Java Producer & Consumer Example (Kafka v3.4 on Windows 10)

Introduction

This is Part-2 of a 2-Part articles series on Running Apache Kafka Server, Configuring Kafka Topic, and Creating a Core Java Based Kafka Consumer, as also a Core Java Based Kafka Producer. All this is demonstrated step-by-step example. All of this is for Java v8.0, Apache Kafka v3.4 on Windows 10. Part-1 focussed on Kafka Consumer and Kafka Producer from the Command Line. This article focuses on the Core Java counterparts. It is important that the reader reads the Part-1 of the article, completes the example so that the reader has a basic foundation in Kafka. This article also provides the Maven Dependencies required to create, build and run Apache Kafka Consumers and Producers. Finally, It shows the code sample to run the Consumer and Producer Thread. The code sample is straightforward for an Intermediate+ Java Developer, hence the explanations is omitted for the code.


Pre-Requisites

1. Install Java//JRE (v8.0 is used in this Example)
2. Install Apache Kafka 3.4.0 from the Given Link
3. Set Java Classpath> Set JAVA_HOME Correctly
4. UnZIP/UnTAR Apache Kafka Downloaded in (2)
5. Use a Text Editor like [Notepad++] for Editing
6. Eclipse IDE (Or Others) to Create, Run & Test


Before You Begin, Read my Article #1 in this Series at : https://rebrand.ly/skp-ts-kafka-v3-win

Maven Project (Eclipse) and Dependencies

Create a Simple JAR archetype Maven Project in Eclipse (Or IDE of Your Choice). Add the following dependencies in your pom.xml. Make Sure that your Compiler Version is Java 8.0.
 <dependencies>  
         <!-- This is the Core Library Containing the Classes We will Use -->  
         <dependency>  
             <groupId>org.apache.kafka</groupId>  
             <artifactId>kafka-clients</artifactId>  
             <version>3.2.1</version>  
         </dependency>  
         <!-- The Kafka Client Libraries use the slf4j Logger, So we Need to Add   
             This as a Dependency so that the Required Classes are Present in Our   
             Classpath for the Kafka Client Libraries to Use -->  
         <dependency>  
             <groupId>org.slf4j</groupId>  
             <artifactId>slf4j-api</artifactId>  
             <version>1.7.36</version>  
         </dependency>  
 </dependencies>  


Developing the Java Producer

1. Make Sure you know the Topic Name
2. Find out Your Bootstrap - Server Port
3. Refer Javadoc for KafkaProducer Obj. 
4. Also, for Producer & ProducerRecord


Code for Java Producer (Tested on Kafka v3.4 on Windows 10)
 /**   
  *    Author @sumith.puri (Addl. Ref: https://www.sohamkamani.com/java/kafka/)  
  */   
 package com.kafka.poc.producer;  
   
 import java.util.Properties;  
   
 import org.apache.kafka.clients.producer.KafkaProducer;  
 import org.apache.kafka.clients.producer.Producer;  
 import org.apache.kafka.clients.producer.ProducerRecord;  
   
 public class KafkaPoCProducer implements Runnable {  
   
     private static final String TOPIC = "test";  
     private static final String BOOTSTRAP_SERVERS = "localhost:9092";  
   
     @Override  
     public void run() {  
   
         produce();  
     }  
   
     private void produce() {  
   
         // Create Configuration Options for our Producer and Initialize a New Producer  
         Properties props = new Properties();  
         props.put("bootstrap.servers", BOOTSTRAP_SERVERS);  
   
         // We Configure the Serializer to Describe the Format in which we Want To  
         // Produce Data into our Kafka Cluster  
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
   
         // Since we Need to Close our Producer, We can use the try-with-resources  
         // Statement to create a New Producer  
         try (Producer<String, String> producer = new KafkaProducer<>(props)) {  
   
             // Here, We Run an Infinite Loop to Send a Message to the Cluster Every Second  
             for (int i = 0;; i++) {  
                 String key = Integer.toString(i);  
                 String message = "Watson, Please Come Over Here " + Integer.toString(i);  
   
                 producer.send(new ProducerRecord<String, String>(TOPIC, key, message));  
   
                 // Log a Confirmation Once The Message is Written  
                 System.out.println("Sent Message " + key);  
                 try {  
                     // Sleep for a Second  
                     Thread.sleep(1000);  
                 } catch (Exception e) {  
                     break;  
                 }  
             }  
         } catch (Exception e) {  
             System.out.println("Could not Start Producer Due To: " + e);  
         }  
     }  
 }  
   

Developing the Java Consumer

1. Make Sure you know the Topic Name
2. Find out Your Bootstrap - Server Port
3. Refer Javadoc > KafkaConsumer Obj.
4. Also, for Consumer, ConsumerRecord
5. Refer AutoCommit/Acknowledgement


Code for Java Consumer (Tested on Kafka v3.4 on Windows 10)
 /**   
  *    Author @sumith.puri (Addl. Ref: https://www.sohamkamani.com/java/kafka/)  
  */   
 package com.kafka.poc.consumer;  
   
 import java.time.Duration;  
 import java.util.Arrays;  
 import java.util.Properties;  
   
 import org.apache.kafka.clients.consumer.ConsumerRecord;  
 import org.apache.kafka.clients.consumer.ConsumerRecords;  
 import org.apache.kafka.clients.consumer.KafkaConsumer;  
   
 public class KafkaPoCConsumer implements Runnable {  
   
     private static final String TOPIC = "test";  
     private static final String BOOTSTRAP_SERVERS = "localhost:9092";  
   
     @Override  
     public void run() {  
   
         consume();  
     }  
   
     private void consume() {  
   
         // Create Configuration Options for our Consumer  
         Properties props = new Properties();  
   
         props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);  
         // The Group ID is a Unique Identified for Each Consumer Group  
         props.setProperty("group.id", "my-group-id");  
   
         // Since our Producer uses a String Serializer, We need to use the Corresponding  
         // Deserializer  
         props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
         props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
   
         // Every Time We Consume a Message from kafka, We Need to "commit", That Is,  
         // Acknowledge Receipts of the Messages.... We Can Set up an Auto-Commit at  
         // Regular intervals, so that this is Taken Care of in the Background  
         props.setProperty("enable.auto.commit", "true");  
         props.setProperty("auto.commit.interval.ms", "1000");  
   
         // Since We Need to Close our Consumer, We can Use the try-with-resources  
         // Statement to Create It  
         try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {  
   
             // Subscribe this Consumer to the Same Topic that we Wrote Messages to Earlier  
             consumer.subscribe(Arrays.asList(TOPIC));  
   
             // Run an Infinite Loop where we Consume and Print New Messages to the Topic  
             while (true) {  
                   
                 // The consumer.poll Method Checks and Waits..For Any New Messages To Arrive For  
                 // The Subscribed Topic in case there are No Messages for the Duration Specified  
                 // In the Argument (1000 ms In this Case), It returns an Empty List  
                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));  
                 for (ConsumerRecord<String, String> record : records) {  
                     System.out.printf("Received Message: %s\n", record.value());  
                 }  
             }  
         }  
     }  
   
 }  
   


Create the Java Application to Demo Kafka Producer & Consumer
 /**   
  *    Author @sumith.puri (Addl. Ref: https://www.sohamkamani.com/java/kafka/)  
  */   
 package com.kafka.poc.app;  
   
 import com.kafka.poc.consumer.KafkaPoCConsumer;  
 import com.kafka.poc.producer.KafkaPoCProducer;  
   
 public class KafkaPoCApp {  
   
     public static void main(String[] args) {  
   
         Thread cThread = new Thread(new KafkaPoCConsumer());  
         cThread.start();  
   
         Thread pThread = new Thread(new KafkaPoCProducer());  
         pThread.start();  
   
     }  
 }  
   

Run the Above Application in your IDE or Command-Line.


Typical Output from Running the Kafka Producer Consumer PoC



Tuesday, July 4, 2023

Starting Apache Kafka on Windows 10 (Kafka v3.4)

Introduction

This is Part-1 of a 2-Part articles series on Running Apache Kafka Server, Configuring Kafka Topic, and Creating a Kafka Consumer, as also a Kafka Producer. All this is demonstrated step-by-step example that works from the Command Line. All of this is for Apache Kafka v3.4 on Windows 10.


Pre-Requisites
1. Install Java//JRE (v8.0 is used in this Example)
2. Install Apache Kafka 3.4.0 from the Given Link
3. Set Java Classpath> Set JAVA_HOME Correctly
4. UnZIP/UnTAR Apache Kafka Downloaded in (2)
5. Use a Text Editor like [Notepad++] for Editing



Version 3.4.0
Apache Kafka Version 3.4.0 was Released on Feb 7, 2023, This article specifically is for the Kafka Version (2.13-3.4.0). 

For Purposes of this Article, I use {KAFKA_HOME} as the windows folder where Kafka was installed.


Step-By-Step Guide

0. Configure Zookeeper (Data Directory)
Create a folder to hold Zookeeper Data by modifying the file zookeper.properties (File is Located under {KAFKA_HOME}/config/). Create a Folder named zk-data (or as per your wish). In my case, I created this under {KAFKA_HOME}. You may then modify your properties file as show in the image below.  Modify your dataDir to point to the newly created folder. 



0. Configure Kafka (Kafka Logs)
For the purpose of kafka logs, you can create a folder with the name kafka-logsIn my case, I created this under {KAFKA_HOME}. You may then modify your properties file as show in the image below. The property to be modified is  log.dirs in server.properties that should now point to the newly created folder.



1. Starting Zookeeper
First, Zookeeper has to be started using the following command.
zookeeper-server-start.bat ..\..\config\zookeeper.properties



2. Starting Kafka Server
Next, we will start the Kafka Server using the following command.

D:\kafka_2.13-3.4.0\bin\windows>kafka-server-start.bat ..\..\config\server.properties



3. Creating a Test Topic
Create a Kafka Topic to test out the Kafka Installation using the following command.

D:\kafka_2.13-3.4.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test



The above is an updated way to create topics in Kafka. In earlier versions of Kafka (Kafka v2), the suggested way to create topics was directly via Zookeeper. From v3, It has changed to create topics via Brokers.

(Cited from StackOverflow)

For version 2.* you have to create the topic using zookeper with the default port 2181 as a parameter.

For the version 3.* the zookeeper is not any more a parameter, you should use --bootstrap-server using localhost or the IP adresse of the server and the default port 9092.

Documentation

 

4. Create Kafka Producer
kafka-console-producer.bat --broker-list localhost:9092 --topic test



5. Create Kafka Consumer

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning


------

Next in this series of Articles will be the demonstration of a Core Java Kafka Producer and Consumer followed by an article on Spring Boot based Kafka Integration.