I’m trying to solve a problem with nested API that has nodes and children. Each node can have zero or more children, which is a vector of IDs. If you ever had to deal with Hackernews Firebase API, that’s exactly like I’m trying to describe.
Now, what would be the great way of fetching data for every node recursively and in parallel?
I decided to try that with pipeline-async. Pipeline takes two channels in
and out
and the async function af
. My idea is to whenever the node has children, I put them all back into the in
channel, essentially forcing it to fetch the data for each sub-node and if the sub-node has children, they’d also get pushed into the pipeline, and the cycle continues until we fetch the entire tree.
Let me show that using Hackernews API example:
(defn hn-retrieve-discussion-content
[story-id]
(let [in-ch (a/chan)
out-ch (a/chan)
api-base "https://hacker-news.firebaseio.com/v0/item/%s.json"
fetch
(fn [{:keys [story-id
parent-id
parents]}
res-ch]
(when story-id
(let [{:keys [kids
id
text
type
by] :as _res}
(some->
(format api-base story-id)
(client/get {:as :auto})
:body)]
(doseq [kid kids]
(a/put! in-ch {:story-id kid
:parents (conj parents id)
:parent-id id}))
(when text
(a/put! res-ch {:parent-id parent-id
:story-id id
:text text
:type type
:by by
:parents parents}))))
(a/go (a/close! res-ch)))]
(a/put! in-ch {:story-id story-id :parents []})
(a/pipeline-async 5 out-ch fetch in-ch)
out-ch))
Now, I have to note that even though I’m using here clj-http.client/get
(which is non-async), I think, I still have to use async-pipeline, because I don’t think it’s possible to push into the in
channel with pipeline or pipeline-blocking
But the async pipeline works and if I want to print all the nodes that were fetched, I can do:
(let [ch (hn-retrieve-discussion-content "37452051")]
(a/go-loop []
(when-let [val (a/<! ch)]
(println val)
(recur))))
This is nice and all, but here’s the fundamental problem I’ve been scratching my head for. There no really good way to determine when “there’s no more data to fetch”. So I don’t know how to make a non-async handler for my own API call that returns entire discussion in one call.
So my question is: what is the idiomatic way of draining a channel coming out of async pipeline?
Or perhaps I am doing entirely wrong thing here?