šŸ„Š Parallel Transducing Context Fight Night: |>> (pipeline) vs =>> (fold)

stray thought, do any of your bound cases involve keywords?

old example
discussed on reddit and zulip.

The math work test should be allocationless (or extremely limited).

I havenā€™t really found workloads (although people have claimed apocryphally to have seen near linear scaling in private applicationsā€¦) that scale beyond 3x (even unordered stuff using work stealing). If you have a reproducible example that scales on commodity hardware beyond 3x (4x seems to be a weird barrier in my experience), I would be interested.

Yeah, Iā€™m seeing similar diminishing returns too. Though with fold a little better.

With this test code:

(defn flip [n]
  (apply comp (take n (cycle [inc dec]))))

(dotimes [n 30]
  (let [w 10000]
    (println
     (time-val
      (x>> (range)
           (take (* 100 n))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (apply +))))))

On 2 cores (4 hyper threads):


(in the subtitle ^ that should say ā€œ10,000ā€ int ops)

On 8 cores (16 hyper threads):


(in the subtitle ^ that should say ā€œ10,000ā€ int ops)

On the 2 core machine, I can double my speed. On the 8 core I get 5 times the speed. So JVM thread efficiency scales at roughly 1.6 the number of cores, when you get to around 8. Iā€™d imagine that number keeps going down as you add more and more cores.

Would be interesting to see on actual 16 core, 32 core and 64 core machines.

Iā€™ve done these kinds of workloads on an AWS 72 core machine, down to 2 core, etc. Even on the higher core count setup, I top out at around 14x improvement (for workloads with allocation, e.g. typical FP stuff). We have batted around some hypotheses and things to look at, but my current working theory is that there is some implicit synchronization with the GC that limits throughput even on embarassingly parallel workloads and allows Ahmdalā€™s law to take hold; or elsewhere in the computational hierarchy. In practice, I have never seen 4x scaling on commodity hardware with 8-16 cores, but on the order of 3+. You get more performance as you throw more cores at it, but it seems like 4x is a weird asymptote. It kind of flies in the face of the ā€œhopesā€ that multicore can outpace singlecore and the implicit justifications for some of the inherent inefficiencies we live with (e.g. allocation, boxing, copying) since we ā€œshouldā€ be in a place to just scale the work much much better. This has not panned out in practice for me, although we are definitely able to leverage existing resources without changing much.

I should add that horizontal scaling still helps, and the FP style + immutability is well positioned to take advantage of that.

So at 2 cores, you barely lose any to overhead. At 8 cores you loose 40%. At 74 cores you lose 80%. Yeah, I guess somewhere passed 100 cores it becomes pointless. If Ahmdalā€™s law allows for better performance than that at 100 cores, and those kinds of CPUs were to become mainstream, Iā€™m sure the JVM will optimize for those scenarios eventually. If itā€™s just the speed limit of the universe then yeah, kinda pointless to go passed 100 cores.

Sure. Or if you do, you really want several different jvms running on different heaps on different physical cores. I am baffled as to why this happens for now though. Thereā€™s always the caveat that ā€œyour mileage may varyā€ and perhaps someoneā€™s workload scales substantially better if not near-linearly. If someone has a case like that, which is reproducible, on the JVM, I would love to see it and learn from it.

My guess is that it has something to do with caches on the CPU cores being the bottleneck. L3 and L4 (the shared caches) are too slow, so you want to keep your workloads local to a core and maximize cache hits in the L1 and L2 caches. And I think this is where the JVM eventually, it canā€™t fully make it so thatā€™s the case, itā€™ll start to rely on L3 and L4 cache at some point, or just miss all caches and need to go to main memory.

Looks like if I bring the sequence down to 1 through 100 and push the int ops up to 10 million, it lowers the thread communication and I can push a 6 times speedup. So only a 25% loss on 8 cores for threads with barely any communication and mostly work.

(def flip-10-mil
  (apply comp (take 10000 (cycle [(flip 1000)]))))

(time (flip-10-mil 2)) ;=> 2
"Elapsed time: 168.611595 msecs"

(dotimes [n 100]
  (println
   (time-val
    (x>> (range)
         (take (* 10 n))
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (map flip-10-mil)
         (apply +)))))

But yeah, thatā€™s not real world data. Iā€™m working on a set of benchmarks that exercise more real-world, heterogeneous thread-last workloads.

Nice example. Maybe the scaling case I had wasnā€™t big enough to keep stuff thread local. We are investigating more cases like this with renewed vigor. Also, thanks foe the axis labels. The graph is easy to interpret on its own.

Thatā€™s awesome to hear. Please do share any graphs / benchmarks of your findings.

Honestly, Iā€™m pretty surprised at the performance of =>>. I was not expecting it to be so forgiving on small sequences and workloads. In my testing, it only takes a small sequence of a few dozen items, with a work plan of a few dozen int ops spread over 2 or 3 sequence transformations, and boom, your either doing as well as single threaded or better. In other words, =>> is extremely safe to use optimistically.

Also, where I thought =>> was getting beat out by |>> - those were almost all edge cases that we wouldnā€™t care about. In the vast majority of scenarios, =>> (fold) just beats the pants off of |>> (pipeline).

Another surprising take-away: Lazy seqs really ainā€™t that bad in a lot of average scenarios. What are those?

  1. If your work plan is so embarrassingly small/single threaded - any number of sequences transformations, but where theyā€™re all a single int op - then x>> actually doesnā€™t do any better than ->>/+>>`

  2. If your work plan is large enough - any number of sequence transformations where =>> would start to show significant benefits - then the time spent waiting on laziness disappears when measured next to the amount of work being done between the boxing and unboxing steps

In other words, there is a goldilocks zone between extremely small workloads and fairly large workloads where x>> will be better than ->>/+>>. It turns out that a lot of the work we do in Clojure actually falls into that goldilocks zone - doing a small to medium sized set of transformations over a few dozen items. So x>> will probably be more performant than ->>/+>> for most thread-last situations youā€™ll see in the wild, but the lazy versions are no slouches when it comes to heavier work, which I thought was counterintuitive at first.

Iā€™ve got pages and pages of charts at this point. Just gotta dial them in to a coherent story that can lay out these lessons learned, at this point, and clean them up with sensible labels and whatnot. Honestly, Iā€™m no expert on this data visualization stuff and I could definitely use some help bench-marking these formalisms.

Now what you need is an auto-tune feature, maybe a ?>> macro that runs the thread with various amount of test data and then outputs a recommendation to use: x>>, |>> or =>>.

Humā€¦ or I wonder if that could be done almost like a JIT behavior. So if I used ?>> it would time the execution as the code run, and possibly rewrite itself up/down as x>>, ->> or =>>, etc.

Also, as an aside, Iā€™m wondering why |>> isnā€™t showing as good results as =>>ā€¦

Pipeline will take each element one by one and send them to be executed against the transducer pipeline on a thread from a CachedPool with n at a time, and then as soon as one of them finishes it will send another one.

This is better than batch, because in a batch, youā€™d take the collection say of 100 and partition it in say 8 group for 10/8 batches, and have 8 thread (assuming 8 core) each go through each batch, and when all done, collect those back. The issue with batch partitioning is what of one batch takes longer, youā€™ll have other threads technically that could start helping with that batch. But in the pipeline case, I thought this would not happen, elements are in a channel, and the threads grab the next available element on it process it through the transducer and when done pick the next one. So the threads should also be saturated.

Fold uses the ForkJoinPool, and that basically addresses the batch problem too, where things are batched, but if a thread is out of stuff to do itā€™ll grab from other threadā€™s batches. That should have the same behavior as core.async pipeline I thought, in that threads also get saturated.

But also your data is weird, like heterogenous workloads do much better on ForkJoin? Weird. And large core count and workloads do better in pipeline ? Ok that I think it must be because Fold uses the Common Pool for ForkJoin and maybe that one has a too small number of threads?

1 Like

So I should clarify what I mean by ā€œheterogeneousā€: A series of sequence transformations that each have different kinds of work loads. Like:

(=>> (range 100000)
     (map inc)
     (filter odd?)
     ...
     (map super-heavy-function)
     ...
     (filter even?)
     ...
     (map another-super-heavy-function)
     ...
     (apply +))

Big ops, small ops, multiple single-threaded sequencings, multiple multi-thread folding, etc.

And Iā€™m concerned with four different types of complexity here:

  1. Sequence length - (range 100000)
  2. Sequence item size - is it a sequence of ints, nested arrays, massive maps?
  3. Single transformer difficulty - (map super-heavy-function)
  4. Number of transformers, of a few different types, in a thread

My testing has been trying to measure the cost of each of those independently of one another. When I say heterogeneous workloads, I mean any combination of 2, 3 and 4 are potentially non-uniform. And we can measure heterogeneity along each of those axis as well.

Good point about auto-tune, @didibus I was thinking about hot-swapping out fold for pipeline, under the hood, for small sequences. Turns out a small fix to the fold =>> impl makes it catch up to |>> in most cases where it lagged behind.

Really, I kinda wonder if some kind of jit/profiling is going on inside Fork/Join, where if a workload is embarrassingly single-threaded, it smartly keeps most of the process local to a particular thread.

And large core count and workloads do better in pipeline ?

Thatā€™s no longer obvious, after my fix with 0.1.0-alpha.14. I had originally found this scenario where |>> appeared to perform 2 to 3 times better than =>>:

Weā€™re only measuring sequences of 0 through 3 length here. But if we zoom out, we can see that the problem disappears for =>> with longer sequence lengths:

(dotimes [n 20]
  (let [m (mk-m (* 10000 n))]
    (println
     (qb-val ; <- using quickbench in some of these tests
      (=>> (cycle [{}])
           (take (* 5 n))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           last)))))

On an 8 core:

To dial in this behavior on the 2 core machine, I needed to tweak the code a bit:

(dotimes [n 20]
  (let [m (mk-m (* 10000 n))]
    (println
     (qb-val
      (|>> (cycle [{}])
           (take n)
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           (map #(merge % m))
           last)))))

On 2 core:

As you can see, at sequences six items in length, |>> can be over 3 times faster than =>>. After the fix in 0.1.0-alpha.14, this anomaly is fixed:


And on 2 core:

So now itā€™s pretty hard to find scenarios where |>> does significantly better than =>>.

So, in what situations does =>> do significantly better than |>>? Many, especially with long sequences. Letā€™s look at a very basic scenario:

(dotimes [n 100]
  (println
   (time-val
    (|>> (range)
         (take (* 1000 n))
         (map (flip 2))
         (map (flip 2))
         (apply +)))))


As we can see there, |>> is horrible at small work over long sequences. Amazingly, =>> is already beating out the lazy version. Itā€™s almost as if =>> has no (or very little) thread overhead on these semi-light-weight examples.

If we increase the work over time, however, we can see |>> start to catch up:

(dotimes [n 100]
  (println
   (time-val
    (=>> (range)
         (take (* 1000 n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (apply +)))))


Bump it up a little more and we can see the break even point for |>> against the non-parallelized variants:

(dotimes [n 100]
  (println
   (time-val
    (->> (range)
         (take (* 1000 n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (map (flip n))
         (apply +)))))

At around 600,000 seq length, over 60 int ops, over 5 mapping transforms, |>> starts to pull ahead of the single-threaded versions.

Letā€™s crank the work way up and see what happens:

(dotimes [n 100]
  (let [w (* n 100)]
    (println
     (time-val
      (x>> (range)
           (take (* 100 n))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (map (flip w))
           (apply +))))))

And right there we can see that once our transformers are sufficiently saturated with work, the differences between =>> and |>> disappear and, notably, the differences between x>> and ->>/+>> also disappear.

Does that help explain some of the differences between these different thread macros?

Still exploring, but if I canā€™t find workloads where |>> pays significant dividends over =>>, I may end up removing |>> altogether, just to remove the footgun.

In the above analysis, I would consider all of these homogeneous or uniform workloads. Next up, Iā€™m looking into heterogeneous, non-uniform workloads. Thereā€™s lots more variation there, where parallel constructs can be feeding into transducing/sequencing constructs, feeding into lazy constructs, back into parallel constructs, etc.

@didibus your point still stands about auto-tuning - there could be some *>> thing that monitors timing of the sequence calls in x>> and if they rise above a given threshold (say, 1 ms), relative to the number of cores on a machine, would automatically switch to =>> under the hood. That behavior would be a bit less predictable than either x>> or =>> alone, but it might be a better default for prototyping. PRs welcome!

1 Like

Very interesting deep dive, it really just opened up my curiosity about why fold over the fork/join pool is showing much better results than the pipeline implementation.

I know that Fork/Join has a work stealing algorithm, so maybe it is smart in the way it steals work to minimize overhead, where as pipeline steal things at random.

It might also have something to do with the implementation of r/fold.

Fork/Join normally says to implement parallelization like so:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

The first piece is that it checks for how big the work is, and if itā€™s not very big it wonā€™t fork, that minimizes overhead. The second bit is that it can actually fork operations, not just elements.

For example, it could fork a single mapping operation over a really big sequence into a few smaller mapping operations.

I donā€™t know how fold is implemented here though. I might try to have a look.

Pipeline on the other hand will simply parallelize on elements and it will do so consistently, as it just queues up each element in a channel and has threads pull the next element and process them.

1 Like

Ran the same numbers on a 32 core box on Google cloud:

14 times speed up, representing a 40% core efficiency, vs a 75% efficiency on 8 core (6 times speed up).

Then I ran them on a 72 core box on AWS:

28 times speedup, still representing a 40% core efficiency :thinking:

Have we reached a ceiling of 60% thread overhead that we can depend on? Or maybe one platform is doing HT and my core numbers are off?

Trying to get access to higher core count servers now.

If there really is a 60% thread overhead ceiling, then we should see a 51 times speed up on a 128 core machine.

Just tried it on a 64 core Linode box. Basically the same graph. 26 times speed up, at exactly a 40% core efficiency ratio.

Interesting results. This is looking more familiar. Curious if the ~60% overhead holds in general, or is peculiar to this workload. There are also a bevy of jvm options to possibly explore as well.

The HT/physical core count is also something to consider (they are somewhat opaque to the JVM, and have to be derived from the OS I think).

1 Like

This is actually something I know about.
While HT fakes being two processors, it isnā€™t really two processors. What it gives you is the option to parallelize some operations with no data dependency which donā€™t use the same modules. So you can probably dispatch two floating point operations in parallel, or integer operation while other things are going on.
The interesting part is here:

Look at the execution engine. Operations can be dispatched in parallel when they can be assigned to different ports.
After that, you should use a tool like JITWatch to understand the generated assembly, youā€™re at a level where you want to understand whatā€™s going on.
The way the results look to me, there isnā€™t a 60% overhead, this code is just only able to utilize one thread of the HT model, which some have called bunk in the past, and the overhead is more like 5%.

1 Like

@John_Newman try testing on a AMD or ARM processor, should be interesting

2 Likes

Iā€™d raise the HEAP maximum as well on the JVM, if you do more stuff in parallel, it would seem normal to me that more HEAP would be needed, so if the same default is used, maybe that bottlenecks things.

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.