Faster csv reading/processing, how I got there with core.async

I used some time to make some code for parsing large csv files quickly.
the rules of the game are:

  • Arbitrary file sizes, meaning putting the entire csv in memory is cheating.
  • Should be at least as fast as python with libraries, I used pandas and got 7 lines of code, with 17-19 seconds timed for the test file.
  • The test is to sum all the numbers in column number 15.
  • support normal csv syntax

If a csv file can fit in memory, I guess it is always fastest to just read the entire thing and then do processing on it. Today, that is not what Iā€™m trying to do.

Now I should mention that the python library is probably using C code underneath some layers, and has a very large base of developers and users, so you can call this cheating if you want, but I wanted my code to be able to stand up to realistic use of other tools.

First i tried with the clojure.data.csv namespace, but it was quite slow. My tests are run with an extremely large file, so speed is the name of the game. The clojure.data.csv documentation clearly states that it only focuses on correctly parsing csv, so I cannot really complain about the speed :slight_smile:

Then I tried a huge rabbit hole with a different reader. Forget it, buggy, and not especially specially faster.

I realized at some time that reading and parsing did not have to happen in the same thread. So I used that as an excuse to learn some more about the mysterious core.async namespace :ghost: .

Then what follows is my cleaned up namespace
(read-0) takes 54 seconds : ~ 30 MBps :-1:
python takes 18 seconds: ~ 88 MBps
(read-3) takes 5-6 seconds : ~ 290 MBps :boom:

When I look back, a factor 10x is a pretty nice improvement, but I wish it was more. In an ideal world I would like it to be 50% of the theoretical maximum. Now it is just above 10% of the theoretical maximum.

My computer has a M2 ssd, so read speeds is most of the time cpu bound. lets say maximum 2 GBps read speed.

Now, the code is not well-structured, I think it could do with some refactoring. But I though I could put it out here, to see if anybody had some input on how to speed it up.

Iā€™ve included all the steps, so you can see the story, in code.

(ns bigcsv.parse-async
  (:require
   [clojure.pprint :refer [pprint]]
   [clojure.core.async
    :as async
    :refer [>! <!! chan close! go]]
   [clojure.string :as str]
   [clojure.java.io :as io]))

;; Big csv file found in the wild:
;; (~2GB) https://s3.amazonaws.com/carto-1000x/data/yellow_tripdata_2016-01.csv
;; task: take sum of all numbers in column 15

;; ~ 2-3 seconds, kinda slow, but not too bad. this kind of time is what I really
;; wanted for the entire processing
(defn count-lines []
  (with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
    (let [lines (line-seq rdr)]
      (count lines))))
(comment
  (time (count-lines)))

(comment
  (require '[clojure.data.csv :as csv])
  ;; simplest, easiest
  ;; 54 seconds, slow
  (defn read-0 []
    (with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
      (reduce + (->> (csv/read-csv rdr)
                     (drop 1) ; ignore headers
                     (map #(nth % 15))
                     (map #(Double/parseDouble %))))))
  (comment
    (time (read-0))))

;; ~ 16-20 secs same as python with pandas
(defn read-1 []
  (with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
    (let [lines (line-seq rdr)
          headers (str/split (first lines) #",")
          ix 15
          txer (comp
                (map #(str/split % #","))
                (map #(Double/parseDouble (nth % ix)))
                (filter (complement zero?)))]
      (println headers)
      (println ix)
      (transduce txer +' (rest lines)))))
(comment
  (time (read-1)))

;; I'm not happy with same as python speed. 
;; moving on, something is definitely blocking, and capping our performance. 
;; So we need async!

;; proof that making the processing async works, without much penalty in speed
;; ~ 17 secs but using much memory if channel capacities are huge...
(defn read-2 []
  (with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
    (let [linechan> (chan 1000)
          pump-data (fn [r]
                      (let [lines (line-seq rdr)
                            headers (first lines)]
                          ;; closes linechan> when coll is exhausted
                        (async/onto-chan!! linechan> (partition-all 1000 (rest lines))) 
                        headers))
          headers (pump-data rdr)
          ix 15
          txer (comp
                cat
                (map #(str/split % #","))
                (map #(Double/parseDouble (nth % ix)))
                (filter (complement zero?)))
          numberchan> (chan 1000 txer)]

      (async/pipe linechan> numberchan>)
      (println headers)
      (println ix)
      (<!! (async/reduce +' 0 numberchan>)))))
(comment
  (time (read-2)))

;; now that basic asynchronicity works, parallelize the action
(defn parselines-af [ix val result]
  (let [mparse (comp
                #(Double/parseDouble (nth % ix))
                #(str/split % #","))]
    (go
      (let [m (reduce + (map mparse val))]
        (>! result m)
        (close! result)))))
;; ~5-6 seconds. possibility for huge memory footprint
(defn read-3 []
  (with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
    (let [linechan> (chan 1000)
          pump-data (fn [r]
                      (let [lines (line-seq rdr)
                            headers (first lines)]
                        (async/onto-chan!! linechan> (partition-all 10000 (rest lines)))
                        headers))
          headers (pump-data rdr)
          ix 15
          numberchan> (chan 5000)]
      ;; this line parallelizes the processing of the data. Amdahls law took over above 4-6, so I set it to 8
      ;;  I just monitored it with the monitor function to see what happened with the buffer sizes to determine this
      (async/pipeline-async 8 numberchan> (partial parselines-af 15) linechan>) 
      (<!! (async/reduce +' 0 numberchan>)))))

(comment
;; nice monitor function to see which channel buffers had items in them.
;; used for tuning the pipeline paralellism
(defn monitor [linechan> numberchan>]
  (future (doseq [x (range 100)]
            (Thread/sleep 50)
            (pprint {:lines
                     (.count (.buf linechan>))
                     :numbers
                     (.count (.buf numberchan>))}))))
  (time (read-3))
  )
5 Likes

Interesting. Since you used pandas as a baseline, Iā€™m curious how tech.ml.dataset performs. Thereā€™s a dplyr-like wrapper around it called tablecloth as well. Itā€™s using univocity csv parser and really efficient columnar storage under the hood (with widening of types and string compression and some other tricks). Pandas (and data.table and dplyr) were comparative performance targets during design.

3 Likes

You should also try those two approaches to compare:

This is a good case to reach down deeper into java.util.concurrent since core.async really solves a different problem. What you want is some kind of batch processing which use you can very easily do with a small threadpool and just some basic code.

You could do it like this for example

(import '[java.util.concurrent Executors TimeUnit])
(import '[java.io BufferedReader])
(require '[clojure.java.io :as io])

(defn read-pool []
  ;; we have one thread doing the file reading
  ;; + thread-count processing batches of batch-size length
  (let [thread-count 8
        batch-size 5000

        pool (Executors/newFixedThreadPool thread-count)
        result-ref (atom 0)

        process-batch
        (fn [batch-idx batch]
          (let [nums-xf
                (comp
                  (map #(str/split % #","))
                  (map #(nth % 15))
                  (map #(Double/parseDouble %)))

                batch-result
                (transduce nums-xf + 0 batch)]

            ;; batch-idx would be useful if you need ordering of some kind
            ;; in this case we don't so we just add ourselves to final result

            (swap! result-ref + batch-result)
            ))]

    (with-open [^BufferedReader rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
      ;; drop headers line
      (.readLine rdr)

      (loop [batch-idx 0
             batch (transient [])]

        ;; when batch is full submit it to pool
        (if (== batch-size (count batch))
          (do (.submit pool ^Runnable #(process-batch batch-idx (persistent! batch)))
              (recur (inc batch-idx) (transient [])))
          ;; otherwise read next line and loop
          ;; terminates when readLine return null, ie. EOF
          (if-let [next (.readLine rdr)]
            (recur batch-idx (conj! batch next))
            ;; EOF, this thread is now otherwise idle, make it process the last batch directly
            (process-batch batch-idx (persistent! batch))
            ))))

    (.shutdown pool)
    ;; wait for all work to finish, how long could be user customizable
    (.awaitTermination pool 10 TimeUnit/MINUTES)

    ;; result-ref now contains final result

    @result-ref
    ))

(comment
  (time
    (read-pool)))

You could optimize this further but this basic construct I have used in the past and it workd well. Lets you easily control how many threads are running and so on. With batch-size you can also control how much memory this ultimately consumes the assumption should be that youā€™ll have (* batch-size thread-count) items in memory at any given time.

Also worth noting here is that tuning memory settings will have an effect on such a benchmark, as well as GC settings. All this should factor in when choosing batch-size + thread-count for maximum performance.

Further tuning could also be done by tweaking the str/split+nth logic and so on.

3 Likes

The results are in.
I copied/coded up some naive solutions with the suggested methods.

rated by speed:

  1. threadpool solution ~4 seconds, which varies with the thread count. :partying_face: Shared with new iota solution from @joinr,
  2. tech.ml.dataset 5.6-5.8 seconds with the new v5.13 with actual csv parsing, very few lines. (Edit)
  3. cookbook links: meh speed, 8 seconds after fiddling around with pmap
  4. iota single core action, super slow. :-1: (approx time: minutes)
  5. tech.ml.dataset : no go, tries to read entire file, and the process uses only one core. didnā€™t get tech.v3.datatype.mmap to work in openjdk11 and it is incompatible with java 16 :-1: No documentation for how to use tech.v3.datatype.mmap with the rest of the library, so I had to guess. tech.v3.dataset looks great for stuff in-memory though. (approx time: infinity) (Edit: just my wrong code and lacking knowledge)

@theller Your solution is both fast and uses little memory. I like it. I started learning Clojure without knowing java, so Iā€™m not very familiar with the java utils yet, so thanks for showing me this.
Minor issue: the final batch wonā€™t be computed unless it is exactly the batch size, so I changed the termination clause of the loop to include the final non-emtpy batch. (also found out that empty? does not work on transients)

(if-let [next (.readLine rdr)]
            (recur batch-idx (conj! batch next))
            (if (not= 0 (count batch))
              (.submit pool ^Runnable #(process-batch batch-idx (persistent! batch)))
              nil))

Edit: striked out wrong statements about tech.ml.dataset, updated time entry.
Edit: new fast solution using iota from joinr

2 Likes

Oh yes indeed. Good catch. Edit: updated example accordingly.

I didnā€™t test without batching since I made the assumption batching would be better based on previous experience but my previous problem was different so it would probably be worth testing.

tech.ml.dataset : no go, tries to read entire file, and the process uses only one core. didnā€™t get tech.v3.datatype.mmap to work in openjdk11 and it is incompatible with java 16 :-1: No documentation for how to use tech.v3.datatype.mmap with the rest of the library, so I had to guess. tech.v3.dataset looks great for stuff in-memory though. (approx time: infinity)

Actually, on my end, I initially read the dataset in about 6x pandas (reads the whole thing, requires a decent heap, I think 4 gb, but more memory makes jvm happier).

After consulting with @Chris_Nuernberger on zulip, he identified some additional paths in the library that get it down to ~2x of pandas (2.68x on my machine, about 65s) again to just naively read the whole thing in memory. There are some additional optimizations to be had, but at the moment, the bulk of time is spent parsing Doubles, which I suspect can be further mitigated by effectively using the same parser Pandas is using, since weā€™re using the general (and accurate, but likely slower) java.lang.Double in t.m.d. In the process I learned that it looks like pandas defaults to less precise double parsing but provides options for higher precision. Curious to see how using xstrtodo instead of java.lang.Double/parseDouble compares in performance.

Chris has additional tests answering the actual problem of just summing a column. I think the results will surprise :wink:

1 Like
;;prefix - (require '[tech.v3.datatype.functional :as dfn])
;;prefix - (require '[tech.v3.dataset :as ds])
user> (def ignored (time (-> (ds/->dataset "/home/chrisn/Downloads/yellow_tripdata_2016-01.csv" {:column-whitelist ["tip_amount"]})
                             (ds/column "tip_amount") 
                             (dfn/sum))))
"Elapsed time: 7904.559816 msecs"
#'user/ignored

Letā€™s try saving the file to parquet:

user> (require '[tech.v3.libs.parquet :as parquet])
nil
user>     (require '[tech.v3.dataset.utils :as ds-utils])
nil
user>     (ds-utils/set-slf4j-log-level :info)
:info

user> (parquet/ds->parquet full-ds "/home/chrisn/Downloads/yellow_tripdata_2016-01.parquet")
10:28:31.198 [nREPL-session-9a3d5486-77de-4893-b45c-5fb752dfb839] INFO org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.snappy]
:ok
user>  ;;file is 241MB
user> (def ignored (time (-> (ds/->dataset "/home/chrisn/Downloads/yellow_tripdata_2016-01.parquet" {:column-whitelist ["tip_amount"]})
                             (ds/column "tip_amount") 
                             (dfn/sum))))
10:29:49.257 [nREPL-session-9a3d5486-77de-4893-b45c-5fb752dfb839] INFO org.apache.hadoop.io.compress.CodecPool - Got brand-new decompressor [.snappy]
"Elapsed time: 1761.358758 msecs"
#'user/ignored

And finally letā€™s try Arrow:


user> (arrow/write-dataset-to-stream! full-ds "/home/chrisn/Downloads/yellow_tripdata_2016-01.arrow" {:strings-as-text? true})
10:31:02.376 [nREPL-session-9a3d5486-77de-4893-b45c-5fb752dfb839] INFO org.apache.arrow.memory.BaseAllocator - Debug mode disabled.
10:31:02.378 [nREPL-session-9a3d5486-77de-4893-b45c-5fb752dfb839] INFO org.apache.arrow.memory.DefaultAllocationManagerOption - allocation manager type not specified, using netty as the default type
10:31:02.380 [nREPL-session-9a3d5486-77de-4893-b45c-5fb752dfb839] INFO org.apache.arrow.memory.CheckAllocator - Using DefaultAllocationManager at memory-unsafe/2.0.0/arrow-memory-unsafe-2.0.0.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class
nil
user> ;;strings-as-text? avoids the use of string-tables which have to be loaded with the in-place pathway
user> (def ignored (time (-> (arrow/read-stream-dataset-inplace "/home/chrisn/Downloads/yellow_tripdata_2016-01.arrow")
                             (ds/column "tip_amount") 
                             (dfn/sum))))
10:33:04.135 [tech.resource.gc ref thread] INFO tech.v3.resource.gc - Reference thread starting
"Elapsed time: 123.862865 msecs"
#'user/ignored

The original code could be quietly incorrect in at least two ways. Commas can be escaped and thus the relationship between columns and commas isnā€™t 1:1. Second, the summation can grow to the point where new data fails to add to the double; dataset uses Kahanā€™s compensated algorithm to avoid this.

2 Likes

You can specify column whitelist/blacklists in the parse pathway to cut down on the columns you read and this makes the largest difference in the CSV pathway.

I guess there is an easy third way the original code could be incorrect; the columns can change order.

Regardless, this inspired some solid optimizations in tmd so the latest (5.13) has a decent perf gain due just to this post so I have to thank everyone for that.

Finally the tech.v3.datatype.mmap pathway does work on JDK-16 but you have to enable the memory model module. I usually enable the foreign module because I am always using the ffi pathway -

            :jdk-16 {:jvm-opts ["--add-modules" "jdk.incubator.foreign" "-Dforeign.restricted=permit"]}
2 Likes

Iā€™m aware of the loss of precision. I was thinking to use the accurate Clojure numeric type to get perfect accuracy of the numbers.

I did not think of people having quoted commas, I was really surprised, and horrified, by the idea that the comma in a csv file can be valid csv data. If I got that kind of csv data from someone I would throw a slipper at them :wink:

I believe it is prudent to show the pandas code I used:

import pandas as pd
import numpy as np

def read_sum():
    chunksize = 100 ** 6
    filename = r"yellow_tripdata_2016-01.csv"
    sum = 0
    with pd.read_csv(filename, chunksize=chunksize) as reader: # optional parameter , engine='c' , no difference
        for chunk in reader:
            sum += np.sum(chunk['tip_amount'])
    return sum

I also guess I have to eat my words. I used the wrong functions for parsing the data with the tmd library. Iā€™m editing the post above!

Iā€™m really impressed by the speeds of the arrow datasets. How would one convert without having the full dataset in memory ? (Iā€™m assuming by your use of the full-ds var that the full dataset is in memory somewhere)

(ns bigcsv.parse-tech-ml
  (:require
   [tech.v3.dataset :as ds]
   [tech.v3.datatype.functional :as dfn]))

(comment
;; ~8.6 seconds version 5.12 / 5.6-5.8 seconds version 5.13, low memory footprint
  (def result (time (-> (ds/->dataset "./data/yellow_tripdata_2016-01.csv" {:column-whitelist ["tip_amount"]})
                         (ds/column "tip_amount")
                         (dfn/sum))))
  )

Thanks a lot for reading what I wrote with such an open mind :-).

I was very surprised to see that pandas doesnā€™t default to high precision reading of doubles. Ouch. There are some sad scientists out there way over their head at this moment due to issues like this.

Personally I was impressed by the parquet file size. The original CSV was 1.7GB and the parquet file was 241MB which means if your job is to download data from s3 and manipulate it parquet gets you a decent win out of the gate.

One arrow file can save multiple in-memory sized datasets in what are called records. Or you can save multiple arrow files and in-place load all of them; the OS will allow you to mmap far more than process memory and itā€™s the OSā€™s problem to make it all work. In that case the data is loaded on demand from disk which is ideal as you donā€™t have to specify a whitelist or anything; it just works.

We have tools for splitting up CSV rows before parsing them so you can easily have a one or more extremely large csvs and save them all into one or more arrow files.

Then we have a namespace specially designed for these sequence-of-datasets-style large datasets with apache data sketch bindings.

One great point you raised was if Clojureā€™s default numeric tower will correctly handle streams of doubles by upcasting to a bignum of you are trying to add a really large number to a really small number. I honestly assumed it would fail but I donā€™t actually know. I do know that the implementation of Kananā€™s compensation will be much faster than upcasting to a bignum and then working in that space. tech.ml.dataset will use Clojureā€™s numeric tower for most operations (not sum) if your column datatype is :object so you can have a column of bignums but the rest of the system such as io isnā€™t built for bignums.

Hum, kind of surprised by this, it is supposed to be parallel, are you sure you used it correctly?

Iā€™ve never tried it, but the link mention it being parallel. My understanding is it would read the file in parallel, which maybe actually slows down IO to be fair :man_shrugging:, and then use Clojure reducers to do a parallel map/reduce over it, which uses Javaā€™s fork/join pool under the hood.

Maybe I did something wrong, I canā€™t deny that.

The documentation for iota says that its optimized for use with the reducers namespace, so my attempt with iota looks basically the same as the example flow on the iota github page:

(require '[clojure.core.reducers :as r])
;; slow, single core
(defn process-iota3 [filename]
  (->> (iota/seq filename)
       (drop 1)
       (r/map #(str/split % #","))
       (r/map #(nth % 15))
       (r/map #(Double/parseDouble %))
       (r/fold +'))) ;; I expected this to enable parallel processing....

I think (drop 1) returns a sequence again, try using r/drop instead maybe ?

with r/drop: 10 minutes

Wowza!

Thatā€™s really strange, the doc really implies it should batch 512 at a time and parallelize the reduction of the batches. Very strange :man_shrugging:

@didibus

Just counting lines of the file with iota and fold:

(time (->> (iota/seq path) (r/map (fn [_] 1)) (r/fold +)))
"Elapsed time: 7435.786999 msecs"
10906859

When I add in r/drop, it bogs way down (I stopped timing after a minute). Iā€™m pretty sure the result from r/drop and interaction with the iota seq is being turned into something thatā€™s not partitioned and fold is falling back into reduce. The answer - which is fairly poorly documented but derived from the source - is that r/drop does not return a reified object that implements CollFold, per the source it uses the helper function reducer, as opposed to the helper folder used by other functions (like r/map) which provides both a coll-reduce and coll-fold implementation. So using drop in the pipeline eliminates parallelization and returns to reduce.

We can retain a foldable thing that accomplishes drop at a cost in performance by using iota/vec and subvec:

dftest.core> (time (->>  (-> (iota/vec path)  (iota/subvec 1)) (r/map (fn [_] 1)) (r/fold +)))
"Elapsed time: 8233.9381 msecs"
10906858
(defn process-iota4 [filename]
  (->> (-> (iota/vec filename)
           (iota/subvec 1))
       (r/map #(clojure.string/split % #","))
       (r/map #(nth % 15))
       (r/map #(Double/parseDouble %))
       (r/fold +')))

(time (process-iota4 path))
"Elapsed time: 14113.735599 msecs"

If we mess with parsing we can get the equivalent of drop for ā€œjustā€ this task. We can also use much faster split operations than clojure.string/split (still end up splitting the whole line though), and try to leverage primitive math to help.

(require '[spork.util.string :as s])
(set! *unchecked-math* :warn-on-boxed)
(defn process-iota6 [filename & {:keys [n] :or {n 512}}]
  (let [splitter (s/->array-splitter ",")
        maybe-double (fn ^double [^String x]
                       (try (Double/parseDouble x)
                            (catch Exception e 0.0)))
        dsum (fn (^double [] 0.0)
                 (^double [^double x ^double y] (+ x y)))]
    (->> (iota/seq filename)
         (r/map #(maybe-double (aget ^objects (splitter %) 15)))
         (r/fold n dsum dsum ))))

(time (process-iota6 path))
"Elapsed time: 6036.917 msecs"
1.9094234009999998E7

For reference, it takes about 4.9 seconds just to traverse the line-seq on my machine.
read-pool is about 6.8s. read-3 is about 8.3. I had a core.async variant the came in around 7.9, using an unordered producer/consumer queue.

I modified the code a little, just to have less differences to the previous benchmarks, possibly reducing perfomance a bit thoughā€¦
for comparison: I got 4.6-4.8 seconds on this,

(defn process-iota7 [filename & {:keys [n] :or {n 512}}]
  (let [splitter #(str/split % #",")
        maybe-double (fn ^double [^String x]
                       (try (Double/parseDouble x)
                            (catch Exception e 0.0)))]
    (->> (iota/seq filename)
         (r/map #(maybe-double (get (splitter %) 15)))
         (r/fold n +' +'))))

I donā€™t know if this is obvious, but I wanted to mention it. Especially if anyone is late to the party

Using custom code turns out to be slightly faster than using an established library (tech.ml.dataset). Now with the relatively small performance gain follows the significant risks of doing the parsing wrongly, (what if a new dataset had differently ordered columns), and the manipulation of the data can be more inaccurate than it needs to be (just +ā€™ vs the compensated algorithm mentioned by @Chris_nurenberger).

What I was looking for did already exists, I just did not know about it.

In number of lines, the library version wins, in simplicity, the library version wins, and in readability and maintainability, the library version wins.

The custom code weā€™ve made is like an F1 car, speedy, but fragile. As it turns out, the library case is capable of doing 24h Le Mans, going just a tiny bit slower than the custom (fragile) creations weā€™ve made. In the ā€˜drag-raceā€™ I set up, the F1 was always going to win.

So I encourage everyone to use the tech.ml.dataset library case. And Iā€™m looking forward to seeing it (and tablecloth) pop up more often on clojureverse.

2 Likes

What I was looking for did already exists, I just did not know about it.

Thatā€™s part of the reason your question interested me. This exercise has come up before (hence the various answers with core.async, iota, thread executors, etc.). What hasnā€™t really existed - until recently within about the last 2 years - and really in a mature form in the last 1.5 - is a solid library that works out of the box on this kind of problem (and others) like other ecosystems have.

I think getting the word out w.r.t. the stuff that SciCloj is pushing out (with a lot of involvement with Chrisā€™s tech stack, of which tech.ml.dataset is central) is important so the perception emerges that ā€œthere isā€ an efficient off-the-shelf solution (at least comparable to other ecosystems like pandas) for much of this stuff (where for many years there wasnā€™t, or the solutions offered didnā€™t scale on stock desktop/laptop hardware).

I hope that these threads around processing tasks establish a foothold in the public perception about some of the newer libraries and techniques, and also provide impetus for additional refinement in the libraries themselves (be it documentation, PR, performance, or more users). Very useful discussion.

1 Like