Clojure: managing throughput with virtual threads

Wrote this blog post over the weekend. Virtual threads are actualy quite fun to work with. I’m using CompletionService to implement unordered paralelism and Semaphore to implement token bucket rate limiting. So far it’s been working quite well on my current project.

However, I wonder if there’s a way to make it less earger? Right now my implementation of upmap needs to know the total number of tasks it’s running (so that I know how many tasks to take from the CompletionService). This means it needs all it’s inputs before it can start submitting tasks. This isn’t a major problem, as the important part, consuming tasks unordered and as they are completed is working. But i’m still curious if anyone has any ideas? I guess some form of chunking maybe?

Here’s the code snippet (version without rate limiting).

(defn upmap
  ([f coll]
   (let [cs (ExecutorCompletionService/new executor)]
     (run! (fn [x] (ExecutorCompletionService/.submit
                     cs #(f x))) coll)
     (->> (repeatedly #(deref (ExecutorCompletionService/.take cs)))
       (take (count coll))))))

More context in the blog post. Thanks.

4 Likes