In this post, we will take a closer look at Apache Kafka Messaging. We will show how you can easily start a Kafka cluster and how messages can be sent and received by means of CLI and from a Java application. At the end, we will explore how partitions work from a practical point of view.

1. Introduction

It is advised to read some documentation if you are not yet acquainted with Apache Kafka Messaging. There is quite a lot of documentation available which is also similar to each other. A good starting point is the official Apache Kafka documentation which can be found here. Another good reference is from Kevin Sookocheff, it contains duplicate information compared to the official documentation but it has a nice section about partitions, which we will cover at the end of this post.

As mentioned before, this post will not be a theoretical exercise, but we are going to try to get more acquainted with Apache Kafka Messaging from a more practical point of view. The sources that are being used can be found at GitHub.

2. Run Kafka

An easy way to run a Kafka cluster on your local machine, is to use the Wurstmeister Kafka Docker Compose file. Download the git repository or clone it to your local machine. We will be using Ubuntu 18.04. Also make sure that you have installed Docker Compose.

We will run a Kafka cluster with a single broker, therefore, we first need to edit the file docker-compose-single-broker.yml. We need to alter the environment variable KAFKA_ADVERTISED_HOST_NAME to localhost.

Change the line:

KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100

into:

KAFKA_ADVERTISED_HOST_NAME: localhost

We can start the Kafka cluster now, make sure that you execute the following command from the directory where the docker-compose-single-broker.yml file resides:

$ docker-compose -f docker-compose-single-broker.yml up -d
Starting kafka-docker-master_kafka_1 ... done
Starting kafka-docker-master_zookeeper_1 ... done

We now have a running Kafka cluster.

In order to stop the cluster, we can issue the following command (again, make sure that this command is executed from the directory where the docker-compose-single-broker.yml file resides):

$ docker-compose stop

3. Send and Receive Messages by Means of CLI

Now that we have a running Kafka cluster, we are already able to send and receive messages. When starting the Kafka cluster with Docker Compose, a topic test was automatically created.

The official binary download contains scripts which, for example, makes it possible to send and receive messages. First, download the Kafka binary here. We have been using version kafka_2.12-2.3.0.

In order to send messages to a topic, we need to create a Producer. We do so by means of the kafka-console-producer.sh script in the bin directory of the Kafka binary download.

$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

Parameter broker-list indicates the Kafka cluster we are connecting to, parameter topic indicates to which topic we want to send messages to.

In order to receive messages from a topic, we need to create a Consumer. We do so by means of the kafka-console-consumer.sh script in the bin directory of the Kafka binary download. Start this command in another terminal window.

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Parameter bootstrap-server indicates the Kafka cluster we are connecting to, parameter topic indicates from which topic we want to receive the messages, parameter from-beginning indicates that we want to receive all messages present in the topic, also those that are sent to the topic before we connected to the topic.

At this moment, it is possible to enter messages in the Producer terminal window and to receive them in the Consumer terminal window. When you terminate the Consumer, and then connect again, you will see that all messages are received again because of the from-beginning parameter.

4. Send and Receive Messages by Means of Java

In this section, we will send and receive messages by means of a Java application. We will do so based on the JavaDoc for the KafkaProducer and the JavaDoc for the KafkaConsumer.

We are going to create a Maven multi-module project with Java 11 containing the following modules:

  • mykafkaproducerplanet: a Spring Boot application which will send messages to the topic;
  • mykafkaconsumerplanet: a Spring Boot application which will receive messages from the topic.

4.1 The Kafka Producer

The Kafka Producer will send 100 messages to the topic when a URL is invoked. We will make use of Spring Web MVC in order to do so. Therefore, we add the dependency spring-boot-starter-web to the pom and also the dependency kafka-clients in order to access the Java classes for sending messages to the topic.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.0</version>
</dependency>

The REST endpoint will be provided by a KafkaProducerController. The explanation about the properties can be found in the KafkaProducer JavaDoc. The controller will be executed when the URL http://localhost:8081/sendMessages/ is being invoked. A hundred messages are sent to the my-kafka-topic. We also added a callback function which prints the offset of the message to the console. This way, we have some kind of feedback when the message has been sent to the topic.

@RestController
public class KafkaProducerController {

  private int counter;

  @RequestMapping("/sendMessages/")
  public String sendMessages() {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 100; i++) {
      producer.send(new ProducerRecord<String, String>("my-kafka-topic", Integer.toString(counter), Integer.toString(counter)),

        (metadata, e) -> {
          if(e != null) {
            e.printStackTrace();
          } else {
            System.out.println("The offset of the record we just sent is: " + metadata.offset());
          }
        }
      );
      counter++;
    }

    producer.close();

    return "Messages sent";

  }

}

4.2 The Kafka Consumer

The Kafka Consumer will poll the topic and consume the messages when they are available in the topic. In order to use the Kafka Consumer classes, we also need to add the kafka-clients dependency to the pom.

The MyKafkaConsumerApplication subscribes to the topic my-kafka-topic, see the JavaDoc of the KafkaConsumer for the explanation of the properties.  When subscribed, the topic is being polled every 100ms. A message is being printed to the console when messages are available and consumed.

@SpringBootApplication
public class MyKafkaConsumerApplication {

public static void main(String[] args) {
  SpringApplication.run(MyKafkaConsumerApplication.class, args);

  Properties props = new Properties();
  props.setProperty("bootstrap.servers", "localhost:9092");
  props.setProperty("group.id", "mykafkagroup");
  props.setProperty("enable.auto.commit", "true");
  props.setProperty("auto.commit.interval.ms", "1000");
  props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Arrays.asList("my-kafka-topic"));
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }

  }
}

4.3 Test the Java Application

In order to test the Java application, we first need to create the topic. Ensure that the Kafka cluster is running and execute the following command from the bin directory from the Kafka binary download. This command will create the my-kafka-topic with 1 partition for us.

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-topic

Check whether the topic has been created successfully:

$ ./kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
my-kafka-topic
test

Start the Kafka Producer by executing the following command from the mykafkaproducerplanet directory:

mvn spring-boot:run

At this point, we first check whether we can send messages to the topic by invoking the URL http://localhost:8081/sendMessages/. The response ‘Messages sent’ is received and in the console output we can verify that 100 messages have been sent (the offsets can differ from what you see in your console):

The offset of the record we just sent is: 601
The offset of the record we just sent is: 602
The offset of the record we just sent is: 603
The offset of the record we just sent is: 604
...

Start the Kafka Consumer by executing the following command from the mykafkaconsumerplanet directory:

mvn spring-boot:run

After successful startup of the application, the 100 messages present in the topic are printed to the console:

offset = 601, key = 0, value = 0
offset = 602, key = 1, value = 1
offset = 603, key = 2, value = 2
offset = 604, key = 3, value = 3
...

5. Something about Partitions

Up till now, we made use of a topic with one partition, one producer and one consumer. The producer sends data to the partition of the topic, the consumer consumes all data from the one partition. But what happens when we have another consumer? And what happens when we have two partitions?

5.1 One Partition, Two Consumers

In order to see what happens when we have two consumers belonging to the same consumer group (i.e. we have one logical consumer), we just start a second consumer application at another port.

mvn spring-boot:run -Dspring-boot.run.arguments="--server.port=8083"

Now invoke the URL again in order to send another 100 messages. What we see now, is that all messages are consumed by the second consumer. Stop the second consumer (the one that consumed the messages) and send another 100 messages. Now the first consumer is consuming the messages. When we start the second consumer again and send another 100 messages, the messages are consumed by the second consumer again. This confirms what is stated in the documentation that a consumer consumes from exactly one partition. In our case, the other consumer, which is not consuming the messages, is taking over when the consuming consumer crashes.

5.2 Two Partitions, Two Consumers

In order to see what happens when we have two consumers belonging to the same consumer group and a topic with two partitions, we will first create another topic my-kafka-topic-2-partitions.

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic my-kafka-topic-2-partitions

In the Kafka Producer and Consumer Java application, we replace the topic name with this new name. See branch feature/multiple-partitions.

Start the producer and two consumers just like we did before. Invoke the URL in order to send 100 messages to the topic. We now can see that approximately half of the messages are sent to each partition. Each consumer is subscribed to one of the partitions and consumes the messages from that partition.

Snippet of log of first consumer:

offset = 96, key = 1, value = 1
offset = 97, key = 3, value = 3
offset = 98, key = 4, value = 4
offset = 99, key = 7, value = 7
...

Snippet of log of second consumer:

offset = 104, key = 0, value = 0
offset = 105, key = 2, value = 2
offset = 106, key = 5, value = 5
offset = 107, key = 6, value = 6
...

6. Conclusion

We showed how you can easily start a Kafka cluster on a local machine and how messages can be sent and received by means of CLI. Also, a producer and consumer Java application is created for sending and receiving messages. At the end, we took a closer look at how partitions are used in combination with more than one consumers.