When I see these kinds of things, I tend to jump toward core.async these days. You can accomplish similar stuff with futures and promises, but I think the core.async infrastructure works nicely to compose these concurrent operations together and allows for even higher order patterns (like parallel processing with pipeline
).
Here’s my take…
(ns demo
(:require [clojure.core.async :as a]))
;;pretend we have database.
(def db (atom {:foo 0
:bar 0
:baz 0
:qux 0}))
(defn get-db-info [k]
(get @db k))
(defn transact-new-data-db [k v]
(swap! db assoc k v))
(def probs
{:foo 0.3
:bar 0.2
:baz 0.8
:qux 0.6})
(defn check-api-condition [x]
(> (rand) (probs x 0.5)))
(defn polling-update
[target condition f wait]
(let [db-info (get-db-info target)
res (a/chan)]
(a/go-loop []
(if (check-api-condition condition)
(do (->> (f db-info)
(transact-new-data-db target)
(a/>! res))
(a/close! res))
(do (a/<! (a/timeout wait))
(recur))))
res))
(defn collect [chs]
(->> (a/merge chs (count chs))
(a/into [])
a/<!!))
(defn polling-updates [xs]
(->> (for [{:keys [target condition f wait]} xs]
(polling-update target condition f (or wait 3000)))
collect))
(defn noisy-update [msg f]
(let [out *out*]
(fn [x]
(binding [*out* out]
(do (println msg)
(f x))))))
(def services
[{:target :foo :condition :foo :f (noisy-update :foo #(+ % 1)) :wait 100}
{:target :bar :condition :bar :f (noisy-update :bar #(+ % 2)) :wait 300}
{:target :baz :condition :baz :f (noisy-update :baz #(+ % 3)) :wait 500}
{:target :qux :condition :qux :f (noisy-update :qux #(+ % 4)) :wait 200}])
(defn demo [n]
(dotimes [i n]
(println (polling-updates services))))
With the demo yielding:
demo> (demo 3)
:bar
:foo
:qux
:baz
[{:foo 19, :bar 40, :baz 57, :qux 76}
{:foo 20, :bar 40, :baz 57, :qux 76}
{:foo 20, :bar 40, :baz 57, :qux 80}
{:foo 20, :bar 40, :baz 60, :qux 80}]
:foo
:baz
:bar
:qux
[{:foo 21, :bar 40, :baz 60, :qux 80}
{:foo 21, :bar 40, :baz 63, :qux 80}
{:foo 21, :bar 42, :baz 63, :qux 80}
{:foo 21, :bar 42, :baz 63, :qux 84}]
:bar
:qux
:foo
:baz
[{:foo 21, :bar 44, :baz 63, :qux 84}
{:foo 21, :bar 44, :baz 63, :qux 88}
{:foo 22, :bar 44, :baz 63, :qux 88}
{:foo 22, :bar 44, :baz 66, :qux 88}]
I basically mimicked some of the functions you left undefined with my own in-memory database (an atom). Random wait times are simulated with calls to random
. The system is managed by core.async channels and go-loops, where a “service” that waits for a condition and updates an entry in the database with a supplied function and optional wait time kicks off an individual go-loop to accomplish the task. The assumption (it wasn’t entirely clear from your question, but I inferred it) is that all the updates to the database are independent. There is no data coordinated between the results of “f1” and any other function (or service in this demo). If that holds, we are free to fire off n independent go-loops to enact the polling and updating.
We then define a higher-order function that polling-updates
, that can operate an an arbitrary number of these service definitions supplied be a sequence of maps. It handles invoking the core.async plumbing, and collating the results (synchronizing).
At this level of granularity (somewhat coarse IMO), we are firing off N polling updates simultaneously, and we update atomically based on who comes in first. There may be some time savings (since requests can update computations can be distributed across cores via the worker threads core.async uses), but it’s still a bit limited compared to other schemes. If we have N truly independent tasks, then we would be better off doing something that can steal work and letting the system progress as fast as possible (the pipeline
function family is good for this). That way, workers can switch over to shorter polling tasks while the longer ones are tied up.
If the assumption about independence is invalidated, there are still some elegant ways to compose these kind of services. The core.async way would be to use channels to propogate information (or atoms or similar references) and leverage the higher order building blocks to build systems of communicating processes. I think you can get there similarly with futures, promises, and threads, but for more complex stuff I find the core.async methods scale nicely and admit things like back pressure out of the box.