I have a database table with event data. Each event has a time and duration. For each discrete timepoint in the table, I want the number of events that were occurring.
For example (each map represents a row in the table)
[{:time 0 :duration 1}
{:time 0 :duration 2} ; time 0 => 2 events
{:time 1 :duration 2} ; time 1 => 2 events (second event at time 0 still occurring)
{:time 3 :duration 1}
{:time 3 :duration 2} ; time 3 => 2 events
{:time 4 :duration 2}] ; time 4 => 2 events (second event at time 3 still occurring)
I want to take this sequence of rows and turn it into a vector of key (time) value (#events) pairs, like this
[[0 2] [1 2] [3 2] [4 2]]
I want to use jdbc/reducible-query
to process the table row by row, so I made a reducer:
(defn duration-reducer
[acc row]
(let [time (:time row)
acc (if (empty? acc)
{:acc []
:time (:time row)
:carry []}
acc)]
(if (= time (:time acc))
(assoc acc :carry (conj (:carry acc) (:duration row)))
{:acc (conj (:acc acc) [(:time acc) (count (:carry acc))])
:carry (conj (->> (:carry acc)
(map #(- % (- time (:time acc))))
(filter pos?))
(:duration row))
:time time})))
This reducer needs a final cleanup for the last :time
, so invoking it looks like this
(let [x (reduce duration-reducer {} [{:time 0 :duration 1}
{:time 0 :duration 2} ; time 0 => 2 events
{:time 1 :duration 2} ; time 1 => 2 events
{:time 3 :duration 1}
{:time 3 :duration 2} ; time 3 => 2 events
{:time 4 :duration 2}])] ; time 4 => 2 events
(conj (:acc x)
[(:time x) (count (:carry x))]))
=> [[0 2] [1 2] [3 2] [4 2]]
However:
- I have 15 million rows in the table, so I don’t want to build up a large accumulation of pairs. Rather, I want to produce the key vector pairs as soon as they are available and save them to a file
- The last pair is not handled by the reduce but needs to be handled afterwards as seen in the last code snippet.
I think that a transducer is the answer. If I understand correctly, a transducer reducer can pass on results (i.e., a key value pair) as they become available and also have a completion stage that can handle the last case. However, I’ve looked at the transducer code of dedupe
and partition-by
and I don’t understand how to apply them to my reducer code above.
So, can I rewrite my duration-reducer
to be a transducer that accepts input from jdbc/reducible-query
and passes key vector pairs to a function that appends them to a file as they are produced?