software, simply

Kafka, distributed coordination and the actor model

On how to design KafkaStream applications in terms of the actor model.

It seems that this blog post did not properly explain what it meant to replace Zookeeper with Kafka as the coordination mechanism for a cluster of services.

In the example, we had several instances of a service and they had to agree about which one would send an email to which clients, so they had to somehow coordinate amongst themselves to agree on who would do the work and for what subset of clients.

The blog post explains that KTables are conceptually equivalent to a database table, and some people understood that KTables were the mechanism used to do the coordination, by querying/updating the KTable state to signal or lock a record.

But this is far from what I meant.

Kafka as coordinator

KTables are built on top of Kafka and they do not provide any additional coordination mechanism other than what Kafka already provides: a leader election mechanism.

Let’s go back to basics to understand what we mean:

  1. A Kafka topic is divided into one or more partitions.
  2. By default, messages with the same key go to the same partition.
  3. Within a Consumer Group, Kafka will make sure that only one consumer receives all the messages for a particular partition.

In a picture:

kafka topic partitioning

In our case, if all trading positions for a given client are keyed by the client id, then all messages will end up in the same partition, which means that they all will end up consumed by one and only one of the service’s instances, which is the same as saying that the instance owns or is the “leader” for that particular client.

Actors, agents and distributed data grids

Furthermore, the Kafka consumer within an instance is single-threaded, which means that we have one and just one thread that owns a client, so that thread can make local reasonings about the state of the client without having to coordinate with other threads or service instances.

This is exactly the same concurrency write model as Erlang actors or Clojure agents:

kafka actor model

Basically, some state updated by a single thread/process, which reads actions from a queue (well, in Erlang it is not a simple queue).

The beauty behind this model is that it is simpler to reason about state changes, as there is always one thread manipulating the state, so your code doesn’t need to worry about synchronization or concurrent access to the data.

Additionally, as Kafka distributes the partitions between consumers, it is distributing the state of all these actors amongst the instances of your service cluster, so that each instance has to deal with just a subset of the data, much similar to what a data grid provides.

To update some data in a data grid you can follow the read-update-save pattern, but most data grids provide a performance optimization were you can send the operation to the data, as it is less chatty and the grid can optimize any locks required access the data. This optimized pattern is what Kafka Stream applications must follow.

Note that KafkaStream applications provide a different read model than actors, agent or data grids: you either push the state to another Kafka topic for any party interested or you have to build your own API to expose that state.

So you can think of Kafka Stream as a distributed agent model with built-in durability, and Kafka’s scalability, fault tolerance, partitioning and transactional model.

Example: deduplication of requests

Lets see a practical example from this comment:

I want multiple clients to be able to put messages in a “run request” topic. If I’m already running when I receive the request, I don’t want to do anything. If I’m not running, I want to initiate (send a message) to a “run command” topic. I believe that a KTable that holds the running state is the answer. I can’t figure out what the kafka streams processor would look like for this as it reads and writes to the “run command” KTable.

As we said before, because KTables are build on asynchronous message passing, we cannot expect several clients to agree on the state of a run command and change it transactionally. Instead, we need Kafka to designate a leader for a particular run command, so that leader can make the decision about which one of the client commands to run.

The first thing that we need to do is create a key that is the same for all the commands that we want to deduplicate, but different from other run commands. This key will identify the leader/actor.

Then all clients send their version of the run command with that same key, so that all commands end up in the same Kafka partition.

As there will only be one consumer for that particular partition, that consumer is effectively the leader of all the keys (or run commands) that end up in that partition. That consumer will just act on the first run command, update its state and ignore the rest of commands.

In code:

(defn run-first-command [^KeyValueStore store ctx k v]
 (if (.putIfAbsent store k true)
   (println "Ignoring command" v "for key" k)
   (println "Running command" v "for key" k)))

(defn create-kafka-stream-topology []
 (let [^KStreamBuilder builder (KStreamBuilder.)
       store (-> (Stores/create "run-command-state")
                 (.withValues (EdnSerde.))
       builder (-> builder
                   (.addSource "raw-data" (into-array ["raw-data"]))
                   (.addProcessor "run-command-processor"
                                  (processor run-first-command "run-command-state")
                                  (into-array ["raw-data"]))
                   (.addStateStore store (into-array ["run-command-processor"])))]

You can find the complete code here.

Some notes:

  1. Kafka Streams is just a library, so the run command app can run in the same process as the clients if we consider it more appropriate.
  2. If you are more used to the actor model, this is equivalent to several processes sending each a run command message to an actor. The run command key would be the identifier of the actor.
  3. Kafka Streams will store the state of the run command actor/leader in Kafka.
  4. If you require exactly-once semantics, Kafka Streams will take care of it if the side effect of your run command is sending a message to another Kafka topic. For other kinds of side effects, your application needs to be carefully designed.
  5. Just because you can do it, it doesn’t mean that this kind of solution is the best one for your particular context. Maybe Redis put-if-absent is more appropriate for this case.


I hope this clarifies how to use Kafka Streams to design your applications.

Even if Kafka just provides a leader election mechanism, it is a powerful one.

First, it makes reasoning about state a lot simpler because there is always just one thread manipulating the state, hence you don’t need to worry about synchronization or distributed locks. Kafka Streams makes persisting that state a breeze.

Second, it provides a sharding mechanism. As each partition will be owned by just one instance, Kafka is effectively distributing the data across your service instances, so each just has to handle a subset of it.

And all on top of the other Kafka’s niceties.

Did you enjoy it? or share!

Tagged in : Architecture Clojure Kafka