Recursive async pipeline

I’m trying to solve a problem with nested API that has nodes and children. Each node can have zero or more children, which is a vector of IDs. If you ever had to deal with Hackernews Firebase API, that’s exactly like I’m trying to describe.

Now, what would be the great way of fetching data for every node recursively and in parallel?

I decided to try that with pipeline-async. Pipeline takes two channels in and out and the async function af. My idea is to whenever the node has children, I put them all back into the in channel, essentially forcing it to fetch the data for each sub-node and if the sub-node has children, they’d also get pushed into the pipeline, and the cycle continues until we fetch the entire tree.

Let me show that using Hackernews API example:

(defn hn-retrieve-discussion-content
  [story-id]
  (let [in-ch (a/chan)
        out-ch (a/chan)
        api-base "https://hacker-news.firebaseio.com/v0/item/%s.json"
        fetch
        (fn [{:keys [story-id
                     parent-id
                     parents]}
             res-ch]
          (when story-id
            (let [{:keys [kids
                          id
                          text
                          type
                          by] :as _res}
                  (some->
                   (format api-base story-id)
                   (client/get {:as :auto})
                   :body)]

              (doseq [kid kids]
                (a/put! in-ch {:story-id kid
                               :parents (conj parents id)
                               :parent-id id}))
              (when text
                (a/put! res-ch {:parent-id parent-id
                                :story-id id
                                :text text
                                :type type
                                :by by
                                :parents parents}))))
          (a/go (a/close! res-ch)))]
    (a/put! in-ch {:story-id story-id :parents []})
    (a/pipeline-async 5 out-ch fetch in-ch)
    out-ch))

Now, I have to note that even though I’m using here clj-http.client/get (which is non-async), I think, I still have to use async-pipeline, because I don’t think it’s possible to push into the in channel with pipeline or pipeline-blocking

But the async pipeline works and if I want to print all the nodes that were fetched, I can do:

(let [ch (hn-retrieve-discussion-content "37452051")]
  (a/go-loop []
    (when-let [val (a/<! ch)]
      (println val)
      (recur))))

This is nice and all, but here’s the fundamental problem I’ve been scratching my head for. There no really good way to determine when “there’s no more data to fetch”. So I don’t know how to make a non-async handler for my own API call that returns entire discussion in one call.

So my question is: what is the idiomatic way of draining a channel coming out of async pipeline?

Or perhaps I am doing entirely wrong thing here?

It looks like the pattern people often use for the cases similar to this is to utilize
alts! with a timeout channel, e.g.,

(let [closer-ch (a/timeout 3000)
      data-ch (my-async-pipeline)]
  (a/go-loop [results []]
     (let [[val ch] (a/alts! [data-ch closer-ch])]
       (if (or (nil? val) (= ch closer-ch))
          results
          (recur (conj results val))))))

The pipeline must finish gathering all the data before the timeout. That may not be ideal, but it works for me. However, I still feel my entire approach to this problem has been wrong from the start.

I guess the base case “there is no more work to be done” e.g. “there are no more children pending”. If you keep track of the work in flight (say with an atom), then you can close the input channel when the base case is reached.

I tried to derive a little simulated example based on some of your idioms:

(ns recurse
  (:require [clojure.core.async :as a]))

(def db
  {:a {:id :a :kids [:b :c :d] :data "a"}
   :b {:id :b :kids [:h]   :data "b"}
   :c {:id :c :data "c"}
   :d {:id :d :kids [:e :f] :data "d"}
   :e {:id :e :kids [:g] :data "e"}
   :f {:id :f :data "f"}
   :g {:id :g :data "g"}
   :h {:id :h :data "h"}})

(defn fake-get [parent]
  (a/go
    (a/<! (a/timeout 16))
    (db parent)))

(defn retrieve-content>
  [init-id]
  (let [in-ch   (a/chan)
        out-ch  (a/chan)
        pending (atom #{})
        fetch
        (fn [{:keys [id parents parent-id]}
             res-ch]
          (a/go
            (when id
              (let [{:keys [kids data]} (a/<! (fake-get id))]
                (swap! pending #(-> % (disj id) (into kids)))
                (doseq [kid kids]
                      (a/put! in-ch {:id        kid
                                     :parents   (conj parents id)
                                     :parent-id id}))
                (when data
                  (a/put! res-ch {:id id
                                  :data data
                                  :kids kids
                                  :parent-id parent-id
                                  :parents parents}))))
            (a/close! res-ch)
            (when-not (seq @pending) (a/close! in-ch))))]
    (swap! pending conj init-id)
    (a/put! in-ch {:id init-id :parents []})
    (a/pipeline-async 5 out-ch fetch in-ch)
    out-ch))

(defn pull> [id]
  (let [ch (retrieve-content> id)]
    (a/go-loop []
      (if-let [val (a/<! ch)]
        (do (println val)
            (recur))
        :completed))))

(defn pull!! [id]
  (a/<!! (pull> id)))

yielding an expected walk of the parent/children:

recurse=> (pull!! :a)
{:id :a, :data a, :kids [:b :c :d], :parent-id nil, :parents []}
{:id :b, :data b, :kids [:h], :parent-id :a, :parents [:a]}
{:id :c, :data c, :kids nil, :parent-id :a, :parents [:a]}
{:id :d, :data d, :kids [:e :f], :parent-id :a, :parents [:a]}
{:id :e, :data e, :kids [:g], :parent-id :d, :parents [:a :d]}
{:id :f, :data f, :kids nil, :parent-id :d, :parents [:a :d]}
{:id :h, :data h, :kids nil, :parent-id :b, :parents [:a :b]}
{:id :g, :data g, :kids nil, :parent-id :e, :parents [:a :d :e]}

You can probably extend this scheme to include timeouts, so if the get in your original fails, could yield e.g. an empty map or something. The work node would still have been disjoined at this point, and close logic would kick in (so basically truncating chasing down child comments). Or you could implement retries and the like however you see fit…

[It took me a couple of tries to notice a subtle race condition in the initial hack on this too. The (apparent) fix was ensuring the atomic update to the atom contained both removal of current node and conjoining of the child nodes. Apparently some of the workers were getting ahead and closing the channel since those two actions were initially decoupled (I conj’d the child during the doseq, which was potentially too slow).]

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.