Help with software design: Cuncurrency, Parallelism, something else?

Hello everyone.

This is my first time building a fullstack clojure application and I’ve run into some issues and I’m not sure what’s the best/correct aproach to solve them.

I have four functions (f-1, f-2, f-3, f-4) , all of them have the same basic structure:

(defn f [foo bar]
  (let [db-info (get-db-info foo)]
    (loop [cond false]
      (if cond
        (do
          (update-db db-info)
          (transact-new-data-db m)) ; new data added to the db
        (Thread/sleep 3000))
      (recur (check-api-condition bar))))) 

Where the new data transacted in function f-n is looked up to in the function f-(n+1) in the “let” binding.

I’m using a loop because I constantly need to check the API of the service I’m using until certain conditions are meet, I’m guessing I could avoid these if there were weebhooks available but there aren’t any.

I currently have a do statement where the four functions are called in sequence. The thing is that ony one of them runs at any given time. So if f-3 takes 10 minutes to excute and exit the loop, then functions f-1 and f-2 are not doing anything even if they could.

So the question is: How can I make the program run all 4 functions simultaneously? Is this better suited for concurrency or parallelism?

houpefully this information is enough to give you guys a good enough picture of the issue, if not please let me know and I’ll add more detail.

Thanks everyone.

Yup, this is exactly the idea behind parallelism and concurrency.

In your case, since you only have four functions, an easy way is just to use future. Just wrap the calls to your functions in a future and they’ll now all be executing concurrently and possibly in parallel if you have a multi-core CPU.

The problem which you have though, and that has nothing to do with parallelization, is that you have an infinite loop. How are you supposed to terminate your program? If I were you, I’d pass an atom into them as well, and terminate the loop if the atom is equal to true or something:

(defn f [foo bar terminate-atom]
  ...
  (loop [cond false]
    ...
    (when-not @terminate-atom
      (recur ....))))

That way in your program you can set the terminate-atom to true anytime you want to stop your functions.

1 Like

When I see these kinds of things, I tend to jump toward core.async these days. You can accomplish similar stuff with futures and promises, but I think the core.async infrastructure works nicely to compose these concurrent operations together and allows for even higher order patterns (like parallel processing with pipeline).

Here’s my take…

(ns demo
  (:require [clojure.core.async :as a]))


;;pretend we have database.
(def db (atom {:foo 0
               :bar 0
               :baz 0
               :qux 0}))
(defn get-db-info [k]
  (get @db k))
(defn transact-new-data-db [k v]
  (swap! db assoc k v))

(def probs
  {:foo 0.3
   :bar 0.2
   :baz 0.8
   :qux 0.6})

(defn check-api-condition [x]
  (> (rand) (probs x 0.5)))

(defn polling-update
  [target condition f wait]
  (let [db-info (get-db-info target)
        res     (a/chan)]
    (a/go-loop []
      (if (check-api-condition condition)
        (do (->> (f db-info)
                 (transact-new-data-db target)
                 (a/>! res))
            (a/close! res))
        (do (a/<! (a/timeout wait))
            (recur))))
    res))

(defn collect [chs]
  (->> (a/merge chs (count chs))
       (a/into [])
       a/<!!))

(defn polling-updates [xs]
  (->> (for [{:keys [target condition f wait]} xs]
         (polling-update target condition f (or wait 3000)))
       collect))

(defn noisy-update [msg f]
  (let [out *out*]
    (fn [x]
      (binding [*out* out]
        (do (println msg)
            (f x))))))

(def services
  [{:target :foo :condition :foo :f (noisy-update :foo #(+ % 1)) :wait 100}
   {:target :bar :condition :bar :f (noisy-update :bar  #(+ % 2)) :wait 300}
   {:target :baz :condition :baz :f (noisy-update :baz  #(+ % 3)) :wait 500}
   {:target :qux :condition :qux :f (noisy-update :qux  #(+ % 4)) :wait 200}])

(defn demo [n]
  (dotimes [i n]
    (println (polling-updates services))))

With the demo yielding:

demo> (demo 3)
:bar
:foo
:qux
:baz
[{:foo 19, :bar 40, :baz 57, :qux 76}
 {:foo 20, :bar 40, :baz 57, :qux 76}
 {:foo 20, :bar 40, :baz 57, :qux 80}
 {:foo 20, :bar 40, :baz 60, :qux 80}]
:foo
:baz
:bar
:qux
[{:foo 21, :bar 40, :baz 60, :qux 80}
 {:foo 21, :bar 40, :baz 63, :qux 80}
 {:foo 21, :bar 42, :baz 63, :qux 80}
 {:foo 21, :bar 42, :baz 63, :qux 84}]
:bar
:qux
:foo
:baz
[{:foo 21, :bar 44, :baz 63, :qux 84}
 {:foo 21, :bar 44, :baz 63, :qux 88}
 {:foo 22, :bar 44, :baz 63, :qux 88}
 {:foo 22, :bar 44, :baz 66, :qux 88}]

I basically mimicked some of the functions you left undefined with my own in-memory database (an atom). Random wait times are simulated with calls to random. The system is managed by core.async channels and go-loops, where a “service” that waits for a condition and updates an entry in the database with a supplied function and optional wait time kicks off an individual go-loop to accomplish the task. The assumption (it wasn’t entirely clear from your question, but I inferred it) is that all the updates to the database are independent. There is no data coordinated between the results of “f1” and any other function (or service in this demo). If that holds, we are free to fire off n independent go-loops to enact the polling and updating.

We then define a higher-order function that polling-updates, that can operate an an arbitrary number of these service definitions supplied be a sequence of maps. It handles invoking the core.async plumbing, and collating the results (synchronizing).

At this level of granularity (somewhat coarse IMO), we are firing off N polling updates simultaneously, and we update atomically based on who comes in first. There may be some time savings (since requests can update computations can be distributed across cores via the worker threads core.async uses), but it’s still a bit limited compared to other schemes. If we have N truly independent tasks, then we would be better off doing something that can steal work and letting the system progress as fast as possible (the pipeline function family is good for this). That way, workers can switch over to shorter polling tasks while the longer ones are tied up.

If the assumption about independence is invalidated, there are still some elegant ways to compose these kind of services. The core.async way would be to use channels to propogate information (or atoms or similar references) and leverage the higher order building blocks to build systems of communicating processes. I think you can get there similarly with futures, promises, and threads, but for more complex stuff I find the core.async methods scale nicely and admit things like back pressure out of the box.

2 Likes

This most likely won’t work for OP, since the IO operations inside polling-update’s go-loop are likely blocking, unlike in your example.

So you need to switch to an async/thread loop instead, or turn the IO functions into non-blocking variants.

Fair enough.

(a/thread 
   (loop []
      (if (check-api-condition condition)
        (do (->> (f db-info)
                 (transact-new-data-db target)
                 (a/>!! res))
            (a/close! res))
        (do (a/<!! (a/timeout wait))
            (recur))))
1 Like