@DanLebrero.

software, simply

Kafka, GDPR and Event Sourcing - Implementation details

Implementation details for a proof of concept for compliance with GDPR on an Event Sourcing architecture built with Apache Kafka.

This post contains the implementation details of this other post about a possible architecture for Kafka, GDPR and Event Sourcing. Please read that other post for context and conclusions.

The proof of concept (PoC) uses Kafka Streams API and all code, including a docker-compose environment to test it, is here.

Event producer

Nothing interesting.

Encryptor

The encryptor job is pretty easy: the first time that it sees a key, it needs to generate a new encryption key, store it and push it to the encryption-keys topic for other consumer. In code:

(defn get-or-create-encryption-key [^KeyValueStore store ^ProcessorContext ctx k]
 (if-let [encryption-key (.get store k)]
   encryption-key
   (let [new-encryption-key (generate-encryption-key)]
     (.put store k new-encryption-key)
     (.forward ctx k new-encryption-key "encryption-keys")
     new-encryption-key)))

This means that we are storing the key twice: once in the store changelog and once in the encryption-keys topic. This is ok as we dont want to couple the implementation details of the encryptor (what we store in the changelog) with what the consumers need. For example, if we decided to use an asymmetric algorithm, the store will have the private key while the encryption-keys topic would have the public one.

After we have the encryption key, in the PoC we just encrypt the whole value of the message and forward it to the encrypted Kafka topic, but during the implementation it raised the question about whether it would be more appropriate to encrypt just the PII data within the message. Some possible options:

  1. The value to have two fields: PII and non-PII. If the encryption key is deleted, the PII field will be deserialized to null.
  2. Have PII events and non-PII events, in the same or different topics, depending if ordering between them is important or not.
  3. When the user exercises the right of erasure, before deleting the encryption key, push a event with the non-PII data that we want to retain.

The second option seems the simplest one.

The Encryptor also listen to the GDPR messages and when the user exercises the right to erasure, we store a tombstone both in the local store and in the encryption-keys topic:

(defn handle-gdpr [^KeyValueStore store ^ProcessorContext ctx k _]
 (.put store k common/tombstone)
 (.forward ctx k common/tombstone))

We will explain in the Event Consumer why a tombstone and not just delete the record.

Another decision that we need to make is what to do if the encryptor gets a message after the user has exercised the right to erase:

  1. Create a new key for all the new messages, so new messages will be readable. In this case, we don’t need to store a tombstone in the local store.
  2. Ignore the message, basically deleting it.
  3. Create a new key just for the event, and never store the key, effectively creating an unreadable event.

It will depend on a business decision, but either the first or last options seem like the most sensible ones.

Reusable Encryptor

Ideally, we don’t want to change the Encryptor every time there is a new topic that needs to be encrypted.

While KafkaStreams allows to subscribe to a set of topics based on a regular expression, and it will subscribe to newly created topics that match that expression, there is no obvious way to publish a message to an arbitrary topic.

Maybe the correct way is to provide a KafkaClientSupplier to the KafkaStreams, so that both our code and the KafkaStreams code use the same Kafka producer, but we decided to do what a SinkNode does:

(defn send [^ProcessorContext ctx k v]
 (let [rc ^RecordCollector (.recordCollector ^RecordCollector$Supplier ctx)]
        (.send rc
            (encrypted-topic-name (.topic ctx))
            k v (.timestamp ctx)
            (-> ctx .keySerde .serializer)
            (-> ctx .valueSerde .serializer)
            nil))))

Never a good idea to use an internal api, but good enough for the PoC.

But even if this solves the problem of sending messages to an arbitrary topic, there is a bigger problem: to be able to reuse the Encryptor, all input topics must be copartition (aka. must have the same number of partitions).

If the topics do not have the same number of partitions, then two messages for the same user in different topics can end up in different instances of the Encryptor service, which means that each instance will generate an encryption key and push it to the encryption-keys topic, one overwriting the other, making some messages unreadable.

If the topics have the same number of partitions, then the default partition assignor will distribute the partitions between the Encryptor instances so that partition 1 for all topics is consumed by one consumer, partition 2 for all topics is consumed by another consumer, etc… This way all message for a given user end up in the same instance, even from different topics.

As copartitioning all topics is unlikely to be desirable, we probably need to split the Encryptor in two: one app to generate the encryption keys and another one to do the actual encryption. Something like:

Kafka GDPR encryption architecture reuse

This way, the encryption app can repartition the encryption keys if required.

Event Consumer

The event consumer has to join the events from the encrypted-data topic (a KStream) with the encryption-keys (a KTable).

Unfortunately, even if Kafka Streams tries to give you some control over the order on which messages are processed, some testing showed that if there are a lot of events in the encrypted-data topic, the first batch of events will all come from this topic and none from the encryption-keys topic, which means that none of the events was decrypted because none of the encryption keys was known at that time.

So we ended up building our own join-like topology, that will remember which events are missing the encryption key and will process those events once the encryption key shows up.

For this topology, processing the encrypted data is pretty simple:

(defn encrypted-data-msg [^KeyValueStore missing-store
                         ^KeyValueStore encryption-keys-store
                         ^ProcessorContext ctx
                         k
                         encrypted-item]
 (if-let [encryption-key (.get encryption-keys-store k)]
   (.forward ctx k [encryption-key encrypted-item])
   (.put missing-store (lexicographic-ordered-key k (.partition ctx) (.offset ctx)) encrypted-item))

So if the encryption key is known, we forward the encryption key and the encrypted value pair to the next step, the actual decryption.

If the encryption key is not known yet, we store the event. But when the encryption key arrives, we will want the order of events to remain intact, hence we need to somehow remember the arrival order. There are two possible options:

  1. Store as key the user, and as value a list of events.
  2. Store as key the user + event offset, and as value the event.

In the PoC we chose the second option, as there could be an unbounded amount of events for a user, so we would need an unbounded amount of memory to deserialize them for processing or updating.

Lets see how we use it when the encryption key arrives:

(defn encryption-key-msg [^KeyValueStore missing-store
                         ^KeyValueStore encryption-keys-store
                         ^ProcessorContext ctx
                         k
                         encryption-key]
 (.put encryption-keys-store k encryption-key)
 (with-open [encrypted-items (.range missing-store
                                 (lexicographic-ordered-key k 0 0)
                                 (lexicographic-ordered-key k Integer/MAX_VALUE Integer/MAX_VALUE))]
   (doseq [^KeyValue encrypted-item (iterator-seq encrypted-items)]
     (.delete missing-store (.key encrypted-item))
     (when-not (common/tombstone? encryption-key)
       (.forward ctx k [encryption-key (.value encrypted-item)])))
   (when (common/tombstone? encryption-key)
     (.forward ctx k [encryption-key nil]))))

Notice that we are using the range method, that even if it doesn’t guarantee an order, the actual RocksDB implementation does. Another implementation detail that we should not depend on.

Even if this implementation guarantees the order of events for a given user, notice that it doesn’t guarantee any order between users, so it is possible that during decryption there is some reordering of some messages. To guarantee the order of all messages, we would need to store every message after the first one missing the encryption key, even for those that we already know the encryption key.

Last, we prefer a tombstone in the encrypted-keys topic, over just deleting the encryption key, so that the consumer can clean up any events for that user. If we did not store a tombstone, the consumers could store events in their local store forever.

Back to Kafka, GDPR and Event Sourcing.


Did you enjoy it? or share!

Tagged in : Architecture Clojure Kafka