Some blog posts ago, we experimented with Kafka Messaging and Kafka Streams. Although we used Spring Boot applications in order to demonstrate some examples, we deliberately did not make use of Spring Kafka. Reason for doing so, was to get acquainted with Apache Kafka first without any abstraction layers in between. Now that we have done so, it is of course time to take a look at Spring Kafka!

1. Introduction

It is advised to read our previous blog posts about Kafka Messaging, Kafka Streams and Kafka Joins in order to have some basic knowledge about these topics, but it is not required. We are going to rewrite the examples we used in the blog posts Kafka Messaging and Kafka Streams in order to make use of Spring Kafka. The Spring Kafka project will make it possible to apply Spring concepts in combination with the use of Apache Kafka. We will make use of the official Spring documentation in order to convert our previously created applications.

2. Prerequisites

Before getting started, there are some things we need to take care of. First of all, we need a running Kafka cluster. We will make use of Docker Compose in order to accomplish this. See our previous post about Kafka Messaging on how to do this.

The official binary download contains scripts which, for example, make it possible to create topics, send and receive messages, etc. We will make use of these scripts. Therefore, download the Kafka binary here. We have been using version kafka_2.12-2.3.0.

We also have been using Ubuntu 18.04, Java 11 and Spring Boot 2.2.1.

The sources being used in the post are of course available at GitHub.

3. Kafka Messaging

In this section, we will send and receive messages by means of a Java application.

We are going to create a Maven multi-module project with Java 11 containing the following modules:

  • myspringkafkamessageproducerplanet: a Spring Boot application which will send messages to the topic;
  • myspringkafkamessageconsumerplanet: a Spring Boot application which will receive messages from the topic.

3.1 The Spring Kafka Message Producer

The Kafka Producer will send hundred messages to the topic when a URL is invoked. We will make use of Spring Web MVC in order to do so. Therefore, we add the dependency spring-boot-starter-web to the pom and also the dependency spring-kafka in order to make use of the Spring Kafka features for sending messages to the topic.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.3.3.RELEASE</version>
</dependency>

The REST endpoint will be provided by a KafkaProducerController. First, we will create the necessary methods for creating and configuring the Kafka topic. We will add a NewTopic Bean to the application context to automatically add our topic to the Apache Kafka broker if it doesn’t already exist.  We create the topic with the same properties just like we would do by means of CLI.

private static final String TOPIC_NAME = "my-spring-kafka-message-topic";

@Bean
public NewTopic mySpringKafkaMessageTopic() {
  return TopicBuilder.name(TOPIC_NAME)
    .partitions(1)
    .replicas(1)
    .compact()
    .build();
}

We will use KafkaTemplate in order to send messages to the topic. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. We therefore need to create a ProducerFactory and corresponding ProducerConfigs. The ProducerConfigs contain the necessary Kafka properties.

@Bean
public ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  // See https://kafka.apache.org/documentation/#producerconfigs for more properties
  return props;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<String, String>(producerFactory());
}

Now that the necessary configuration is in place, we can create the controller which will be executed when the URL http://localhost:8081/sendMessages/ is being invoked. A hundred messages will be sent to the my-spring-kafka-messaging-topic. We also added a callback function which prints the offset of the message to the console. This way, we have some kind of feedback when the message has been sent to the topic.

private int counter;
...
@RequestMapping("/sendMessages/")
public String sendMessages() {

  for (int i = 0; i < 100; i++) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate().send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(counter), Integer.toString(counter)));
    future.addCallback(new ListenableFutureCallback<SendResult>String, String<>() {

      @Override
      public void onSuccess(SendResult<String, String> result) {
        System.out.println("The offset of the record we just sent is: " + result.getRecordMetadata().offset());
      }

      @Override
      public void onFailure(Throwable ex) {
        ex.printStackTrace();
      }

    });
    counter++;
  }

  return "Messages sent";

}

3.2 The Spring Kafka Message Consumer

The Kafka Consumer will poll the topic and consume the messages when they are available in the topic. In order to use the Kafka Consumer classes, we also need to add the spring-kafka dependency to the pom.

We will use the @KafkaListener approach in order to consume the messages. First, we will need a Configuration class which will instantiate a ConcurrentMessageListenerContainer which provides multi-thread consumption. We set the concurrency to 1 in order to simplify things a bit. We also set the Kafka broker properties just like we did for the Producer. Beware that we are using a ConsumerConfig now and deserializers instead of serializers.

@Configuration
@EnableKafka
public class KafkaConfig {

  @Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // See https://kafka.apache.org/documentation/#consumerconfigs for more properties
    return props;
  }
}

In order to consume the messages, we add a listen method to our MySpringKafkaMessageConsumerPlanetApplication class. We annotate it with @KafkaListener and set some properties in order to retrieve the messages and we print some (meta)data of the messages. We could also use the listen interface which takes the data as a String and only process the data, but then we do not have access to the metadata of the ConsumerRecord which we would like to print here.

private static final String TOPIC_NAME = "my-spring-kafka-message-topic";
...
@KafkaListener(groupId = "mykafkagroup", topics = TOPIC_NAME, properties = { "enable.auto.commit=true", "auto.commit.interval.ms=1000", "poll-interval=100"})
public void listen(ConsumerRecord<?,?> record) {
  System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

3.3 Test the Message Producer and Consumer

First, ensure that the Kafka broker is running. After that, we start the mykafkaspringmessageproducerplanet application:

$  mvn spring-boot:run

During startup, we expect the topic to be created. Let’s check this before we continue (run the command from the bin directory of the Kafka binary download):

$ ./kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
..
my-spring-kafka-message-topic

The list of topics contains the topic we defined in our Spring Boot application. Now we start the mykafkaspringmessagconsumerplanet application the same way.

At this point, we send messages to the topic by invoking the URL http://localhost:8081/sendMessages/. The response ‘Messages sent’ is received and in the console output we can verify that a hundred messages have been sent (the offsets can differ from what you see in your console). We only list the last message received, the log statements are printed in between other log statements:

...
The offset of the record we just sent is: 99

The console output of our Consumer shows the following output:

offset = 0, key = 0, value = 0
offset = 1, key = 1, value = 1
offset = 2, key = 2, value = 2
offset = 3, key = 3, value = 3
...

The outputs we see here are exactly the same as we did before in our previous post without using Spring Kafka. We have successfully converted our application by using Spring Kafka!

4. Kafka Streams

In this section, we will send and receive an unbounded stream of data by means of a Java application.

We are going to create a Maven multi-module project with Java 11 containing the following modules:

  • myspringkafkastreamsproducerplanet: a Spring Boot application which will send an unbounded stream of data to the topic;
  • myspringkafkaprocessingplanet: a Spring Boot application which will process the unbounded stream of data, transforms it and sends it to another unbounded stream.

4.1 The Spring Kafka Streams Producer

We will create a small application which sends an unbounded stream of data. We simulate a stream of Tweets with at the end of the Tweet exactly one hashtag. Every second a message is published at the topic. The Tweets contain always the same message (Lorem ipsum…), the hashtag is randomly chosen from a fixed list of 5 hashtags. The producer is similar to the one we wrote above. We can start the stream of data by invoking the URL http://localhost:8081/sendMessages/. Let’s take a look at the source of the KafkaProducerController. We only list what is different compared to the Producer we created above.

private static final String TOPIC_NAME = "my-spring-kafka-streams-topic";

private static final String loremIpsum = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor " +
    "incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco " +
    "laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit " +
    "esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa " +
    "qui officia deserunt mollit anim id est laborum.";

private static final String[] hashTags = {"latin", "italy", "roman", "caesar", "cicero"};

private Random randomNumber = new Random();

private String randomMessage;

...

@RequestMapping("/sendMessages/")
public void sendMessages() {

  while (true) {
    // Every second send a message
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {}

    randomMessage = loremIpsum + " #" + hashTags[randomNumber.nextInt(hashTags.length)];
    kafkaTemplate().send(new ProducerRecord<String, String>(TOPIC_NAME, "", randomMessage));

  }

}

The variable loremIpsum is the fixed message, the variable hashTags is the fixed list of hashtags. We create an endless loop which sends every second a message with a random hashtag (variable randomMessage) to the topic my-spring-kafka-streams-topic. We only provide a value to the topic and not a key because the key is not relevant for us in this topic.

4.2 Transform One Stream into Another

In our transformation application, we will read the Tweets from the my-spring-kafka-streams-topic, filter the Tweets with hashtag #latin and publish it to topic my-spring-kafka-streams-output-topic.

In our pom, we also need to add the kafka-streams jar besides the spring-kafka jar because it is an optional dependency.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.3.3.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>2.3.0</version>
</dependency>

Next, we will create a KafkaStreamsConfig class which will contain the necessary annotations and configuration for transforming one stream into another one.

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

  private static final String INPUT_TOPIC = "my-spring-kafka-streams-topic";
  private static final String OUTPUT_TOPIC = "my-spring-kafka-streams-output-topic";

  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myKafkaStreams");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    return new KafkaStreamsConfiguration(props);
  }

  @Bean
  public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {

    KStream<String, String> stream = kStreamBuilder.stream(INPUT_TOPIC);
    stream.print(Printed.toSysOut());
    stream.filter((key, value) -> ((String) value).endsWith("#latin")).to(OUTPUT_TOPIC);

    return stream;
  }

}

Let’s take a closer look at what is happening here. Just like in the Message Consumer, we add a Configuration class and set the @Configuration and @EnableKafka annotations. Besides that, we also annotate this class with @EnableKafkaStreams. The only thing we need to do, is to add a KafkaStreamsConfiguration bean named defaultKafkaStreamsConfig. In our case, it is the kStreamsConfigs method which contains the necessary Kafka properties. This will create a StreamsBuilderFactoryBean which we eventually can use in our kStream method. In the kStream method, we just read the stream from the topic, filter it and publish the filtered stream to another topic. As you will notice, we do not need to start the stream ourselves, Spring will take care of that.

4.3 Test the Stream Producer and Processor

First, ensure that the Kafka broker is running. After that, we start the mykafkaspringstreamsproducerplanet application:

$  mvn spring-boot:run

During startup, we expect the topic to be created. Let’s check this before we continue:

$ ./kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
..
my-spring-kafka-streams-topic

The list of topics contains the topic we defined in our Spring Boot application. Now we start the mykafkaspringprocessingplanet application the same way.

Send messages from the Producer to the my-spring-kafka-streams-topic by invoking URL http://localhost:8081/sendMessages/.

Start the Consumer by executing the following command from the bin directory of the Kafka binary. We notice that only the messages ending with hashtag #latin are printed.

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-spring-kafka-streams-output-topic
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. #latin
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. #latin
...

5. Conclusion

In this post, we converted previously created examples of Kafka Messaging and Kafka Streams to the Spring way by using Spring Kafka. We do not need to create topics ourselves and some boiler plate code is removed.