Instrumenting async buffers, is this a good approach?

I’m in need of a queue, which can communicate between threads, and has non-blocking puts and blocking takes, with limited capacity, and it should keep most recent items. I also want some instrumentation, especially a warning if the dropping rate is too high. This ties in to downstream processing of the data sent through the queue.

For this I’m currently using (chan (sliding-buffer bufsize)) (is this a good idea? it feels easy to use).
I want to instrument it. I looked at the source and came up with the code below.

It is simply a copy-paste of the SlidingBuffer, but with the added (info-fn ...) calls

As far as I understand, the thread safety is provided by the async library, so I don’t have to worry about the fact that we are using a LinkedList.

For the instrumentation data (state) I’m using an agent, What I imagine is that you can then add a watcher to the ref and this watcher can for example push info to a time-series database, or a warning system.

I’m wondering if this is the idiomatic way to do this kind of instrumentation?

It feels a bit weird to do two sends on the :adding-item case… and this will cause two calls to the watcher… I think this is a code smell, also when adding an element, you can get up to three calls to the info-fn, again something that is not so good.

(ns custombuffer.buffers
  (:require [clojure.core.async :as a]
            [clojure.core.async.impl.protocols :as impl])
  (:import [java.util LinkedList]))

(deftype SlidingInstrumentedBuffer [^LinkedList buf ^long n ^Runnable info-fn]
  impl/UnblockingBuffer
  impl/Buffer
  (full? [this]
    false)
  (remove! [this]
    (info-fn :removing-item)
    (.removeLast buf))
  (add!* [this itm]
    (info-fn :adding-item)
    (when (= (.size buf) n)
      (info-fn :dropping-item)
      (impl/remove! this))
    (.addFirst buf itm)
    this)
  (close-buf! [this])
  clojure.lang.Counted
  (count [this]
    (.size buf)))

(defn sliding-instrumented-buffer [n info-fn]
  (SlidingInstrumentedBuffer. (LinkedList.) n info-fn))

(def demo-instr-agent (agent {:writes 0
                          :elements 0
                          :drops 0}))

(defn instr-buf-fn [a k]
  (case k
    :dropping-item (send a update :drops inc)
    :removing-item (send a update :elements dec)
    :adding-item (do
                   (send a update :writes inc)
                   (send a update :elements inc))))

(comment
  (def c> (a/chan (sliding-instrumented-buffer 2 (partial instr-buf-fn demo-instr-agent))))
  (a/>!! c> 1)
  (a/<!! c>)
  @demo-instr-agent
  ;; => {:writes 1, :elements 0, :drops 0}
  (doseq [i (range 10)]
    (a/>!! c> i))
  (a/<!! c>)
  @demo-instr-agent
  ;; => {:writes 11, :elements 1, :drops 8}
  )

Why not combine the state change in one send?

(send a update #(-> % (update :writes inc) (update :elements inc)))

You could probably collapse the amount of implicit sends invoked by info-fn on add if you allow a sequence of information to be passed as an arg, like [:adding-item :dropping-item], and have info fn batch the state changes in its send function.

(add!* [this itm]
   (let [full? (= (.size buf) n)
          effects (if full? [:adding-item :dropping-item] [:adding-item])]
    (info-fn effects)
    (when full? (impl/remove! this))
    (.addFirst buf itm)

I was going to suggest instrumenting with a transducer on the channel as well, but it looks like you want/need the lower-level information about channel visibility. Seems reasonable on the surface.

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.