A couple of thoughts:
- You want two or more things to run simultaneously, so you need to use threads.
- If you want backpressure, you want a queue.
- If you call (b (a x)) in separate threads, it still takes 20 seconds, but you can do multiple ones at the same time.
- To make it so that one thread is running an a and another is running a b simultaneously, you need dedicated threads for as and dedicated threads for bs.
core.async would work, but it’s a big dependency to use just for a couple of queues.
Instead, I would use an ExecutorService. I would construct one with this.
(defonce b-executor (Executors/newFixedThreadPool 1))
I put 1, but the number of threads is up to you.
Now imagine you had a list of numbers to run a and b on:
(def numbers (range 100))
(def answers (map (fn [n] (let [result (a n)]
(.submit b-executor (fn [] (b result))))))
numbers))
answers
will contain Futures that will have the answer. You can call .get()
on them to get the answer, which will block until it’s ready. Also note that map
is lazy, so no work will actually start until you access the first element.
This will run the a
calculation in the main thread, and the b
calculation in a separate thread. In this way, as soon as the a
calculation is done processing 0, the main thread will move on to process the next a
calculation (1), while the b
thread is processing 0. I think this does what you were asking for.
As for back pressure, there is none here. But you are only creating one extra thread, so the b’s may get backed up if they take longer to run than the a’s. If you want to prevent that with real backpressure, you will have to configure the ExecutorService’s queue to be a fixed size. This is slightly more complicated, but you get more control:
(def b-executor (ThreadPoolExecutor. 0 1 1 TimeUnit/MINUTES ;; 1 max thread that will get spun down after 1 minute of inactivity
(ArrayBlockingQueue. 3 true)) ;; queue size: 3, FIFO: true
This queue will block if it’s full, which will block the main thread, which will then cannot run more a’s. That’s backpressure!
Things you may need to import:
java.util.concurrent.Executors
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.TimeUnit
java.util.concurrent.ArrayBlockingQueue
PLEASE NOTE: I did not run this code, so there may be bugs.