Turning a reducer into a transducer for pipelining

I have a database table with event data. Each event has a time and duration. For each discrete timepoint in the table, I want the number of events that were occurring.

For example (each map represents a row in the table)

[{:time 0 :duration 1}
 {:time 0 :duration 2}  ; time 0 => 2 events
 {:time 1 :duration 2}  ; time 1 => 2 events (second event at time 0 still occurring)
 {:time 3 :duration 1}
 {:time 3 :duration 2} ;  time 3 => 2 events
 {:time 4 :duration 2}] ; time 4 => 2 events (second event at time 3 still occurring)

I want to take this sequence of rows and turn it into a vector of key (time) value (#events) pairs, like this

[[0 2] [1 2] [3 2] [4 2]]

I want to use jdbc/reducible-query to process the table row by row, so I made a reducer:

(defn duration-reducer
 [acc row]
 (let [time (:time row)
    acc  (if (empty? acc)
           {:acc   []
            :time  (:time row)
            :carry []}
           acc)]
(if (= time (:time acc))
  (assoc acc :carry (conj (:carry acc) (:duration row)))
  {:acc   (conj (:acc acc) [(:time acc) (count (:carry acc))])
   :carry (conj (->> (:carry acc)
                     (map #(- % (- time (:time acc))))
                     (filter pos?))
                (:duration row))
   :time  time})))

This reducer needs a final cleanup for the last :time, so invoking it looks like this

(let [x (reduce duration-reducer {} [{:time 0 :duration 1}
                                          {:time 0 :duration 2} ; time 0 => 2 events
                                          {:time 1 :duration 2} ; time 1 => 2 events
                                          {:time 3 :duration 1}
                                          {:time 3 :duration 2} ; time 3 => 2 events
                                          {:time 4 :duration 2}])] ; time 4 => 2 events
       (conj (:acc x)
             [(:time x) (count (:carry x))]))

=> [[0 2] [1 2] [3 2] [4 2]]

However:

  • I have 15 million rows in the table, so I don’t want to build up a large accumulation of pairs. Rather, I want to produce the key vector pairs as soon as they are available and save them to a file
  • The last pair is not handled by the reduce but needs to be handled afterwards as seen in the last code snippet.

I think that a transducer is the answer. If I understand correctly, a transducer reducer can pass on results (i.e., a key value pair) as they become available and also have a completion stage that can handle the last case. However, I’ve looked at the transducer code of dedupe and partition-by and I don’t understand how to apply them to my reducer code above.

So, can I rewrite my duration-reducer to be a transducer that accepts input from jdbc/reducible-query and passes key vector pairs to a function that appends them to a file as they are produced?

1 Like
(defn xf'duration-reducer
  [rf]
  (fn
    ([] {})
    ([acc] (when-some [{:keys [acc time carry]} (not-empty acc)]
             (rf acc [time (count carry)])))
    ([acc {:keys [time duration]}]
     (let [acc (or (not-empty acc)
                   {:acc [] :time time :carry []})]
       (if (= time (:time acc))
         (update acc :carry rf duration)
         {:acc (rf (:acc acc) [(:time acc) (count (:carry acc))])
          :carry (rf (->> (:carry acc)
                          (map #(- % (- time (:time acc))))
                          (filter pos?))
                     duration)
          :time time})))))

(transduce xf'duration-reducer conj [{:time 0 :duration 1}
                                     {:time 0 :duration 2} ; time 0 => 2 events
                                     {:time 1 :duration 2} ; time 1 => 2 events
                                     {:time 3 :duration 1}
                                     {:time 3 :duration 2} ; time 3 => 2 events
                                     {:time 4 :duration 2}])

Thanks!

I changed the last call to rf into conj since it is not part of what the reducer produces but rather just keeps track of previous event durations.

(defn xf'duration-reducer
  [rf]
  (fn
    ([] {})
    ([acc] (when-some [{:keys [acc time carry]} (not-empty acc)]
             (rf acc [time (count carry)])))
    ([acc {:keys [time duration]}]
     (println (count (:acc acc)))  ; Shows build-up of vector.
     (let [acc (or (not-empty acc)
                   {:acc [] :time time :carry []})]
       (if (= time (:time acc))
         (update acc :carry rf duration)
         {:acc   (rf (:acc acc) [(:time acc) (count (:carry acc))])
          :carry (conj (->> (:carry acc)
                            (map #(- % (- time (:time acc))))
                            (filter pos?))
                       duration)
          :time  time})))))

(transduce xf'duration-reducer conj [{:time 0 :duration 1}
                                    {:time 0 :duration 2} ; time 0 => 2 events
                                    {:time 1 :duration 2} ; time 1 => 2 events
                                    {:time 3 :duration 1}
                                    {:time 3 :duration 2} ; time 3 => 2 events
                                    {:time 4 :duration 2}])

However, this seems to do exactly what my reducer did. It builds up an accumulated vector of key value pairs and returns it. The println shows the size of the growing vector.

What I want to achieve is a function that passes on the key value pair as soon as it is ready (i.e., the time value has changed to a new one) so that it can be saved to a file. Basically, the values from the SQL query should be piped through the reducer, which collects all events for a specific time, and passes it on to the file-saving function before handling the next row from the SQL query. So that there is no buildup of a huge accumulator (remember, I have 15 million rows of events).

I think that dedupe and partition-all both send along values as soon as they’re ready? According to https://clojure.org/reference/transducers, these functions need to keep an inner state to achieve this, so maybe that is needed?

Does this make any sense?

For me it sounds like core.async use-case. You put ready pair to channel then it is processed. You even have a transducer for this.

Are you going to make single query and then fetch 15 mio rows through single connection? Is it realistic?

Yes they are. As your transducer above. Every time rf is called the result “is sent”.
But maybe you can rewrite your code to not require buildup of a huge accumulator at all?

You can also rewrite your accumulator as inner state of your transducer.

Something like this:

(defn xf'duration-reducer
  [rf]
  (let [a-time (volatile! nil)
        a-carry (volatile! [])]
    (fn
      ([] [])
      ([result] (rf result [@a-time (count @a-carry)]))
      ([result {:keys [time duration]}]
       (let [acc-time (or @a-time (vreset! a-time time))]
         (if (= time acc-time)
           (do
             (vswap! a-carry conj duration)
             result)
           (let [x [acc-time (count @a-carry)]]
             (vreset! a-carry (conj (->> @a-carry
                                         (map #(- % (- time acc-time)))
                                         (filter pos?))
                                    duration))
             (vreset! a-time time)
             (rf result x))))))))

This was very helpful, thank you!

I’m beginning to wonder if a transducer is not what I want at all. Because, the end function is just a side-effect that writes its input to a file and returns nothing. I have managed to accomplish this using your function.

(defn xf'duration-reducer
 [rf]
 (let [a-time  (volatile! nil)
       a-carry (volatile! [])]
   (fn
     ([] [])
     ([result] (rf result [@a-time (count @a-carry)]))
     ([result {:keys [time duration]}]
      (println "Reducer called")
      (if (= time @a-time)
        (do
          (vswap! a-carry conj duration)
          result)
        (let [x (when @a-time
                  [@a-time (count @a-carry)])]
          (vreset! a-carry (conj (->> @a-carry
                                      (map #(- % (- time @a-time)))
                                      (filter pos?))
                                 duration))
          (vreset! a-time time)
          (if x
            (rf result x)
            result)))))))

(transduce xf'duration-reducer (fn [& [_ output]]
                                 (when output
                                   (println "WRITING TO FILE: " (second output))))
           [{:time 0 :duration 1}
            {:time 0 :duration 2}
            {:time 1 :duration 2}
            {:time 3 :duration 1}
            {:time 3 :duration 2}
            {:time 4 :duration 2}])

=>
Reducer called
Reducer called
Reducer called
WRITING TO FILE:  [0 2]
Reducer called
WRITING TO FILE:  [1 3]
Reducer called
Reducer called
WRITING TO FILE:  [3 2]
WRITING TO FILE:  [4 3]

It works! The file-writing takes place in between the calls to the reducer. However, the writing function seems quirky, it needs to ignore its first argument and is also called with no arguments by transduce to get the init value (I don’t know what it is used for).

However, if I would have had a few more intermediate steps of data processing, it seems that a transducer would be suitable, even though it all ends with a nil-returning side-effecting function.

For example

(transduce (comp xf'duration-reducer (map #(str "Time: " (first %) ". Events: " (second %))))
           (fn [& [_ output]]
             (when output
               (println "WRITING TO FILE: " output)))
           [{:time 0 :duration 1}
            {:time 0 :duration 2}
            {:time 1 :duration 2}
            {:time 3 :duration 1}
            {:time 3 :duration 2}
            {:time 4 :duration 2}])

To sum up my confusion: A transducer gets the work done by passing on values as they are ready and it’s easy to add additional transformations. But the last writing function seems quirky.

Transducers aren’t lazy, they are eager, but they perform what’s called loop fusion, so for example:

If you have [1 2 3]

And you want to say increment and filter odds.

You could first loop over all three elements and increment them, giving you [2 3 4]. And then loop over all of those and filter, giving you [2 4].

This would be eager, but it isn’t doing loop fusion, in that you go over all elements and apply the first transform, then go over all results of that and apply the second, etc.

With loop fusion, instead this happens:

You loop only once over the initial collection, and for each element you apply all transform one by one until you have the final result, and then move to the next element. So with my example it would be:

You first take 1 and increment it to 2 and then filter that, which doesn’t filter it, so now you’d collect it as your first result.

Now you’d take 2 and increment it to 3, and then filter that, which is odd, so you would skip collecting it.

Finally you’d take 3 and increment it to 4, and then filter that, which is even, so you’d collect it as well.

Now you’d return the final collected result: [1 2].

This is loop fusion, because the transformation handled by each loop is fused into one loop. It’s more performant because it avoids having to create intermediate collections holding intermediate results and it takes less memory similarly since you don’t need to keep intermediate result around for as long.

So yes, the result of each transducers is pushed to the next, element by element as they are done processing, but the result of the transducer chain isn’t, it waits for all elements to have been processed and collected, and then returns that final collection.

So in your case, if you want to write to file element by element, you’re logic for writing to a file has to be inside a transducer as well, so that it gets fused inside as the last step.

I’d say that breaks a bit the idiom, doing side effect like that in a transducer, but for your own use it’s probably fine.

But honestly, for what you’re doing, I’d just use a loop/recur.

1 Like

This is how it ended up looking:

(with-open [file (clojure.java.io/writer "filename.txt")]
  (transduce xf'duration-reducer
             (fn [& [_ output]]
               (when output
                 (.write file (pr-str output))))
             (jdbc/reducible-query db ["SELECT time, duration FROM table ORDER BY time"] {:raw true})))

It took 120 seconds to process 15 million rows like that. If I used conj as the rf, thus still used reducible-query but realizing the entire sequence before writing to a file, it took 343 secs, more than double.

Yes, it seems that I have over-complicated things. But a loop-recur wouldn’t help if I want to use reducible-query, right?

I feel like there is something that I am misunderstanding since the output function got so quirky. The process of io-in -> partially-reducing-transform -> io-out on a row-by-row basis does not seem like an unusual use-case? Do people use other techniques than transducers for this?

You aren’t. It’s just that transduce is intended for reducing things into something else, like a value, a collection, etc. In your case, you are only doing side-effect with it, so its interface seems quirky for that use case.

So normally, you can think of transduce as letting you pick the last transducer function to apply, which is of the form:

(transduce
 (partition-by identity)
 (fn
   ;; init - returns initial value for accumulator, called when no init is given to transduce
   ([] [])
   ;; completion - returns the final result, take the final accumulated value, called once there are no more elements to process
   ([acc] acc)
   ;; step - do whatever you want on each element, returns accumulated state and takes accumulated state from before and new element
   ([acc e] (conj acc e)))
 '()
 [1 1 1 2 2 3 3 4 4 5 6 7 7])
;; => ([7 7] [6] [5] [4 4] [3 3] [2 2] [1 1 1])

A few functions naturally behave like this in Clojure, like +, -, *, str, conj, etc. Basically, 0-ary is the initialization function, 1-ary is the identity function, and 2-ary is the accumulating function.

So someone might want to use it like this:

(transduce
 (comp
  (filter odd?)
  (map inc))
 +
 0
 [1 1 1 2 2 3 3 4 4 5 6 7 7])
;; => 36

And there’s a utility function called completing that lets you add the 1-ary for you, and if you provide an init to transduce you can skip the 0-ary as well, for example:

(transduce
 (partition-by identity)
 (completing cons)
 0
 [1 1 1 2 2 3 3 4 4 5 6 7 7])
;; => (((((((0 1 1 1) 2 2) 3 3) 4 4) 5) 6) 7 7)

Well, I’d say maybe it’s not common enough for Clojure to offer a function specifically tailored to it. There is a JIRA from 2014 suggesting to add one, you can up-vote it here: https://ask.clojure.org/index.php/1537/add-doseq-like-macro-for-transducers

If you look at the current patch though, it just hides the quirkiness of what you are doing under a macro. Not everyone seems to be sold on the syntax for it. You could similarly create your own convenience function like so which made the interface less quirky if you used it often. Personally, I like the comment suggested adding an arity to run! so it takes an xf as well.

That’s also why I said it might be cleaner for now to just use a loop/recur, especially since you are writing your own transducer, if you were reusing existing ones it might make more sense to do what you do. Now, I’m not sure if the object returned by java.jdbc support any kind of iteration though, you wouldn’t want to use it in a sequence since it’ll add intermediate sequence overhead. So if it only exposes itself as a reducible or a sequence, what you’re doing is the best you can do.

One last thing, you could try this:

(run!
  #(.write file (pr-str %))
  (sequence
    xf'duration-reducer
    [{:time 0 :duration 1}
     {:time 0 :duration 2}
     {:time 1 :duration 2}
     {:time 3 :duration 1}
     {:time 3 :duration 2}
     {:time 4 :duration 2}]))

There’s a chance it might be slower, but there’s also a chance it might be faster. Code looks cleaner, so I’d give it a try. In my quick experiment, it was actually a bit faster.

It is a bit confusing here, but this is basically going to, instead of writing after every element returned by the transducer, it will run the transducer 32 elements at a time, and then write all 32 results, then move to the next chunk of 32, etc. This batching can make it faster, but since it needs to cache 32 elements as well between the processing and the writing, there is a bit of overhead related to the cache, which is why it could be slower as well, and you need to time it. Though it should almost always be faster than lazy-seq, since lazy-seq adds a cache between every operation, this only does one at the end.

1 Like

Time flies by fast. I just want to thank you @didibus for your detailed explanation of how transducers work! Before I asked these questions, transducers seemed very mystical to me. Now they don’t anymore.

I agree that it would make sense of adding an arity to run! so that it can be used to execute a transducer. When I read your post, I realized that that was exactly the function that I was searching for when I was reading about transducers.

(BTW, how do you markup your code to get syntax highlighting?)

1 Like