What is the difference between using core.async channels in threads versus go-threads?

I just read through this:

https://clojure.org/guides/async_walkthrough

But I did not understand this sentence:

“Because these are blocking calls, if we try to put on an unbuffered channel, we will block the main thread.”

I think this makes the assumption that the channel was created on the main thread? If I create the channel in a thread would its use still block the main thread? If so, how?

Let me ask a different question now. This is more of a style question, or perhaps a question about architecture. The next part of the tutorial describes how we might use a core.async/thread, to do some work in a separate thread and then return a channel with the result. But when would we want to return a channel, instead of returning the result? The tutorial says:

"We can use thread (like future) to execute a body in a pool thread and return a channel with the result. Here we launch a background task to put “hello” on a channel, then read that value in the current thread.

(let [c (a/chan)]
  (a/thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (a/close! c))

I’m trying to imagine the situation where I would want to do this? Why couldn’t I just use a Future? Is the idea that I would combine this with (a/alts!) ?

I also read this:

“The go macro asynchronously executes its body in a special pool of threads. Channel operations that would block will pause execution instead, blocking no threads. This mechanism encapsulates the inversion of control that is external in event/callback systems.”

I’m not sure I fully understand the implication. Is this a simple win, in that I can use go blocks and have activity that never blocks? But if the advantage is that simple, then why would we ever use threads? Is there a tradeoff that I need to consider? Because, otherwise, we could treat go blocks as a complete replacement for most uses of threads. Is that a reasonable conclusion to make?

A final question about (a/alts!). In what situations would you use alts!! in a regular thread? Why not always use alts! in a go block? I can imagine using alts!! on the main thread, and I’m wondering if that the most common use of it?

Finally, a completely different question, I once used Zach Tellman’s library, Manifold, on a small project:

core.async has a different style, but seems to enable the same kind of workflow, in that put! and take! to a stream has the same workflow as putting and taking to a channel. Does anyone know of a situation where I would definitely prefer Manifold over core.async?

2 Likes

I think this makes the assumption that the channel was created on the main thread? If I create the channel in a thread would its use still block the main thread? If so, how?

Channel creation shouldn’t matter, it’s the coordination part (and the unbuffered-ness) that apply here. You can create channels anywhere and threads can access them as needed. The problem occurs when any thread tries to perform a blocking put or take on the channel, particularly if the channel is unbuffered.

per the docs for >!! if you try to execute a blocking put onto an open unbuffered channel and there is no space, the operation will block. " By default channels are unbuffered (0-length) - they require producer and consumer to rendezvous for the transfer of a value through the channel."

Say you are in the REPL and decide to test this out. The REPL thread you are interacting with could try to eval a blocking put >!! on an unbuffered channel. It would then block the thread to wait for a consumer to pull a value off the channel.

user=> (require '[clojure.core.async :as async])
user=> (async/>!! (async/chan) :a)
;;thread blocks forever and repl is frozen waiting for result....
;restarted repl
user=> (let [c @(future (async/chan))] (async/>!! c :hello))
;;thread blocks forever and repl is frozen waiting for result....

We can satisfy the producer/consumer rendezvous for unbuffered channels with another thread that is doing a blocking take waiting for the channel to provide a value:

user=> (let [c (async/chan)] (future (async/>!! c :hello)) (async/<!! c))
:hello
user=> (let [c (async/chan)] (async/thread (async/>!! c :hello)) (async/<!! c))
:hello

That is the context for the unbuffered channel comment w.r.t. blocking the main thread (whatever thread is initiating the blocking put).

Is this a simple win, in that I can use go blocks and have activity that never blocks? But if the advantage is that simple, then why would we ever use threads? Is there a tradeoff that I need to consider? Because, otherwise, we could treat go blocks as a complete replacement for most uses of threads. Is that a reasonable conclusion to make?

go blocks will transform (via macro / compiler magic), your puts and takes on channels into “parked” light weight threads implemented as state machines. The idea is that these are just pending tasks sitting in a queue that are awoken periodically and multiplexed over a shared thread pool (initiated/managed by the core.async runtime) that makes progress based on changes (the flow of data to and from channels). So you can have virtual threads that park instead of actual threads that block, the upside being that virtual threads are cheap and present a concurrency model that blurs the distinction between “real” and “virtual” threads.

The catch is, if you actually inject blocking calls anywhere in these lightweight threads (or initiate a long-running computational process that hogs a core.async executor thread), you can consume part of the core.async threadpool that is trying to make progress on all the parked lightweight threads. If you stall all the threads in the threadpool that are supposed to be processing parked tasks, then the system can’t make progress and core.async locks up (in the background). I did this a couple of times accidentally on my early forays into learning core.async…

So this is probably a no-no and I am betting that when a thread from the threadpool picks up this statemachine and tries to run it, the thread will be stuck in the same problem we had above. Except this time, that thread will be unable to continue working to process other parked lightweight threads. We get a channel back, but that core.async system thread is probably blocked until somebody pushes a value to c, which in this case is impossible since c only exists inside the let…

user=> (let [c (async/chan)] (async/go (async/>!! c :hello)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x3f04fb07 "clojure.core.async.impl.channels.ManyToManyChannel@3f04fb07"]

The core.async analyzer will helpfully point out if you are using a non-blocking put or take >!, <! outside of a go body, but as we see the blocking puts/takes are allowed.

So naturally there are times you want to fork off a dedicated thread to do work with blocking calls that can participate in the core.async ecosystem (e.g. blocking puts and takes on channels, which may communicate with other core.async processes), without tying up the threadpool the executor is using to process non-blocking tasks. thread provides that mechanism and yields a channel that interfaces with the rest of the core.async design.

But when would we want to return a channel, instead of returning the result?

Any time we want to use the CSP semantics of the channel in our processing. The channel represents a potential value we can access with blocking or non-blocking. We could then weave the result of the blocking thread result with other blocking threads, or parked virtual threads could depend on that channel just as well. You could similarly create a channel, use future to do some work and push a value (or values) with blocking puts onto that channel, and then publish the channel to other parts of the system. thread wraps this all up and simplifies the API.

In what situations would you use alts!! in a regular thread?

I have used it in some rendering when I wanted some timeout with a blocking input or process (I can imagine modal dialogues). IIRC I was primarily using it for interactive stuff from the repl though. The one definite use case was a small process management system where you could have named core.async processes (either virtual or real threads) working in the background, along with an api for starting/stopping/resetting them. I used the poison-pill strategy for having a poison channel for the process which would either be picked up inside a process’s logic by alts! or alts!!. So the caller could interact with otherwise invisible core.async processes.

Doing interactive rendering using a custom piccolo2d wrapper, core.async threads to handle the rendering / node updating and computational junk, with some virtual threads where it made sense, using channels to provide the comms. I have no idea how much alts!! exists in the wild though.

7 Likes

Just for clarity, when you say virtual threads you mean the new functionality which landed in JDK 21, right?

Great answer! Thanks for posting.

1 Like

Ah no, I mean virtual threads as specific to core.async, which predates that work by many years (they mention virtual threads in the core.async docs so I stuck with it). The virtual threads implementation (maybe they are called fibers IIRC) under project Loom are quite a bit more general than what core.async provides inside of go blocks.

There are limitations to what go allows, although I find they can typically be worked around without much issue.

In contrast, I believe that Loom virtual threads are operating at the JVM level, and don’t face these constraints (the constraints are introduced by the compiler used to implement go).

related

2 Likes

You can think of core.async like an evolution of promise/future.

A future spins up a thread (well takes one from the future pool), and executes some code on it and then automatically the result gets delivered to the future. When you deref the future, it blocks waiting for the future result to be delivered, or just gets the value out of it if it was already delivered.

A promise is kind of similar, except you need to spin up the thread yourself, but you can deliver to the promise whenever you want, and things that deref the promise will block until a value is delivered to it.

Now a channel is more like a promise, except where-as a promise is one element, a channel is a sequence of elements. So you can deliver many values to a channel, and after delivering one value, you can deliver another, and so on. This can be infinite, so there’s a mechanism to indicate the “end”, which is called as “closing” the channel, to say, I’m done delivering things to this.

When you take from a channel, it will block until the next delivered value is available (or if a value is already on it to take right away).

Now like with promise, a channel is just a value exchanging construct, there’s no thread, you have to spin that up yourself. You can use a/thread for that, similar to how you could use future. a/thread is more like future, it spawns a thread and executes the body, and will deliver the result to the returned channel. Since it closes the channel after as well, it ends up being a channel of one element. So it’s pretty identical to a future, except the interfaces are that of core.async, so you would not deref the a/thread channel, you’d take or whatever.

There’s even a promise-chan, that is a one element channel that closes after one element is delivered, which is similar to promise, but again, using the core.async interfaces instead.

So you can use core.async as a replacement for promise/future. And at that point it can work exactly the same.

Now core.async adds a ton more functionality beyond that. This is why I say you can think of it like an evolution of future/promise.

The first being that a channel is not limited to a single value. That means the thread can continue to send more and more values to the same channel, and another thread can keep taking more and more values from the same channel.

You also have a bunch other functions if you’re dealing with many channels, merging them, flattening, alting, etc.

You can add transducers that transforms the values as they pass through the channel.

And so on.

This lets two threads interact with each other in really complex ways through the use of channels, where each thread can be long lived processes, hence the name: “Concurrent Sequential Processes [CSP]”. The “sequential” part, is because through their channel interaction they block on each other, thus creating a sequenced ordering on which is executing and which is waiting on the other, sequencing their execution in some key places.

Finally, on top of all that, core.async adds a new kind of thread that is called by many names: Fiber, Green Thread, Lightweight Thread, Coroutine, etc.

These are like threads over the real native threads. Their advantage is they take very little memory, have minimal overhead, and context switch really fast. The downside is that they are not visible to functions called from inside them, and if a “native block” is initiated, it doesn’t block them, but instead it blocks the native thread they are running over.

You can create those using a/go. So a/thread creates a native thread, and a/go creates a core.async thread (which comes with the limitations I mentioned above).

If you can deal with the limitations of core.async threads, they are better, so use a/go but when not, you have to use a/thread and a native thread.

In JDK 21+, the JVM has also introduced a similar kind of thread running over the native threads, they are called Virtual Threads, and are similar to core.async threads, but because it had access to the whole JVM, they managed to get rid of most limitations around them (though there are still a few caveats). They similarly are much faster and require less memory. Core.async did not yet adapt to leveraging them though.

EDIT: Also, with channels, the thread that is delivering a value to the channel can also block, if the channel has no space in it currently. In that way, it will wait for some other thread to take something off the channel, thus creating space on it, and then it can proceed with delivering the value.

5 Likes

Something else to look at is Promesa: promesa.exec.csp documentation (funcool.github.io) – that’s a drop-in subset of core.async built on top of JDK 21’s virtual threads.

3 Likes

This is a fantastic answer. I feel like they should add this to the official documentation. You add in the extra nuances that most tutorials are missing.

Sean Corfield, this is amazing. I was away from JVM programming for a few years so I missed the last few years of innovation. Can you tell me if you are using virtual threads? If so, what do you regard as the “best practice” regarding them, or what do you consider the strongest use-case for them?

We are just starting to experiment with vthreads at work but haven’t settled into any specific pattern of usage yet.

While the general documentation around vthreads says they are best suited for tasks with I/O (rather than CPU-bound tasks) – because I/O operations let them yield (park) and other vthreads continue on the host thread – there are still some limits you have to consider:

  • with I/O against a database, you’re typically limited by the size of your connection pool so if you overrun that by having too many vthreads requesting connections, they’re going to block and wait, and you’ll potentially get timeouts (even with a very large pool, at some point you hit the maximum number of concurrent connections the database can support across all clients);
  • with HTTP requests against specific services, you’re typically rate-limited to some degree (especially for cheaper services) so, again, it’s easy to overrun that by having too many vthreads making requests and you’ll get throttled.

It’s probably worth noting that sleep() causes yielding so you can drop that into CPU-bound tasks to force some collaboration.

Virtual threads are cheaper to create and use than O/S host threads so you can have “millions” of vthreads where you might only get hundreds or maybe thousands of host threads. With host threads, you typically use a thread pool to limit how many threads can run concurrently, so as not to overwhelm the host – with vthreads, they advise not using a pool because they are “cheap”, but multiple vthreads reuse a single host thread and have to “collaborate” by way of yielding (parking), which happens on specific operation, in order to share effectively. A synchronized operation will not yield and you end up with a single vthread pinned to a host thread (which is why the MySQL JDBC driver couldn’t be used with vthreads prior to the recent 9.0.0 release, for example). Clojure itself has recently made changes internally to avoid synchronized for this reason.

So, the TL;DR is that we haven’t figured out how “best practice” applies to our applications at work… yet…

1 Like

I doubt vthreads are going to give a crazy performance boost or anything like that to be honest. Many benchmarks of say even go vs java don’t really show much increase to throughput or latency, probably because of the realist Sean mentioned.

What I wonder more is if, as libraries and code is all made to work well with vthread, if they will just simplify the programming model, because you don’t need to manage pools, or worry about spawning too many.

For certain applications, it could allow to increase throughput, I’m thinking like a chat server, where all the server does is mostly relay IO, message coming in from one user to another, etc. These can benefit from being able to do really large amount of concurrent IO, with very little compute work otherwise. This can maybe extend to any kind of shared interactive app/game.

1 Like

I think it’s a bigger deal when you haven’t had lightweight threads already. core.async has been around for over a decade. Aside from the limitations mentioned around the current go implementation (which could probably be overcome running on the newer virtual thread model), my response to project Loom has been “meh.” I’m curious to see where it dominates. Java people seem to (rightfully) be more excited since they get parking semantics built in now, but retain the thread api. So server scales moar better without big changes.

In the last couple of years we killed our production system a few times, since we overlooked a blocking call in a core.async go block. That can lead to a global deadlock when all threads in the global core.async thread pool stuck at this blocking call. There even was a blocking call in an old version of the datomic.client.api.async that we have been using.

Since JDK 19 we mostly use virtual threads. There you might also be hit by a virtual thread pinning, but these became rare as authors adapting their libraries to virtual threads. I recently implemented a reverse proxy server using virtual threads, which was almost boring, since virtual threads makes this rather trivial.

We use the JVM option -Dclojure.core.async.go-checking=true to check for that during dev/test/CI – so we get exceptions instead of potential deadlocks.

Good to hear that you’ve found vthreads a good replacement for core.async – can you talk a bit about how you’ve handled coordination between threads, in the way that core.async channels provide synchronization points via channels?

1 Like

Cool, thanks a lot, I wasn’t aware of -Dclojure.core.async.go-checking=true that’s definitely a great safeguard.

I wouldn’t say vthreads are a complete replacement for core.async in our case. Besides a few places on the server, we still use it heavily on the ClojureScript side.

With vthreads we just use the things from java.util.concurrent. One type of background jobs for example consumes other limited resources, therefore we use java.util.concurrent.Semaphore to realize an upper limit. @andersmurphy wrote great blog posts about using virtual threads in combination with Clojure:

Another coordinating mechanism I try to use a lot is Clojure’s epochal time model. Sure, it’s not directly related to virtual threads, but still a great mechanism to separate different system parts and having good observability.

2 Likes

Actually, just to clarify this, I understand that a blocking put, on an unbuffered channel, would block the thread where I am trying to make the blocking put, but why would that block the main thread? Again, I’m curious if I’m supposed to simply assume that the blocking put is being made from the main thread? If I was in some new thread, and I tried to make a blocking put, then I would expect the blocking put to block that new thread, but not the main thread. And I’m assuming that “main thread” here refers to the initial thread that the software begins with, the thread that runs the (-main) function.

I’m assuming that “main thread” here refers to the initial thread that the software begins with, the thread that runs the (-main) function.

Sure.

why would that block the main thread?

My example was from the repl, where the repl is running on the main thread, so that example would stall out the repl since the put is happening on the main thread of execution.

I never said it would arbitrarily block other threads though; the only thread that can be affected under those blocking semantics (e.g. blocking put on unbuffered channel), would be the thread from which the blocking put is invoked. Sure, you can spawn off futures or core.async thread calls and have them do blocking puts on an unbuffered channel and have them get locked up without impacting the parent (typically the main) thread. That’s all up to the caller.

1 Like

This was a great answer. Thank you.

You did not say this, but the official documentation specifically mentions the main thread, which is why I thought it was so odd. I have to assume they (whoever wrote the documentation) were assuming the initial call was being made from the main thread.