Some keywords: Ktable, Kstream, broker, kafka, enrichment, stateful, join

I had the chance to start using kafka streams to improve the performance of some existing application we had in my team,  one of the main issues to solve was in how kafka streams performs a stateful transformation.

Let’s put  this scenario in a simple way, the service needs to join the data of an specific user to several other tables, let’s consider this scenario for a game store data.

A databus will send to us all the data from users, games available and purchased games. Our goal in here would be to create a topology that consider the following

  • Users and games will be considered as tables and purchased games will be considered as a stream that will listen every time someone buy a game
val purchaseStream = builder.stream("ourDatabusTopicForPurchases")

val usersTable = builder.table("ourDatabusTopicForUsers")

val gamesTable = builder.table("ourDatabusTopicForGames")

purchaseStream
.map(replaceKeyWithUserForeignKey)
.leftJoin(usersTable)(ourJoinUserFunction)
.map(replaceKeyWithGameForeignKey).leftJoin(gamesTable)(ourJoinGameFunction)
.to(“ouroutPutEnrichedData”)

This should be enough right? unfortunately not as expected…

The reasons goes behind how kafka streams understands our topology and how our data bus is going to send this data.

Each transformation is going to require a new state and each of the states is going to be ready for a new batch of data to be processed, what happens is that when the purchase stream arrives, the users table and the game table will not be ready for the actual offset and batch to be processed (when the enrichment service start processing)

The first impression was a bit disappointing since I was expecting that Kafka Streams was going to deal somehow with this situation, some properties may give some extra time to wait for the ktables to be ready, such as MAX_TASK_IDLE_MS_CONFIG which will provide some extra time to wait until  the topology  finish this processing for all data and then start  transformations.

The second approach was to deal with the Processor API  in order to generate some periodic task call “Punctuation” that take care of  processing the data into intervals of time during the whole life of the application.

Both approaches were part of the first search results in stack overflow and the main reason for writing this article

Can Kafka Streams be configured to wait for KTable to load?
I’m using materialized KTable to use for left join with my KStream(while the stream is the left side). However, it seem to process immediately, without waiting for the current version of the KTab...

The solution might be simpler than all those approaches but it requires some attention to details.

val purchaseStream = builder.stream("ourDatabusTopicForPurchases")

val usersTable = builder.stream("ourDatabusTopicForUsers")
.groupby(patientID)
.reduce(makesureisUnique)(materializeUserStore)

val gamesTable = builder.stream("ourDatabusTopicForGames")
.groupby(patientID)
.reduce(makesureisUnique)(materializeGameStore)

Here Users are still  a Ktable by definition but it considers a different approach, we process this data as a stream , this batch is going to be  grouped  by users in a store and  that will also care not to store repetitions, therefore only the last state of each of the offsets (as Ktables were intended) and  what is better  in the top of our topology!  the same for games.

Great! now Ktables will be processed first, next step will be just to execute the join operations as intended and finish our task, but what about the stores?...

val purchaseStream = builder.stream("ourDatabusTopicForPurchases")

val usersTable = builder.stream("ourDatabusTopicForUsers")
.groupby(patientID)
.reduce(makesureisUnique)(materializeUserStore)

val gamesTable = builder.stream("ourDatabusTopicForGames")
.groupby(patientID)
.reduce(makesureisUnique)(materializeGameStore)

purchaseStream
.map(customJoinFunction(userStore))
.map(customJoinFunction(gameStore))
.to(“ouroutPutEnrichedData”)

Kafka streams got a trick up his sleeve call "Interactive Query" this approach can let us perform some query under a Ktable store, the join operation will perform the same operation under our own persisted rocksdb database (in memory or in disk depending in our properties)  and perform the join operations.

The first advantage starts in how debugging becomes  much easier  for us to understand if data is arriving in order, with the format we require, complete and without losing data, there are also different kind of stores according to our needs (in memory, in disk, with default timestamps, next articles will cover this).

And finally the biggest advantage remains in how Ktables will be updated. In the first approach Ktables by their own does not perform any consultation to the new come offsets until a purchase is made, this can lead to some unexpected behavior in case for example a new user will arrive and perform a purchase

User arrives -> purchase a game -> Purchase stream join previous available KTable and generate a state for the new user -> process fail

Same User purchase a game again ->Purchase stream now got his data-> purchase succeed now ....but in wrong time...

His Ktable will not be ready in time and the join operation will fail, in the last approach things goes different

User arrives  -> Stream is processed and stored in his table->User purchase a game -> Purchase stream join KTables ->purchase succeed  

Summary

Grouping the data from streams to ktables within persisted stores made debugging of processing easier and with more accuracy, grouping data from streams also help us to take care of updating our Ktables everytime new data arrives from a data bus

So that would be for the concepts!. Now the implementation, next time I will share this solution in detail on how to build the custom join functions and how to create our custom stores under the .reduce(makesureisUnique)(MaterializeStoreUser)