In this post, we will take a look at joins in Kafka Streams. Main goal is to get a better understanding of joins by means of some examples. The examples are taken from the Kafka Streams documentation but we will write some Java Spring Boot applications in order to verify practically what is written in the documentation.

1. Introduction

As an introduction, we refer to the official Kafka documentation and more specifically the section about stateful transformations. Take a look at the global overview diagram and read the part in the documentation about joins. This will provide you a good basic knowledge on the topic.

In short, joins allow us to combine data from different streams into a new stream. Actually, enrich the data of one stream with the data of another. A typical use case would be to enhance a stream of data with information from a table. In order to have fast lookup of the table, the data of the table can be made available to Kafka as a KTable. This will give you fast access to the data when the actual database is in a remote location, removing any network latency, for example.

One other thing to consider is that we will make use of windowing. More information about windowing can be found here. We are going to make use of sliding time windows. In practice this means that two records are considered to be in a time window when the difference between their timestamps is smaller than the time window size.

2. In Practice

2.1 Prerequisites

Before getting started, there are some things we need to take care of. First of all, we need a running Kafka cluster. We will make use of Docker Compose in order to accomplish this. See our previous post about Kafka Messaging on how to do this.

The official binary download contains scripts which, for example, make it possible to create topics, send and receive messages, etc. We will make use of these scripts. Therefore, download the Kafka binary here. We have been using version kafka_2.12-2.3.0.

We also have been using Ubuntu 18.04, Java 11 and Spring Boot.

The examples we are using are based on the official Kafka documentation. We will implement KStream-KStream joins and KStream-KTable joins.

The sources being used in this post are of course available at GitHub.

2.2 Create the Producer

Before getting started with joins, we first need to create two streams which contain the data we need. Assumption is that all records have the same key, all records belong to a single join window and the records will be consumed in timestamp order. We want to achieve the following:

Timestamp Left(KStream) Right(KStream)
1 null
2 null
3 A
4 a
5 B
6 b
7 null
8 null
9 C
10 c
11 null
12 null
13 null
14 d
15 D

The producer is similar to the one we wrote in our previous post. We can start the stream of data by invoking the URL http://localhost:8081/sendMessages/. Let’s take a look at the source of the KafkaProducerController.

@RestController
public class KafkaProducerController {

  private static final String KEY = "FIXED-KEY";

  private static final Map<Integer, String> LEFT;
  static {
    LEFT = new HashMap<>();
    LEFT.put(1, null);
    LEFT.put(3, "A");
    LEFT.put(5, "B");
    LEFT.put(7, null);
    LEFT.put(9, "C");
    LEFT.put(12, null);
    LEFT.put(15, "D");
  }

  private static final Map<Integer, String> RIGHT;
  static {
    RIGHT = new HashMap<>();
    RIGHT.put(2, null);
    RIGHT.put(4, "a");
    RIGHT.put(6, "b");
    RIGHT.put(8, null);
    RIGHT.put(10, "c");
    RIGHT.put(11, null);
    RIGHT.put(13, null);
    RIGHT.put(14, "d");
  }

  @RequestMapping("/sendMessages/")
  public void sendMessages() {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    try {
      for (int i = 0; i < 15; i++) {
        // Every 10 seconds send a message
        try {
          Thread.sleep(10000);
        } catch (InterruptedException e) {}

          if (LEFT.containsKey(i + 1)) {
            producer.send(new ProducerRecord<String, String>("my-kafka-left-stream-topic", KEY, LEFT.get(i + 1)));
          }
          if (RIGHT.containsKey(i + 1)) {
            producer.send(new ProducerRecord<String, String>("my-kafka-right-stream-topic", KEY, RIGHT.get(i + 1)));
          }

      }
    } finally {
      producer.close();
    }

  }

}

We created two HashMaps, one for the LEFT stream and one for the RIGHT stream containing the data at a certain timestamp. In the sendMessages method we first set some Kafka properties. After this, we create a for-loop where we send a message to one of the topics every 10 seconds. The HashMaps are used to determine whether data has to be published to the topic. The left KStream topic is my-kafka-left-stream-topic and the right KStream topic is my-kafka-right-stream-topic.

Last thing to do is to create the topics in Kafka. We do so by means of the kafka-topics.sh script in the bin directory of the Kafka binary download.

The left topic:

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-left-stream-topic

The right topic:

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-right-stream-topic

2.3 KStream-KStream Inner Join

Now that we have made all the preparations, it is time to start with the first join example. We will read the left KStream and the right KStream from the respective topics and we will perform an inner join with both streams and publish them to topic my-kafka-stream-stream-inner-join-out. Inner join means that each record on the one side, will produce a record with all matching records (matching the key) on the other side inside the configured window.

We create a Spring Boot application (module mykafkaprocessingplanet in the multi-module Maven project) and add the following dependencies to the pom:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>2.3.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.0</version>
</dependency>

Just like the Producer, we make sure that we can start the processing after invoking an URL. The KafkaProcessingController contains method startStreamStreamInnerJoin:

private KafkaStreams streamsInnerJoin;

@RequestMapping("/startStreamStreamInnerJoin/")
public void startStreamStreamInnerJoin() {

  stop();

  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-stream-inner-join");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  final StreamsBuilder builder = new StreamsBuilder();

  KStream<String, String> leftSource = builder.stream("my-kafka-left-stream-topic");
  KStream<String, String> rightSource = builder.stream("my-kafka-right-stream-topic");

  KStream<String, String> joined = leftSource.join(rightSource,
      (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
      JoinWindows.of(Duration.ofMinutes(5)),
      Joined.with(
          Serdes.String(), /* key */
          Serdes.String(),   /* left value */
          Serdes.String())  /* right value */
  );

  joined.to("my-kafka-stream-stream-inner-join-out");

  final Topology topology = builder.build();
  streamsInnerJoin = new KafkaStreams(topology, props);
  streamsInnerJoin.start();

}

As always, first we need to define the properties to connect to the topic. Next, we create a KStream for reading each topic. At line 19 the join takes place. The first parameter is the stream we want to join with which is rightSource in our case. The second parameter gives us the opportunity to create the joined value. We just combine both values. Then, because the join is window based, we supply information about the window to be used which is 5 minutes in our case. With the last parameter we indicate which serdes needs to be used for serializing/deserializing the keys of the input streams and of the output stream. At the end, we publish the stream to the output topic and start the stream.

We also added a stop method at the beginning of the method, because we will use the same input topics for the different examples and do not want the different examples to interfere with each other. The method closes the streams of all connections:

private void stop () {
  if (streamsInnerJoin != null) {
    streamsInnerJoin.close();
  }
  if (streamsLeftJoin != null) {
    streamsLeftJoin.close();
  }
  if (streamsOuterJoin != null) {
    streamsOuterJoin.close();
  }
  if (streamTableInnerJoin != null) {
    streamTableInnerJoin.close();
  }
  if (streamTableLeftJoin != null) {
    streamTableLeftJoin.close();
  }
}

We create the output topic:

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-stream-stream-inner-join-out

We listen to the output topic, we ensure that also the key and the timestamp is printed in order to make it easier to verify the output:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-stream-stream-inner-join-out --property print.key=true --property print.timestamp=true

Time to put everything together. Start the Producer by invoking the following command from the mykafkaproducerplanet directory:

$ mvn spring-boot:run

Start the Processor by invoking the same command from the mykafkaprocessingplanet directory.

Send messages from the Producer to the left and right input topics by invoking URL http://localhost:8081/sendMessages/.

Start the processor by invoking URL http://localhost:8082/startStreamStreamInnerJoin/.

The Consumer outputs the following:

CreateTime:1570367637661 FIXED-KEY left=A, right=a
CreateTime:1570367647662 FIXED-KEY left=B, right=a
CreateTime:1570367657663 FIXED-KEY left=A, right=b
CreateTime:1570367657663 FIXED-KEY left=B, right=b
CreateTime:1570367687664 FIXED-KEY left=C, right=a
CreateTime:1570367687664 FIXED-KEY left=C, right=b
CreateTime:1570367697665 FIXED-KEY left=A, right=c
CreateTime:1570367697665 FIXED-KEY left=B, right=c
CreateTime:1570367697665 FIXED-KEY left=C, right=c
CreateTime:1570367737668 FIXED-KEY left=A, right=d
CreateTime:1570367737668 FIXED-KEY left=B, right=d
CreateTime:1570367737668 FIXED-KEY left=C, right=d
CreateTime:1570367747668 FIXED-KEY left=D, right=a
CreateTime:1570367747668 FIXED-KEY left=D, right=b
CreateTime:1570367747668 FIXED-KEY left=D, right=c
CreateTime:1570367747668 FIXED-KEY left=D, right=d

We have outputs for timestamps 4, 5, 6, 9, 10, 14 and 15. The timestamp in the console output is in milliseconds, so we focus on the fifth digit from the right which indicates the 10 seconds interval we defined. With this information, and the output, we notice that it is identical to what is documented in the official Kafka documentation.

2.4 KStream-KStream Left Join

We will read the left KStream and the right KStream from the respective topics and we will perform a left join with both streams and publish them to topic my-kafka-stream-stream-left-join-out. Left join means that each record on one of the sides, will produce a record with all matching records of the left side inside the configured window. If an input record on the left side does not have a matching record on the right side, then an output record with value null of the right side will be produced. This will explain the output value [A,null] at timestamp 3.

The KafkaProcessingControllernow contains method startStreamStreamLeftJoin. Only the part which is different compared to the inner join is listed here:

@RequestMapping("/startStreamStreamLeftJoin/")
public void startStreamStreamLeftJoin() {
  ...
  KStream<String, String> joined = leftSource.leftJoin(rightSource,
      (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
      JoinWindows.of(Duration.ofMinutes(5)),
      Joined.with(
          Serdes.String(), /* key */
          Serdes.String(),   /* left value */
          Serdes.String())  /* right value */
  );

  joined.to("my-kafka-stream-stream-left-join-out");
  ...
}

We create the output topic my-kafka-stream-stream-left-join-out just as we did with the inner join and we start the Consumer in order to listen to the topic.

Again, we invoke the URL for sending messages from the Producer and we invoke the URL for the processor: http://localhost:8082/startStreamStreamLeftJoin/

The Consumer outputs the following:

CreateTime:1570369221842 FIXED-KEY left=A, right=null
CreateTime:1570369231843 FIXED-KEY left=A, right=a
CreateTime:1570369241844 FIXED-KEY left=B, right=a
CreateTime:1570369251844 FIXED-KEY left=A, right=b
CreateTime:1570369251844 FIXED-KEY left=B, right=b
CreateTime:1570369281846 FIXED-KEY left=C, right=a
CreateTime:1570369281846 FIXED-KEY left=C, right=b
CreateTime:1570369291847 FIXED-KEY left=A, right=c
CreateTime:1570369291847 FIXED-KEY left=B, right=c
CreateTime:1570369291847 FIXED-KEY left=C, right=c
CreateTime:1570369331850 FIXED-KEY left=A, right=d
CreateTime:1570369331850 FIXED-KEY left=B, right=d
CreateTime:1570369331850 FIXED-KEY left=C, right=d
CreateTime:1570369341851 FIXED-KEY left=D, right=a
CreateTime:1570369341851 FIXED-KEY left=D, right=b
CreateTime:1570369341851 FIXED-KEY left=D, right=c
CreateTime:1570369341851 FIXED-KEY left=D, right=d

We have outputs for timestamps 3, 4, 5, 6, 9, 10, 14 and 15. Again, we notice that it is identical to what is documented in the official Kafka documentation.

2.5 KStream-KStream Outer Join

We will read the left KStream and the right KStream from the respective topics and we will perform an outer join with both streams and publish them to topic my-kafka-stream-stream-outer-join-out. Outer join means that each record on one of both sides, will produce a record with all records of the other side inside the configured window. If an input record on the left or right side does not have a matching record on the other side, then an output record with value null will be produced. This will explain the output value [A,null] at timestamp 3.

The KafkaProcessingControllernow contains method startStreamStreamOuterJoin. Only the part which is different compared to the inner join is listed here:

@RequestMapping("/startStreamStreamOuterJoin/")
public void startStreamStreamOuterJoin() {
  ...
  KStream<String, String> joined = leftSource.outerJoin(rightSource,
      (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
      JoinWindows.of(Duration.ofMinutes(5)),
      Joined.with(
          Serdes.String(), /* key */
          Serdes.String(),   /* left value */
          Serdes.String())  /* right value */
  );

  joined.to("my-kafka-stream-stream-outer-join-out");
  ...
}

We create the output topic my-kafka-stream-stream-outer-join-out just as we did with the inner join and we start the Consumer in order to listen to the topic.

Again, we invoke the URL for sending messages from the Producer and we invoke the URL for the processor: http://localhost:8082/startStreamStreamOuterJoin/

The Consumer outputs the following:

CreateTime:1570371657621 FIXED-KEY left=A, right=null
CreateTime:1570371667621 FIXED-KEY left=A, right=a
CreateTime:1570371677621 FIXED-KEY left=B, right=a
CreateTime:1570371687622 FIXED-KEY left=A, right=b
CreateTime:1570371687622 FIXED-KEY left=B, right=b
CreateTime:1570371717624 FIXED-KEY left=C, right=a
CreateTime:1570371717624 FIXED-KEY left=C, right=b
CreateTime:1570371727625 FIXED-KEY left=A, right=c
CreateTime:1570371727625 FIXED-KEY left=B, right=c
CreateTime:1570371727625 FIXED-KEY left=C, right=c
CreateTime:1570371767627 FIXED-KEY left=A, right=d
CreateTime:1570371767627 FIXED-KEY left=B, right=d
CreateTime:1570371767627 FIXED-KEY left=C, right=d
CreateTime:1570371777628 FIXED-KEY left=D, right=a
CreateTime:1570371777628 FIXED-KEY left=D, right=b
CreateTime:1570371777628 FIXED-KEY left=D, right=c
CreateTime:1570371777628 FIXED-KEY left=D, right=d

We have outputs for timestamps 3, 4, 5, 6, 9, 10, 14 and 15. Again, we notice that it is identical to what is documented in the official Kafka documentation.

2.6 KStream-KTable Inner Join

We will read the left KStream and the right stream as a KTable from the respective topics and we will perform an inner join and publish them to topic my-kafka-stream-table-inner-join-out. KStream-KTable joins are always non-windowed joins. The KStream will be enhanced with the information of the KTable. We can look at it as a table lookup. Since we are using an inner join here, both sides must have a value before it produces an output record. An output record is only published as a trigger of the left KStream.

The KafkaProcessingControllernow contains method startStreamTableInnerJoin. The syntax is similar to the one for the KStream-KStream joins:

private KafkaStreams streamTableInnerJoin;

@RequestMapping("/startStreamTableInnerJoin/")
public void startStreamTableInnerJoin() {

  stop();

  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-inner-join");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  
  final StreamsBuilder builder = new StreamsBuilder();

  KStream<String, String> leftSource = builder.stream("my-kafka-left-stream-topic");
  KTable<String, String> rightSource = builder.table("my-kafka-right-stream-topic");

  KStream<String, String> joined = leftSource.join(rightSource,
      (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

  joined.to("my-kafka-stream-table-inner-join-out");

  final Topology topology = builder.build();
  streamTableInnerJoin = new KafkaStreams(topology, props);
  streamTableInnerJoin.start();

}

We create the output topic my-kafka-stream-table-inner-join-out just as we did before and we start the Consumer in order to listen to the topic.

Again, we invoke the URL for sending messages from the Producer and we invoke the URL for the processor: http://localhost:8082/startStreamTableInnerJoin/

The Consumer outputs the following:

CreateTime:1570372799769 FIXED-KEY left=B, right=a
CreateTime:1570372899774 FIXED-KEY left=D, right=d

We only have outputs for timestamps 4 and 15. Again, we notice that it is identical to what is documented in the official Kafka documentation.

2.7 KStream-KTable Left Join

We will read the left KStream and the right stream as a KTable from the respective topics and we will perform a left join and publish them to topic my-kafka-stream-table-left-join-out. Since we are using a left join here, an output record is  published when a record is published in the left KStream when it has a value different from null.

The KafkaProcessingControllernow contains method startStreamTableLeftJoin:

@RequestMapping("/startStreamTableLeftJoin/")
public void startStreamTableLeftJoin() {
  ...
  KStream<String, String> joined = leftSource.leftJoin(rightSource,
      (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

  joined.to("my-kafka-stream-table-left-join-out");
  ...
}

We create the output topic my-kafka-stream-table-left-join-out just as we did before and we start the Consumer in order to listen to the topic.

Again, we invoke the URL for sending messages from the Producer and we invoke the URL for the processor: http://localhost:8082/startStreamTableLeftJoin/

The Consumer outputs the following:

CreateTime:1570373267768 FIXED-KEY left=A, right=null
CreateTime:1570373287769 FIXED-KEY left=B, right=a
CreateTime:1570373327771 FIXED-KEY left=C, right=null
CreateTime:1570373387774 FIXED-KEY left=D, right=d

We have outputs for timestamps 3, 5, 9 and 15. Again, we notice that is identical to what is documented in the official Kafka documentation.

3. Conclusion

In this post, we created examples based on the examples provided in the official Kafka documentation concerning joins in order to get a better understanding how joins work from a practical point of view.