I’m relatively new to Clojure, and I’m currently working on a project that involves processing a large dataset in real-time. The dataset consists of around 10 million records, and each record requires a series of transformations and calculations before it can be stored in a database. While I’ve managed to get my code working, I’m facing significant performance bottlenecks, and I’m looking for some advice on optimizing it to handle this volume more efficiently.
Here’s a quick overview of what my current approach looks like:
Reading Data: I’m using core.async to read chunks of data from a file asynchronously.
Processing: Each chunk is then processed using a series of functions that perform transformations like filtering, mapping, and reducing. I’m currently using transducers to chain these transformations, but I’m not sure if I’m doing it in the most efficient way.
Database Writing: After processing, the data is written to a PostgreSQL database using clojure.java.jdbc.
The main issue I’m encountering is that the processing stage is much slower than I anticipated, even with transducers. I also check this: https://clojureverse.org/t/london-clojurians-talk-basilisp-clojure-on-the-python-vm-by-chris-rinkruby But I have not found any solution. Could anyone guide me about this? I suspect it might have something to do with how I’m handling lazy sequences or perhaps the way I’ve set up my core.async channels. Also, the database writes are causing occasional timeouts, which further slows things down.
Here are a few specific questions I have:
Are there any best practices or common pitfalls to avoid when using core.async and transducers together in Clojure?
Would it be more efficient to use a different library or approach for handling such a large volume of data? For example, would using tools like Kafka or a different database connector make a difference?
How can I optimize the database writes to reduce the chances of timeouts? Should I be batching the writes in a different way?
I would say that the first step is to identify where the bottleneck is. If you think you have found that writing the data to the database is the bottleneck, then that’s already a good step forward.
If you are unsure, please have a look at the comment to this post:
Many of the same suggestions will apply here.
If you are sure that the bottleneck is writing the data to the database then there are a couple of things you could double check:
are you using a connection pool for your database connections?
Opening a connection per query could be rather expensive.
If you are not using a connection pool, try one of the following:
Do you batch write your data into Postgres?
Writing data can be done in large batches of thousands of records with a single statement. This is way more efficient than using individual INSERT INTO statements for each record. check PostgreSQL: Documentation: 9.4: INSERT
here is an example of how to bulk insert records
INSERT INTO films (code, title, did, date_prod, kind) VALUES
('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
('UA502', 'Bananas', 105, '1971-07-13', 'Comedy'),
('HG120', 'The Dinner Game', 140, '1961-06-16', 'Comedy');
Large transformations can be very slow even with transducers - have you explored tech.ml.dataset? This can be an order of magnitude faster at the upfront cost of creating the dataset
The tech.ml.dataset library can be orders of magnitude faster for data transformations. Here is a walkthrough tech.ml.dataset Walkthrough to get you started quickly.
It’s very hard to optimize code in the abstract. But I will try from the clues you have given.
You’re reading chunks using core.async? This sounds odd since I would normally read in the chunks sequentially. For that, I would not need core.async or any asynchrony. Could this be the bottleneck?
How many rows are you reading in a chunk?
Using transducers should be pretty efficient. I’m wondering how complicated the transformation steps are. Are they creating a large amount of garbage? Doing some computationally expensive calculations?
Are the transducers acting on a core.async channel? Or on one of the Clojure sequences?
Are you using batch writes?
When you’re using asynchrony (core.async) and laziness (transducers), it could obfuscate the bottleneck. For instance, it could appear that writing to the database is slow, but it’s only because nothing happens until you start constructing the SQL query and the backpressure finally causes the next chunk to be read from the file.
My suggestion is to remove asynchrony and concurrency and do everything in a non-lazy way first. Read in the chunk completely. Then transform the entire thing in memory (using vectors instead of lazy sequences to ensure it’s done eagerly). Then write it to postgres with a single batch query. Then move on to the next batch.
Also, I forgot to ask: How much of a speedup are you looking for from what you already have? 2x? 10x? 1000x?
Good luck!
Eric
PS Please ask follow ons if you have them. And keep us posted on your progress.
Have you had any luck with this? I can also recommend checking out Clojure’s data science stack. tech.ml.dataset and tablecloth in particular may be useful for you.