Is this a good way to stream data to caller of a function?

I am trying to send data in chunks to caller of my function because data is available to me in chunks. The body of my function will make it more clear. I need to fetch data from the same url periodically until I have seen all the data. As I fetch the data, I want to pass it on to the caller.

(defn foo []
  (let [marker        (make-array Boolean/TYPE num-files 0)
        results       (transient {:pages []})
        failure-count (atom 0)]
    (->> (range)
         (take-while (fn [_]
                       (and (not (seen-all? marker))
                            (= 0 @failure-count))))
         (map (fn [_]
                (f/if-let-ok? [pages (fetch url)]
                  (do
                    (mark-as-seen! pages marker results)
                    (Thread/sleep 500)
                    pages)
                  (do
                    (swap! failure-count inc)
                    pages))))
         (filter (comp not empty?)))))

Is this the best of doing it? If not, why and what would be a better way of doing it?

Note The caller can use it like this

(->> (foo)
     (run! (comp println process))))

Hum, no, this seems weird to me.

Starting with your use case actually. I’m not really sure what you’re trying to do. I can see two possibilities:

  1. Fetch data only as needed, lazily, maybe with a form of rate limiting in place
  2. Fetch data as early as possible, before it might even be needed, eagerly, maybe with some form of rate limiting in place.

The way you implemented it is closer to #1, but your description makes it sound like you want #2.

Can you clarify which one you’re trying to have?

So I went down a bit of a rabbit whole today regarding your question :stuck_out_tongue:

If you wanted #1, I think you could more cleanly implement it in terms of lazy-seq as such:

(defn fetch
  "A stub fetch that pretends like its getting random data from a url, but instead return a rand char from url."
  [url]
  (println "fetching " url)
  (rand-nth url))

(defn lazy-seq-fetch
  "Returns a lazy-seq which on every getting of the next element, will fetch the next element by calling
   page-fetching-fn. Will stop once it sees an element is returned by page-fetching-fn that has already been fetched before."
  ([page-fetching-fn] (lazy-seq-fetch page-fetching-fn #{}))
  ([page-fetching-fn pages-fetched]
    (lazy-seq
      (let [page (page-fetching-fn)]
        (if (pages-fetched page)
          nil
          (cons page (lazy-seq-fetch page-fetching-fn (conj pages-fetched page))))))))

(def data (lazy-seq-fetch #(fetch "http://www.google.com")))
(mapv upper-case data)
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;;=> ["G" "W" "." "H" "O" "T"]

Adding rate limiting inside the lazy-seq in my opinion is not a good idea, you’re better off having the caller do that. Like say:

(run! #(do (println %) (Thread/sleep 1000)) (lazy-seq-fetch #(fetch "http://www.google.com")))
;> fetching  http://www.google.com
;> t
;> fetching  http://www.google.com
;> .
;> fetching  http://www.google.com
;> g
;> fetching  http://www.google.com
;> /
;> fetching  http://www.google.com
;;=> nil

Though I think you could do something fancy, where you keep track of the time when you last fetched, and the next time you fetch, if it has not been X millisecond since last time, you could Thread/sleep for the remaining amount. Leave that up to you to add to the above code if you care.

The rabbit whole I was talking about, it happened on the Clojure slack today. At first, I thought of using iterate instead of lazy-seq. I’m just a bit more used to it, so I was going to reach for it first, then I saw its doc says it shouldn’t be used for side-effects. So I went in a rabbit whole as to why? Turns out iterate returns a special kind of seq, which is non-caching when reduced over, but caching when used as a seq otherwise. That means that if you reduce over it twice back to back, you make the calls to fetch the data all over again, and if your server is not idempotent, it would return possibly different results.

Now apparently in a future Clojure version, due to patch CLJ-2555, we might see a new function called iteration. It is more specifically designed to handle paging APIs, except your question is more challenging, because you want to fetch until you see duplicate data returned. Where as iteration is more designed for APIs that take a key and return a key to the next page you can use to call it back next time to get the rest. Now that function is intended for side-effects, unlike iterate. Though similar to iterate in some ways, iteration will call your API over and over if you keep accessing the same elements over and over. That said, if you call seq on it, it will return a stable caching sequence, and you can then safely consume from that seq over and over, even reduce over it or over its rest or next, and it will not call your API more than once. This is unlike iterate, where calling seq on does not return a stable sequence like that, as if you reduce over the rest of it, it will call your API again. Here’s an example of how you’d use it for your problem:

(defn lazy-iteration-fetch [fetching-fn]
  (->> (iteration
         (fn [pages-fetched]
           (let [page (fetching-fn)]
             (if (pages-fetched page)
               nil
               [page (conj pages-fetched page)])))
         :vf first
         :kf peek
         :initk #{})
       lazy-seq))

(def data (lazy-iteration-fetch #(fetch "http://www.google.com")))
(mapv upper-case data)
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;> fetching  http://www.google.com
;;=> ["G" "." "O" "W" ":" "H"]

You don’t need to call lazy-seq on its return like I did, but if you don’t, then if you access data over and over, it will call your API over and over, and if your API is not idempotent, possibly return different results. So it depends what behavior you want. The patch for it is here: https://clojure.atlassian.net/browse/CLJ-2555

Might be preferable to use an Atom to keep track of the pages-fetched instead of abusing k as I did. This will be true especially if you do have a key to call your API with actually, and if it returns any kind of actual next page identifier for you to make the next call with:

(defn lazy-iteration-fetch [fetching-fn url]
  (->> (let [pages-fetched (atom #{})]
         (iteration
           (fn [k]
             (let [[page next-k] (fetching-fn k)]
               (swap! pages-fetched conj page)
               (when-not (@pages-fetched page)
                 [page next-k])
         :vf first
         :kf peek
         :initk url)
       lazy-seq))

You can trick iterate to return a stable sequence as well, by encapsulating it under a call to map, but it is a bit of a hack, and to be fair, there might be some edge cases that I’m not thinking of:

(defn lazy-fetch-until-data-not-seen [fetching-fn]
  (->> (iterate
         (fn [[_ page-fetching-fn pages-fetched]]
           (let [page (page-fetching-fn)]
             (if (pages-fetched page)
               [:done nil nil]
               [page page-fetching-fn (conj pages-fetched page)])))
         [nil fetching-fn #{}])
       (map first)
       (drop 1)
       (take-while #(not= :done %))))

It also turns out to be the less readable implementation, so I now advise against it for both being more hacky (the doc-string even warns not to do this :stuck_out_tongue: ), and more complicated. That said, you can see how I call map on the return of iterate, so this would mean that even though iterate is unstable in its sequence, map will be stable, and map will never call the return of iterate for elements more than once, meaning we won’t call the API over and over either.

Having said all that, in your case, you might want to explore #2, if so, it would appear to be a good case for either a callback style function, like in a loop you fetch the data from URL until its done, and every iteration you call the passed in callback with the result to be handled. Or this would be a good use case for core.async as well, where you return a channel with the fetched data as they come in.

4 Likes

I’d use a channel to return the data to the caller, decoupling the two processes.
You’ll need to add some logic which knows when you got the last bit of information and close the channel after you. Tat way, when you take nil from the output channel you know you’re done.

I added in the following example a stop channel to allow early termination if you need to. Also adhered to the convention of the producer closing the queue.

(require '[clojure.core.async :as a])

(defn poll!
  []
  (rand 10))

(defn start-poll!
  ([stop]
   (start-poll! stop nil))
  ([stop buf-or-n]
   (let [out (a/chan buf-or-n)]
     (a/thread
       (loop []
         (if-not (a/poll! stop)
           (let [v (poll!)]
             (if (and v (a/>!! out v))
               (recur)
               (a/close! out))))))
     out)))

(def stop (a/chan))
(def out (start-poll! stop))
(a/poll! out)
(a/put! stop :stop)
(a/poll! out)

Yes, I am trying out the first approach that you mentioned:)

Adding rate limiting inside the lazy-seq in my opinion is not a good idea, you’re better off having the caller do that

The sleep of 500 ms is being added so that the servers are not bombarded. This function resides in the client library. The caller uses this function to fetch the data. What I am doing right now is a rewrite of the function. Before this all the data would be returned in one shot. Now, I am trying to make the data available to the caller as soon as it is available to me:)

I like the lazy-seq approach using cons. I think I will use that

You would also try this:

(defn foo []
  (->> (repeatedly fetch)
       ;; throttle the call
       (map #(do (Thread/sleep 200) %))
       ;; keep track of seen pages
       (reductions (fn [{:keys [seen] :as s} page]
                     (if page
                       ;; update seen page and return fetched page to reductions
                       (-> s
                           (assoc :page page)
                           (update :seen conj page))
                       ;; no more pages
                       (reduced s)))
                   {:seen #{}})
       ;; skip the seed
       rest
       ;;take pages until I've seen at least 5 pages (or whatever criteria here)
       (take-while (fn [{:keys [seen]}] (< (count seen) 6)))
       ;; returns only fetched page. i.e. removed seen from the output
       (map :page)))

(doseq [p (foo)]
  (println (java.util.Date.) p))

;=> #inst "2020-03-31T15:08:09.792-00:00" 19
;=> #inst "2020-03-31T15:08:09.995-00:00" 20
;=> #inst "2020-03-31T15:08:10.201-00:00" 21
;=> #inst "2020-03-31T15:08:10.402-00:00" 22
;=> #inst "2020-03-31T15:08:10.605-00:00" 23

;; for illustration, fetch just return a sequence
(let [p (atom 0)]
  (defn fetch []
    (swap! p inc)))

1 Like

So if you want to throttle inside the lazy-seq, I was thinking something like this:

(defn lazy-seq-fetch
  ([page-fetching-fn rate-ms] (lazy-seq-fetch page-fetching-fn rate-ms #{} (+ rate-ms (System/currentTimeMillis))))
  ([page-fetching-fn rate-ms pages-fetched next-time-to-fetch]
   (lazy-seq
    (let [current-time (System/currentTimeMillis)
          time-diff (- next-time-to-fetch current-time)
          ;; Sleep if it hasn't been rate-ms number of seconds since last fetch
          _ (when (pos? time-diff) (Thread/sleep time-diff))
          page (page-fetching-fn)]
      (if (pages-fetched page)
        nil
        (cons page
              (lazy-seq-fetch
               page-fetching-fn
               rate-ms
               (conj pages-fetched page)
               (+ rate-ms (System/currentTimeMillis)))))))))

That way, if the expected time between the caller reading the prior element and the next has gone by, they don’t pay the price to still have to wait.

For example, these two things will still only take 2 second:

(def data (lazy-seq-fetch #(fetch "abcdefghijklmnop") 1000))


(time
 (do
   (doall (take 1 data))
   (doall (take 2 data))
   (doall (take 3 data))))

(time
 (do
   (doall (take 1 data))
   (Thread/sleep 1000)
   (doall (take 2 data))
   (Thread/sleep 1000)
   (doall (take 3 data))))

Because if the caller calls for the second and third element when it hasn’t yet been 1000ms, the lazy-seq will sleep for whatever time is left for it to be 1000ms, but if the caller has waited 1000ms before it gets the next element, than the lazy-seq won’t sleep any longer.