X> & x>>: auto-transducifying thread macros (now with parallelizing |>> and =>>)

Parallel => and =>>

Got a new update out last night. Try it out with criterium and net.cgrand/xforms:

clj -Sdeps \
    '{:deps 
      {net.clojars.john/injest {:mvn/version "0.1.0-alpha.12"}
       criterium/criterium {:mvn/version "0.4.6"}
       net.cgrand/xforms {:mvn/version "0.19.2"}}}'

This release comes with parallel versions of x> and x>> which use the equals sign’s two horizontal bars to denote parallelism: => and =>>

The improvements are interesting: Instead of using sequence on the thread, => and =>> leverage core.async's parallel pipeline in order to execute singular or consecutive stateless transducers over a pool of threads equal to (+ 2 your-number-of-cores). Remaining contiguous stateful transducers dealt with in the same manner as in x> and x>>. It doesn’t work well for small data payloads though, so for demonstration purposes let’s augment our previous example threads:

(require '[clojure.edn :as edn])

(defn work-1000 [work-fn]
  (range (last (repeatedly 1000 work-fn))))

(defn ->>work [input]
  (work-1000
   (fn []
     (->> input
          (map inc)
          (filter odd?)
          (mapcat #(do [% (dec %)]))
          (partition-by #(= 0 (mod % 5)))
          (map (partial apply +))
          (map (partial + 10))
          (map #(do {:temp-value %}))
          (map :temp-value)
          (filter even?)
          (apply +)
          str
          (take 3)
          (apply str)
          edn/read-string))))  

(defn x>>work [input]
  (work-1000
   (fn []
     (x>> input
          (map inc)
          (filter odd?)
          (mapcat #(do [% (dec %)]))
          (partition-by #(= 0 (mod % 5)))
          (map (partial apply +))
          (map (partial + 10))
          (map #(do {:temp-value %}))
          (map :temp-value)
          (filter even?)
          (apply +)
          str
          (take 3)
          (apply str)
          edn/read-string))))

Same deal as before but we’re just doing a little extra work in our thread, repeating it a thousand times and then preparing the results for handoff to the next stage of execution.

Now let’s run the classical ->> macro:

(->> (range 100)
     (repeat 10)
     (map ->>work)
     (map ->>work)
     (map ->>work)
     (map ->>work)
     (map ->>work)
     (map ->>work)
     last
     count
     time)
; "Elapsed time: 18309.397391 msecs"
;=> 234

Just over 18 seconds. Now let’s try the x>> version:

(x>> (range 100)
     (repeat 10)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     last
     count
     time)
; "Elapsed time: 6252.224178 msecs"
;=> 234

Just over 6 seconds. Much better. Now let’s try the parallel =>> version:

(=>> (range 100)
     (repeat 10)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     (map x>>work)
     last
     count
     time)
; "Elapsed time: 2862.172838 msecs"
;=> 234

Under 3 seconds. Much, much better!

All those times come from Github’s browser-based vscode. When running in a local vscode instance (or in a bare repl), those above times look more like: 11812.604504, 5096.267348 and 933.940569 msecs - a performance increase of 2 fold for the x>> version, to an increase of 10 fold for the =>> version, when compared to ->>.

In the future I’d like to explore using parallel folder instead of core.async but this works pretty well.

After a few days or weeks - after folks have had a bit to kick the tires - I’ll release a beta version on Clojars and put out a more formal release announcement in a separate set of posts. In the mean time, please give it a whirl and let me know if you find any issues. BTW, there was a bug in the last release that made it impossible to define a thread within a function with bindings - that’s been fixed but sorry if anyone got bit by that; it would have been pretty confusing. Anyway, enjoy!

1 Like