I have an IO operation to be done. The client library I use provides 900
connections to the target (database) to perform send the IO commands. I have 1 million
IO operations to be performed in groups of 50,000
each. So basically there are 20
chunks of IO operation to be performed where each chunk contains 50,000
operations.
I need to process these 20
chunks concurrently.
Solution 1
(def monitor (atom 0))
(defn- bi-function ^BiFunction
[]
(reify BiFunction
(apply [_ _ _]
(swap! monitor dec))))
(defn update-line [line]
(while (>= @monitor 800))
(swap! monitor inc)
(-> <call a function that returns a completable future> ;; This is the part where the IO is done. It is not in our control
(.handle (bi-function))))
(defn update
([csv-file]
(trace/span
"update"
(with-open [reader (clojure.java.io/reader csv-file)]
(doseq [line (line-seq reader)]
(update-line line))))))
The function update is called with 20
csv files concurrently.
Solution 2 (Using clojure.core.async)
(def monitor (atom 0))
(defn- bi-function ^BiFunction
[]
(reify BiFunction
(apply [_ _ _]
(swap! monitor dec))))
(defn update-line [line]
(while (>= @monitor 800))
(swap! monitor inc)
(-> <call a function that returns a completable future> ;; This is the part where the IO is done. It is not in our control
(.handle (bi-function))))
(defn update
([csv-file channel]
(trace/span
"update"
(with-open [reader (clojure.java.io/reader csv-file)]
(doseq [line (line-seq reader)]
(async/>!! channel line))))))
;; The difference from solution 1 is that the caller of update will spawn a go block
(let [files [file1 file2 ... file20]
channel (async/chan 800)]
(async/go
(while true
(let [[line _] (async/alts! [channel])]
(update-line line))))
(doseq [file files]
(future (update file channel))))
With Solution 1
each file takes ~ 85 seconds
to be processed. Solution 2
does a bit better clocking at ~ 55 seconds
. Is there a way to better Solution 2
? Is this an efficient solution to begin with?