Core Async + Ring response: wait until some async processes finish to respond

Hi guys. My motivation for this topic is because on another app I must get stock data about 13 companies, and this takes a long time. My point is to shrink the response time, fetching information concurrently, and then, return the response based on the data I’ve collected.

I made a base project on GitHub which can be found here. This project uses Compojure, so lein ring server would be enough to make the server run.

This file has some working (but synchronous) code that I would like to make asynchronous to fetch more posts concurrently.

Thank you.

My first thought would be to wrap the call to get-post in a future, and then (write-str (map deref posts)) when you output it. This is okay as long as you don’t have too many posts to fetch. For any kind of production code I would recommend some kind of coordination, such that you have an upper bound on the number of concurrent fetches.

If I remember correctly, core.async is not intended for IO, as it uses a limited and quite small thread pool which easily gets exhausted when its threads block.

Using clojure.core.async in conjunction with blocking I/O is ok, just don’t use go. The go block is intended for activity that may “park” (for <! or >! and such) but will not “block” for any pause that isn’t managed by clojure.core.async.

I agree with @mdiin, in your case, futures would probably do. If you needed some bound (which I doubt you do here, since you’re automatically bounded by the number of “posts-to-fetch”, but still, if you were interested in bounding them, you could use threadpools and executors directly, or the claypoole (https://github.com/TheClimateCorporation/claypoole) library which wraps them in a nicer Clojure interface but lets you specify the backing threadpool.

You can also use core.async, but sometimes for simple use case like this, I find it is almost trickier to use.

Thanks for the answers guys.

Here’s my finished code:

(ns async-ring-fetch.handler
  (:require [compojure.core :refer :all]
            [compojure.route :as route]
            [ring.util.response :refer [response content-type]]
            [clojure.data.json :refer [read-str write-str]]
            [clj-http.client :as client]
            [ring.middleware.defaults :refer [wrap-defaults site-defaults]]))

(def posts-to-fetch ["https://jsonplaceholder.typicode.com/posts/1"
                     "https://jsonplaceholder.typicode.com/posts/2"
                     "https://jsonplaceholder.typicode.com/posts/3"
                     "https://jsonplaceholder.typicode.com/posts/4"
                     "https://jsonplaceholder.typicode.com/posts/5"])

(defn get-post [url]
  (future
    (read-str (:body (client/get url)))))

(defn get-posts
  "Gets posts based on a collection of URLs"
  [urls]
  (reduce (fn [acc cur] (conj acc (get-post cur))) [] urls))

(defn get-all-posts
  "Gets all the posts"
  []
  (mapv deref (get-posts posts-to-fetch)))

(defn posts [_ respond _]
  ;; Here should be an async response
  (respond
   (content-type
    (response
     (write-str
      (get-all-posts))) "application/json")))

(defroutes app-routes
  (GET "/" [] posts))

(def app
  (wrap-defaults app-routes site-defaults))

This line (mapv deref (get-posts posts-to-fetch) might seem strange but I’m doing that because after tried to deref directly when calling get-post the main thread was blocked.

Also, the functions names I’ve created are very similar/misleading but I’m focusing only in the technical problem.

Above are the difference between both approaches:

Sync average to get the five posts: 1722,8ms
Async average to get the five posts: 394,2ms

Looks good to me. Your use of mapv deref is what I would have done as well, when you want to wait for all futures to complete to proceed, this is a nice pattern.

EDIT:

Actually, I do feel your use of reduce is a bit strange, you could also just use mapv instead no?

(mapv get-post urls)

And then easily combine both:

(defn get-posts [urls]
  (->> urls
       (mapv get-post)
       (mapv deref)))

Truly much cleaner this way, thank you @didibus.

I’ll try to make a little enhancement limiting the numbers of concurrent posts being fetched since this was pointed above and will post what I did here in the future.

Cool!

Just want to say though, you would only care to limit this if you believe that your host which is running the code won’t be able to vertically scale to some load and will thus brown out.

One example here would be if you think that it would run out of memory due to using too many threads, or that you will run-out of memory due to having too many IO buffers and connections ongoing.

So the question is, can user behavior drove the load on your host to run itself out of memory? If it can, that’s a problem, and you’ll want to set some bounds to prevent this from happening. There are many places where you can bound things though.

So in your case, if the post to fetch are user provided, that could be a problem. You can imagine a user making a single request and asking to fetch say 10 billion posts. That would cause your code to create 10 billion threads each opening an IO buffer and connection, which could cause your host to run out of memory.

Now, you could choose to bound the threads using a thread pool, that would similarly bound the IO buffer and connections since you have one of those per thread. But, then you still need to wonder, keeping track of this 10 billion payload itself consumes memory, you have a vector of 10 billion string. And since that’s unbounded, if 10 billion isn’t enough to OOM, the user can request for 100 billion, or whatever it needs and still tip your host over.

So now you realize that probably what should be bounded on this case is how many post a user can request be fetched on a single request. If you had a bound on that, then you wouldn’t need to bound your futures, since they’re one to one with the number of user requested posts.

The second thing to look at is the total number of concurrent request on your server. Even if you limit each request to ask to fetch at most 1 post. A user could make 100 billion request, and you’d still OOM. So you probably also want a bound on the number of concurrent request you handle.

So now, by just having a bound on max concurrent request and on max post to fetch per request, you’ve also properly bounded your number of futures as MaxConcurentRequest X MaxPostToFetch.

And you can perform load test to see what those values should be configured at for a particular type of host.

So my point is, protecting yourself from tip overs like this is something where you need to consider the entirety of the application. And same as premature optimization, there exist premature artificial limits. I’m in the camp to that limits should be put in place but at the end, when you can property load test and reason about the whole application. And generally, limits at the entry points are good enough and much simpler to implement and maintain and reason about.

In the real application that I want to use this, the numbers of threads are not even close to a problem. I’ll do basically the same thing as I did on this example, but make literally 13 requests to another endpoint. Nobody but me would not be able to change that.

The idea to limit the number of paralel request here is merely to practice, because as you probably notice I’m new to Clojure.

Anyway, I’ll take what you said in consideration. Thanks again @didibus!

1 Like

Another option here is to use aleph and manifold. Both were pretty much made for exactly this sort of use case. I’m using aleph’s client to pretty much do exactly what you are doing and it’s a lot faster as it uses persistent connections with a connection pool, if the other server supports it. I had previously been forking off futures to download each in parallel using cli-http and it was a lot slower (faster than doing it serially, but slower than using aleph). All the ideas are very similar, but consult the aleph and manifold docs for more info.

2 Likes

That’s a great tip. I’ll take a look

The Apache HTTP client library also offers a connection pool and persistent connections (maybe so does Java 11’s HTTP client?). But Aleph and Manifold are worth a look anyway. Completely different design orientation vs core.async. Manifold is true artwork of ergonomics. You scroll down the manual page (which is pretty good), going uh-huh, uh-huh, wow!, omg!!, and then you reach the part about let-flow and you’re in raptures. But it does not port to ClojureScript.

Yes, cli-http can also do connection pooling, but the whole Aleph/Manfold model is generally nice and uses persistent connections by default.

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.