@DanLebrero.

software, simply

Proof of concept using KafkaStreams and KTables - implementation notes, gotchas and Docker Compose example

Implementation details from a proof of concept using Kafka Streams and KTables using Clojure and Docker

This blog contains the implementation notes from the proof of concept explained in the previous post.

As a reminder, we want to build a new microservice whose job is to:

Send a weekly email to client’s holding trading positions in a US stock.

We will use Kafka Streams and more specifically KTables to build it.

All the code can be found here, including a Docker Compose file that will run Kafka, Zookeeper plus three instances of this service, so you can play around with it. The details of how to build and run it are in the repository.

The plan

The order booking system is keeping a Kafka compacted topic updated with any changes in the open positions for each client. The Kafka’s key used is the position’s id.

This is perfect example of a KTable, which conceptually looks like:

PositionId (Key) Client Ticker Amount Exchange
1 client1 AAPL 100 NASDAQ
2 client1 VOD 5 LON
3 client1 FB 33 NASDAQ
4 client2 AAPL 25 NASDAQ
5 client3 VOD 33 LON

We want to transform this KTable into another one that just contains the clients that we want to email, without any duplicates. Something like:

Client (Key) value
client1 ?????
client2 ?????

But what are the values on that table? Well, it has to be the list of US positions for that client so that we keep track of how many the client has and we remove the client from the KTable when he has none open.

So the KTable that we are looking for is:

Client (Key) positions
client1 #{1, 3}
client2 #{4}

The code

To do this transformation we just need this code:

(->
  (.table builder "share-holders" "share-holder-store")
  (.filter (k/pred [key position]
             (= "NASDAQ" (:exchange position))))
  (.groupBy (k/kv-mapper [key position]
              (KeyValue/pair (:client position)
                             #{(:id position)})))
  (.reduce (k/reducer [value1 value2]
             (set/union value1 value2))
           (k/reducer [value1 value2]
             (let [result (set/difference value1 value2)]
               (when-not (empty? result)
                  result)))
           "us-share-holders"))

Notice that to create a KTable we specify an associated store. For me it was not obvious why would we need it, but we will see latter why it is.

GroupBy will make sure that all the positions for a client end up in the same partition. This is a shuffle operation that will need an additional Kafka topic. Kafka Streams will take care of creating this internal topic without us having to do any work or configuration.

Note that as part of the GroupBy we are also mapping the position to a set with one element, the position id. This is so we can next use a Reduce instead of the more complex AggregateByKey function.

As we are working with an infinite stream, Reduce needs two functions, one when a new position is added and the other when the position is deleted. Note that on the delete function, we return null if it was the last position for the client, so the client is removed from the store.

Lastly, the results of Reduce are stored in the us-share-holders store. This will require another internal topic for fault tolerance.

Now that we have the store, we just need to query it to get our list of clients:

(with-open [all (.all 
                  (.store kafka-streams 
                          "us-share-holders" 
                          (QueryableStoreTypes/keyValueStore)))]
  (mapv
    (fn [x] (.key x))
    (iterator-seq all)))

We just iterate over all the entries in the store, returning just the keys.

Note that we are just querying the local view of the store, so for each instance of the microservice will see a different set of clients.

Gotchas

Here is a list of bumps on the road while building this proof of concept.

Internal topics not being created

We were getting a “Topic not found during partition assignment” exception when starting the KafkaStream:

org.apache.kafka.streams.errors.StreamsException: Topic not found during partition assignment: example-consumer17-us-share-holders-repartition
    at org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
   	at org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
 	at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)

At first we thought that KafkaStreams required the “auto.create.topics.enable” flag set to true but the documentation strongly discourages it, so it couldn’t be that.

So we had to dig a little bit more and we found this really helpful WARN in the logs:

WARN ... Topic [...] do not exists but couldn't created as the config 'zookeeper.connect' isn't supplied

So should had pay more attention while RTFM!

30 seconds delay on processing

Initially we saw that messages took 30 seconds to be processed, which was surprising as we expected real time processing.

So after a quick search and RTFM, we learned that KafkaStreams has an internal cache and that no processing happens until that cache is full or the commit.interval.ms time has elapsed, whatever happens first. The size of the cache can be controlled with the cache.max.bytes.buffering setting.

This cache allows for a better throughput as it enables compaction before processing plus batch writes to Kafka and RocksDB.

Both parameters can be tuned to match the latency or throughput requirements of your application. Setting cache.max.bytes.buffering to 0 will make KafkaStreams real time again, processing one message at a time.

All is nicely explained in the documentation.

KTable associated store

We also did not understood why when creating a KTable we had to specify a store; at the end of the day, we were not interested on the original KTable but on the transformed one.

All became more clear when we got to implement the reduce function.

Lets see what happens when we add a new position:

INFO - Producing {:topic share-holders, :key daniel:::AAPL, :value {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}}
INFO - Filtering daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - Grouping daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - [KTABLE-REDUCE-0000000006]: daniel , (#{"daniel:::AAPL"}<-null)

Nothing weird so far, but notice that reduce is not called at all; but that makes sense as it is the very first position for this client.

Let’s now remove that position:

INFO - Producing {:topic share-holders, :key daniel:::AAPL, :value nil}
INFO - Filtering daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - Grouping daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - Removing from #{daniel:::AAPL} the position #{daniel:::AAPL}
INFO - [KTABLE-REDUCE-0000000006]: daniel , (null<-null)

That is interesting. filter and grouping are being called with the position that has been closed.

So that is why the KTable requires a store, to remember what was the last value for a key. If KafkaStreams did not remember what was the last value, we would have a really hard time to implement the removal of the position as the Kafka message that triggers the reduce just has a null payload. That is pretty neat!

But what happens when we update a position?

INFO - Producing {:topic share-holders, :key daniel:::AAPL, :value {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}}
INFO - Filtering daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - Grouping daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - [KTABLE-REDUCE-0000000006]: daniel , (#{"daniel:::AAPL"}<-null)
INFO - Producing {:topic share-holders, :key daniel:::AAPL, :value {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 1}}
INFO - Filtering daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 1}
INFO - Filtering daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - Grouping daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 1}
INFO - Grouping daniel:::AAPL {:client daniel, :id daniel:::AAPL, :ticker AAPL, :exchange NASDAQ, :amount 99}
INFO - removing #{daniel:::AAPL} #{daniel:::AAPL}
INFO - [KTABLE-REDUCE-0000000006]: daniel , (null<-null)
INFO - [KTABLE-REDUCE-0000000006]: daniel , (#{"daniel:::AAPL"}<-null)

So an update is generating two events: one stating that the client has no positions followed immediately by other one adding the position back. That is not so neat!

But this is because we have set the cache.max.bytes.buffering to 0. With some caching, only the last value would be produced.

Something to keep in mind!


Did you enjoy it? or share!

Tagged in : Architecture Clojure Kafka