How to avoid SQLite callbacks with Core.async?

Hi there!

I would really like to better understand Core Async with Clojure; specifically, I want to remove my self from callback hell to see if I can make my clojurescript backend for a new project to be a bit clearer.

In my current project trunk, I’m building a “full stack” clojurescript application with electron, using SQLite as the database. Sqlite on node is built with callbacks in mind, and so some of my code requiring multiple database calls is starting to look pretty hairy:

(defn article-get
  "Fetches an article, and computes the `:word-data` for it. Sets `last_opened` value before fetching."
  [id cb]
  (let [q1-sql    "UPDATE articles SET last_opened = ? WHERE article_id = ?"
        q1-params (array (js/Date.now) id)
        q2-sql    "SELECT * FROM articles WHERE article_id = ?"
        q2-params (array id)]
    (.run db q1-sql q1-params
          (fn [err]
            (.get db q2-sql q2-params (fn [err row]
                                    (words-get-for-article  (js->clj row :keywordize-keys true) cb)))))))

The above function is making two database calls: the first is to update the article Trunk is about to fetch, and update it’s “last opened” value to the current time. Then I can go ahead and select the value back in the second command. In fact, there is also a third db call (words-get-for-article) but I’ll leave that out for the sake of brevity.

So, we have callbacks (and errors that I’m not sure how to handle yet). I’d really love to see how core.async could (if it can) be used to clean this up?

So far, I’ve found the chapter on core.async in Clojure for the Brave and True to be helpful but I haven’t gotten it adapted to work yet for this solution.

The example it provides looks like this:

  (defn upper-caser
   [in]
   (let [out (chan)]
     (go (while true (>! out (clojure.string/upper-case (<! in)))))
     out))
 
 (defn reverser
   [in]
   (let [out (chan)]
     (go (while true (>! out (clojure.string/reverse (<! in)))))
     out))
 
 (defn printer
   [in]
   (go (while true (println (<! in)))))
 
 (def in-chan (chan))
 (def upper-caser-out (upper-caser in-chan))
 (def reverser-out (reverser upper-caser-out))
 (printer reverser-out)
 
 (>!! in-chan "redrum")
 ; => MURDER
 
 (>!! in-chan "repaid")
 ; => DIAPER

Here, there are several defined global variables - which I won’t want to do. So, I’ll end up perhaps making some functions that perform the single db calls, but instead of returning the value, will return the channel they are happening on, and then I’ll string them all up in a single function?

(defn <article-update-last-opened
  [in]
  (let [out (chan)
        sql "UPDATE articles SET last_opened = ? WHERE article_id = ?"]
    (println "about to update last opened")
    (go
      (let [id     (<! in) ;; this could be passed in?
            params (array (js/Date.now) id)]
        (.run db sql params (fn [err] (go (>! out id))))
        out))))

(defn <article-get-by-id
  [in]
  (let [out (chan)
        sql "SELECT * FROM articles WHERE article_id = ?"]
    (println "about to select article")
    (go
      (let [id     (<! in) ;; <1> there's nothing to take off the channel here.
            params (array )]
        (.get db sql params (fn [err row]
                              (prn "value is " err row)
                              (go (>! out row))))
        out))))

(defn article-get-new
  [id]
  (let [db-chan (chan)]
    (go (>! db-chan id))
    (-> db-chan
        <article-update-last-opened
        <article-get-by-id
        )))

I’m getting bit stuck here. The first function in article-get-new runs (<article-update-last-opened) but I think due to the callback in these functions being the place where the out channel is getting a value placed on it – by the time we move onto the next threaded function, it’s too late (or rather… too soon?) to get a value out of the channel as the callback hasn’t executed yet.

I might be missing something simple here. What I’d love is for every database function to exist on it’s own, and then to have them be able to be composed into larger functions when necessary. Any advice / input would be appreciated.

you might be interested in promesa which turns callback hell into promise chaining. It’s also quite nice. I need to reread your use case to extrapolate to core.async though.

it’s too late (or rather… too soon?) to get a value out of the channel as the callback hasn’t executed yet.

Since the functions take and return channels, this shouldn’t matter though (I think). So when the callback returns it should push values to the respective out channels and propagate along the chain.
Maybe something like this would work:

(defn article-get-new
  [id]
  (let [db-chan (chan)]
    (go (>! db-chan id)
        (-> db-chan
            <article-update-last-opened
            <article-get-by-id
            <!))))

Push the initial value onto the input chan, then compose the other functions (inside the go-block) to create the output chan, and then draw the value from it. This kind of setup seems ripe from some macrology or at least helper functions though to compose the operations and queries and query parameters (also error propagation).

I don’t know ClojureScript very well, but I think this would be the simplest setup:

(defn article-update-last-opened-async
  [db id]
  (let [out (promise-chan)
        params (array (js/Date.now) id)
        sql "UPDATE articles SET last_opened = ? WHERE article_id = ?"]
    (.run db sql params
          (fn [err]
            (this-as this
                     (if err
                       (put! out (ex-info "Failed to update article" {:error :sql-error :id id}))
                       (put! out {:last-id (.-lastID this)
                                  :changes (.-changes this)})))))
    out))

(defn article-get-by-id-async
  [db id]
  (let [out (promise-chan)
        params (array id)
        sql "SELECT * FROM articles WHERE article_id = ?"]
    (.get db sql params
          (fn [err row]
            (if err
              (put! out (ex-info "Failed to get article" {:error :sql-error :id id}))
              (put! out (js->clj row :keywordize-keys true)))))))

(defn article-get-async
  "Fetches an article, and computes the `:word-data` for it. Sets `last_opened` value before fetching."
  [db id cb]
  (let [out (promise-chan)]
   (go
    (let [article-update-last-opened (<! (article-update-last-opened-async db id))
          article (<! (article-get-by-id-async db id))]
      (put! out (words-get-for-article article cb))))
   out))

(go (<! (article-get-async db 1234 cb)))

You could probably create a sql-run-async and sql-get-async function and use those instead for all your queries, like:

(defn sql-run-async
  [db sql params]
  (let [out (promise-chan)
    (.run db sql params
          (fn [err]
            (this-as this
                     (if err
                       (put! out (ex-info "Failed to run sql query" {:error :sql-error :sql sql :params params}))
                       (put! out this)))))
    out))

And refactor the above with those so you don’t need to repeat the pattern for each of your queries.

Edit: Let me explain a bit…

The Brave and True book is showing you how to model an application using CSP, concurrent sequential processes. That’s a different style of programming. In your case you’ve got functions with callbacks returning a single result by calling the callback. And it seems you want to continue to use functions, but want them to be async.

A channel normally is more like a stream in that it is assumed it will have many values passing through it over time. But your functions will return a single value each time they are called. A promise-chan is a special channel just for that, it mimics a promise like behavior, which is that you expect a single value to be delivered at some point in the future, where as a regular channel is meant to deliver many values at various points in the future.

Now a function is something that takes parameters and return a single value. An async function is a function that takes parameters and returns a container of value immediately, but which will deliver the logical result to that returned container later in time.

This is what I did here. Our functions take parameters and immediately return a promise-chan which is a container that will have a single value delivered to it sometime later.

Because the returned promise-chan is a possibly empty container, if you want to do something that depends on the value to be delivered, or just after you know that whatever was happening async completed successfully, you need a way to coordinate the order. You can do that with <! inside a go block. Everything inside the go block after the call to <! will only happen after the channel has had a value delivered to it and extracted by <!. But a function that depends on an async value must itself be async, which is why everything becomes async in our example (in Clojure JVM you could block to return to a synchronous model, but not in ClojureScript).

Now there’s a very different way to model things, which is that instead of using async functions, you could use Processes.

A process is different from a function. A process doesn’t take parameters, instead it takes messages from channels, process them, and possibly puts new messages on the same or other channels.

Programming with processes is very different to programming with functions or even programming with async functions. You’d need to rethink how you modeled your application.

For example, in your case, you could have a querier process, it would be a process that listens to incoming queries on some query channel. For each incoming query, it could perform the query to the DB and then put a message on a query result channel when the result of it comes back from the DB.

This is no longer a function though, now you have a process instead. You don’t “call” a process the way you “call” a function, you’d need to send it a message by putting a query over the channel it listens for them.

So instead of calling run on the db with SQL and params, you would call put! or >! on the querier process’ input channel. And instead of getting the result from a return value, you’d take! or <! from its output channel.

It’s a very different way to model a concurrent application. Anyways, that’s what the Brave book I believe is explaining.

With core.async you can do both this CSP style of programming, or you can use it more traditionally to create async functions like I did.

2 Likes

Thank you for the thorough answer! I didn’t even know about promise-chan and this makes a lot more sense. I will hopefully find some free time to implement this today and report back with what happens.

To follow up, I’ve had some success. Now, my functions are much more isolated in what they do, the inputs are clearer, and there are no callbacks.

First, as per your advice, I abstracted out a generic sql function. I think it could be better still, but I’ve set it up so that depending on the type of call it will return different things:

(defn <sql
  "Creates a sql operation that returns a channel, allowsing for async/await like syntax.
  Has been abstracted to handle variety of return types depending on sql op(eration)"
  [{:keys [sql params op]}]
  (let [out    (promise-chan)
        params (apply array params) ;; TODO - if this is not a sequence, handle it?
        cb     (fn [err res]
                 (this-as this
                   (if err
                     (put! out (ex-info (str "Failed to run async query of type " (name op)) {:error :sql-error :res res}))
                     ;; TODO nil - nothing coming back.
                     (cond
                       (= :insert op) (put! out (.-lastID this))
                       res            (put! out (js->clj res :keywordize-keys true))
                       :else          (put! out (js->clj this :keywordize-keys true))))))]

    (case op
      :all    (.all db sql params cb)
      :get    (.get db sql params cb)
      :insert (.run db sql params cb)
      :run    (.run db sql params cb))
    out))

Now I can call functions like articles-get as <articles-get:

(defn <articles-get
  []
  (<sql {:op :all :sql "SELECT * FROM articles ORDER BY date_created DESC"}))

or

(defn <insert-article
  "Creates a new article. Requirements:
  -> words for article are already in words table.
  -> words from article have been re-queries
  -> requiriesed words' ids have been made into a delimited string with $."
  [{:keys [article title source word_ids]}]
  (<sql {:op     :insert
         :params [article word_ids title source (js/Date.now)]
         :sql "INSERT INTO articles(original, word_ids, name, source, date_created) VALUES (?, ?, ?, ?, ?)"}))

Which can be called like so, from the Ipc handlers:

(def ipcHandlers
  {(s-ev :article-create)
   (fn [event data]
     (go (let [x                (<! (db/<insert-words (data :article)))
               word-ids-str     (<! (db/<get-word-ids (data :article)))
               inserted-article (<! (db/<insert-article (merge data {:word_ids word-ids-str})))]
           (reply! event (s-ev :article-created) inserted-article))))
;...

Anyway - I have yet to discover if I’m creating future foot guns for myself, but this feels a lot cleaner to write.

Thanks for your help!

1 Like

I would REALLY go into the promesa route. Core.async is not that easy to use in ClojureScript, and it’ll not help you too much on multiple situations. It’s also a lot less cleaner and it does not integrate well with the JS ecosystem. I wrote a little bit about this here: ClojureScript vs clojure.core.async – Maurício Szabo

Your solution could be changed for:

(ns your.ns.here
  (:require [promesa.core :as p]))

(defn <sql
  "Creates a sql operation that returns a channel, allowsing for async/await like syntax.
  Has been abstracted to handle variety of return types depending on sql op(eration)"
  [{:keys [sql params op]}]
  (let [out    (p/deferred)
        params (apply array params) ;; TODO - if this is not a sequence, handle it?
        cb     (fn [err res]
                 (this-as this
                   (if err
                     (p/reject! out (ex-info (str "Failed to run async query of type " (name op)) {:error :sql-error :res res}))
                     ;; TODO nil - nothing coming back.
                     (cond
                       (= :insert op) (p/resolve! out (.-lastID this))
                       res            (p/resolve! out (js->clj res :keywordize-keys true))
                       :else          (p/resolve! out (js->clj this :keywordize-keys true))))))]

    (case op
      :all    (.all db sql params cb)
      :get    (.get db sql params cb)
      :insert (.run db sql params cb)
      :run    (.run db sql params cb))
    out))

At first glance, nothing changed - you just replaced >! with p/resolve! or p/reject! and promise-chan with p/deferred. But you can do the following now:

(p/let [res (<sql ...)
        something-from-js-world (run-this-result res)]
  (do-something-with something-from-js-world))

The code above will automatically “short-circuit” (if <sql return an error, the whole code will be a promise with a failure, with the error so it’s easier to capture exceptions) and it’ll also integrate with JS (because res is a promise, you can send it to JS libraries that expect a promise, and they will work correctly). You can also make queries in parallel and await for all of then with p/all, for example, if you want.

4 Likes

Hmmm… this looks very interesting. I will try this out. I enjoyed reading your blog post as well. Thanks!

It’s a good point about the integration with JS.

The other issues seem solvable though, but my core.async experience is with Clojure JVM.

For example, say with something like: GitHub - alexanderkiel/async-error: A Clojure(Script) library which provides core.async error handling utilities. you can relatively easily handle errors:

(go-try
  (let [res (<? (<sql ...))
        something-from-js-world (run-this-result res)]
    (do-something-with something-from-js-world)))

<? takes from the channel and if its an error it rethrows it. Throwing an error will cause things to short-circuit, and you can use normal try/catch/finally, or if you don’t want to handle it go-try will catch it and return the thrown error on the go return channel.

The nice thing is really that if you look at the implementation, those two macros are the most trivial thing to implement over core.async, and really are just convenience for always having to try/catch things.

Now if you want to await on multiple async promise-chan you can use the async/map function. And if you want to wait on the first one of many, you can use alts!.

Here’s examples for all of them:

(ns bar
  (:require [clojure.core.async :as async :refer [go <! >! alts! take! timeout poll! put! promise-chan]]))

;; Lets start by defining an asynchronous function
(defn inc-async
  "Increment num asynchronously, will pretend like it can take
   between 0 and 2 seconds to do so for demo purpose, as if this
   was a remote call."
  [num]
  (let [ret (promise-chan)]
    (go (>! ret (do (<! (timeout (rand-int 2000))) (inc num))))
    ret))

;; Example of await all where we wait for all 100 inc-async operation to complete
;; before printing their results as a vector. They are all fired concurrently at the
;; same time.
(go
  (println (<! (async/map vector (for [i (range 100)]
                                   (inc-async i))))))

;; Example of await all, but where we wait at most 1 second, which
;; won't be enough most of the time and thus will return nil as it
;; timed out
(go
  (println (first (alts! [(async/map vector (for [i (range 100)]
                                              (inc-async i)))
                          (timeout 1000)]))))

;; Example of await any where we wait for the first inc-async operation out of
;; many to complete
(go
  (println (first (alts! (for [i (range 100)]
                           (inc-async i))))))

;; Example of await any where we wait for the first inc-async operation out of
;; many to complete, but with a timeout of 10ms which will often not be enough
;; and in such case would return nil as it timed out
(go
  (println (first (alts! (conj (for [i (range 100)]
                                 (inc-async i))
                               (timeout 10))))))

;; Now lets see how we can handle errors

;; First we define a helper function that checks for errors and if so throws
(defn ??
  "Returns the given value or throws it if it represents an error"
  [value-or-error]
  ;; I'm using Clojure JVM here, so I check for Throwable, in CLJS you'd want to check for the equivalent
  ;; or you can check for whatever you'd prefer to model your exceptions as, you could use maps for example
  ;; instead of Exceptions
  (if (instance? Throwable value-or-error)
    (throw value-or-error)
    value-or-error))

;; Then for convenience we write a macro that takes from a channel,
;; but uses ?? to rethrow if the taken value is an error
(defmacro <?
  "Like <!, but throws if taken value is an error"
  [port]
  `(?? (<! ~port)))

;; And another convenience macro for the pattern of creating a promise-chan in a go block
;; that returns thrown errors on the channel
(defmacro go?
  "Like go, but returns a promise-chan and catches all exceptions inside body and
   returns them as the error on the promise-chan in such case."
  [& body]
  `(let [ret# (promise-chan)]
     (go (>! ret# (try ~@body
                       (catch Throwable t#
                         t#))))
     ret#))

;; Now we define an async function that can return an error to demonstrate
;; This has actually got simpler, since now we can just use `go?` as a
;; convenience for creating async functions that return promise-chan and
;; put caught errors unto it if an exception occurred
(defn errorable-inc-async
  "Increment num asynchronously, will pretend like it can take
   between 0 and 2 seconds to do so for demo purpose, as if this
   was a remote call. Can throw an error at random."
  [num]
  (go?
   (if (zero? (rand-int 5))
     ;; We can throw as normal
     (throw (ex-info (str "Failed to increment " num) {:num num}))
     (inc num))))

;; Example of short-circuiting using normal try/catch
;; which we can use as-is since `<?` rethrows in case the channel
;; returns an error
(go
  (try
    (let [a (<? (errorable-inc-async 0))
          b (<? (errorable-inc-async a))
          c (<? (errorable-inc-async b))
          d (<? (errorable-inc-async c))]
      (println d))
    (catch Throwable t
      (println t))))

For the sake of completion, here are the same examples on Promesa:

(ns bar
  (:require [promesa.core :as p]))

(defn inc-async
  "Increment num asynchronously, will pretend like it can take
   between 0 and 2 seconds to do so for demo purpose, as if this
   was a remote call."
  [num]
  (p/do!
   (p/delay (rand-int 2000))
   (inc num)))

;; Example of await all where we wait for all 100 inc-async operation to complete
;; before printing their results as a vector. They are all fired concurrently at the
;; same time.
(p/let [res (p/all (for [i (range 100)]
                     (inc-promise i)))]
  (println res))

;; Example of await all, but where we wait at most 1 second, which
;; won't be enough most of the time and thus will return nil as it
;; timed out
(p/let [res (p/race [(p/all (for [i (range 100)]
                              (inc-async i)))
                     (p/delay 1000)])]
  (println res))

;; Example of await any where we wait for the first inc-async operation out of
;; many to complete
(p/let [res (p/race (for [i (range 100)]
                      (inc-async i)))]
  (println res))

;; Example of short-circuiting with Promesa
(p/catch
  (p/let [a (errorable-inc-async 0)
          b (errorable-inc-async a)
          c (errorable-inc-async b)
          d (errorable-inc-async c)]
    (println d))
  (fn [t]
    (println t)))
1 Like

You could also use a different library which doesn’t need all those callbacks and claims it’s better…

Or use this one, which is already promise-based:

When using that, it should be very easy to use with promesa.

I updated nbb’s sqlite examples to use the promised-based sqlite npm library:

1 Like

Thanks @borkdude . I didn’t know about the synchronous version of sqlite. I am tempted to get that and to avoid avoid callbacks / promises / core.async.

Regardless, quite happy I posted and to have learned what I’ve learned here. Thanks everyone.

1 Like

It does seem for interop Promesa is quite nice, since I imagine you need to hand over promises to JS sometimes, or have to handle a lot of them and it can be annoying to always wrap them with <p!. Also probably debugging and all when using the default event loop executor might be better supported than debugging the core.async state machine.

But I think the rest of your article wasn’t really convincing to me, since all the other things are trivial to do with core.async as well like I showed.

I just wanted to show that core.async is pretty simple and just as ergonomic, but there’s a lack of good tutorials or examples in my opinion, people tend to dramatically over complicate things because they get confused between the CSP style and the async style. But if you stick to the async style, you get something that’s pretty much the same as async/await.

I think the reasoning should be: why pull in a (non-trivially complex) library to do what you can do with a few small macros (promesa.core/plet) and functions to tap into the already standardized world of JS promises. You should justify the added cost/benefit compared to what’s already in the box. If core.async is “just as easy” that’s not enough reason to use it in CLJS in my opinion.

@didibus Agree to disagree. This thread is already marked as solved, so maybe we can discuss it on another thread? Just ping me and we can talk if you want to know about the points I made on the article (that’s specific to ClojureScript, to be honest - on Clojure, core.async works way better than on the JS runtime) :slight_smile:

1 Like

Ya, I wasn’t really trying to say that I think you should use one over the other in ClojureScript, just that from a usage scenario, they offer very similar usability and ergonomics if used in an async style, at least in my opinion.

Just like promises can be made nice and ergonomic with a few macros/utils provided by Promesa, so can core.async be made nice and ergonomic with a few macros/utils as well.

I haven’t done extensive ClojureScript coding, so I’m not trying to convince that core.async is preferred. I might even lean towards Promesa, because like you said, maybe sticking with promises makes other things easier. Kitchen-async looks similar to Promesa as well in that way, I’d probably look at it too.

If you look back at my example, go? is very similar to async and <? is very similar to await.

Take this example from the Mozilla tutorial on async/await for example:

async function myFetch() {
  let response = await fetch('coffee.jpg');
  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }
  return await response.blob();

}

myFetch().then((blob) => {
  let objectURL = URL.createObjectURL(blob);
  let image = document.createElement('img');
  image.src = objectURL;
  document.body.appendChild(image);
}).catch(e => console.log(e));

You can rewrite it pretty verbatim with core.async:

(defn my-fetch []
  (go?
    (let [response (<p! (js/fetch "coffee.jpg"))]
      (if-not (.-ok response)
        (throw (ex-info (str "HTTP error! status: " (.-status response)) {}))
        (<p! (.blob response))))))

(go?
  (try
    (let [blob (<? (my-fetch))
          object-url (.createObjectURL js/URL blob)
          image (.createElement js/document "img")]
      (set! (.-src image) object-url)
      (-> (js/document) .-body (.appendChild image))
    (catch js/Error e
      (.log js/console e))))

Where you’d use async you use go?, where you’d use await you use <p! if awaiting a JS promise, or <? if awaiting a core.async promise-chan.

I’m not sure why in the Mozilla example they don’t use async again after to call it: Making asynchronous programming easier with async and await - Learn web development | MDN

I think it’s because in JavaScript you can’t use async/await in top level position, so they use .then instead. Anyway with core.async you can just wrap it in a go? again like I did.

Not showing this to convince anyone of using core.async over Promesa or kitchen-async or anything else. Just showing it for education purposes. With core.async you get async/await like syntax in ClojureScript.

The p/let and p/do! and the other Promesa macros/utils also make working with promises a lot nicer than standard JavaScript, so it’s a nice alternative to async/await (or core.async) as well. I guess the cool thing in Cljs is you get to choose what you prefer.

1 Like

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.