In this post, we will take a look at Kafka Streams. We will give a short introduction, but the main part of the blog will be about writing some simple Java applications. This way, we are going to get more familiar with Kafka Streams from a practical point of view.

1. Introduction

In this short introduction, we will only describe what you need to know for this blog post. It is advised, however, to read our post about Kafka Messaging first and to read the official Kafka Streams documentation. Reading these will give you a good overview and basic knowledge.

Besides all of this useful information, we definitely need to know what a stream is. A stream is an unbounded, continuously updating data set. In Kafka terminology, this means that we will be reading data from a topic and we will send data to another topic. We talk about a stream processor when we transform the data between receiving and sending the data.

2. In Practice

2.1 Prerequisites

Before getting started, there are some things we need to take care of. First of all, we need a running Kafka cluster. We will make use of Docker Compose in order to accomplish this. See our previous post about Kafka Messaging on how to do this.

The official binary download contains scripts which, for example, make it possible to create topics, send and receive messages, etc. We will make use of these scripts. Therefore, download the Kafka binary here. We have been using version kafka_2.12-2.3.0.

We also have been using Ubuntu 18.04, Java 11 and Spring Boot.

The examples below are inspired by the examples provided in the official Kafka Streams documentation.

The sources being used in the post are of course available at GitHub.

2.2 Create the Producer

We will need to read an unbounded stream of data from a topic. We therefore need to create a small application which sends an unbounded stream of data. We simulate a stream of Tweets with at the end of the Tweet exactly one hashtag. Every second a message is published at the topic. The Tweets contain always the same message (Lorem ipsum…), the hashtag is randomly chosen from a fixed list of 5 hashtags. The producer is similar to the one we wrote in our previous post. We can start the stream of data by invoking the URL http://localhost:8081/sendMessages/. Let’s take a look at the source of the KafkaProducerController.

@RestController
public class KafkaProducerController {

  private static final String loremIpsum = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor " +
      "incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco " +
      "laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit " +
      "esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa " +
      "qui officia deserunt mollit anim id est laborum.";

  private static final String[] hashTags = {"latin", "italy", "roman", "caesar", "cicero"};

  private Random randomNumber = new Random();

  private String randomMessage;

  @RequestMapping("/sendMessages/")
  public void sendMessages() {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    try {
      while (true) {
        // Every second send a message
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {}

        randomMessage = loremIpsum + " #" + hashTags[randomNumber.nextInt(hashTags.length)];
        producer.send(new ProducerRecord<String, String>("my-kafka-streams-topic", null, randomMessage));

      }
    } finally {
      producer.close();
    }

  }

}

The variable loremIpsum is the fixed message, the variable hashTags is the fixed list of hashtags. In the sendMessages method, we first define some Kafka properties. In the first try-block an endless loop sends every second a message with a random hashtag (variable randomMessage) to the topic my-kafka-streams-topic. We only provide a value to the topic and not a key because the key is not relevant for us in this topic.

Last thing to do is to create the topic in Kafka. We do so by means of the kafka-topics.sh script in the bin directory of the Kafka binary download.

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-topic

2.3 Transform One Stream into Another

Now that we have made all the preparations, it is time to start with the first streaming example. We will read the Tweets from the my-kafka-streams-topic, filter the Tweets with hashtag #latin and publish it to topic my-kafka-streams-out1.

We create a Spring Boot application (module mykafkaprocessingplanet in the multi-module Maven project) and add the following dependencies to the pom:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.3.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
  </dependency>
</dependencies>

As can be seen, besides the kafka-streams dependency, we also had to add the kafka-clients dependency. If we don’t add this last dependency, we encounter the following error when creating a KStream.

java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.AdminClientConfig

Just like the Producer, we make sure that we can start the processing after invoking an URL. The KafkaProcessingController:

private KafkaStreams streams1;

@RequestMapping("/startProcessing1/")
public void startProcessing1() {

  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor1");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  final StreamsBuilder builder = new StreamsBuilder();

  builder.stream("my-kafka-streams-topic").filter((key, value) -> ((String) value).endsWith("#latin")).to("my-kafka-streams-out1");

  final Topology topology = builder.build();
  streams1 = new KafkaStreams(topology, props);
  streams1.start();

}

As always, first we need to define the properties to connect to the topic. Notice that we did not override the default key Serializer/Deserializer because we didn’t provide a key to the messages in the topic. The magic happens at line 13 where we read from the my-kafka-streams-topic, filter the messages which end with #latin and send the filtered messages to topic my-kafka-streams-out1. The other lines are needed for actually starting the stream processing.

Last thing to do, is to create the output topic:

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-out1

Time to put everything together. Start the Producer by invoking the following command from the mykafkaproducerplanet directory:

$ mvn spring-boot:run

Start the Processor by invoking the same command from the mykafkaprocessingplanet directory.

Start the Consumer by executing the following command from the bin directory of the Kafka binary:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-streams-out1 --from-beginning

Send messages from the Producer to the my-kafka-streams-topic by invoking URL http://localhost:8081/sendMessages/.

Start the processor by invoking URL http://localhost:8082/startProcessing1/.

The Consumer prints only the messages with hashtag #latin to the console:

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. #latin 
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. #latin 
...

The stream processing can be stopped by invoking URL http://localhost:8082/stop.

2.4 Count Number of Hashtags

In the second example, we will read the Tweets from the my-kafka-streams-topic, create a new intermediate stream with the hashtags as value, transform the stream into a KTable which holds the counts per hashtag and publish it to topic my-kafka-streams-out2. A KTable is a special kind of stream where only the last value of a certain key is applicable. In our case, we are only interested in the last value of a count per hashtag. The hashtag will be the key and the count will be the value.

We add a new REST endpoint to the kafkaprocessingplanet module. We are going to read from the same topic my-kafka-streams-topic and will transform it in order to publish messages with the hashtag as key and the count as value to topic my-kafka-streams-hashtagcount-output. We assume that there is always a hashtag in the message and that it is present at the end of the message, just to keep things simple.

The Kafka properties remain identical to the first example, except that we do provide a configuration for the serialization/deserialization of the key because we will publish to a topic with a String as key. Otherwise you will encounter a ClassCastException.

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

First, we will create a new stream hashTags which will contain only the hashtags as value. This can be achieved as follows:

KStream<Object, String> hashTags = source.flatMapValues(new ValueMapper<String, Iterable>() {
    @Override
    public Iterable apply(String value) {
      return Arrays.asList(value.substring(value.indexOf("#")));
    }
});

We convert this to a lambda for brevity:

KStream<Object, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))));

We have a stream without keys and hashtags as values. In order to know how many times a hashtag occurs in the stream, we need the records per hashtag. We therefore create a KGroupedStream with the hashtag as key and as value. The KeyValueMapper is used to  create the key. We provide the original key and value as input parameters to the apply method and return the new key as a String.

KGroupedStream<String, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))))
  .groupBy(new KeyValueMapper<Object, String, String>() {
      @Override
      public String apply(Object key, String value) {
        return value;
      }
});

Again, we convert this to a lambda:

KGroupedStream<String, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#")))) 
  .groupBy((key, value) -> value)

Next, we invoke the count method to the KGroupedStream which will create a KTable counts for us with the hashtags as keys and the counts as values.

KTable<String, Long> counts = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))))
  .groupBy((key, value) -> value)
  .count();

The only thing we need to do now, is to create a stream of this and publish it to the topic my-kafka-streams-hashtagcount-output.

source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))))
  .groupBy((key, value) -> value)
  .count()
  .toStream()
  .to("my-kafka-streams-hashtagcount-output", Produced.with(Serdes.String(), Serdes.Long()));

At the end, we need to add the following to start the stream:

final Topology topology = builder.build();
streams2 = new KafkaStreams(topology, props);
streams2.start();

The output topic must be configured with log compaction enabled. See the official documentation or a shorter explanation in order to learn about log compaction. Log compaction ensures that when several records with the same key are published to a topic, only the last version will remain. Only the last version is relevant and if log compaction is not enabled, older versions will consume unnecessary resources. Kafka will clean up older versions when log compaction is enabled. With this information, we can create the output topic:

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-hashtagcount-output --config "cleanup.policy=compact"

Time to test everything. Start the Producer and the Processor Spring Boot applications just as we did before.

Send messages from the Producer to the my-kafka-streams-topic by invoking URL http://localhost:8081/sendMessages/.

Start the processor by invoking URL http://localhost:8082/startProcessing2/.

Start the Consumer for topic my-kafka-streams-hashtagcount-output with some extra properties in order that the data is printed correctly to the console. We notice that every 30 seconds new results are printed to the console.

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-streams-hashtagcount-output --property print.key=true --value-deserializer=org.apache.kafka.common.serialization.LongDeserializer
#italy 85
#latin 84
#cicero 100
#caesar 87
#roman 80
#cicero 104
#italy 94
#latin 87
#caesar 96
#roman 85
#cicero 108
#caesar 103
#italy 101
#roman 92
#latin 92
...

Instead of just using the count method, we could also have used the following:

count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))

In this case, the results are stored in a state store which can be queried. How to query such a state store can be found here.

3. Conclusion

We managed to create some Kafka Streams examples in order to get acquainted with Kafka Streams. The official Kafka documentation is of good quality and several good examples are provided in order to get you started easily.

Advertisement