I’m running some processing, in which I want a part of the processing to be multi-threaded. The threads does not have to be ‘real’ threads. I need the data muxed out to the separate ‘threads’. No data can be lost.
I’ve not been able to find out how to do this with google. So I’m asking here. I hope that it is not too much trouble.
The data must come to the threads in the original order, and the data are separated into the threads by tags in the data. I’m using a stateful transducer on each of the receiving channels…
I also want to ignore the nil
id’s
Example data, in the real world I will use this with up to 9 ids (channels) and billions/trillions of entries in the source channel:
source channel : [{:id :0 :d 0} {:id :1 :d 1} {:id nil} {:id :0 :d 3} ]
split to
channel 0 : [{:id :0 :d 0} {:id :0 :d 3}]
channel 1 : [{:id :1 :d 1}]
The problems I have in writing this multiplexer are
-
async/put!
ingo
loop:java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel.
-
>!
ingo
loop: deadlock… -
>!
indotimes [i conc_calls] ...
withgo
loop: data out of order…
What I do to look at the contents of the channels
(let [from (chan 5) ; fast process
cm {:0 (chan 100) :1 (chan 100)} ; slower process
]
(channel-muxer-X cm :id from) ; setup channel muxer number x
(go (onto-chan from [{:id :0 :d 0} {:id :1 :d 1} {:id :0 :d 3}]))
(Thread/sleep 100)
(map-indexed (fn [i c] (close! c) [i (<!! (async/into [] c))]) (vals cm)))
Code for the three cases:
(defn channel-muxer-1 [chmap discr-fn from]
(let [vwarned? (volatile! false)]
(go-loop []
(let [x (<! from)]
(if (nil? x)
(mapv async/close! (vals chmap)) ;; closed
(let [k (discr-fn x)
och (if-let [c (chmap k)]
c
(do
(when (not @vwarned?)
(println "undefined chan" (chmap k) "from" x)
(vreset! vwarned? true))
(chan)))
]
(when (async/put! och x) ;;
(recur)))))))
(vals chmap))
(defn channel-muxer-2 [chmap discr-fn from] ;; deadlocks.
(let [vwarned? (volatile! false)]
(go-loop []
(let [x (<! from)]
(if (nil? x)
(mapv async/close! (vals chmap)) ;; closed
(let [k (discr-fn x)
och (if-let [c (chmap k)]
c
(do
(when (not @vwarned?)
(println "undefined chan" (chmap k) "from" x)
(vreset! vwarned? true))
(chan)))
]
(when (>! och x) ;;
(recur)))))))
(vals chmap))
;;http://danboykis.com/posts/things-i-wish-i-knew-about-core-async/#edge-two-too-many-puts
(defn channel-muxer-3 [chmap discr-fn from] ;; out of order
(let [vwarned? (atom false)
conc-calls 512]
(dotimes [i conc-calls]
(go-loop []
(let [x (<! from)]
(if (nil? x)
(mapv async/close! (vals chmap))
(let [k (discr-fn x)
och (if-let [c (chmap k)]
c
(do
(when (not @vwarned?)
(println "undefined chan" (chmap k) "from" x)
(reset! vwarned? true))
(chan)))
]
(when (>! och x) ;;
(recur))))))))
(vals chmap))
Addendum:
This code really had me stuck for a whole day. The problem was solved once I decided to ask on the forum.