Missionary - new release with streaming support, design notes

I model IO this way as much as I can, because the imperative way is ridiculously hard to get right.

Let’s take a very simple, practical and common example. You’re making a web service endpoint returning business data about an entity, and this information must be aggregated from different sources (it may come from multiple third-party services, or you may have multiple sources of truth in your system, whatever). You need to perform multiple request in parallel, wait for each and then merge the results.

Somewhere in your app there’s some DAO functions performing request to each datasource and returning data.

(defn fetch-from-datasource-a! [entity-id])
(defn fetch-from-datasource-b! [entity-id])

Then you can run those in parallel and aggregate results using e.g future :

(defn fetch-and-aggregate! [entity-id]
  (let [request-a (future (fetch-from-datasource-a! entity-id))
        request-b (future (fetch-from-datasource-b! entity-id))]
    (merge @request-a @request-b)))

It works fine at first glance and it’s likely to pass the entire QA process all the way to production, however its failure behavior is highly suboptimal. One day, datasource b will be down, making request-b fail. You have to propagate this error, maybe wrap it in something producing an HTTP 503 or something. You don’t need request-a anymore, so it must be cancelled. In the previous code, you don’t check for request-b failure until request-a is completed so you can waste resources waiting for something you won’t use.

A optimized version could be done like this :

(defn fetch-and-aggregate! [entity-id]
  (let [responses (java.util.concurrent.LinkedBlockingQueue.)
        request-a (future
                    (.put responses (try (fetch-from-datasource-a! entity-id)
                                         (catch Throwable e {:error e}))))
        request-b (future
                    (.put responses (try (fetch-from-datasource-b! entity-id)
                                         (catch Throwable e {:error e}))))]
    (try (loop [result {}
                count 0]
           (let [response (.take responses)]
             (if-some [e (:error response)]
               (throw e)
               (let [result (merge result response)
                     count  (inc count)]
                 (if (= count 2) result (recur result count))))))
         (finally
           (.cancel request-a)
           (.cancel request-b)))))

This new version does the right thing, but the code is unreadable, essential complexity has been swallowed by technical concerns with zero business value. You don’t want to do this each time, what you really want is to abstract away this error and cancellation handling into something you can apply to arbitrary actions. It requires to represent action as values, for example with thunks :

(defn fetch-from-datasource-a [entity-id]
  #(fetch-from-datasource-a! entity-id))

(defn fetch-from-datasource-b [entity-id]
  #(fetch-from-datasource-b! entity-id))

(defn parallel [thunk-a thunk-b]
  #(let [responses (java.util.concurrent.LinkedBlockingQueue.)
         request-a (future
                     (.put responses (try (thunk-a)
                                          (catch Throwable e {:error e}))))
         request-b (future
                     (.put responses (try (thunk-b)
                                          (catch Throwable e {:error e}))))]
     ;; same as before))

(defn fetch-and-aggregate [entity-id]
  (parallel (fetch-from-datasource-a entity-id)
            (fetch-from-datasource-b entity-id)))

And now we’re doing functional programming. You could have invented it yourself.

The problem with thunks is they rely on thread blocking, so it won’t work in clojurescript, so we need an asynchronous representation of actions, and that’s the purpose of the task spec. When you have a standardized protocol you can wrap all IO this way and leverage all operators that have been written for it.

And flow is really the same idea applied to multiple-value producers.

11 Likes