How to use transducers to get distinct, sorted values on multiple keys/columns

How can I write a transducer that generates multiple streams of data? I’m looking to get distinct sorted values on both “name” and “supply”:

[{:name "pete" :supply 1}
 {:name "pete" :supply 2}
 {:name "john" :supply 2}
 {:name "sara" :supply 1}
 {:name "jane" :supply 2}
 {:name "sara" :supply 2}]

That I want to turn into

[["jane" "john" "pete" "sara"] [1 2]]
1 Like

Use xforms library:

(def data [{:name "pete" :supply 1}
           {:name "pete" :supply 2}
           {:name "john" :supply 2}
           {:name "sara" :supply 1}
           {:name "jane" :supply 2}
           {:name "sara" :supply 2}])

(x/transjuxt [(comp (map :name) (distinct) (x/sort) (x/into []))
              (comp (map :supply) (distinct) (x/sort) (x/into []))]
             data)

;=> [["jane" "john" "pete" "sara"] [1 2]]
2 Likes

Lovely question!

One thought:

user> (def data [{:name "pete" :supply 1}
                 {:name "pete" :supply 2}
                 {:name "john" :supply 2}
                 {:name "sara" :supply 1}
                 {:name "jane" :supply 2}
                 {:name "sara" :supply 2}])
#'user/data
user> (-> (reduce (fn [eax item] 
                    (reduce-kv #(update %1 %2 conj %3) eax item))
                  {} data)
          (update-vals (comp vec sort set)))
{:name ["jane" "john" "pete" "sara"], :supply [1 2]}

But the thing is, you’re really thinking in columns…

user> (require '[tech.v3.dataset :as ds])
nil
user> (def ds (ds/->dataset data))
#'user/ds
user> ds
_unnamed [6 2]:

| :name | :supply |
|-------|--------:|
|  pete |       1 |
|  pete |       2 |
|  john |       2 |
|  sara |       1 |
|  jane |       2 |
|  sara |       2 |
user> (-> ds :name set sort vec)
["jane" "john" "pete" "sara"]
user> (-> ds :supply set sort vec)
[1 2]

hth

Thanks for the alternative solutions. I like that you noticed I was thinking in “columns”.

1 Like

Just for completeness’ sake, here’s a quick comparison between the approaches:

(ns tmp.bench
(:require [taoensso.tufte :as tufte]
          [net.cgrand.xforms :as xforms ]
          [tech.v3.dataset :as dataset]))

(tufte/add-basic-println-handler! {})

(def data (into [] (repeatedly 1000000 (fn[] {:name (str "some-name-" (rand-int 10000) :supply (rand-int 50000))}))))

(tufte/defnp via-xforms []
  (xforms/transjuxt [(comp (map :name) (distinct) (xforms/sort) (xforms/into []))
                     (comp (map :supply) (distinct) (xforms/sort) (xforms/into []))]
                    data))

(tufte/defnp via-reduce-kv []
  (-> (reduce (fn [eax item]
                    (reduce-kv #(update %1 %2 conj %3) eax item))
                  {} data)
          (update-vals (comp vec sort set))))

(tufte/defnp via-dataset []
  (let [ds  (dataset/->dataset data)]
    [(-> ds :name set sort vec)
     (-> ds :supply set sort vec)]))

(tufte/profile {}
               (dotimes [_ 5]
                 (via-xforms)))
;;pId                            nCalls        Min      50% ≤      90% ≤      95% ≤      99% ≤        Max       Mean   MAD      Clock  Total
;;
;;:tmp.bench/defn_via-xforms          5      2.12s      2.18s      2.31s      2.31s      2.31s      2.31s      2.20s   ±2%     10.98s   100%
;;
;;Accounted                                                                                                                    10.98s   100%
;;Clock                                                                                                                        10.98s   100%

(tufte/profile {}
               (dotimes [_ 5]
                 (via-reduce-kv)))

;;pId                               nCalls        Min      50% ≤      90% ≤      95% ≤      99% ≤        Max       Mean   MAD      Clock  Total
;;
;;:tmp.bench/defn_via-reduce-kv          5      2.27s      2.33s      2.58s      2.58s      2.58s      2.58s      2.37s   ±4%     11.89s   100%
;;
;;Accounted                                                                                                                       11.89s   100%
;;Clock                                                                                                                           11.89s   100%


(tufte/profile {}
               (dotimes [_ 5]
                 (via-dataset)))

;;pId                             nCalls        Min      50% ≤      90% ≤      95% ≤      99% ≤        Max       Mean   MAD      Clock  Total
;;
;;:tmp.bench/defn_via-dataset          5      2.73s      2.81s      2.98s      2.98s      2.98s      2.98s      2.83s   ±3%     14.21s   100%
;;
;;Accounted                                                                                                                     14.21s   100%
;;Clock                                                                                                                         14.21s   100%

3 Likes

That’s an interesting, if slightly synthetic, benchmark. :+1:

If you were in fact going to do something like that 5 times, you would build the dataset up once up front, and then operate on it till your hearts content.

It appears there’s some time to be saved there (~20%), though, maybe not as much as I might have guessed:

user> (def data (into [] (repeatedly 1000000 (fn[] {:name (str "some-name-" (rand-int 10000) :supply (rand-int 50000))}))))
#'user/data
user> (require '[tech.v3.dataset :as ds])
nil
user> (time (def ds (ds/->dataset data)))
"Elapsed time: 379.836637 msecs"
#'user/ds
user> (time (dotimes [_ 5]
              [(-> ds :name set sort vec)
               (-> ds :supply set sort vec)]))
"Elapsed time: 5172.057698 msecs"
nil
user> (time (dotimes [_ 5]
              (-> (reduce (fn [eax item]
                            (reduce-kv #(update %1 %2 conj %3) eax item))
                          {} data)
                  (update-vals (comp vec sort set)))))
"Elapsed time: 6505.626537 msecs"
nil

The 5 runs is so that the profiler provides meaningful results, I agree that in use, you would not recreate the dataset. The point was comparing the performance of the different approaches.

1 Like

There is a typo in the code you shared…

On the (def data ... line, the call to str includes the second keyword and so only makes one column (!) — oops!

With two columns, TMD gets happier (also note the idea I had this morning which was to use sorted-set, but that looks to be algorithmically sub optimal) :smile:

user> (def data (into [] (repeatedly 1000000 (fn[] {:name (str "some-name-" (rand-int 10000))
                                                    :supply (rand-int 50000)}))))
#'user/data
user> (time (dotimes [_ 5]
              (let [f #(conj (or %1 (sorted-set)) %2)]
                (reduce (fn [eax item]
                          (reduce-kv #(update %1 %2 f %3) eax item))
                        {} data))))
"Elapsed time: 4220.972512 msecs"
nil
user> (time (dotimes [_ 5]
              (-> (reduce (fn [eax item]
                            (reduce-kv #(update %1 %2 conj %3) eax item))
                          {} data)
                  (update-vals (comp vec sort set)))))
"Elapsed time: 2421.715578 msecs"
nil
user> (require '[tech.v3.dataset :as ds])
nil
user> (time (def ds (ds/->dataset data)))
"Elapsed time: 290.992562 msecs"
#'user/ds
user> ds
_unnamed [1000000 2]:

|          :name | :supply |
|----------------|--------:|
| some-name-5996 |    8668 |
|  some-name-746 |   39337 |
| some-name-4138 |   48823 |
| some-name-1710 |   20430 |
| some-name-3404 |   12937 |
| some-name-8858 |    7315 |
|  some-name-951 |   44280 |
|  some-name-369 |   38511 |
| some-name-7493 |   39483 |
| some-name-3068 |   44927 |
|            ... |     ... |
| some-name-2288 |     576 |
| some-name-3082 |   39212 |
| some-name-5731 |   41733 |
| some-name-7685 |   35613 |
| some-name-3814 |   31071 |
| some-name-5442 |   10386 |
| some-name-3213 |   28816 |
| some-name-9913 |   11027 |
| some-name-1423 |   24207 |
| some-name-1585 |   30184 |
| some-name-1956 |   48937 |
user> (time (dotimes [_ 5]
              [(-> ds :name set sort vec)
               (-> ds :supply set sort vec)]))
"Elapsed time: 822.110927 msecs"
nil

And to get even further :evergreen_tree: lost in the woods, DuckDB’s default BRIN indexes are a sharp tool in this area:

user> (require '[tmducken.duckdb :as duckdb])
nil
user> (require '[tech.v3.dataset :as ds])
nil
user> (def data (into [] (repeatedly 1000000 (fn[] {:name (str "some-name-" (rand-int 10000))
                                                    :supply (rand-int 50000)}))))
#'user/data
user> (def ds (ds/->dataset data))
#'user/ds
user> ds
_unnamed [1000000 2]:

|          :name | :supply |
|----------------|--------:|
| some-name-9294 |   20046 |
|  some-name-233 |   11033 |
| some-name-6980 |   41089 |
| some-name-9668 |   40815 |
| some-name-4544 |   13680 |
| some-name-5885 |     376 |
| some-name-3102 |   18762 |
| some-name-5828 |    3955 |
| some-name-5432 |   29035 |
| some-name-1999 |   49322 |
|            ... |     ... |
| some-name-2677 |   44303 |
| some-name-9606 |   17934 |
| some-name-4066 |     708 |
| some-name-2696 |   48942 |
|  some-name-617 |   10899 |
| some-name-1706 |   42183 |
| some-name-4871 |   30347 |
| some-name-4758 |   42849 |
| some-name-9453 |   43553 |
| some-name-9466 |   46838 |
| some-name-7953 |   15217 |
user> (def db* (delay (duckdb/initialize!)
                      (duckdb/open-db)))
#'user/db*
user> (def conn* (delay (duckdb/connect @db*)))
#'user/conn*
user> (duckdb/create-table! @conn* (ds/->dataset data))
Aug 23, 2023 9:42:54 AM clojure.tools.logging$eval7479$fn__7482 invoke
INFO: Attempting to load duckdb from "./binaries/libduckdb.so"
"_unnamed"
user> (time (duckdb/insert-dataset! @conn* ds))
"Elapsed time: 3284.406667 msecs"
nil
user> (duckdb/sql->dataset @conn* "select * from _unnamed")
Aug 23, 2023 9:43:32 AM clojure.tools.logging$eval7479$fn__7482 invoke
INFO: Reference thread starting
_unnamed [1000000 2]:

|           name | supply |
|----------------|-------:|
| some-name-9294 |  20046 |
|  some-name-233 |  11033 |
| some-name-6980 |  41089 |
| some-name-9668 |  40815 |
| some-name-4544 |  13680 |
| some-name-5885 |    376 |
| some-name-3102 |  18762 |
| some-name-5828 |   3955 |
| some-name-5432 |  29035 |
| some-name-1999 |  49322 |
|            ... |    ... |
| some-name-2677 |  44303 |
| some-name-9606 |  17934 |
| some-name-4066 |    708 |
| some-name-2696 |  48942 |
|  some-name-617 |  10899 |
| some-name-1706 |  42183 |
| some-name-4871 |  30347 |
| some-name-4758 |  42849 |
| some-name-9453 |  43553 |
| some-name-9466 |  46838 |
| some-name-7953 |  15217 |
user> (duckdb/sql->dataset @conn* "select distinct name from _unnamed order by name")
_unnamed [10000 1]:

|           name |
|----------------|
|    some-name-0 |
|    some-name-1 |
|   some-name-10 |
|  some-name-100 |
| some-name-1000 |
| some-name-1001 |
| some-name-1002 |
| some-name-1003 |
| some-name-1004 |
| some-name-1005 |
|            ... |
|  some-name-999 |
| some-name-9990 |
| some-name-9991 |
| some-name-9992 |
| some-name-9993 |
| some-name-9994 |
| some-name-9995 |
| some-name-9996 |
| some-name-9997 |
| some-name-9998 |
| some-name-9999 |
user> (duckdb/sql->dataset @conn* "select distinct supply from _unnamed order by supply")
_unnamed [50000 1]:

| supply |
|-------:|
|      0 |
|      1 |
|      2 |
|      3 |
|      4 |
|      5 |
|      6 |
|      7 |
|      8 |
|      9 |
|    ... |
|  49989 |
|  49990 |
|  49991 |
|  49992 |
|  49993 |
|  49994 |
|  49995 |
|  49996 |
|  49997 |
|  49998 |
|  49999 |
user> (time (dotimes [_ 5]
              [(duckdb/sql->dataset @conn* "select distinct name from _unnamed order by name")
               (duckdb/sql->dataset @conn* "select distinct supply from _unnamed order by supply")]))
"Elapsed time: 128.604367 msecs"
nil

So, that’s another 10x if anyone needs it.

Plus, once we’re this deep we now have a complete sql engine, so if the real work involves joins, or filters on out of core sets, or other things like to pay good money pay for, then we’re cooking. And with TMD outputs, we still get to write our downstream analysis in Clojure, which we believe to be objectively better than other non-functional routes people take.

:grin:

Good catch!

With the typo fixed (and on another laptop…), the values are

pId                            nCalls        Min      50% ≤      90% ≤      95% ≤      99% ≤        Max       Mean   MAD      Clock  Total

:tmp.bench/defn_via-xforms          5      604ms      648ms      762ms      762ms      762ms      762ms      658ms   ±7%      3.29s    99%

Accounted                                                                                                                     3.29s    99%
Clock                                                                                                                         3.31s   100%


pId                               nCalls        Min      50% ≤      90% ≤      95% ≤      99% ≤        Max       Mean   MAD      Clock  Total

:tmp.bench/defn_via-reduce-kv          5      734ms      825ms      903ms      903ms      903ms      903ms      831ms   ±6%      4.15s   100%

Accounted                                                                                                                        4.15s   100%
Clock                                                                                                                            4.15s   100%


pId                             nCalls        Min      50% ≤      90% ≤      95% ≤      99% ≤        Max       Mean   MAD      Clock  Total

:tmp.bench/defn_via-dataset          5      866ms      905ms      1.01s      1.01s      1.01s      1.01s      931ms   ±6%      4.66s   100%

Accounted                                                                                                                      4.66s   100%
Clock                                                                                                                          4.66s   100%

1 Like

The duckdb stuff is pretty cool!
I have never used it before, but in this context it almost feels like cheating :smiley:

1 Like

“You ain’t cheatin’, you ain’t tryin’” ha! /s

Of course, don’t always need it, OP was trying to transduce a dataset with n=6, so we landed quite far from that. But it’s good to know it’s there when we do. DuckDB is powerful stuff, and every time I bump into these BRIN indexes it feels like alien future tech.

Appreciate the impetus to poke around with this stuff, it’s where we’re living right now and really enjoying it, always love the opportunity to share that joy w/ others.

Of course, don’t always need it, OP was trying to transduce a dataset with n=6, so we landed quite far from that.

Indeed… I often find myself “migraitng” from simple threading macros (which I typically build step-wise checking intermediate results in the REPL) to transducers, if performance or input size warrant it. I suspect the OP’s needs did go beyond the 6 elements, in fairness :smiley:

1 Like

GitHub - johnmn3/injest: +>, +>>, x>>, =>>: Auto-transducifying, auto-parallelizing path thread macros blurs the line; might make migration easier.

2 Likes

If we do not recreate the dataset every time then it appears on my machine that Harold’s solution beats the xforms by nearly a factor of two or so - 822ms for the xforms version while Harold’s initial attempt with datasets is 458ms.

There are other options :slight_smile: - ham-fisted.

(ns distict-sort
  (:require [taoensso.tufte :as tufte]
            [net.cgrand.xforms :as xforms]
            [tech.v3.dataset :as dataset]
            [ham-fisted.lazy-noncaching :as lznc]
            [ham-fisted.api :as hamf]))

(tufte/add-basic-println-handler! {})

(def data (into [] (repeatedly 1000000 (fn[] {:name (str "some-name-" (rand-int 10000)) :supply (rand-int 50000)}))))
(def ds (dataset/->dataset data))

(tufte/defnp via-xforms []
  (xforms/transjuxt [(comp (map :name) (distinct) (xforms/sort) (xforms/into []))
                     (comp (map :supply) (distinct) (xforms/sort) (xforms/into []))]
                    data))

(tufte/defnp via-reduce-kv []
  (-> (reduce (fn [eax item]
                    (reduce-kv #(update %1 %2 conj %3) eax item))
                  {} data)
          (update-vals (comp vec sort set))))


(tufte/defnp via-dataset []
  [(-> ds :name set sort vec)
   (-> ds :supply set sort vec)])


(tufte/defnp via-hamf []
  [(->> data (lznc/map :name) hamf/java-hashset hamf/sort)
   (->> data (lznc/map :supply) hamf/java-hashset hamf/sort)])


(tufte/defnp via-hamf-ds []
  [(->> ds :name hamf/java-hashset hamf/sort)
   (->> ds :supply hamf/java-hashset hamf/sort)])



(comment
  (tufte/profile {}
                 (dotimes [_ 5]
                   (via-xforms)))

  (tufte/profile {}
                 (dotimes [_ 5]
                   (via-reduce-kv)))

;;2x xforms
  (tufte/profile {}
                 (dotimes [_ 5]
                   (via-dataset)))

 ;;also 2x xforms
  (tufte/profile {}
                 (dotimes [_ 5]
                   (via-hamf)))

 ;;fastest so far, ~3.5x xforms by my calculation
  (tufte/profile {}
                 (dotimes [_ 5]
                   (via-hamf-ds)))
  )

My point being simply that hamf has a fast sort that returns something you don’t need to call ‘vec’ on afterwards.

And a more abstract point is that the dataset columns are faster to reduce over regardless of your reduction system than is a vector of maps – if you want a column and not a map.

ham-fisted comes with dataset so if you are using tech.v3.dataset v7+ you also get ham-fisted.

2 Likes

Isn’t criterium considered the go-to for Clojure benchmarking? I thought tufte was more for profiling. (Sorry to go a bit off course it’s just been distracting me from reading about the core discussion :sweat_smile:)

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.