Core async `alts` performance scaling quadratically with number of channels?

I went through the core.async walkthrough before doing some async stuff, and when I hit Line 101 I became curious to see what the limits of the go-threads were.

Some quick test code was made (a copy of the walkthrough code) where n go threads are made, each thread sends “hi” to its own channel, and alts!! is used to collect the results:

(require '[clojure.core.async :refer [<! >! chan go thread alts! alts!!]])

(def timings
  (into
   (sorted-map)
   (for [n [1 10 20 30 40 50 75 100 150 200 250 500 750 1000 1500 2000 3000]]
     (let [cs (repeatedly n chan)
           begin (System/currentTimeMillis)]
       (doseq [c cs] (go (>! c "hi")))
       (dotimes [i n]
         (let [[v c] (alts!! cs)]
           (assert (= "hi" v))))
       (let [dur (- (System/currentTimeMillis) begin)]
         (println "Read" n "msgs in" dur "ms")
         [n dur])))))

And lo and behold, 1000 channels is 100 times slower than 100 channels. I don’t know what I was expecting really, but I feel like it shouldn’t take seconds for thousands of go threads. 10k threads took 30 seconds on my machine. I’ve clearly hit some limit, and I have a use-case where I might be waiting on 1k-10k asynchronous replies from the cloud.

At first I was wondering about what made the test code slow. I thought a bit in my ‘hammock’ (chair) and realized that the limiting part should be alts!! for large collections.

So the ‘fix’, after repl’ing some more was to run alts!! in parallel

(into
 (sorted-map)
 (for [n [1 10 20 30 40 50 75 100 150 200 250 500 750 1000 1500 2000 3000]]
   (let [cs (repeatedly n chan)
         begin (System/currentTimeMillis)]
     (doseq [c cs] (go (>! c "hi")))
     (doall (pmap
             (fn [cs]
               (dotimes [i (count cs)]
                 (let [[v c] (alts!! cs)]
                   (assert (= "hi" v))))) 
             (partition-all 100 cs))) 
     (let [dur (- (System/currentTimeMillis) begin)]
       (println "Read" n "msgs in" dur "ms")
       [n dur]))))

And suddenly the time it takes scales linearly. huh. Also 16-20 ms for 3k channels, instead of 2400 ms for 3k channels.

So lesson learned: alts!! is very slow for large (thousand+) number of channels. I do not understand why, because the source code for alts is quite difficult to follow for me, but it looks like it expands to some giant cond statement.

  • Have you hit slow cases with async code before? and how did you solve it?
  • Is my test code garbage? if so, how?

Yes, the alts!! indeed seems to scale quadratically. So when you generate n channels where “hi” will be put sooner or later, the alts!! will have to go through all the clauses in a linear searching manner, and when one result is found, the next loop of dotimes restarts the search. This scales quaratically and is not an idiomatic way to aggregate results from several parallell go-blocks.

If you split this up to several threads using pmap, it is faster partly because of several threads, but mainly because you are only looking at a subset of the channels in each thread which gives shorter lists of cond-clauses in each thread. If alts! could used some other strategy in case (for instance not start from the top each time a channel was found) it would probably get a higher throughput in the test case.

A more idiomatic way of aggregating results from several go-blocks would be to let the go-blocks put their response in some known reponse channel, like:

(require '[clojure.core.async :refer [<!! >! chan go ]])

(time
 (let [n 10000
       res-chan (chan n)]
   (dotimes [i n] (go
                    ;; real work would go here here...
                    (let [res i]
                      (>! res-chan res))))

   (<!! ;; get the get the result of the "into-chan"
    (<!! ;; get the result of the chan implicitly returned from go-block
     (go (let [res (clojure.core.async/take n res-chan)]
           (clojure.core.async/into [] res)))))))

which takes approx 40 ms for n=10000, and 200 ms for n=100000 on my machine.

Regarding limitations in core.async: there is a limit of 1024 concurrent attempts to put and take on each channel. In the example a go-block tries one put, and the alts! tries one take concurrently, so no risk of approaching 1024 concurrent put- and take processes here (apart from the result chan, but as long as the result channel itself is not blocking, like there was not enough place in the buffer and many thousands of go blocks was parked attempting to put results to the channel, the limit 1024 concurrent operations should not be a problem.

Your original use case seems to be to initiate a lot of parallell web requests and wait for some responses. If you are just doing some batch work where the requests are not dependent of the results each other, you could use a construct like in the example. The aggregator should then correlate the responses on the result channel to the requests (you will have to handle exceptions etc - as you might have experienced already, core.async behaves somewhat mysterious when it’s go-blocks are cancelled by thrown exceptions. The aggregation can be made either “online” or “offline” when all the issued requests have returned or timed out.

You don’t mention how the requests are dependent of each other, but if they are: try to make these dependencies more fine grained and make go-blocks for each request or chain of requests, that can in turn deliver sub results to other, dependent go-blocks, via dedicated channels to these blocks.

In summary: you should not use a repeating alts!! to get results from many processes, instead, make the processes push the result to some common channel and let an aggregating process listen to that channel instead. This can be expanded to a network of intercommunicating go-loops that can wait for/trigger each other.

2 Likes

Your reply is excellent. I like that you focused on the design issues in itself.

On the design side, I see now that your explanation leads to a system where the entire flow of data
must be described by the async constructs. This can be nice, however, I always believed in a ‘side effects on the edges’ approach to things.

Because I also like the ‘async contstructs as a network of processing’ thought, the next step for me now is to learn how to harmonize these two perspectives (side effects at the edges vs async all the way through) for myself.

Thank you for your appreciation!

I think the most useful thing about core.async is that you can let the machinery take care of a lot of coordination for you, especially when it comes to stuff like “This part should only run when both of these results are availiable” which is very tedious to do in other ways. And very often you want to take action when not all the results are availiable yet, which is quite hard to do manually in an efficient and correct way. Time-outs and other things can also be described using alts! which is really powerful.

The only thing I am missing when working with core.async is that the process state is quite opaque in that there is no apparent mechanism to read out the current state of the process network, ie where a go-process is parked, the state and content of the channel buffers etc. It would be useful to introspect and also store such a state to, say, a db, reload it later and continue the processing. But that’s also quite an extreme requirement for most use cases and could probably not be realized without quite a lot of performance penalty.

And yes, core.async means adding some kind of mutable state into your process. It’s quite well behaved but really mutable.

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.