Core.async locks up when using timeout deep inside async processes

I’ve got a situation where I don’t understand core.async’s timeout behaviour, and I’m managing to completely lock up core.async. I’ve made a gist with an easily repeatable scenario.

As far as I can tell, using (<!! (timeout n)) in a function that is called somewhere inside an async process causes core.async to completely lock up.

For some background, I’m writing a REST API client that leverages core.async extensively. The API has various rate limits in place, and I’m using @ericnormand’s fantastic token bucket with core.async implementation (thanks Eric!) to respect the rate limits at call time.

The sample code in the gist is from another project where I spiked out a core.async pipeline with “fake work”, to get a feel for how it works. In the actual pipeline timeout is never used, so it works as advertised. But as a teaching/learning example it is kinda useless without timeout because it just runs through and you don’t get a feel for how the pipeline executes. Using println directly instead of printing through an agent clearly shows the crazy async processing happening.

Now I’ve got a real case for having a timeout deep inside the core.async process, and it locks up :frowning:

Any suggestions or feedback would be deeply appreciated

I’m not an expert on core.async, but I tried the gist adding some deps.edn like:

{:deps {
 org.clojure/clojure {:mvn/version "1.9.0"}
 org.clojure/core.async {:mvn/version "0.4.474"}} }

And running the file, and it does not do anything - just terminates immediately. But if I add:

(report "LastRow")

as a last row, then it runs. It works the same way whether i have the timeout on or off (so I always to the “Done! [10 2 4 15 7 3 1 9 6 5 12 0 8 13 17]” and I never experiences a lockdown in ~10 runs).

When it runs, the project does not terminate (maybe threads from core.async need a shutdown).

Running on a Mac with:

$ java -version
java version "1.8.0_66"
Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)

Update: I think I got something.

  1. If I don’t have anything past (async/onto-chan ...) then it just terminates - looks like the core-async thread pool does not get a chance to get started
  2. If I put a fixed timeout (Thread/sleep 2000) then it is killed after 2 seconds. The thing is, if you don’t have timeouts rows it terminates, if you have timeouts, it gets cut off as you report. But this is because it takes way more time to complete. If you have timeouts at 1 ms, it terminates in either case.
  3. if you use a (report "LastRow") as a last line then it does not terminate anymore.

Makes me think it is a thing in how the JVM agent thread pool is set up and how that is to be killed.

Hi @kennethkalmer,

Thanks for the question. I’ve had a look and I must admit I’ve never used pipeline before. I read the docs and it looks like for blocking operations (like you have with xform), you need to use pipeline-blocking. When I switch pipeline for pipeline-blocking, it works. I can’t say why, though!

https://clojuredocs.org/clojure.core.async/pipeline-blocking

Rock on!
Eric

Not an expert, but what is blocking in the xform?

I think timeout is blocking then. That would make sense why switching to pipeline-blocking makes it work.

I’ll have to rework the example then to be closer aligned to my actual problem I was stoked that this simple one showed it off.

<!! Is blocking take.

Yeah, I’m not sure I get what you’re trying to do. Do you think there’s still a problem, but that this one doesn’t pinpoint it?

<!! is a blocking call. Using pipeline with blocking call will use up the async go block thread pool easily (default size is 8). That is why there is a pipeline-blocking version for such scenario (pipeline-blocking creates dedicated thread for blocking call).

1 Like

rmcv is right, never use <!! inside the calling context of a go. Go blocks are limited (by default) to 8 cpu cores. So it’s quite easy to deadlock if you do a blocking operation inside a go block. Use <! instead. Ofcourse you can’t use <! outside the lexical context of a go-block, so your code will take a bit of refactoring. My talk from Clojure/West might have a few ideas on how to do that ( https://www.youtube.com/watch?v=096pIlA3GDo )

2 Likes

Yeah, there is still a problem but this doesn’t pinpoint in anymore :man_facepalming:. The pipeline-blocking insight was huge, thanks.

I’m gonna try and articulate it here before creating a new, more verbose example. Let’s assume you’ve read Eric’s post and you’ve got limit-rate defined and it returns a channel. Now wrap up limit-rate into an anonymous function that can easily be passed around, and that the caller can use without any knowledge of the internals:

(defn build-limiter [interval burst]
  (let [in  (chan)
        out (limit-rate in interval burst)]
    (fn [weight]
      (<!!
       (go-loop [n weight]
         (trace "Checking limiter for" n "tokens")

         (if (zero? n)
           true

           (do
             (>! in n)
             (<! out)
             (recur (dec n)))))))))

So this is what we should have:

(def limited (build-limiter 5 1))
#'user/limited
user> (time (limited 1))
"Elapsed time: 9.280764 msecs"
true
user> (time (limited 10))
"Elapsed time: 1621.771328 msecs"
true    
user> (<!! (go-loop [] (time (limited 10))))
"Elapsed time: 1617.920017 msecs"
true
user> (<!! (go (time (limited 10))))
"Elapsed time: 1620.238012 msecs"
true

So far so good, it does what we expect it to. And this sends me back to the drawing board… :sob:

Slightly more context while I break to hammock this and work on a better example. My issue is when I’m passing around limited (as per the example above), when my HTTP client uses it just before making a call it blocks up completely and never returns. Now the HTTP calls in question result in limited being called inside a go-loop, but from the repl output above it clearly works as advertised…

Thanks @Timothy_Baldridge, I’ll check that out as well!

If youre calling limited from inside a go block, you have a problem: it uses <!! blocking take on the first line.

Thanks everyone for the help, and patience. I learned a lot from digesting your responses and trying to articulate my issue more in depth.

The “solution” here was to lean on core.async/thread to get around the blocking limitation. The reworked limit-rate function looks like this:

(defn limit-rate
  ([ch rate]
   (limit-rate ch rate 0))

  ([ch rate burst]
   (let [bucket (if (pos? burst) (chan burst) (chan))
         out    (chan)]

     (async/thread :: <- switched core.async/thread
       (while true
         (>!! bucket :token)
         (<!! (timeout (int (/ 1000 rate))))))

     (go-loop []
       (let [v (<! ch)]

         (if (nil? v)
           (close! v)

           (do
             (<! bucket)
             (>! out v)
             (recur)))))

     out)))

However, this is just a bandaid over a very poor design from my side. I had an epiphany and will refactor the code properly. Hopefully something nice falls out of this that I can share in a later post. This is part of an API client that will be open-sourced, however these layers will be hidden away from the consumers (just an implementation detail).

Thanks again!