Out of step sequence processing

So I have function a that takes roughly 10 seconds and a function b that takes roughly 10 seconds.

b takes the result of function b so (-> a b). In total they take roughly 20 seconds to run.

These function are called repeatedly to process a queue/sequence of inputs. Is there a simple way to get them to run out of step?


(a 1)      (a 2)      (a 3)      (a 4)
       (b (a 1))  (b (a 2))  (b (a 3))

I’ve done something crude with futures.

(a 1)               (a 2)               (a 3)      
       (future (b (a 1)))  (future (b (a 2)))

But that doesn’t have back pressure. So if a runs faster than b things can get messy.

Thoughts? Thanks.

Maybe use a core.async channel with a fixed buffer.


b takes the result of function b so (-> a b) .

Assuming you meant a in the second position there.

Also assuming coll is the sequence of inputs, how about:

(pmap (comp b a) coll)

1 Like

A couple of thoughts:

  1. You want two or more things to run simultaneously, so you need to use threads.
  2. If you want backpressure, you want a queue.
  3. If you call (b (a x)) in separate threads, it still takes 20 seconds, but you can do multiple ones at the same time.
  4. To make it so that one thread is running an a and another is running a b simultaneously, you need dedicated threads for as and dedicated threads for bs.

core.async would work, but it’s a big dependency to use just for a couple of queues.

Instead, I would use an ExecutorService. I would construct one with this.

(defonce b-executor (Executors/newFixedThreadPool 1))

I put 1, but the number of threads is up to you.

Now imagine you had a list of numbers to run a and b on:

(def numbers (range 100))

(def answers (map (fn [n] (let [result (a n)]
                            (.submit b-executor (fn [] (b result))))))

answers will contain Futures that will have the answer. You can call .get() on them to get the answer, which will block until it’s ready. Also note that map is lazy, so no work will actually start until you access the first element.

This will run the a calculation in the main thread, and the b calculation in a separate thread. In this way, as soon as the a calculation is done processing 0, the main thread will move on to process the next a calculation (1), while the b thread is processing 0. I think this does what you were asking for.

As for back pressure, there is none here. But you are only creating one extra thread, so the b’s may get backed up if they take longer to run than the a’s. If you want to prevent that with real backpressure, you will have to configure the ExecutorService’s queue to be a fixed size. This is slightly more complicated, but you get more control:

(def b-executor (ThreadPoolExecutor. 0 1 1 TimeUnit/MINUTES ;; 1 max thread that will get spun down after 1 minute of inactivity
                                     (ArrayBlockingQueue. 3 true)) ;; queue size: 3, FIFO: true

This queue will block if it’s full, which will block the main thread, which will then cannot run more a’s. That’s backpressure!

Things you may need to import:


PLEASE NOTE: I did not run this code, so there may be bugs.


Thanks Eric, this is exactly what I was looking for.

I was dreading having to pull in core.async just to use queues/channels. I did explore pmap but I ideally wanted fine grain control over the number of threads.