Missionary - new release with streaming support, design notes

I’ve just pushed a major release of missionary. Previous version was a simple functional effect system, and this new version adds support for streaming. It’s still experimental stuff and there’s a few minor breaking changes, but I plan to switch to a production maintenance process in the near future.

Processes as values

Functional programming gives structure to concurrent processes for free. Functional composition enforces a hierarchical topology, which means exceptions can naturally bubble up and cancellation can naturally bubble down. This is the core idea of missionary from day 1, it turned out to work very well for single-value producers, so multiple-value producers follow the same idea.

This is not a new idea. Rx is largely guided by this principle (cold observables) and the scala ecosystem is heading in this direction as well, for good reasons.

This is not a silver bullet either. This model somehow binds you to a hierarchical model, and shows its limits when you need to model graph topologies. I’ve still not found any practical way to model graph dataflow structures in a functional style. For instance, Rx fallbacks to imperative style (processors), and akka-streams has a functional graph API but it’s arguably complicated and limited to static topologies. I’m still experimenting on this topic.

The flow protocol

flow is the unified protocol modelling multiple-value producers in missionary.

Original FRP formulations (Elliott, Hudak) introduced the idea of continuous time, and that idea sounds right to me. Unfortunately it has been largely misunderstood, which led to a lot of confusion between library authors claiming to implement FRP ideas and original authors defending their model. As far as I know, none of currently popular streaming engines properly implement lazy sampling of continuous values.

The duality between discrete events and continuous values is very clear nonetheless. A continuous signal has a notion of current value, that’s a stateful identity, typically something you would represent with a reference type in clojure (git branches, immutable databases). A discrete stream has a value only when it happens, and this value is ephemeral, its purpose is to be aggregated into something more persistent.

So here’s my take on it : discrete events and continuous values can be unified under the same protocol. It really boils down to different transfer strategies : discrete events are backpressured, and continuous values are lazily sampled.

  • A discrete producer can’t discard events, so it must propagate backpressure when consumer is too slow.
  • A continuous producer can discard old values if the consumer is too slow, because the new value invalidates the old.
  • A discrete consumer pulls events eagerly, because they all matter anyways.
  • A continuous consumer pulls values lazily, because only the latest matters.

The flow protocol I came up with is actually quite simple and allows each of these strategies. Conceptually, a flow is like a java.lang.Iterable, except availability and termination are notified asynchronously. It’s a factory function taking two zero-argument callbacks, a notifier and a terminator, and returning an iterator that is callable (for cancellation) and derefable (for iteration). The producer calls the notifier when it’s ready to emit, then the consumer derefs the iterator when it’s ready to consume, transferring the value.

The major difference with protocols usually found in popular streaming systems is that a producer can inform the consumer of the availability of a value without transferring it. This property allows lazy sampling, without compromising backpressure propagation for discrete events.

Reactive Streams

Reactive Streams is basically a standardization of Rx’s internal protocol. It’s rather complicated, it doesn’t support lazy sampling, and it doesn’t support graceful termination (you can cancel a subscription, but the spec doesn’t allow post-cancel communication so you don’t have any way to know when allocated resources will actually be released). However, it exists, it’s now part of the JDK, and more and more libraries support it so it’s good to support it as well for interoperability.

Ambiguous evaluation

Previous version had sp, a coroutine-based macro for task definition. It turns out ap, the flow equivalent, relies on an near-forgotten idea I found in SICP, section 4.3. The design is slightly different, but there’s still this powerful idea of operators able to fork current evaluation, and run it an arbitrary number of times with different return values.

It turns out to be highly expressive, and I think it deserves more momentum.

13 Likes

Looks great, and I really appreciate the new tutorials - thanks!

1 Like

This looks very interesting - but when would you use this? which problem was this designed to solve?

I model IO this way as much as I can, because the imperative way is ridiculously hard to get right.

Let’s take a very simple, practical and common example. You’re making a web service endpoint returning business data about an entity, and this information must be aggregated from different sources (it may come from multiple third-party services, or you may have multiple sources of truth in your system, whatever). You need to perform multiple request in parallel, wait for each and then merge the results.

Somewhere in your app there’s some DAO functions performing request to each datasource and returning data.

(defn fetch-from-datasource-a! [entity-id])
(defn fetch-from-datasource-b! [entity-id])

Then you can run those in parallel and aggregate results using e.g future :

(defn fetch-and-aggregate! [entity-id]
  (let [request-a (future (fetch-from-datasource-a! entity-id))
        request-b (future (fetch-from-datasource-b! entity-id))]
    (merge @request-a @request-b)))

It works fine at first glance and it’s likely to pass the entire QA process all the way to production, however its failure behavior is highly suboptimal. One day, datasource b will be down, making request-b fail. You have to propagate this error, maybe wrap it in something producing an HTTP 503 or something. You don’t need request-a anymore, so it must be cancelled. In the previous code, you don’t check for request-b failure until request-a is completed so you can waste resources waiting for something you won’t use.

A optimized version could be done like this :

(defn fetch-and-aggregate! [entity-id]
  (let [responses (java.util.concurrent.LinkedBlockingQueue.)
        request-a (future
                    (.put responses (try (fetch-from-datasource-a! entity-id)
                                         (catch Throwable e {:error e}))))
        request-b (future
                    (.put responses (try (fetch-from-datasource-b! entity-id)
                                         (catch Throwable e {:error e}))))]
    (try (loop [result {}
                count 0]
           (let [response (.take responses)]
             (if-some [e (:error response)]
               (throw e)
               (let [result (merge result response)
                     count  (inc count)]
                 (if (= count 2) result (recur result count))))))
         (finally
           (.cancel request-a)
           (.cancel request-b)))))

This new version does the right thing, but the code is unreadable, essential complexity has been swallowed by technical concerns with zero business value. You don’t want to do this each time, what you really want is to abstract away this error and cancellation handling into something you can apply to arbitrary actions. It requires to represent action as values, for example with thunks :

(defn fetch-from-datasource-a [entity-id]
  #(fetch-from-datasource-a! entity-id))

(defn fetch-from-datasource-b [entity-id]
  #(fetch-from-datasource-b! entity-id))

(defn parallel [thunk-a thunk-b]
  #(let [responses (java.util.concurrent.LinkedBlockingQueue.)
         request-a (future
                     (.put responses (try (thunk-a)
                                          (catch Throwable e {:error e}))))
         request-b (future
                     (.put responses (try (thunk-b)
                                          (catch Throwable e {:error e}))))]
     ;; same as before))

(defn fetch-and-aggregate [entity-id]
  (parallel (fetch-from-datasource-a entity-id)
            (fetch-from-datasource-b entity-id)))

And now we’re doing functional programming. You could have invented it yourself.

The problem with thunks is they rely on thread blocking, so it won’t work in clojurescript, so we need an asynchronous representation of actions, and that’s the purpose of the task spec. When you have a standardized protocol you can wrap all IO this way and leverage all operators that have been written for it.

And flow is really the same idea applied to multiple-value producers.

7 Likes

That’s a great explanation and probably worth putting on missionary’s documentation site somewhere.

2 Likes

@leonoel Thanks a lot for your work! I haven’t used cloroutine or missionary yet, but spent much time reading your instructive tutorials and interesting implementation notes. It has a very SICPish feeling to it ))
Having finally understood the ap/fork concept in missionary and the task design, I’m now ready to try it as a replacement for core.async in a future project.

  • For one, the propagation of exception through tasks/flows seems a huge advantage, if I take it correctly missionary has a deterministic exception flow.
  • No wiring up channels if all I want is unbuffered and sometimes backpressure (I understand buffering is now optional)
  • Forking instead of writing channel dispatch loops

ap/fork looks truly fascinating, I’m very curious to see how it works out in larger projects, over channels.

Yes. As the name suggests, missionary makes it safe to miss IO.

As with core.async, backpressure propagation is the default behavior. If you want to disable it, you have to be explicit about it and specify what to do on overflow (see relieve operator).

Buffering is explicit as well, but keep in mind many operators have an internal buffer to store items while waiting for downstream to become ready, so in practice you’re almost never fully unbuffered. core.async has the same behavior, more details here.

2 Likes