I’m in need of a queue, which can communicate between threads, and has non-blocking puts and blocking takes, with limited capacity, and it should keep most recent items. I also want some instrumentation, especially a warning if the dropping rate is too high. This ties in to downstream processing of the data sent through the queue.
For this I’m currently using (chan (sliding-buffer bufsize))
(is this a good idea? it feels easy to use).
I want to instrument it. I looked at the source and came up with the code below.
It is simply a copy-paste of the SlidingBuffer
, but with the added (info-fn ...)
calls
As far as I understand, the thread safety is provided by the async library, so I don’t have to worry about the fact that we are using a LinkedList
.
For the instrumentation data (state) I’m using an agent
, What I imagine is that you can then add a watcher to the ref and this watcher can for example push info to a time-series database, or a warning system.
I’m wondering if this is the idiomatic way to do this kind of instrumentation?
It feels a bit weird to do two sends on the :adding-item
case… and this will cause two calls to the watcher… I think this is a code smell, also when adding an element, you can get up to three calls to the info-fn, again something that is not so good.
(ns custombuffer.buffers
(:require [clojure.core.async :as a]
[clojure.core.async.impl.protocols :as impl])
(:import [java.util LinkedList]))
(deftype SlidingInstrumentedBuffer [^LinkedList buf ^long n ^Runnable info-fn]
impl/UnblockingBuffer
impl/Buffer
(full? [this]
false)
(remove! [this]
(info-fn :removing-item)
(.removeLast buf))
(add!* [this itm]
(info-fn :adding-item)
(when (= (.size buf) n)
(info-fn :dropping-item)
(impl/remove! this))
(.addFirst buf itm)
this)
(close-buf! [this])
clojure.lang.Counted
(count [this]
(.size buf)))
(defn sliding-instrumented-buffer [n info-fn]
(SlidingInstrumentedBuffer. (LinkedList.) n info-fn))
(def demo-instr-agent (agent {:writes 0
:elements 0
:drops 0}))
(defn instr-buf-fn [a k]
(case k
:dropping-item (send a update :drops inc)
:removing-item (send a update :elements dec)
:adding-item (do
(send a update :writes inc)
(send a update :elements inc))))
(comment
(def c> (a/chan (sliding-instrumented-buffer 2 (partial instr-buf-fn demo-instr-agent))))
(a/>!! c> 1)
(a/<!! c>)
@demo-instr-agent
;; => {:writes 1, :elements 0, :drops 0}
(doseq [i (range 10)]
(a/>!! c> i))
(a/<!! c>)
@demo-instr-agent
;; => {:writes 11, :elements 1, :drops 8}
)