🥊 Parallel Transducing Context Fight Night: |>> (pipeline) vs =>> (fold)

:boxing_glove::boxing_glove::boxing_glove: Helloooooo Wonderful Clojure People and Welcome to the Parallel Transducing Context Fight Night! :boxing_glove::boxing_glove::boxing_glove:

Iiiiiin the left corner we have |>>, weighing in at 195 pounds - The parallel pipeline transducing context!

In the right cornerrrr is =>>, weighing in at only 120 pounds - The parallel fold transducing context!

crowd cheers :clap: :clap: :clap: :clap: :clap: :clap:

How did they fair? Checkout the shootout.md doc for the juicy play by play:

But while we’re here, let’s jump into a quick recap of the results.

Round 1

In the average workload case, they are almost identical:

Note: In all the images below, lower is better

On 4 cores:

Screen Shot 3

On 16 cores:

Screen Shot 4

Okay, so the first round is a draw. Let’s look at another scenario.

Round 2

For the extremely high workloads, on high core count machines, |>> pulls ahead:

On 4 cores:

Screen Shot 9

On 16 cores:

Screen Shot 10

In the low core count ring, =>> is a slight winner, but in the high core count ring, |>> is the undisputed champion of heavyweight workloads.

Round 3

However, in heterogeneous, low workload scenarios, |>> falls on its face:

On 4 cores:

todo

On 16 cores:

Screen Shot 12

Third round technical knockout, with =>> as the winner.

Post-Fight Analysis

Wow, what a match-up, right? I feel like |>> had a really good showing tonight, but the footwork on =>> eventually put |>> on the ropes.

When you want to put |>> or =>> in the ring, here are some key things to think about:

If a given thread is only just starting to seem like it could benefit from parallelization, then it’s a good chance that |>> will be a footgun for you, while =>> may pay dividends - so in general I recommend reaching for =>> first. However, once your threads’ workloads become embarrasingly parallel, then it makes sense to try out |>> to see if it can get you even farther - especially with more available cores.

3 Likes

You should have single threaded transducers as well in this fight, as a baseline.

1 Like

Please label your axes :slight_smile: I have no idea what the independent and dependent variables are without added context. Interested to see which workloads actually get beyond the 3-4x speedup boundary that I keep encountering.

2 Likes

I think you’re right, @didibus, adding in the reigning champions x>> and ->> is bound to be a crowd pleaser!

Round 1

:bellhop_bell: :bellhop_bell: :bellhop_bell:

On 4 cores:

Screen Shot 13

On 16 cores:

Screen Shot 14

@joinr Y axis is seconds and X axis is number of steps. Usually 10, unless they don’t finish near the time the winners finish. In the above match, ->> only shows 4 steps. The 5th step takes over 3 minutes.

As we can see, directly comparing the parallelizing functions against the non parallel versions tends to wash out the differences between the parallelizing ones, making it more difficult to isolate the parallel versions’ pros and cons relative to each other.

Note that the ->> version is fully lazy, such that even the work functions it’s iterating over are lazy. In the real world, you’re likely to end up doing things that force more eagerness into your application, thus not incurring such a pure laziness cost.

Well that was a blood bath of a round, @didibus. We’ll have to do some micro benchmark rounds to see where the break even point is between when work is heavy enough to justify transitioning a thread to parallelization.

In the meantime, stay tuned for replays of round 2 and 3 from above, with x>> and ->> thrown into the mix!
:boxing_glove: :boxing_glove: :boxing_glove:

1 Like

:boxing_glove: Fight Night: Round 2! :boxing_glove:

:bellhop_bell: :bellhop_bell: :bellhop_bell:
On 4 cores:

Screen Shot 15

On 16 cores:

Screen Shot 16

Aha, we’ve made a discovery!

=>> is breaking for very small sequences (here 10) and reverting to single threaded transducer performance - but only when it is shorter than the number of cores. In this case, the 16 core count is greater than the number of items available in the sequence. This issue may be pathological to our partitioning strategy - I’ll see if I can tweak it to get better parallelism from fold on super short sequences.

But with |>> fairing 13 times faster than the fully lazy ->> version, I think it’s fair to say that those dividends will only keep growing larger as the workload rises.

I think =>> is asking for a round 2 rematch against |>> after a later release. In the mean time, stay tuned for round 3 where we see if the parallel contenders crumble against our single threaded incumbents in a battle for the light-work, heavy-sequence title of the wooooorld! After these commercials…

1 Like

:boxing_glove: Fight Night: Round 3! :boxing_glove:

Now let’s see the case of an extremely large sequence with heterogeneous data and work:
:bellhop_bell: :bellhop_bell: :bellhop_bell:
On 4 cores:

Screen Shot 17

On 16 cores:

Screen Shot 18

If you look close enough, you can see that x>> beat out =>>. Here we can see that, with this kind of workload, the best we can do is try to keep up with the single threaded transducer version.

And here we can also see that the bad behavior from |>> on thin, heterogeneous workloads is present on both the 4 core and 16 core platforms.

Well, that wraps up our 3 round fight between all 4 contenders. I think it is fair to say that =>> is the most robust for most general workloads. Future work may help ameliorate some of these edge behaviors for |>>. The takeaway from this round: for small workloads and small sequences, it’s best to keep threads as x>> or +>>. If stuff gets heavy, first reach for =>>, then |>>.

Next up, I’ll be working on a set of rounds focused on where the cost-benefit analysis between single and parallel threaded contexts puts our threshold of work at. Is it a value of l as 100 and w as 100? We’ll find out next time in the next episode of Fight Night! Parallel Transducer Smackdown! :boxing_glove: :boxing_glove: :boxing_glove:

If you have any requests of stuff you’d like benchmarked, to test the limits of these threads, let me know and I’ll put a few rounds together!

1 Like

stray thought, do any of your bound cases involve keywords?

old example
discussed on reddit and zulip.

The math work test should be allocationless (or extremely limited).

I haven’t really found workloads (although people have claimed apocryphally to have seen near linear scaling in private applications…) that scale beyond 3x (even unordered stuff using work stealing). If you have a reproducible example that scales on commodity hardware beyond 3x (4x seems to be a weird barrier in my experience), I would be interested.

Yeah, I’m seeing similar diminishing returns too. Though with fold a little better.

With this test code:

(defn flip [n]
  (apply comp (take n (cycle [inc dec]))))

(dotimes [n 30]
  (let [w 10000]
    (println
     (time-val
      (x>> (range)
           (take (* 100 n))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (apply +))))))

On 2 cores (4 hyper threads):


(in the subtitle ^ that should say “10,000” int ops)

On 8 cores (16 hyper threads):


(in the subtitle ^ that should say “10,000” int ops)

On the 2 core machine, I can double my speed. On the 8 core I get 5 times the speed. So JVM thread efficiency scales at roughly 1.6 the number of cores, when you get to around 8. I’d imagine that number keeps going down as you add more and more cores.

Would be interesting to see on actual 16 core, 32 core and 64 core machines.

I’ve done these kinds of workloads on an AWS 72 core machine, down to 2 core, etc. Even on the higher core count setup, I top out at around 14x improvement (for workloads with allocation, e.g. typical FP stuff). We have batted around some hypotheses and things to look at, but my current working theory is that there is some implicit synchronization with the GC that limits throughput even on embarassingly parallel workloads and allows Ahmdal’s law to take hold; or elsewhere in the computational hierarchy. In practice, I have never seen 4x scaling on commodity hardware with 8-16 cores, but on the order of 3+. You get more performance as you throw more cores at it, but it seems like 4x is a weird asymptote. It kind of flies in the face of the “hopes” that multicore can outpace singlecore and the implicit justifications for some of the inherent inefficiencies we live with (e.g. allocation, boxing, copying) since we “should” be in a place to just scale the work much much better. This has not panned out in practice for me, although we are definitely able to leverage existing resources without changing much.

I should add that horizontal scaling still helps, and the FP style + immutability is well positioned to take advantage of that.

So at 2 cores, you barely lose any to overhead. At 8 cores you loose 40%. At 74 cores you lose 80%. Yeah, I guess somewhere passed 100 cores it becomes pointless. If Ahmdal’s law allows for better performance than that at 100 cores, and those kinds of CPUs were to become mainstream, I’m sure the JVM will optimize for those scenarios eventually. If it’s just the speed limit of the universe then yeah, kinda pointless to go passed 100 cores.

Sure. Or if you do, you really want several different jvms running on different heaps on different physical cores. I am baffled as to why this happens for now though. There’s always the caveat that “your mileage may vary” and perhaps someone’s workload scales substantially better if not near-linearly. If someone has a case like that, which is reproducible, on the JVM, I would love to see it and learn from it.

My guess is that it has something to do with caches on the CPU cores being the bottleneck. L3 and L4 (the shared caches) are too slow, so you want to keep your workloads local to a core and maximize cache hits in the L1 and L2 caches. And I think this is where the JVM eventually, it can’t fully make it so that’s the case, it’ll start to rely on L3 and L4 cache at some point, or just miss all caches and need to go to main memory.

Looks like if I bring the sequence down to 1 through 100 and push the int ops up to 10 million, it lowers the thread communication and I can push a 6 times speedup. So only a 25% loss on 8 cores for threads with barely any communication and mostly work.

(def flip-10-mil
  (apply comp (take 10000 (cycle [(flip 1000)]))))

(time (flip-10-mil 2)) ;=> 2
"Elapsed time: 168.611595 msecs"

(dotimes [n 100]
  (println
   (time-val
    (x>> (range)
         (take (* 10 n))
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (apply +)))))

But yeah, that’s not real world data. I’m working on a set of benchmarks that exercise more real-world, heterogeneous thread-last workloads.

Nice example. Maybe the scaling case I had wasn’t big enough to keep stuff thread local. We are investigating more cases like this with renewed vigor. Also, thanks foe the axis labels. The graph is easy to interpret on its own.

That’s awesome to hear. Please do share any graphs / benchmarks of your findings.

Honestly, I’m pretty surprised at the performance of =>>. I was not expecting it to be so forgiving on small sequences and workloads. In my testing, it only takes a small sequence of a few dozen items, with a work plan of a few dozen int ops spread over 2 or 3 sequence transformations, and boom, your either doing as well as single threaded or better. In other words, =>> is extremely safe to use optimistically.

Also, where I thought =>> was getting beat out by |>> - those were almost all edge cases that we wouldn’t care about. In the vast majority of scenarios, =>> (fold) just beats the pants off of |>> (pipeline).

Another surprising take-away: Lazy seqs really ain’t that bad in a lot of average scenarios. What are those?

  1. If your work plan is so embarrassingly small/single threaded - any number of sequences transformations, but where they’re all a single int op - then x>> actually doesn’t do any better than ->>/+>>`

  2. If your work plan is large enough - any number of sequence transformations where =>> would start to show significant benefits - then the time spent waiting on laziness disappears when measured next to the amount of work being done between the boxing and unboxing steps

In other words, there is a goldilocks zone between extremely small workloads and fairly large workloads where x>> will be better than ->>/+>>. It turns out that a lot of the work we do in Clojure actually falls into that goldilocks zone - doing a small to medium sized set of transformations over a few dozen items. So x>> will probably be more performant than ->>/+>> for most thread-last situations you’ll see in the wild, but the lazy versions are no slouches when it comes to heavier work, which I thought was counterintuitive at first.

I’ve got pages and pages of charts at this point. Just gotta dial them in to a coherent story that can lay out these lessons learned, at this point, and clean them up with sensible labels and whatnot. Honestly, I’m no expert on this data visualization stuff and I could definitely use some help bench-marking these formalisms.

Now what you need is an auto-tune feature, maybe a ?>> macro that runs the thread with various amount of test data and then outputs a recommendation to use: x>>, |>> or =>>.

Hum… or I wonder if that could be done almost like a JIT behavior. So if I used ?>> it would time the execution as the code run, and possibly rewrite itself up/down as x>>, ->> or =>>, etc.

Also, as an aside, I’m wondering why |>> isn’t showing as good results as =>>

Pipeline will take each element one by one and send them to be executed against the transducer pipeline on a thread from a CachedPool with n at a time, and then as soon as one of them finishes it will send another one.

This is better than batch, because in a batch, you’d take the collection say of 100 and partition it in say 8 group for 10/8 batches, and have 8 thread (assuming 8 core) each go through each batch, and when all done, collect those back. The issue with batch partitioning is what of one batch takes longer, you’ll have other threads technically that could start helping with that batch. But in the pipeline case, I thought this would not happen, elements are in a channel, and the threads grab the next available element on it process it through the transducer and when done pick the next one. So the threads should also be saturated.

Fold uses the ForkJoinPool, and that basically addresses the batch problem too, where things are batched, but if a thread is out of stuff to do it’ll grab from other thread’s batches. That should have the same behavior as core.async pipeline I thought, in that threads also get saturated.

But also your data is weird, like heterogenous workloads do much better on ForkJoin? Weird. And large core count and workloads do better in pipeline ? Ok that I think it must be because Fold uses the Common Pool for ForkJoin and maybe that one has a too small number of threads?

1 Like

So I should clarify what I mean by “heterogeneous”: A series of sequence transformations that each have different kinds of work loads. Like:

(=>> (range 100000)
     (map inc)
     (filter odd?)
     ...
     (map super-heavy-function)
     ...
     (filter even?)
     ...
     (map another-super-heavy-function)
     ...
     (apply +))

Big ops, small ops, multiple single-threaded sequencings, multiple multi-thread folding, etc.

And I’m concerned with four different types of complexity here:

  1. Sequence length - (range 100000)
  2. Sequence item size - is it a sequence of ints, nested arrays, massive maps?
  3. Single transformer difficulty - (map super-heavy-function)
  4. Number of transformers, of a few different types, in a thread

My testing has been trying to measure the cost of each of those independently of one another. When I say heterogeneous workloads, I mean any combination of 2, 3 and 4 are potentially non-uniform. And we can measure heterogeneity along each of those axis as well.

Good point about auto-tune, @didibus I was thinking about hot-swapping out fold for pipeline, under the hood, for small sequences. Turns out a small fix to the fold =>> impl makes it catch up to |>> in most cases where it lagged behind.

Really, I kinda wonder if some kind of jit/profiling is going on inside Fork/Join, where if a workload is embarrassingly single-threaded, it smartly keeps most of the process local to a particular thread.

And large core count and workloads do better in pipeline ?

That’s no longer obvious, after my fix with 0.1.0-alpha.14. I had originally found this scenario where |>> appeared to perform 2 to 3 times better than =>>:

We’re only measuring sequences of 0 through 3 length here. But if we zoom out, we can see that the problem disappears for =>> with longer sequence lengths:

(dotimes [n 20]
  (let [m (mk-m (* 10000 n))]
    (println
     (qb-val ; <- using quickbench in some of these tests
      (=>> (cycle [{}])
           (take (* 5 n))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           last)))))

On an 8 core:

To dial in this behavior on the 2 core machine, I needed to tweak the code a bit:

(dotimes [n 20]
  (let [m (mk-m (* 10000 n))]
    (println
     (qb-val
      (|>> (cycle [{}])
           (take n)
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           last)))))

On 2 core:

As you can see, at sequences six items in length, |>> can be over 3 times faster than =>>. After the fix in 0.1.0-alpha.14, this anomaly is fixed:


And on 2 core:

So now it’s pretty hard to find scenarios where |>> does significantly better than =>>.

So, in what situations does =>> do significantly better than |>>? Many, especially with long sequences. Let’s look at a very basic scenario:

(dotimes [n 100]
  (println
   (time-val
    (|>> (range)
         (take (* 1000 n))
         (map (flip 2))
         (map (flip 2))
         (apply +)))))


As we can see there, |>> is horrible at small work over long sequences. Amazingly, =>> is already beating out the lazy version. It’s almost as if =>> has no (or very little) thread overhead on these semi-light-weight examples.

If we increase the work over time, however, we can see |>> start to catch up:

(dotimes [n 100]
  (println
   (time-val
    (=>> (range)
         (take (* 1000 n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (apply +)))))


Bump it up a little more and we can see the break even point for |>> against the non-parallelized variants:

(dotimes [n 100]
  (println
   (time-val
    (->> (range)
         (take (* 1000 n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (apply +)))))

At around 600,000 seq length, over 60 int ops, over 5 mapping transforms, |>> starts to pull ahead of the single-threaded versions.

Let’s crank the work way up and see what happens:

(dotimes [n 100]
  (let [w (* n 100)]
    (println
     (time-val
      (x>> (range)
           (take (* 100 n))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (apply +))))))

And right there we can see that once our transformers are sufficiently saturated with work, the differences between =>> and |>> disappear and, notably, the differences between x>> and ->>/+>> also disappear.

Does that help explain some of the differences between these different thread macros?

Still exploring, but if I can’t find workloads where |>> pays significant dividends over =>>, I may end up removing |>> altogether, just to remove the footgun.

In the above analysis, I would consider all of these homogeneous or uniform workloads. Next up, I’m looking into heterogeneous, non-uniform workloads. There’s lots more variation there, where parallel constructs can be feeding into transducing/sequencing constructs, feeding into lazy constructs, back into parallel constructs, etc.

@didibus your point still stands about auto-tuning - there could be some *>> thing that monitors timing of the sequence calls in x>> and if they rise above a given threshold (say, 1 ms), relative to the number of cores on a machine, would automatically switch to =>> under the hood. That behavior would be a bit less predictable than either x>> or =>> alone, but it might be a better default for prototyping. PRs welcome!

1 Like

Very interesting deep dive, it really just opened up my curiosity about why fold over the fork/join pool is showing much better results than the pipeline implementation.

I know that Fork/Join has a work stealing algorithm, so maybe it is smart in the way it steals work to minimize overhead, where as pipeline steal things at random.

It might also have something to do with the implementation of r/fold.

Fork/Join normally says to implement parallelization like so:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

The first piece is that it checks for how big the work is, and if it’s not very big it won’t fork, that minimizes overhead. The second bit is that it can actually fork operations, not just elements.

For example, it could fork a single mapping operation over a really big sequence into a few smaller mapping operations.

I don’t know how fold is implemented here though. I might try to have a look.

Pipeline on the other hand will simply parallelize on elements and it will do so consistently, as it just queues up each element in a channel and has threads pull the next element and process them.

1 Like

Ran the same numbers on a 32 core box on Google cloud:

14 times speed up, representing a 40% core efficiency, vs a 75% efficiency on 8 core (6 times speed up).

Then I ran them on a 72 core box on AWS:

28 times speedup, still representing a 40% core efficiency :thinking:

Have we reached a ceiling of 60% thread overhead that we can depend on? Or maybe one platform is doing HT and my core numbers are off?

Trying to get access to higher core count servers now.

If there really is a 60% thread overhead ceiling, then we should see a 51 times speed up on a 128 core machine.