The problem
I hit a performance snag when I stupidly tried to pipe single values through async channels.
Turns out, this costs too much. I had already made some transducers on channels that did the work. Now I needed to chunk the transfers between the channels. So I made this transducer. In effect this gave me a 7~30x speedup in a simple test case. Just from not being in the async state machine transitions all the time.
So I went from working on single values, to single collections. But keeping the same transducers which had already been made.
The fun fn:
(defn nested-xfrm
([inside-f]
(fn ([xf]
(fn
([] (xf))
([result] (xf result))
([result input]
(xf result (eduction inside-f input))))))))
lets rewrite and let each channel do work on a sequence of values instead of a single value.
code example:
(require '[clojure.core.async :as async :refer [>! <! go-loop]])
(let [a (chan 10)
b (chan 10 (filter even?))
_ (async/pipe a b)]
(async/onto-chan! a (range 1000000))
(loop []
(when-let [x (async/<!! b)]
(recur))))
becomes:
(let [a (chan 10)
b (chan 10 (nested-xfrm (filter even?))) ; <=== simple change
_ (async/pipe a b)
data (partition 8 (range (* 8 10)))]
(go-loop [din data] ; put. not important exactly how this looks.
(if (> (count (first din)) 0)
(do
(>! a (first din))
(recur (rest din)))
(async/close! a)))
(go-loop [] ; get
(when-let [x (<! b)]
(println (into [] x)) ; eduction object
(recur)))
nil)
outputs
[0 2 4 6]
[8 10 12 14]
[16 18 20 22]
[24 26 28 30]
[32 34 36 38]
[40 42 44 46]
[48 50 52 54]
[56 58 60 62]
[64 66 68 70]
[72 74 76 78]
Speed comparison
Turns out buffer size has significant impact on the speed test, so your mileage may vary.
on my computer, with 10M values
with time
: âElapsed time: 2286.3058 msecsâ for the âchunkedâ version
and
âElapsed time: 69798.1046 msecsâ for the single value version , if you change the single value version to have 1000 long buffers, its 13 seconds.
speed test code:
(def bigM 10000000)
(time (let [a (chan 10)
b (chan 10 (nested-xfrm (filter even?)))
_ (async/pipe a b)
data (partition 8192 (range bigM))]
(go-loop [din data]
(if (> (count (first din)) 0)
(do
(>! a (first din))
(recur (rest din)))
(async/close! a)))
(loop []
(when-let [x (async/<!! b)]
(recur)))
nil))
(time (let [a (chan 10)
b (chan 10 (nested-xfrm (filter even?)))
_ (async/pipe a b)]
(async/onto-chan! a (range bigM))
(loop []
(when-let [x (async/<!! b)]
(recur)))
nil))
takeaway: experiment with buffer sizes before trying more difficult stuff.
Hope this is useful to anyone.