An example for redis publish subscribe feature with clojure...?

I’m trying to see how the Redis implementation happens with Clojure. Do we need to create channels or something to use the pub/sub feature? Let’s say there’s a function that updates some data and puts the updated data into an atom. But instead of this how to use the pub/sub from Redis so that we can publish the changes and subscribe to those.

I haven’t used Redis Pub/Sub, but Carmine has support for it: GitHub - ptaoussanis/carmine: Redis client and message queue for Clojure
(I have used Carmine for more “normal” Redis usage, and it’s awesome)

2 Likes

In addition to @mvarela there is this presentation from Bobby Calderwood.

3 Likes

I’ve read the docs there for pub/sub but I’m stuck on how to make the listener if we’re using integrant

How you would use the listener with Integrant depends a bit on what exactly you’ll be doing with it. There’s an example of a component-like system here: system/redis_pubsub.clj at master · danielsz/system · GitHub

Could you take a look at this:
For Listener this is the code example given in the docs:

    (def listener
      (car/with-new-pubsub-listener (:spec server1-conn)
        {"foobar" (fn f1 [msg] (println "Channel match: " msg))
         "foo*"   (fn f2 [msg] (println "Pattern match: " msg))}
       (car/subscribe  "foobar" "foobaz")
       (car/psubscribe "foo*")))

Now if the server1-conn is the connection, which we’ve declared via integrant in a separate file like this:

    (defmethod ig/init-key :app/redis [_ {:keys [redis-host
                                                     redis-port] :as opts}]
      (timbre/warn "Connecting to the Redis server" opts)
      (let [conn {:pool {}
                  :spec {:host redis-host
                         :port redis-port}}]
        (try
          (timbre/warn "Connected to the Redis - " (redis/wcar conn (redis/ping)))
          ;; Return the redis connection object
          conn
          ;; When redis is not available then retrun nil
          (catch Exception _
            (timbre/warn "Unbale to connect with the Redis server ")
            nil))))

Now how to manage the conn with the listener? Thank you.

I’ve done it like this:

(defmethod ig/init-key :app/redis [_ {:keys [redis-host
                                                 redis-port] :as opts}]
  (timbre/warn "Connecting to the Redis server" opts)
  (let [conn {:pool {}
              :spec {:host redis-host
                     :port redis-port}}
        listener
        (car/with-new-pubsub-listener (:spec conn)
        {"foobar" (fn f1 [msg] (println "Channel match: " msg))
         "foo*"   (fn f2 [msg] (println "Pattern match: " msg))}
       (car/subscribe  "foobar" "foobaz")]
   
    (try
      (timbre/warn "Connected to the Redis - " (redis/wcar conn (redis/ping)))
      ;; Return the listener
      listener
      ;; When redis is not available then retrun nil
      (catch Exception _
        (timbre/warn "Unbale to connect with the Redis server ")
        nil))))

but now how to use this listener outside this file?

I think your question is more related to Integrant than to Redis. In your ig/init-key :app/redis,,, you should return your listener. This will be substituted in the Integrant system map under the :app/redis key, so when you have some other component that uses the listener, by using an #ig/ref (and here is why I said it depends on how you’re planning to use it), it will be available in the opts map for that component’s constructor.

For example, for some stuff at work I need to thread a DB connection to my web request handlers, so I have an interceptor (I’m using reitit) that depends on the DB connector (and closes over it), and the app depends on that interceptor, so when I initialize the app, I add that to my interceptor chain, and then I pass that connection to my handlers.

If you can’t figure out where to “store” the listener, you can always stick it in an atom (though this is typically best avoided). In this case, I think that since the listener already has the handler functions passed to it when it’s constructed, you probably don’t need to keep a reference to it yourself, and you can let Integrant handle that for you.

Here this part is returning the listener

(try
      (timbre/warn "Connected to the Redis - " (redis/wcar conn (redis/ping)))
      ;; Return the listener
      listener
      ;; When redis is not available then retrun nil
      (catch Exception _
        (timbre/warn "Unbale to connect with the Redis server ")
        nil))))

So now in order to use this listener in other files like in mutation or a streamer (graphql) which are in other files then how? coz this code is in redis.clj and in the other file I’ve used it like require that redis.clj :as redis and redis/listener, but its no such variable…

You need to use the Integrant refs to pass the listener to the components that need it.

1 Like

Yeah got it! Thank you.

1 Like

Glad you could make it work!

Just a small issue! I’m getting this: Redis commands must be called within the context of a connection to Redis server.

This is the listener, which is in redis.clj:

(defmethod ig/init-key :com.app.graphql.redis/chatroom-listener [_ connection]
  ;(timbre/warn "Connecting to the Redis server" opts)
  (defmacro wcar* [& body] `(redis/wcar connection ~@body)) 
  (let [listener
        (redis/with-new-pubsub-listener (:spec connection)
          {"one" (fn f1 [msg] (println "Channel match: " msg))}
          (redis/subscribe  "chatroom"))]
    
    (try
      (timbre/warn "Connected to the Redis - " (wcar* connection (redis/ping)))
      ;; Return the redis connection object
      listener
      ;; When redis is not available then retrun nil
      (catch Exception _
        (timbre/warn "Unbale to connect with the Redis server ")
        nil))))

Now to publish anything to the channel “one” we use:

(wcar* (car/publish "one" "Hello world"))

But I’m using this publish statement in other file… that’s why I’m getting this error. How to use it in another file?

I think the problem there is in the macro. I can’t test this right now (at work), but the way the wcar* macro is defined would only work if connection is a var in your namespace, which it isn’t in this case.
Try something like:

 (defmacro wcar* [& body] `(redis/wcar ~connection ~@body))

And then I can do this (wcar* (car/publish “one” “Hello world”)) in another file?

The question how can the other file know what is “one”? like how can it knows to publish the data onto that channel called “one” which is in another file?

That really depends on how you structure your app… If you require the ns where you defined the wcar* macro, then the macro itself will be available, but I have no idea where you’re getting the topic names from. It’s hard to give you a more detailed answer without more context, I’m afraid

Hey, could you elaborate on “wcar* macro that closes over the connection”…

The macro I wrote above closes over (captures) the connection that you get from Integrant in the init-key method, so it’s implicit when you use it from some other namespace.

I think you should probably start with a minimal working example using redis, then integrant, and then once you have that down, continue with all the rest you’re trying to do.