Concurrency in Clojure is easy peasy, unless you have to deal with side effects. But atoms and delays can still help us. Let's look at an idiom for resource management.
In Clojure, if your application needs some stateful component that needs to be initialised just once during application startup and destroyed once during shutdown, you would usually rely on something like Component or Mount.
But what if you need to reinitialize that component during the life of the application, while multiple threads are trying to use that component?
A practical example
In a previous post, I explained how to use delay and how we use it in our Spark jobs to initialize an HBase connection pool. Basically, the code that we needed was:
(def better-global-hbase-connection (delay (create-new-hbase-connection)))
Now, I simplify the problem a little bit.
This code would be fine if we just needed to create the HBase connection pool once for the whole duration of the Spark job.
But, in our case of long running Spark Streaming jobs, sometimes, for some unknown reason, the HBase connection pool just stops working. This needed the workaround of recreating the connection pool on error.
As there are multiple threads reading and writing using that connection pool, several could potentially detect at the same time that the pool had gone bad, and have to coordinate between themselves to ensure that just one thread created the new pool.
A not so good solution
Recreating the pool means that we have some mutable state, so the most obvious choice is to use an atom to hold a reference to the delayed pool:
(def global-hbase-connection (atom (delay (create-new-hbase-connection))))
And then, when we detect an error, close the old connection and create a new one:
(defn refresh-connection!  (swap! global-hbase-connection (fn [old-connection] ;; should close old connection (log "Closing" @old-connection) (delay (create-new-hbase-connection))))) (defn run-with-hbase [f] (try (f @@global-hbase-connection) (catch Exception _ (log "Retrying") (refresh-connection!) ;; this retry logic is again a simplification (run-with-hbase f))))
This was the first version the we wrote, but it has a subtle issue. Can you see it?
Let’s give it a try to see what happens, simulating that the threads detect an error:
(dotimes [_ 4] (future (run-with-hbase (let [errored (atom false)] (fn [connection] (if @errored (log "Using" connection) (do (log "Not happy with" connection) (reset! errored true) (throw (RuntimeException.)))))))))
The results are a little bit surprising:
11:31:43 - clojure-agent-send-off-pool-202 - Not happy with 1 11:31:43 - clojure-agent-send-off-pool-200 - Not happy with 1 11:31:43 - clojure-agent-send-off-pool-199 - Not happy with 1 11:31:43 - clojure-agent-send-off-pool-201 - Not happy with 1 11:31:43 - clojure-agent-send-off-pool-201 - Closing 1 11:31:43 - clojure-agent-send-off-pool-199 - Closing 1 11:31:43 - clojure-agent-send-off-pool-200 - Closing 1 11:31:43 - clojure-agent-send-off-pool-202 - Closing 1 11:31:43 - clojure-agent-send-off-pool-201 - Retrying 11:31:44 - clojure-agent-send-off-pool-199 - Connection created! 2 11:31:44 - clojure-agent-send-off-pool-199 - Closing 2 11:31:44 - clojure-agent-send-off-pool-200 - Closing 2 11:31:44 - clojure-agent-send-off-pool-201 - Using 2 11:31:44 - clojure-agent-send-off-pool-202 - Closing 2 11:31:44 - clojure-agent-send-off-pool-199 - Retrying 11:31:45 - clojure-agent-send-off-pool-200 - Connection created! 3 11:31:45 - clojure-agent-send-off-pool-200 - Closing 3 11:31:45 - clojure-agent-send-off-pool-200 - Retrying 11:31:45 - clojure-agent-send-off-pool-202 - Closing 3 11:31:45 - clojure-agent-send-off-pool-199 - Using 3 11:31:46 - clojure-agent-send-off-pool-200 - Connection created! 4 11:31:46 - clojure-agent-send-off-pool-200 - Using 4 11:31:46 - clojure-agent-send-off-pool-202 - Closing 4 11:31:46 - clojure-agent-send-off-pool-202 - Retrying 11:31:47 - clojure-agent-send-off-pool-202 - Connection created! 5 11:31:47 - clojure-agent-send-off-pool-202 - Using 5
What is wrong here?
First, we are creating four new connection pools, instead of the expected one.
Second, several threads try to close the same connection.
Last, on 11:31:45, connection 3 is being closed by thread “clojure-agent-send-off-pool-200” and then thread “clojure-agent-send-off-pool-199” tries to use it. It will find the connection closed, which will cause any operation to fail. If thread 199 decided to retry, it will close the existing connection, which could be in use by thread 200, which would decide to retry, which will close the existing connection, which could be in use by thread 199, which would decide to retry, which will close the existing connection, which could be in use by thread 200, which would decide to retry, which …
Note that even if we are pointlessly creating and closing the connection pool, there are no resource leaks, as all pools are properly closed.
What is going on?
If you remember how swap! works, what is happening is that all four threads run the refresh-connection!, each of them creating a new delayed pool.
One of them will win, and the others will find out that the value on the atom has changed, so Clojure will rerun the function again for the losing threads, closing the newly created connection and create a new bunch of delays.
We naively used swap because we had to close the previous pool, and needed a reference to the previous value.
But in this case, the next value of the atom does not depend on the current value.
A better solution
What we really want to code is: “Recreate the connection pool as long as nobody else has done it yet”.
(defn refresh-connection! [broken-connection] (if (compare-and-set! global-hbase-connection broken-connection (delay (create-new-hbase-connection))) (log "closing connection" @broken-connection))) (defn run-with-hbase [f] (let [connection @global-hbase-connection] (try (f @connection) (catch Exception _ (refresh-connection! connection) (log "Retrying") (run-with-hbase f)))))
Note how the refresh-connection! function now requires the broken pool as a parameter, as we just want to recreate the pool if, and only if, the current pool is still the broken one.
If another thread has recreated the pool already, the compare-and-set! will return false, that is like saying “Hey, somebody already changed the value”, in which case our thread doesn’t want to do anything.
Running the test again, it outputs:
12:07:09 - clojure-agent-send-off-pool-203 - Not happy with 1 12:07:09 - clojure-agent-send-off-pool-204 - Not happy with 1 12:07:09 - clojure-agent-send-off-pool-205 - Not happy with 1 12:07:09 - clojure-agent-send-off-pool-205 - Retrying 12:07:09 - clojure-agent-send-off-pool-206 - Not happy with 1 12:07:09 - clojure-agent-send-off-pool-203 - closing connection 1 12:07:09 - clojure-agent-send-off-pool-203 - Retrying 12:07:09 - clojure-agent-send-off-pool-204 - Retrying 12:07:09 - clojure-agent-send-off-pool-206 - Retrying 12:07:10 - clojure-agent-send-off-pool-205 - Connection created! 2 12:07:10 - clojure-agent-send-off-pool-205 - Using 2 12:07:10 - clojure-agent-send-off-pool-203 - Using 2 12:07:10 - clojure-agent-send-off-pool-204 - Using 2 12:07:10 - clojure-agent-send-off-pool-206 - Using 2
Note that just one thread tried to close the broken pool, just one thread created the new pool and that of them used the newly created pool. All good!
Even if we try to keep our functions pure, for our programs to be useful, we still need side effects. In those cases, even with all the power of Clojure, concurrency, side effects, and mutable state are as hard as ever.
But the simple semantics of atoms help a lot to reason about them.