I’m currently writing some ETL code. Typically, I have consecutive daily files that are parsed, data is extracted and processed with a chain of transducers. But sometimes a file is missing. This information needs to be preserved and processed differently. My question is: how to best do that?
In more abstract terms: I could need some input for error handling in transducers: Since transducers are inherently sequential (they are just composed functions) how do you distinguish between valid values that should be further processed and errors?
Is there a better way than either aborting the whole computation or making each downstream transducer aware of success/failure values?
Maybe a “routing” transducer would be useful: Something that can dispatch values (like a multimethod) but with different downstream transducers per value type. Has anyone gone down this path and can share his/her opinion?
Sometimes I feel people overburden themselves trying to do everything with one abstraction.
You don’t have to do everything in one giant transducer pipeline. Transducers are good mostly because they let you:
Abstract away the concrete input and output stream. That means you can use the same data processing function over different type of inputs/outputs, such as sequences, channels, observables, streams, etc.
They avoid the need to create an initial sequence and a bunch of intermediate sequences, saving time and memory.
They perform loop fusion, thus applying the full transformation in a single eager pass.
So, my point is, don’t over complicate things by trying to write your entire logic within the confine of transducers. Just have functions before, after or in-between transducer chains that do whatever you need. Check for missing files for example, and handle that however you want.
Not sure how much relevant but https://github.com/Datomic/mbrainz-importer could be an interesting read. It uses transducers and core.async - there you could have an extra channel to send errors to.