Is this an efficient way of handling concurrency?

I have an IO operation to be done. The client library I use provides 900 connections to the target (database) to perform send the IO commands. I have 1 million IO operations to be performed in groups of 50,000 each. So basically there are 20 chunks of IO operation to be performed where each chunk contains 50,000 operations.

I need to process these 20 chunks concurrently.

Solution 1

(def monitor (atom 0))

(defn- bi-function ^BiFunction
  []
  (reify BiFunction
    (apply [_ _ _]
      (swap! monitor dec))))

(defn update-line [line]
  (while (>= @monitor 800))
  (swap! monitor inc)
  (-> <call a function that returns a completable future> ;; This is the part where the IO is done. It is not in our control
      (.handle (bi-function))))

(defn update
  ([csv-file]
   (trace/span
     "update"
     (with-open [reader (clojure.java.io/reader csv-file)]
       (doseq [line (line-seq reader)]
         (update-line line))))))

The function update is called with 20 csv files concurrently.


Solution 2 (Using clojure.core.async)

(def monitor (atom 0))

(defn- bi-function ^BiFunction
  []
  (reify BiFunction
    (apply [_ _ _]
      (swap! monitor dec))))

(defn update-line [line]
  (while (>= @monitor 800))
  (swap! monitor inc)
  (-> <call a function that returns a completable future> ;; This is the part where the IO is done. It is not in our control
      (.handle (bi-function))))

(defn update
  ([csv-file channel]
   (trace/span
     "update"
     (with-open [reader (clojure.java.io/reader csv-file)]
       (doseq [line (line-seq reader)]
         (async/>!! channel line))))))

;; The difference from solution 1 is that the caller of update will spawn a go block 
(let [files [file1 file2 ... file20]
      channel (async/chan 800)]
  (async/go
    (while true
      (let [[line _] (async/alts! [channel])]
        (update-line line))))
  (doseq [file files]
    (future (update file channel))))

With Solution 1 each file takes ~ 85 seconds to be processed. Solution 2 does a bit better clocking at ~ 55 seconds. Is there a way to better Solution 2? Is this an efficient solution to begin with?

A few questions:

Is the white… call just a way to block so that you don’t overrun the 800/900 connections limit?

Why do you need to process the 20 files concurrently? Would it make a difference if it was one big file or 800 smaller ones?

Have you tried to just spawn 20 threads (one per file) or 800 threads (one per connection)?

Does the order of the processing matter or could lines be processed randomly?

Also note that having 900 database connections is probably the wrong thing to do. Using just e.g. 10 might improve your performance. That might also be the reason why the core.async version is faster, since it uses just a few threads for the go blocks they might have blocked on the IO operations, limiting the number of used DB connections.

Is the white… call just a way to block so that you don’t overrun the 800/900 connections limit?

Yes

Why do you need to process the 20 files concurrently? Would it make a difference if it was one big file or 800 smaller ones?

It’s cause I would be getting 20 concurrent requests. These 20 requests are independent.

Does the order of the processing matter or could lines be processed randomly?

No order does not matter.

Have you tried to just spawn 20 threads (one per file) or 800 threads (one per connection)?

I am already spawning 20 threads (via future. Check the last line)

Here’s my take using pipeline, and leveraging the additional criteria you specified:

(defn process-lines
  [lines f connections]
  (let [in      (async/chan  connections) ;;respects our connection limit...
        out     (async/chan  connections)
        ;;push work onto in
        _       (async/onto-chan in lines)
        done?   (promise)]
    ;;kick off workers to go process the pool within
    ;;constraints of connection-limit
    (async/pipeline-blocking 20 out (map f) in)
    ;;pull stuff until the out channel is closed,
    ;;delivering our promise.
    (async/go-loop []
      (if-let [res (async/<! out)]
        (recur)
        (deliver done? true)))
    done?))

collect all files into one lazy workload, but also make it closeable…

(defn multi-reader [paths]
  (let [readers (mapv clojure.java.io/reader paths)]
    (reify java.io.Closeable
      (close [this] (doseq [^java.io.Closeable r readers]
                      (.close r)))
      clojure.lang.Seqable
      (seq [this] (seq readers)))))

(defn process-all [files f & {:keys [connections] :or {connections 800}}]
  (with-open [readers (multi-reader files)]
    (-> (mapcat line-seq readers)
        (process-lines f connections)
        deref)))

In this case, f would be update-line from your previous example. This approach should abstract away the work (all the lines) from the source; since order doesn’t matter (we actually preserve order in pipeline and implicitly in the the lazy sequence of input lines though). We let the pipeline dictate the actual thread pool size, and use buffered channels to provide backpressure for the db connection limit. The downside (as written) is that we’re doing a little extra work in delivering those values onto the out channel; we’re only using the channels as concurrency mechanism and we don’t really care about the contents of out. So there could be a tad inefficiency there over just raw futures that don’t deliver any results. Also, this means that the function update-line or whatever is passed in as f needs to return a non-nil result, or else core.async will complain that you can’t put nil values onto a channel. We could wrap the implementation of process-lines to work around this via:

(async/pipeline-blocking 20 out (map (fn [x] (or (f x) :none)) in)
7 Likes

Really nice write up. The core-async pipeline stuff is a bit dense to understand based on the official docs, so it’s nice seeing a concrete example.

Thanks. I felt the same way originally, but since having forced myself to use it a bit, I find it a nice mechanism for general work stealing work loads and parallel processing tasks. You don’t pay the semi-serial processing penalty of pmap, you can still get an ordered result back, and you get all the back pressure / custom thread pool sizing kind of baked in. I also end up using pipeline-blocking almost all the time in these cases.

1 Like

Having just read through the core.async docs, I’d like to raise a few points/questions:

  • onto-chan should probably be onto-chan!
  • could the go-loop be replaced with an async/reduce call that just ignores the values and then waits for the channe to return a final value or close?
  • does the out channel really need an 800 sized buffer? I think theoretically even an unbuffered channel would be ok since we pull values out of it as fast as possible.
  • or is it the other way round? That the in channel doesn’t need the buffer since we push values in as fast as possible?

Or on third read - shouldn’t the n in pipeline be 800? It will spawn 800 threads which will be the upper bound of in-flight requests, there’s no other way to have 800 requests happening with only 20 threads.

I think though that since the IO call seems to be async (returns a future) pipeline-async seems to be the way to do this.

You say concurrent, but I think you mean parallel?

For performance, you don’t want your concurrency to exceed your parallelism, or it might actually slow things down.

For example, it’s not because your DB allows up to 800 connections that it’ll be faster for you to have 800 concurrent request to it. It’s very likely the DB cannot parallelize to such levels, and the requests will just contend for the resources. This is also possibly true of your machine and it’s ability to maintain 800 connections, and of your disk to read files in parallel, and your CPU to parse them, etc.

So basically you could try playing with the amount of concurrency, and it’s possible using less connections and going one file at a time could actually be faster.

2 Likes

onto-chan!

That’s in the library at this point. There are plenty of effectful ops in core.async that aren’t listed as such though.

could the go-loop be replaced with an async/reduce call that just ignores the values and then waits for the channe to return a final value or close?

Sure; the channel returned by async/reduce is in effect a promise that you would then <!! from to get the same effect as a deref on the promise.

does the out channel really need an 800 sized buffer? I think theoretically even an unbuffered channel would be ok since we pull values out of it as fast as possible.
or is it the other way round? That the in channel doesn’t need the buffer since we push values in as fast as possible?

I don’t know; I tend to err on the side of consistency. 800 is small here. I would rather ensure properly sized flow throughout the system, just in case my assumptions are somehow flawed. You could certainly try each one and find out…I have tended to experience the default unbuffered channels (effectively a buffer of 1) leading to surprising phenomena like locked systems due to my own incompetence. I tend to buffer explicitly out of habit these days…

shouldn’t the n in pipeline be 800?

I don’t know; OP said maximum of 20 concurrent requests, and code with futures indicated 20 threads. How fast can 20 threads fill up 800 concurrent connections (I guess reasoning about 1 line being processed at 1 time and delivered over a connection by 1 worker would imply a 1:1 relation between thread : connection, so your analysis sounds right)? So I worked within those bounds for the worker/thread pool. I think 800 true threads is excessive; there are 800 allowed connections, which seems like a different constraint. So we can have 20 workers processing in parallel limited to 800 connections, depending on how fast they push work. It’s unclear which would be a bottleneck…

I think though that since the IO call seems to be async (returns a future) pipeline-async seems to be the way to do this.

If you use pipeline-async, it will work on the threadpool for core.async (default to cpu*2 threads) by spawning co routines. The blocking variant spawns its own thread poool, exactly matched to n.

probably one of the best talks on the difference between concurrency and parallelism IMO. OP would probably find it useful to clarify the semantics, which I also found a bit unclear in the original post.

1 Like

Good point. Hard to tell without knowing what the update function is actually doing (eg. is it doing async IO?) and how CPU intensive it is.

This is a problem that I always wished that I’d provide the upper limits (eg max 800 connections in flight) and some goal (eg optimize for latency, throughput) and let the system auto adjust everything else. Many times this kind of job repeats daily, so it’s possible to use historical data to do an optimization.

I will try this out today and post the result

I tried the one file at a time approach. Each file takes about 5 seconds. So processing of the 20th file starts only after 95 seconds, which is not what I am looking for.

You say concurrent, but I think you mean parallel?

No, I mean concurrent. Think of these 20 requests as 20 http requests with each request uploading a file containing 50000 rows. They are concurrent (overlapping). These 20 requests cannot be processed simultaneously as I don’t have 20 cores on my machine. My machine has probably 4 cores and these 20 requests (each with 50000 updates) will contend for the resources.

found a bit unclear in the original post.

Can you point out the part that was unclear on the original post? I will edit and try it making it more clear.

I’m sorry this might come off as too affirmative, but I really believe you mean parallel.

Concurrency will always make things slower without parallelization. Since you are measuring the overall time it takes to batch process all files and want to minimize the time it takes for the whole thing to complete, what you need is to add parallelization.

Your machine’s CPU is not the only resource that can be parallelized. Your network card, your memory accesses, your disk accesses, all potentially can parallelize to a certain level. And you have parallelizism between your machine and the machine running the DB, as well as the DB most likely also making use of parallelizism on its side.

This is why going fully serial might appear slower than your extreme concurrency example, because the extreme concurrency one was leveraging parallelism at multiple levels of the stack. That said, it is still possible that with less concurrency, you will see better performance.

I’m also curious how you manage connections? Creating connections can be pretty slow, but in the concurrent case, you would actually allow your machine to create them in parallel up to the number of cores.

My suggestion would be to have a connection pool, of around maybe 10, and reuse those for making requests to your DB. And then play around with the parallelization levels of your requests to your DB just so whenever one of those 10 connections returns to the pool, you’ve got the next line ready to make use of it.

Unless your DB is running over a large cluster, it probably will just become slower if you exceed 10 concurrent queries. This can depend on the machine it’s running on or the DB itself so you could play around with that number as well, but I’ve never seen a single instance DB that could handle 100+ queries in parallel without having it add overhead.

The connection pool would effectively take the place of your monitor in your code. Where you use a blocking queue (or a go channel), where when empty, consumers block and wait for a connection to show up in the queue. The advantage over your monitor is that the queue acts as a cache for the connections as well, meaning you can reuse them and don’t pay the price to create one every time.

And like I said, I feel 800 is too many, and you’ll probably see better results with a smaller count.

Okay, so, the db I am talking about here is aerospike. Aeropsike comes with a default of 300 connections per node. Maybe I should I have clarified this. The important point here was not the number of connections. You can have 900 hundred connections provided you have more event loops.

And nowhere in the aerospike documentation, do they talk about reducing the connections to 10 from 300. All the examples that I see work with this default. And it is recommended to use more event loops if you increase the connections. There is no suggestion to reduce the connections anywhere.

The client library I use does not have any throttling mechanism. It just converts your command to a future (using the async methods provided by the java client). having 50,000 futures blows up the program with the error Max connections of 300 exceeded. So I need an application layer solution which is able to throttle the incoming requests and send only X number of update commands to aerospike and making sure that at in point in time there are X number of update commands being processed.

But I see what you are trying to say. But I think you are talking about the parallel connections that the DB can take in, if I am right? Are we on the same page now?

I’m not familiar with aerospike, so I can’t say for sure. That said, many DBs allow for more concurrency then they can parallelize, which will reduce throughput even though they allow it. So lots of DB lets you create more connections then they can handle in parallel, and make more queries to it then what they can parallelize. And doing so can degrade throughput.

But it’s possible that aerospike’s max connections setting is already optimal. I also think it’s possible that your client library internally pools connections The aerospike’s docs do seem to imply they do.

In that case I’d suggest maybe this approach:

First, make the call to aerospike a no-op. Like stub it so that it actually does nothing, doesn’t use the library at all, it just returns, so that you remove that part of the equation.

Now, play with various ways to read lines from your files and prepare the requests to be sent to aerospike. Measure each, until you get to a place where that is as fast as it gets. That would mean that you’ve now maxed out the parallelism of your machine to read lines from files and prepare your queries to be sent to aerospike. Do this with a resource throttle of 800 like you had.

This will kind of tell you how much you can saturate your 800 connections. Like can your machine even produce consistentently 800 requests, or not even.

Once you have that, reintroduce aerospike, and measure the time it takes. Then try with 400 instead of 800. And do a kind of binary search for the right number to have. Maybe it’ll be 800, but maybe not.

After that you’ll be pretty sure that you’ve tuned for the maximum parallelizism of your machine to produce requests to aerospike, and for aerospike to process them.

Edit: And I guess even now, you could add something in your code that counts how much time you spend waiting for a connection. If its a small amount, it means the bottleneck is already the DB, and now you might need to play with the DB settings and the connection count and all. Otherwise the bottleneck is still your code, and you can try optimizing it further.

Deviating from the topic a bit, how is reading lines from files a parallel activity? If I assign 20 threads to read lines from 20 files respectively, what are we parallelising here (given my machine has 4 cores)?