I used some time to make some code for parsing large csv files quickly.
the rules of the game are:
- Arbitrary file sizes, meaning putting the entire csv in memory is cheating.
- Should be at least as fast as python with libraries, I used pandas and got 7 lines of code, with 17-19 seconds timed for the test file.
- The test is to sum all the numbers in column number 15.
- support normal csv syntax
If a csv file can fit in memory, I guess it is always fastest to just read the entire thing and then do processing on it. Today, that is not what Iām trying to do.
Now I should mention that the python library is probably using C code underneath some layers, and has a very large base of developers and users, so you can call this cheating if you want, but I wanted my code to be able to stand up to realistic use of other tools.
First i tried with the clojure.data.csv
namespace, but it was quite slow. My tests are run with an extremely large file, so speed is the name of the game. The clojure.data.csv
documentation clearly states that it only focuses on correctly parsing csv, so I cannot really complain about the speed
Then I tried a huge rabbit hole with a different reader. Forget it, buggy, and not especially specially faster.
I realized at some time that reading and parsing did not have to happen in the same thread. So I used that as an excuse to learn some more about the mysterious core.async
namespace .
Then what follows is my cleaned up namespace
(read-0)
takes 54 seconds : ~ 30 MBps
python takes 18 seconds: ~ 88 MBps
(read-3)
takes 5-6 seconds : ~ 290 MBps
When I look back, a factor 10x is a pretty nice improvement, but I wish it was more. In an ideal world I would like it to be 50% of the theoretical maximum. Now it is just above 10% of the theoretical maximum.
My computer has a M2 ssd, so read speeds is most of the time cpu bound. lets say maximum 2 GBps read speed.
Now, the code is not well-structured, I think it could do with some refactoring. But I though I could put it out here, to see if anybody had some input on how to speed it up.
Iāve included all the steps, so you can see the story, in code.
(ns bigcsv.parse-async
(:require
[clojure.pprint :refer [pprint]]
[clojure.core.async
:as async
:refer [>! <!! chan close! go]]
[clojure.string :as str]
[clojure.java.io :as io]))
;; Big csv file found in the wild:
;; (~2GB) https://s3.amazonaws.com/carto-1000x/data/yellow_tripdata_2016-01.csv
;; task: take sum of all numbers in column 15
;; ~ 2-3 seconds, kinda slow, but not too bad. this kind of time is what I really
;; wanted for the entire processing
(defn count-lines []
(with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
(let [lines (line-seq rdr)]
(count lines))))
(comment
(time (count-lines)))
(comment
(require '[clojure.data.csv :as csv])
;; simplest, easiest
;; 54 seconds, slow
(defn read-0 []
(with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
(reduce + (->> (csv/read-csv rdr)
(drop 1) ; ignore headers
(map #(nth % 15))
(map #(Double/parseDouble %))))))
(comment
(time (read-0))))
;; ~ 16-20 secs same as python with pandas
(defn read-1 []
(with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
(let [lines (line-seq rdr)
headers (str/split (first lines) #",")
ix 15
txer (comp
(map #(str/split % #","))
(map #(Double/parseDouble (nth % ix)))
(filter (complement zero?)))]
(println headers)
(println ix)
(transduce txer +' (rest lines)))))
(comment
(time (read-1)))
;; I'm not happy with same as python speed.
;; moving on, something is definitely blocking, and capping our performance.
;; So we need async!
;; proof that making the processing async works, without much penalty in speed
;; ~ 17 secs but using much memory if channel capacities are huge...
(defn read-2 []
(with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
(let [linechan> (chan 1000)
pump-data (fn [r]
(let [lines (line-seq rdr)
headers (first lines)]
;; closes linechan> when coll is exhausted
(async/onto-chan!! linechan> (partition-all 1000 (rest lines)))
headers))
headers (pump-data rdr)
ix 15
txer (comp
cat
(map #(str/split % #","))
(map #(Double/parseDouble (nth % ix)))
(filter (complement zero?)))
numberchan> (chan 1000 txer)]
(async/pipe linechan> numberchan>)
(println headers)
(println ix)
(<!! (async/reduce +' 0 numberchan>)))))
(comment
(time (read-2)))
;; now that basic asynchronicity works, parallelize the action
(defn parselines-af [ix val result]
(let [mparse (comp
#(Double/parseDouble (nth % ix))
#(str/split % #","))]
(go
(let [m (reduce + (map mparse val))]
(>! result m)
(close! result)))))
;; ~5-6 seconds. possibility for huge memory footprint
(defn read-3 []
(with-open [rdr (io/reader "./data/yellow_tripdata_2016-01.csv")]
(let [linechan> (chan 1000)
pump-data (fn [r]
(let [lines (line-seq rdr)
headers (first lines)]
(async/onto-chan!! linechan> (partition-all 10000 (rest lines)))
headers))
headers (pump-data rdr)
ix 15
numberchan> (chan 5000)]
;; this line parallelizes the processing of the data. Amdahls law took over above 4-6, so I set it to 8
;; I just monitored it with the monitor function to see what happened with the buffer sizes to determine this
(async/pipeline-async 8 numberchan> (partial parselines-af 15) linechan>)
(<!! (async/reduce +' 0 numberchan>)))))
(comment
;; nice monitor function to see which channel buffers had items in them.
;; used for tuning the pipeline paralellism
(defn monitor [linechan> numberchan>]
(future (doseq [x (range 100)]
(Thread/sleep 50)
(pprint {:lines
(.count (.buf linechan>))
:numbers
(.count (.buf numberchan>))}))))
(time (read-3))
)