What is the difference between using core.async channels in threads versus go-threads?

I just read through this:

https://clojure.org/guides/async_walkthrough

But I did not understand this sentence:

“Because these are blocking calls, if we try to put on an unbuffered channel, we will block the main thread.”

I think this makes the assumption that the channel was created on the main thread? If I create the channel in a thread would its use still block the main thread? If so, how?

Let me ask a different question now. This is more of a style question, or perhaps a question about architecture. The next part of the tutorial describes how we might use a core.async/thread, to do some work in a separate thread and then return a channel with the result. But when would we want to return a channel, instead of returning the result? The tutorial says:

"We can use thread (like future) to execute a body in a pool thread and return a channel with the result. Here we launch a background task to put “hello” on a channel, then read that value in the current thread.

(let [c (a/chan)]
  (a/thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (a/close! c))

I’m trying to imagine the situation where I would want to do this? Why couldn’t I just use a Future? Is the idea that I would combine this with (a/alts!) ?

I also read this:

“The go macro asynchronously executes its body in a special pool of threads. Channel operations that would block will pause execution instead, blocking no threads. This mechanism encapsulates the inversion of control that is external in event/callback systems.”

I’m not sure I fully understand the implication. Is this a simple win, in that I can use go blocks and have activity that never blocks? But if the advantage is that simple, then why would we ever use threads? Is there a tradeoff that I need to consider? Because, otherwise, we could treat go blocks as a complete replacement for most uses of threads. Is that a reasonable conclusion to make?

A final question about (a/alts!). In what situations would you use alts!! in a regular thread? Why not always use alts! in a go block? I can imagine using alts!! on the main thread, and I’m wondering if that the most common use of it?

Finally, a completely different question, I once used Zach Tellman’s library, Manifold, on a small project:

core.async has a different style, but seems to enable the same kind of workflow, in that put! and take! to a stream has the same workflow as putting and taking to a channel. Does anyone know of a situation where I would definitely prefer Manifold over core.async?

2 Likes

I think this makes the assumption that the channel was created on the main thread? If I create the channel in a thread would its use still block the main thread? If so, how?

Channel creation shouldn’t matter, it’s the coordination part (and the unbuffered-ness) that apply here. You can create channels anywhere and threads can access them as needed. The problem occurs when any thread tries to perform a blocking put or take on the channel, particularly if the channel is unbuffered.

per the docs for >!! if you try to execute a blocking put onto an open unbuffered channel and there is no space, the operation will block. " By default channels are unbuffered (0-length) - they require producer and consumer to rendezvous for the transfer of a value through the channel."

Say you are in the REPL and decide to test this out. The REPL thread you are interacting with could try to eval a blocking put >!! on an unbuffered channel. It would then block the thread to wait for a consumer to pull a value off the channel.

user=> (require '[clojure.core.async :as async])
user=> (async/>!! (async/chan) :a)
;;thread blocks forever and repl is frozen waiting for result....
;restarted repl
user=> (let [c @(future (async/chan))] (async/>!! c :hello))
;;thread blocks forever and repl is frozen waiting for result....

We can satisfy the producer/consumer rendezvous for unbuffered channels with another thread that is doing a blocking take waiting for the channel to provide a value:

user=> (let [c (async/chan)] (future (async/>!! c :hello)) (async/<!! c))
:hello
user=> (let [c (async/chan)] (async/thread (async/>!! c :hello)) (async/<!! c))
:hello

That is the context for the unbuffered channel comment w.r.t. blocking the main thread (whatever thread is initiating the blocking put).

Is this a simple win, in that I can use go blocks and have activity that never blocks? But if the advantage is that simple, then why would we ever use threads? Is there a tradeoff that I need to consider? Because, otherwise, we could treat go blocks as a complete replacement for most uses of threads. Is that a reasonable conclusion to make?

go blocks will transform (via macro / compiler magic), your puts and takes on channels into “parked” light weight threads implemented as state machines. The idea is that these are just pending tasks sitting in a queue that are awoken periodically and multiplexed over a shared thread pool (initiated/managed by the core.async runtime) that makes progress based on changes (the flow of data to and from channels). So you can have virtual threads that park instead of actual threads that block, the upside being that virtual threads are cheap and present a concurrency model that blurs the distinction between “real” and “virtual” threads.

The catch is, if you actually inject blocking calls anywhere in these lightweight threads (or initiate a long-running computational process that hogs a core.async executor thread), you can consume part of the core.async threadpool that is trying to make progress on all the parked lightweight threads. If you stall all the threads in the threadpool that are supposed to be processing parked tasks, then the system can’t make progress and core.async locks up (in the background). I did this a couple of times accidentally on my early forays into learning core.async…

So this is probably a no-no and I am betting that when a thread from the threadpool picks up this statemachine and tries to run it, the thread will be stuck in the same problem we had above. Except this time, that thread will be unable to continue working to process other parked lightweight threads. We get a channel back, but that core.async system thread is probably blocked until somebody pushes a value to c, which in this case is impossible since c only exists inside the let…

user=> (let [c (async/chan)] (async/go (async/>!! c :hello)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x3f04fb07 "clojure.core.async.impl.channels.ManyToManyChannel@3f04fb07"]

The core.async analyzer will helpfully point out if you are using a non-blocking put or take >!, <! outside of a go body, but as we see the blocking puts/takes are allowed.

So naturally there are times you want to fork off a dedicated thread to do work with blocking calls that can participate in the core.async ecosystem (e.g. blocking puts and takes on channels, which may communicate with other core.async processes), without tying up the threadpool the executor is using to process non-blocking tasks. thread provides that mechanism and yields a channel that interfaces with the rest of the core.async design.

But when would we want to return a channel, instead of returning the result?

Any time we want to use the CSP semantics of the channel in our processing. The channel represents a potential value we can access with blocking or non-blocking. We could then weave the result of the blocking thread result with other blocking threads, or parked virtual threads could depend on that channel just as well. You could similarly create a channel, use future to do some work and push a value (or values) with blocking puts onto that channel, and then publish the channel to other parts of the system. thread wraps this all up and simplifies the API.

In what situations would you use alts!! in a regular thread?

I have used it in some rendering when I wanted some timeout with a blocking input or process (I can imagine modal dialogues). IIRC I was primarily using it for interactive stuff from the repl though. The one definite use case was a small process management system where you could have named core.async processes (either virtual or real threads) working in the background, along with an api for starting/stopping/resetting them. I used the poison-pill strategy for having a poison channel for the process which would either be picked up inside a process’s logic by alts! or alts!!. So the caller could interact with otherwise invisible core.async processes.

Doing interactive rendering using a custom piccolo2d wrapper, core.async threads to handle the rendering / node updating and computational junk, with some virtual threads where it made sense, using channels to provide the comms. I have no idea how much alts!! exists in the wild though.

6 Likes

Just for clarity, when you say virtual threads you mean the new functionality which landed in JDK 21, right?

Great answer! Thanks for posting.

Ah no, I mean virtual threads as specific to core.async, which predates that work by many years (they mention virtual threads in the core.async docs so I stuck with it). The virtual threads implementation (maybe they are called fibers IIRC) under project Loom are quite a bit more general than what core.async provides inside of go blocks.

There are limitations to what go allows, although I find they can typically be worked around without much issue.

In contrast, I believe that Loom virtual threads are operating at the JVM level, and don’t face these constraints (the constraints are introduced by the compiler used to implement go).

related

1 Like

You can think of core.async like an evolution of promise/future.

A future spins up a thread (well takes one from the future pool), and executes some code on it and then automatically the result gets delivered to the future. When you deref the future, it blocks waiting for the future result to be delivered, or just gets the value out of it if it was already delivered.

A promise is kind of similar, except you need to spin up the thread yourself, but you can deliver to the promise whenever you want, and things that deref the promise will block until a value is delivered to it.

Now a channel is more like a promise, except where-as a promise is one element, a channel is a sequence of elements. So you can deliver many values to a channel, and after delivering one value, you can deliver another, and so on. This can be infinite, so there’s a mechanism to indicate the “end”, which is called as “closing” the channel, to say, I’m done delivering things to this.

When you take from a channel, it will block until the next delivered value is available (or if a value is already on it to take right away).

Now like with promise, a channel is just a value exchanging construct, there’s no thread, you have to spin that up yourself. You can use a/thread for that, similar to how you could use future. a/thread is more like future, it spawns a thread and executes the body, and will deliver the result to the returned channel. Since it closes the channel after as well, it ends up being a channel of one element. So it’s pretty identical to a future, except the interfaces are that of core.async, so you would not deref the a/thread channel, you’d take or whatever.

There’s even a promise-chan, that is a one element channel that closes after one element is delivered, which is similar to promise, but again, using the core.async interfaces instead.

So you can use core.async as a replacement for promise/future. And at that point it can work exactly the same.

Now core.async adds a ton more functionality beyond that. This is why I say you can think of it like an evolution of future/promise.

The first being that a channel is not limited to a single value. That means the thread can continue to send more and more values to the same channel, and another thread can keep taking more and more values from the same channel.

You also have a bunch other functions if you’re dealing with many channels, merging them, flattening, alting, etc.

You can add transducers that transforms the values as they pass through the channel.

And so on.

This lets two threads interact with each other in really complex ways through the use of channels, where each thread can be long lived processes, hence the name: “Concurrent Sequential Processes [CSP]”. The “sequential” part, is because through their channel interaction they block on each other, thus creating a sequenced ordering on which is executing and which is waiting on the other, sequencing their execution in some key places.

Finally, on top of all that, core.async adds a new kind of thread that is called by many names: Fiber, Green Thread, Lightweight Thread, Coroutine, etc.

These are like threads over the real native threads. Their advantage is they take very little memory, have minimal overhead, and context switch really fast. The downside is that they are not visible to functions called from inside them, and if a “native block” is initiated, it doesn’t block them, but instead it blocks the native thread they are running over.

You can create those using a/go. So a/thread creates a native thread, and a/go creates a core.async thread (which comes with the limitations I mentioned above).

If you can deal with the limitations of core.async threads, they are better, so use a/go but when not, you have to use a/thread and a native thread.

In JDK 21+, the JVM has also introduced a similar kind of thread running over the native threads, they are called Virtual Threads, and are similar to core.async threads, but because it had access to the whole JVM, they managed to get rid of most limitations around them (though there are still a few caveats). They similarly are much faster and require less memory. Core.async did not yet adapt to leveraging them though.

EDIT: Also, with channels, the thread that is delivering a value to the channel can also block, if the channel has no space in it currently. In that way, it will wait for some other thread to take something off the channel, thus creating space on it, and then it can proceed with delivering the value.

3 Likes

Something else to look at is Promesa: promesa.exec.csp documentation (funcool.github.io) – that’s a drop-in subset of core.async built on top of JDK 21’s virtual threads.

2 Likes