Async muxer

I’m running some processing, in which I want a part of the processing to be multi-threaded. The threads does not have to be ‘real’ threads. I need the data muxed out to the separate ‘threads’. No data can be lost.

I’ve not been able to find out how to do this with google. So I’m asking here. I hope that it is not too much trouble.

The data must come to the threads in the original order, and the data are separated into the threads by tags in the data. I’m using a stateful transducer on each of the receiving channels…

I also want to ignore the nil id’s

Example data, in the real world I will use this with up to 9 ids (channels) and billions/trillions of entries in the source channel:

source channel : [{:id :0 :d 0} {:id :1 :d 1} {:id nil} {:id :0 :d 3} ]

split to

channel 0 : [{:id :0 :d 0} {:id :0 :d 3}]
channel 1 : [{:id :1 :d 1}]

The problems I have in writing this multiplexer are

  1. async/put! in go loop: java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel.
  2. >! in go loop: deadlock…
  3. >! in dotimes [i conc_calls] ... with go loop: data out of order…

What I do to look at the contents of the channels

(let [from (chan 5) ; fast process
        cm {:0 (chan 100) :1 (chan 100)} ; slower process
        ]
    (channel-muxer-X cm :id from) ; setup channel muxer number x
    (go (onto-chan from [{:id :0 :d 0} {:id :1 :d 1} {:id :0 :d 3}]))
    (Thread/sleep 100)
    (map-indexed (fn [i c] (close! c) [i (<!! (async/into [] c))]) (vals cm)))

Code for the three cases:

(defn channel-muxer-1 [chmap discr-fn from]
  (let [vwarned? (volatile! false)]
    (go-loop []
      (let [x (<! from)]
        (if (nil? x)
          (mapv async/close! (vals chmap)) ;; closed
          (let [k (discr-fn x)
                och (if-let [c (chmap k)]
                      c
                      (do
                        (when (not @vwarned?)
                          (println "undefined chan" (chmap k) "from" x)
                          (vreset! vwarned? true))
                        (chan)))
                ]
            (when (async/put! och x) ;;
              (recur)))))))
  (vals chmap))

(defn channel-muxer-2 [chmap discr-fn from] ;; deadlocks.
  (let [vwarned? (volatile! false)]
    (go-loop []
      (let [x (<! from)]
        (if (nil? x)
          (mapv async/close! (vals chmap)) ;; closed
          (let [k (discr-fn x)
                och (if-let [c (chmap k)]
                      c
                      (do
                        (when (not @vwarned?)
                          (println "undefined chan" (chmap k) "from" x)
                          (vreset! vwarned? true))
                        (chan)))
                ]
            (when (>! och x) ;;
              (recur)))))))
  (vals chmap))
;;http://danboykis.com/posts/things-i-wish-i-knew-about-core-async/#edge-two-too-many-puts
(defn channel-muxer-3 [chmap discr-fn from] ;; out of order
  (let [vwarned? (atom false)
        conc-calls 512]
    (dotimes [i conc-calls] 
      (go-loop []
        (let [x (<! from)]
          (if (nil? x)
            (mapv async/close! (vals chmap))
            (let [k (discr-fn x)
                  och (if-let [c (chmap k)]
                        c
                        (do
                          (when (not @vwarned?)
                            (println "undefined chan" (chmap k) "from" x)
                            (reset! vwarned? true))
                          (chan)))
                  ]
              (when (>! och x) ;;
                (recur))))))))
  (vals chmap))


Addendum:
This code really had me stuck for a whole day. The problem was solved once I decided to ask on the forum. :partying_face:

Turns out I found a bug.

The blocking put, in the special case of an undefined channel in the data got a new channel of zero size.
This size should be 1. Then the code seems to work.

(defn channel-muxer-2 [chmap discr-fn from] ;; deadlocks.
  (let [vwarned? (volatile! false)]
    (go-loop []
      (let [x (<! from)]
        (if (nil? x)
          (mapv async/close! (vals chmap)) ;; closed
          (let [k (discr-fn x)
                och (if-let [c (chmap k)] ; this is the code that ignores id's not in the chmap
                      c
                      (do
                        (when (not @vwarned?)
                          (println "undefined chan" (chmap k) "from" x)
                          (vreset! vwarned? true))
                        (chan))) ; <===== SHOULD BE (chan 1), or a dropping buffer.)
                ]
            (when (>! och x) ;;
              (recur)))))))
  (vals chmap))

I’m leaving this up here because.

  1. Maybe someone has a much better solution (clojure/script community is always awesome)
  2. Maybe someone else needs a mux in the future

Have you seen core.async/pub / sub? https://github.com/clojure/core.async/wiki/Pub-Sub It might be a built in version of what you’re trying. There are some differences from your code, and I might be missing something about your problem though

As for your code, instead of defaulting och to a new (chan) when there is no value in the chmap, you could just restructure the code to recur in that scenario. Something like this (untested)

(defn channel-muxer-2 [chmap discr-fn from] ;; deadlocks.
  (let [vwarned? (volatile! false)]
    (go-loop []
      (let [x (<! from)]
        (if (nil? x)
          (mapv async/close! (vals chmap)) ;; closed
          (if-let [och (chmap (discr-fn x))]
            (when (>! och x) ;;
              (recur))
            (do
              (when (not @vwarned?)
                (println "undefined chan" (chmap k) "from" x)
                (vreset! vwarned? true))
              (recur)))))))
  (vals chmap))

Thanks for the tip!

I did look at the core.async pub / sub things, But I was uncomfortable with it, mostly because that I could not find any nontrivial examples using it. Also, I had some issues using pub / sub with onto-chan filling the publishing channel (take this with a grain of salt, maybe just a code bug on my side).
core.async channels are already a niche within clojure, and I am very often dependent on good examples to learn how to use its concepts properly and idiomatically.

Besides, my data is more of a stream than ‘asynchronous events’, and I have transducers in the mix, so I felt like I wanted something more pipeline-esque. What happened then was that I basically used the source code from async/pipe as my inspiration for the multiplexer.

I implemented something similar, you’re welcome to compare, contrast and use it if you’d like. I provided two semantics, dropping and throwing.
What’s the purpose of vwarend in your implementation?

vwarned - it turns off the undefined channel warning message after one warning