How to use the topic created by debezium in the code?

I’m trying to use the Kafka producer and consumer concept, and in my case, the producer is the debezium-connector and the topics are also created by it. So, I just need to use the consumer to read the messages from the topics
So, I configured my consumer via integrant like this…

(defmethod ig/prep-key ::consumer
  [_ {:keys [kafka-brokers kafka-group enable-auto-commit
             topics max-poll-records]
      :or {kafka-brokers "localhost:9092" kafka-group "myapp"
           enable-auto-commit false max-poll-records "100"}}]
  (timbre/info "Preparing consumer")
  {"bootstrap.servers" kafka-brokers
   "" kafka-group
   "" enable-auto-commit
   "auto.offset.reset" "earliest"
   ;; Enviroment variable is always string
   "max.poll.records" (Integer/parseInt max-poll-records)
   "topics" (if topics
              (mapv #(hash-map :topic-name %) (str/split topics #","))
              (throw (IllegalArgumentException. "Kafka topics are
              required. You need specify atleast one topic.")))
   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
(defmethod ig/init-key ::consumer [_ config]
  (timbre/info "Configuring Kafka consumer" config)
  (-> (jc/consumer (dissoc config :topics))
      (jc/subscribe (get config "topics"))))
(defmethod ig/halt-key! ::consumer [_ consumer]
  (timbre/info "Stopping Kafka consumer")
  (when consumer
    (.close consumer)))

Now any idea how do I consume the messages using this consumer…? Basically, I got stuck on how to get the topic name, like if the debezium is the one that is creating the topic and producing into it then how do I refer that to the consumer to use it…?
I can even keep an eye on the Kafka logs for all the updates also via this command

docker run -it --network=docker-debezium_default --rm edenhill/kafkacat:1.6.0 kafkacat -C -b kafka:9092 -t myapp.public.chatrooms -o -10

where myapp.public.chatrooms is the topic where all the updates are being produced
but how do I use it in the code?

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.