Some architecture thoughts with async. Async is maybe a meta-library?

I was recently inspired by the conj talk from Christoph Neumann where he has a system where everything is live all the time, and has interconnections between them.

As I understand it he can start and stop individual components, and the rest of the system is fault tolerant and keeps running.

One of the requirements was “keeps running, no matter what”.

For me this made the concept of ‘architecture’ “click” a bit better. Because it’s not only a structure, but it really drives home that architecture is “the structure that fits the problem”. For me there has been a bit of a disconnect there because I’ve only ever seen architectures in the real world where Conway’s law is in full effect (basically shipping the Org. chart) or been a one man band.

I’ve been going to and from core.async multiple times over a few years. And I think this talk finally convinced me that it’s good. And the bad code I made before was a skill issue.

So I made a new rule based on my new understanding:

Don’t use async functions in your central logic. Ever.

I think this is the biggest antipattern I did, because it hinders good organization of code and async code spreads uncontrollably. I think I’ve read this other places as well.

So I tried to clone the basic idea in a new repo and got some basis(?) functions. Basically wrap some core.async functions into a dumber interface with some decisions taken away from you. Why not directly use the core.async library? because the API is too generic with too many options. I think I finally realized that it is a library that you build libraries on top of. A meta-library so to speak.

The current API of my library consists of:
mapper, aggregator, distributor, input-side-effect, output-side-effect, and, poll!

Which takes care of the following:

channels in and out:

  • 1:1 (mapper); basically a call to a/pipeline
  • N:1 (aggregator); wraps a/merge + a/pipeline for a transducer
  • 1:N (distributor); wraps a/mult + a/tap to set up N new channels

edges of the system (apis/functions/IO):

  • input-side-effect: function return values sent on a channel (using Thread/startVirtualThread for true async)
  • output-side-effect: channel values sent to a function using either direct function call to enable backpressure or a virtualThread for asynchronous output.
  • poll!: do something every n milliseconds. Required for using most APIs. go block with timeout and function call.

All go-loops check for closed channels, so there will be no abandoned threads/workers.

I first considered using threadpools but I realized it would be cool to dip a toe into project loom as well. I tested it by reading some data from a server.
Error handling I’ve implemented by just log and ‘catch everywhere’ and send it on the output channel as a value. This makes it possible to keep processing after encountering an error.

Demo time:

(require '[core.async :as a])
(defonce system (atom {}))

(defn close-system []
  (mapv a/close! (vals @system)))
;; read-data!, printer, counter, filewriter are all normal (non async) clojure functions
;; given the functions described in the post, we can have the entire system definition here:
(defn start-system []
  (close-system)

  (let [raw (a/chan) ;; channel for raw inputs
        [filewriter-c counter-c printer-c] (distributor raw 3) ;; distribute the raw inputs to three channels
       ]
  ;; put everthing on the system atom so we can reload
    (swap! system assoc :raw raw
           :fw-chan filewriter-c 
           :c-chan counter-c 
           :p-chan printer-c)
;; every 2000 ms, call read-data! and put it on the raw channel
    (poll! input-side-effect read-data! raw 2000) 
;; call these side effects with the data from these channels.
    (output-side-effect printer printer-c)
    (output-side-effect counter counter-c)
    (output-side-effect filewriter filewriter-c)))

I think it looks nice. Feels a bit like puzzle pieces.
I have not figured out how to live-insert a new component yet. I think you could do it with some careful partial restarts. But the general case is definetly not hot-swappable. I think it would be cool if it was.

Maybe not useful for the general public. But I think it helps me think about async processing of data, and actually helps me use the core.async library in a cleaner way.

Anybody did stuff like this before? probably, as it feels a bit like some libraries I’ve used before.
Thanks for reading.

5 Likes

Great post!

Your setup looks solid. I had a similar experience with core.async where my skill issues hindered my usage and results from it.

Nothing that can’t be resolved I think

3 Likes

I think the biggest issue I have seen is that most people don’t have a use-case for it. Without a use-case that needs core.async, it’ll be used in weird ways where it’s not obvious how you should use it, because you didn’t start with a problem that called for it in the first place.

I have a feeling it’s true of your library as well?

There’s not a whole lot of reason you might need core.async, but it also is a bit confusing because there’s three categories of problems it can solve:

  1. Compute/IO parallelization - like by using pipeline to parallelize some data transforms or some IO calls

You have some computation you want to use all cores over for speedup. Or you need to handle hundred of incoming or outgoing IO requests in parallel.

  1. Complex concurrent behavior - Model concurrent processes that run in the same process

Say you need to build a real-time chat application back-end and want to maximize how many clients can be handled per-process.

  1. Simpler async handling - Deal with async APIs in a cleaner way than with callbacks

You are using some non-blocking IO APIs, or other non-blocking APIs and want to deal with ordering what to do after/before they complete without drowning in callback hell.

If you don’t have any of those use-case, you don’t need core.async. But also if you do, you have to recognize not all of core.async will be needed, often you only need a little bit of it.

The purpose of the API I designed was to make a model for how a system that is represented in the talk could be. As I understood the talk it was doing parallel IO as well as some vaugely defined communication between workers. I chose to model this communication as core.async channels.

The reasons for needing core.async are that it should have the properties of doing parallel IO ops, as well as doing simpler async handling. Two of the points you mention.

I feel like that is sufficient reason to reach out to core.async.

The example I posted is maybe overly simple. But the thought was that maybe it would be nice pieces to have for a growing system. Where the parallel IO and async handling would become more and more important.
At this stage it is a bit of bikeshedding, I admit. But I appreciate your feedback.

Maybe there is an argument to be made for having dedicated threads plus queues between them? But in my head this easily becomes a low abstraction version of core.async again.

2 Likes