Graph/branching/Routing transducers

Hi,

I’m currently writing some ETL code. Typically, I have consecutive daily files that are parsed, data is extracted and processed with a chain of transducers. But sometimes a file is missing. This information needs to be preserved and processed differently. My question is: how to best do that?

In more abstract terms: I could need some input for error handling in transducers: Since transducers are inherently sequential (they are just composed functions) how do you distinguish between valid values that should be further processed and errors?
Is there a better way than either aborting the whole computation or making each downstream transducer aware of success/failure values?
Maybe a “routing” transducer would be useful: Something that can dispatch values (like a multimethod) but with different downstream transducers per value type. Has anyone gone down this path and can share his/her opinion?

Thanks!

2 Likes

Sometimes I feel people overburden themselves trying to do everything with one abstraction.

You don’t have to do everything in one giant transducer pipeline. Transducers are good mostly because they let you:

  1. Abstract away the concrete input and output stream. That means you can use the same data processing function over different type of inputs/outputs, such as sequences, channels, observables, streams, etc.
  2. They avoid the need to create an initial sequence and a bunch of intermediate sequences, saving time and memory.
  3. They perform loop fusion, thus applying the full transformation in a single eager pass.

I’d recommend reading: https://clojure.org/guides/faq#transducers_vs_seqs as well, for more details.

So, my point is, don’t over complicate things by trying to write your entire logic within the confine of transducers. Just have functions before, after or in-between transducer chains that do whatever you need. Check for missing files for example, and handle that however you want.

1 Like

Not sure how much relevant but https://github.com/Datomic/mbrainz-importer could be an interesting read. It uses transducers and core.async - there you could have an extra channel to send errors to.