How to architecture code using core.async

I am struggling to understand how to architecture code with core.async, regarding the extraction of intermediate functions.

Let’s consider the following basic code, listening an input channel and answering back in an output chan.

(def in (chan))
(def out (chan))
(go-loop [msg (<! in)]
  (when (some-condition msg)
    (>! out (answer msg)))
  (recur (<! in)))

Now, say I want to extract the part (when ...) into a dedicated function (answer-if-matching msg).
From experience and reading the doc, I know that I cannot use >! because it is reserved to go-blocks, which are macros performing special magic.
The obvious alternative seems to be >!!, but its doc explicitly says not to be used in go-blocks (mostly because it “breaks” the nice state-machine, I assume).

The alternative offer! does not block, so the contract differs from >!. Similarly, put! does not look blocking.

And looking at the macro-expansion for my snippet, I see uses of put!, but used within a state machine. So unless I redo a similar state-machine, I won’t get the best of core.async.

Finally, all the examples I found in blogs and courses (even paying) dodge this, showing only simple code. Unfortunately, I don’t know any library I could dive into to find my answer.

So many thanks for any input on this. Cheers

3 Likes

Note that I am also open to the idea that such a refactoring is not the idiomatic way for core.async. That is the reason for this topic: what is the recommended way of architecturing the code?
Composing small go blocks? Digging into the raw async functions?

1 Like

I think the general wisdom is to maintain core.async at the periphery of your application, and compose your business logic using pure functions. So the way I would try and refactor your code might be something like:

(def in (chan))
(def out (chan))

(defn answer-if-matching
  [msg]
  (when (some-condition msg)
    (answer msg))

(go-loop [msg (<! in)]
  (when-let [answer (answer-if-matching msg)]
    (>! out answer))
  (recur (<! in)))

This way all of your IO logic (if I have an answer, send it to someone) stays separate from your business logic (if some condition is met, compute the answer).

Another way you could refactor this (eschewing the above advice) is by using a go block within answer-if-matching and pass the out channel to it, and then <! on the channel answer-if-matching returns. This IMO needlessly complicates the interaction between your business logic and what you want to do with the answer, though.

(go-loop [msg (<! in)]
  ;; `answer-if-matching` uses a go-block and may `>!`
  ;; a value on the `out` chan
  (<! (answer-if-matching out msg))
  (recur (<! in)))

This looks like fewer lines of code inside the go-loop, but now you’ve spread the core.async IO logic across multiple places, which will make this harder to test, debug, etc.

4 Likes

Check out the “pipeline” functions, which cover some exotic cases. But the answer is usually to step back and ponder how you would solve the problem with sequences. Most everyday transformations can be modeled as a combination of filter, map, mapcat, etc. Those same functions produce a transducer if you don’t specify a collection. You can chain (compose) transducers with comp, create a channel with the composed transducer attached, and read the transformed values from the channel.

5 Likes

I guess that this is the point where I don’t have enough experience. I do see the real value of keeping the function as pure as possible. And architectural designs must to strive to adapt to this goal I will work on that point.

I haven’t listed it but I don’t see it match cases where you must dispatch to multiple chans. I also saw the pub-sub design. But eventually, it all boils down to keep core.async at the boundaries, as @lilactown suggested.

Ya I think you can use pipeline for that:

(pipeline
  1
  out
  (comp
    (filter? some-condition)
    (map answer))
  in)

Now when you put things on the in channel the pipeline will automatically put on the out channel the answer for it but only for those things for which some-condition was true.

For example:

(def in (chan))
(def out (chan))

(pipeline
  1
  out
  (comp
    (filter? even?)
    (map #(* 2 %)))
  in)

(doseq [i (range 10)]
  (go (>! in i)))
(close! in)

(<!! (into [] out))
;;> [0 4 8 12 16]
2 Likes

If you aren’t using the parallelism of pipeline, you can accomplish the same thing by applying the transducer function (the stuff in comp) as the xform arg for a channel constructor. Generalizing transducers originally came from this use case (working with channel transforms looked a lot like seqs, lot of duplicated functions).

4 Likes

TIL that you can simply use transducers on channels, so when you put onto the channel the transducer will apply automatically when read.

@joinr do you know if the transducer is run when you put or when you take?

(def answerer
  (chan 1
    (comp
      (filter? even?)
      (map #(* 2 %)))))

(onto-chan answerer (range 10))

(<!! (into [] out))
;;> [0 4 8 12 16]
1 Like

According to this stack overflow post, this is undefined:

2 Likes

And if I make my initial example a little bit more complex with multiple output chans, to do dispatching.
We have a series of conditions to choose the appropriate channel.

I guess that pipeline does not apply because the input chan can only provide the message once. But we can use the pub-sub system [1] to dispatch it more than once.

Applying @lilactown’s suggestion, I can design my go-block to consider the list of output chan, the received message, some additional state if needed, and return the selected target chan and the message to send. Something like:

(go-loop [msg (<! in)]
  (let [[out answer] (process-msg {:topic1 out1 :topic2 out2 ...} msg)]
    (>! out answer)))

[1] Pub Sub · clojure/core.async Wiki · GitHub

I never tested the semantics; and came up with the same stack overflow result reported. I’ve never really thought about this or if it mattered. I imagine if you’re mapping a really expensive function it may matter (e.g. prefer to realized the xform when pulling instead of putting maybe). Looks like it happens on both ends under some performance heuristics that aren’t well documented…

Pub/Sub is when you want the ability to have many subscribers per published topic, which doesn’t seem to be your case. I think you just want to route input to different processes.

So in your case I would just dispatch using a go-loop:

;; Here I'm assuming each of your processes take a
;; msg and return it by adding the result to the 
;; message on a result key. And that the msg contains
;; an :input key which is the input to operate on.
;; {:type :process-a :input 12}
(defn process-a
  [{:keys [input] :as msg}]
  (assoc msg :result (* input input)))

(defn process-b
  [{:keys [input] :as msg}]
  (assoc msg :result (+ 10 input)))

(def in (chan))
(def processor-a (chan 1 (map process-a)))
(def processor-b (chan 1 (map process-b)))
(def out (merge [processor-a processor-b]))

(go-loop [msg (<! in)]
  (case (:type msg)
    :process-a (>! processor-a (:value msg))
    :process-b (>! processor-b (:value msg)))
  (recur (<! in)))

(go-loop [result-msg (<! out)]
  (println result-msg)
  (recur (<! out)))

(go (onto-chan in [{:type : process-a :input 1} {:type :process-b :input 1}]))

So you have a routing go-loop that uses case to route the msg to the appropriate processing channel. Each processing channel is bound to a processing function of msg->msg by their configured transducer. And finally each processed message with their result is returned onto the out channel using merge to combine them all back into one channel.

You could use Pub/Sub for this as well, I just find it might be more complicated for what you’re trying to do then a go-loop with a case in it.

Also as a remark, a much simpler way is to dispatch without concurrency like so:

(defmulti process :type)
(defmethod process :process-a
  [{:keys [input] :as msg}]
  (assoc msg :result (* input input)))

(defmethod process :process-b
  [{:keys [input] :as msg}]
  (assoc msg :result (+ 10 input)))

(def processor
  (chan 1
    (map process)))

(go-loop [result-msg (<! processor)]
  (println result-msg)
  (recur (<! processor)))

(go (onto-chan processor [{:type : process-a :input 1} {:type :process-b :input 1}]))

Which honestly I’m having a bit of a hard time wrapping my head around about how these two ways would differ.

I think the first one is possibly more concurrent… I’m not too sure. But still, I’m tempted to say for good concurrency you’re better off with the second approach and pipeline. Like so:

(defmulti process :type)
(defmethod process :process-a
  [{:keys [input] :as msg}]
  (assoc msg :result (* input input)))

(defmethod process :process-b
  [{:keys [input] :as msg}]
  (assoc msg :result (+ 10 input)))

(def in (chan))
(def out (chan))

(pipeline 4 out (map process) in)

(go-loop [result-msg (<! out)]
  (println result-msg)
  (recur (<! out)))

(go (onto-chan in [{:type : process-a :input 1} {:type :process-b :input 1}]))

And with pipeline you can configure the parallelization level you want.

Anyone else has thought here?

3 Likes

Thanks for the detailed message. That’s a lot of food for thoughts.
I like the different approaches you suggest. It is now time I put all of this in practice.

1 Like