The nested transducer for async channel performance

The problem

I hit a performance snag when I stupidly tried to pipe single values through async channels.
Turns out, this costs too much. I had already made some transducers on channels that did the work. Now I needed to chunk the transfers between the channels. So I made this transducer. In effect this gave me a 7~30x speedup in a simple test case. Just from not being in the async state machine transitions all the time.

So I went from working on single values, to single collections. But keeping the same transducers which had already been made.

The fun fn:

(defn nested-xfrm
  ([inside-f]
   (fn ([xf]
        (fn
          ([] (xf))
          ([result] (xf result))
          ([result input]
           (xf result (eduction inside-f input))))))))

lets rewrite and let each channel do work on a sequence of values instead of a single value.
code example:

(require '[clojure.core.async :as async :refer [>! <! go-loop]])
(let [a (chan 10)
         b (chan 10 (filter even?))
         _ (async/pipe a b)]
     (async/onto-chan! a (range 1000000))
     (loop []
       (when-let [x (async/<!! b)]
         (recur))))

becomes:

(let [a (chan 10)
        b (chan 10 (nested-xfrm (filter even?))) ; <=== simple change
        _ (async/pipe a b)
        data (partition 8 (range (* 8 10)))]
    (go-loop [din data] ; put. not important exactly how this looks.
      (if (> (count (first din)) 0)
        (do
          (>! a (first din))
          (recur (rest din)))
        (async/close! a)))
    (go-loop [] ; get
      (when-let [x (<! b)]
        (println (into [] x)) ; eduction object
        (recur)))
    nil)

outputs

[0 2 4 6]
[8 10 12 14]
[16 18 20 22]
[24 26 28 30]
[32 34 36 38]
[40 42 44 46]
[48 50 52 54]
[56 58 60 62]
[64 66 68 70]
[72 74 76 78]

Speed comparison

Turns out buffer size has significant impact on the speed test, so your mileage may vary.
on my computer, with 10M values
with time: “Elapsed time: 2286.3058 msecs” for the ‘chunked’ version
and
“Elapsed time: 69798.1046 msecs” for the single value version , if you change the single value version to have 1000 long buffers, its 13 seconds.

speed test code:

  (def bigM 10000000)
  (time (let [a (chan 10)
              b (chan 10 (nested-xfrm (filter even?)))
              _ (async/pipe a b)
              data (partition 8192 (range bigM))]
          (go-loop [din data]
            (if (> (count (first din)) 0)
              (do
                (>! a (first din))
                (recur (rest din)))
              (async/close! a)))
          (loop []
            (when-let [x (async/<!! b)]
              (recur)))
          nil))
  (time (let [a (chan 10)
              b (chan 10 (nested-xfrm (filter even?)))
              _ (async/pipe a b)]
          (async/onto-chan! a (range bigM))
          (loop []
            (when-let [x (async/<!! b)]
              (recur)))
          nil))

takeaway: experiment with buffer sizes before trying more difficult stuff.

Hope this is useful to anyone.

1 Like

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.