@DanLebrero.

software, simply

Delay: Clojure's forgotten concurrency primitive

When talking about Clojure's concurrency primitives, we often forget "delay". Let's see how it can help us.

Image attribution: Scott Wolfe Jr

Eric Normand recently released a very nice Clojure Concurrency: The Ultimate Guide, where he explains all the usual primitives and libraries that most of us think about when talking about concurrency in Clojure.

But I see one missing: one that is often forgotten, or perhaps we do not even realize that it is yet another concurrency primitive.

That is, the humble delay:

Takes a body of expressions and yields a Delay object that will invoke the body only the first time it is forced (with force or deref/@), and will cache the result and return it on all subsequent force calls.

That doesn’t sound like a very obvious concurrency related construct, so let’s look at a practical example of what it is and when to use it.

Creating global connection pools, a naive approach

For some Spark jobs, we use HBase as the database. As usual, when accessing a database, we need some connection pool to make queries efficient.

But, because of the way the Spark framework works, we are forced to:

  1. AOT compile our code, so we need to avoid side effects during compilation.
  2. The HBase connection pool must be stored as global state, as Spark does not at the moment provide any “setup hook” in the workers

A naive implementation that tries to initialize the connection pool the first time that it is used could look like:

(defn create-new-hbase-connection []
  ;;some side effecting code that creates a new socket
  (Thread/sleep 1000)
  (println "Connection created!")
  ::done)

;; Old good demonized global state
(def global-hbase-connection (atom nil))

(defn get-or-create-connection []
  (if-let [connection @global-hbase-connection]
    connection
    (swap! global-hbase-connection
           (fn [current-connection]
             (or current-connection (create-new-hbase-connection))))))

Any side effect in a function used with swap should immediately raise alarm bells, as the function can be called multiple times by competing threads.

Let’s see it in action:

(dotimes [_ 5]
  (future (get-or-create-connection)))

Connection created!
Connection created!
Connection created!
Connection created!
Connection created!

So each thread created a new connection pool, but only one would be stored in the atom, while the others are lost in the JVM’s ether.

This is a typical resource leak.

Creating global connection pools, a delay approach

Delay is a perfect match for our requirements.

First, the code inside delay will not be run until it is used, which covers the first requirement.

Second, it will make sure that if several threads try to access the delay at the same time, only one will run the code, while the others will block and reuse the result. This is why delay is a concurrency primitive.

With delay, the code will look like:

(defn create-new-hbase-connection []
  ;;same as before
)
(def better-global-hbase-connection (delay (create-new-hbase-connection)))

If we test it:

(dotimes [_ 100]
  (future @better-global-hbase-connection))
  
Connection created!

That is simpler and correct. Win win!

A note on exceptions

If an exception is thrown in the delay body, the exception will cached:

(def exceptional-delay
  (delay
    (println "I am exceptional")
    (throw (RuntimeException.))))

If you deref exceptional-delay twice, it will print “I am exceptional” only once and it will throw the same exception twice:

@exceptional-delay
I am exceptional
CompilerException java.lang.RuntimeException, 
@exceptional-delay
CompilerException java.lang.RuntimeException, 

This may or may not be your required behaviour. core.memoize has a RetryingDelay that will not cache exceptions if you need it.

Other use cases

As hinted above, delay (or its equivalent in other languages) is used to build read-through caches, were multiple threads can be requesting the same missing value for a given key, and you want to make sure that just one of the threads do the work.

Another use case is when parallelising some heterogeneous set of tasks. Some of the tasks may need to access some expensive to fetch data, but it is also possible that none of the tasks would need the data. Wrapping the access to the data in a delay will be a perfect fit: no cost if no task uses the data, at most once if several tasks need the data.

So keep delay in mind if you need to guarantee only once execution between multiple threads.


You may be also interested on Atoms, delays and side effects: a resource management idiom for Clojure.


Did you enjoy it? or share!

Tagged in : Clojure