Looking to improve this code

I’ve got this function in a project that ended up being useful a couple times, but I don’t like its name and I wonder whether there’s something out there that already does this. Any suggestions for a better name, a better implementation, or something in core or a lib to replace it?

(defn streamline
  "Returns `f`, wrapped such that calling multiple times with the same arguments before any single call completes (eg from different threads) will result in only one actual call to `f` being made.
  Useful for functions that run for a very long time.
  For instance, if you have a function that takes 30 seconds to run, you could wrap that fn with `streamline` and multiple calls to that wrapped fn will return at the same time with the same value."
  (let [calls* (atom {})
        o (Object.)]
    (fn [& args]
      @(locking o
         (if-let [fut (get @calls* args)]
           (let [fut (future (let [[val error] (try [(apply f args)]
                                                    (catch Throwable e [e :error]))]
                               (locking o
                                 (swap! calls* dissoc args)
                                 (if error (throw val) val))))]
             (swap! calls* assoc args fut)

What about memoize?

Thanks for the suggestion, but memoize doesn’t update its cache until after the wrapped function returns, so it doesn’t quite work for my use case. My use case, by the way, is to wrap really long running database queries; if two users request the same query at roughly the same time I don’t want to double-process that query unnecessarily. And if they run that same query again 20 minutes later I want them to get the latest results, so something like core.memoize would require me to manually clear the cache. streamline forgets function results as soon as it returns for that reason.

Take a look at


(clojure.core.memoize/ttl f :ttl/threshold ttl-in-milliseconds)

Similar to memoize, but it uses delays to provide the behavior you need.

I can offer something like this (without error handling):

(defn streamline
  (let [state (atom nil)]
    (fn [& args]
      (deref (or (deref state)
                 (reset! state (future (let [result (apply f args)]
                                         (reset! state nil)

Also looks like function returned in this implementation is 1000× faster than original.

Thanks for the suggestion, Serioga. I’m not understanding how this implementation prevents race conditions. It seems like two threads could hit the inner deref at roughly the same time, both get nil, and then both reset! the atom. That type of issue is why I used locking, but perhaps I’m overcomplicating things?

As the maintainer of the clojure.core.memoize Contrib library, I’m curious about your use case here (since the TTL-cached memoization was suggested).

You say that if two identical requests are made for what will be a long-running query, you don’t want them both to run – if I’m understanding you – you want the first to run, and the second should block until the first completes, and then both requests would return that result, and subsequent requests would continue to return that value for a while, and then an actual query would run again. Rinse and repeat.

Is that correct?

You can add locking around or to be sure.

@seancorfield My original intention with this code wasn’t to memoize at all, but just to prevent redundant queries from happening at the same time. An internal UI at work allows admins to view a timeline of webpages loaded and other events for some visitor, but those queries are a bit slow - 30 seconds up to a couple minutes. The admin can leave the page and come back and ask for that data again, and if the request is ongoing there’s no reason to start over. I don’t currently have any additional memoization, so if the admin has reason to believe there’s fresher data after they look at that timeline they can try again and maybe pick up a recent event.

@Serioga That makes sense and remains cleaner than my implementation, thank you.

1 Like

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.