Skip to content
October 21, 2011 / marrowboy

Worker Queues in Clojure

In Rich Hickey’s talk “Simple Made Easy” he gives a great deal of good advice on how to manage (reduce, avoid) complexity in your code. He points out that complexity is sometimes easy to create but that simplicity is more desirable even if it is sometimes difficult. So simple != easy, and over-concentrating on developers’ ease at the expense of simplicity is killing our software.

Please, watch this talk – it’s hardly about Clojure at all, more about a rational way of thinking about coding. I’d like to concentrate on one specific piece of advice and see about how we can implement it.

Near the end, he talks about decoupling, so that if “A” wants to call a function in “B” there should be a queue of some kind in between so that A & B don’t have to be aware of each other’s existence. The coupling of A to B is unnecessary and “complects” (his word) the code.

The send

So, how to implement such a worker-queue in clojure? There is nothing much to help us in the core Clojure API but we can easily use one of Java’s BlockingQueue implementations, wrapping it up in Clojure like so:

(defn new-q [] (java.util.concurrent.LinkedBlockingDeque.))

(defn offer! 
  "adds x to the back of queue q"
  [q x] (.offer q x) q)

(defn take! 
  "takes from the front of queue q.  blocks if q is empty"
  [q] (.take q))

That’s a pretty thin wrapper! We can use it on the REPL like this:

(def example-queue (new-q))

;; background thread, blocks waiting for something to print
(future (println ">> " (take! example-queue))) 

;; the future-d thread will be unblocked and ">> HELLO WORLD" is printed.
(offer! example-queue "HELLO WORLD") 

This is fine now, we can imagine all kinds of fire&forget scenarios where we can use this like logging, writing to a database, whatever. But what if we need to have our message processed and a value returned? Can we still do this in an un-complected way?

The return

The answer is yes, and the key is in Clojure’s (promise). A promise is simply a reference-type which is initially empty and can only be written to once, using (deliver). Any attempt to deref a promise before it is delivered will block.

So, we just need to put a promise onto the queue and whatever reads the message can deliver the result. Voila:

;; THREAD 1
(def answer (promise))
(offer! example-queue {:q "how much?" :a answer})
(println @answer) ; blocks, until...

;; THREAD 2
(let [msg (take! example-queue)]
  (deliver (msg :a) "£3.50"))

;; THREAD 1
; now unblocked, prints "£3.50"

So Thread 1 fired off a message to the queue, something read the message and supplied an answer back. Thread 1 doesn’t know or care what actually did that, which simplifies it, reduces coupling and makes it more likely to be reusable. Splendid.

Concerns for real life

I’m going to try this approach in my next decent-sized project. I imagine that the entry-point to the app will def a bunch of queues and start a lot of worker threads (using future) to listen to the queues. Possibly there will be more than one listener per queue and I’ll have to implement a peek of some kind to examine waiting messages. The Java class used claims to be internally synchronised so I won’t worry about multiple deliveries etc. I may limit and monitor the size of each queue. I will use the version of deref which applies a time-limit when sending out promises, just in case.

coda

Although Mr Hickey probably isn’t interested, this approach also makes the code super-testable :)

3 Comments

Leave a Comment
  1. Gary Verhaegen / Jun 17 2012 3:53 pm

    There actually is a queue in clojure, which can be created with the “literal” value clojure.lang.PersistentQueue/EMPTY. Note that as the name suggests, this is a persistent (i.e. immutable) queue.

    To use it, you can use conj, which “adds” an element to the queue (actually returns a new queue with the element added), peek which returns the first element of the queue, leaving the queue unchanged, and pop, which “removes” the top element of the queue (the one seen by peek).

    See also : http://stackoverflow.com/questions/2760017/producer-consumer-with-qualifications

    • Jo / Dec 7 2012 8:21 am

      It is not recommended to use PersistentQueue as a queue for workflow.

      “If you find yourself wanting to repeatedly check a work queue to see if there’s an item of work to be popped off, or if you want to use a queue to send a task to another thread, you do not want the PersistentQueue discussed in this section.” (The Joy of Clojure)

      • marrowboy / Dec 7 2012 8:35 am

        Thanks for your comments guys. I was really pleased to see Rich Hickey recommending using the java.util.concurrent.*BlockingQueue classes for a message queue at ClojureX in London yesterday. He said he hadn’t created Clojure versions because the Java ones are fine – he had nothing to add.

        I’m not so sure any more about the wisdom of using promises to return values though. Erlang’s model doesn’t allow any returns, and their systems work. Also by allowing retruns I think you’re restricting the implementations of these queues. For example you couldn’t have your queue backed by an asynchronous REST service.

Leave a reply to Jo Cancel reply