I’m trying to use the Kafka producer and consumer concept, and in my case, the producer is the debezium-connector and the topics are also created by it. So, I just need to use the consumer to read the messages from the topics
So, I configured my consumer via integrant like this…
(defmethod ig/prep-key ::consumer
[_ {:keys [kafka-brokers kafka-group enable-auto-commit
topics max-poll-records]
:or {kafka-brokers "localhost:9092" kafka-group "myapp"
enable-auto-commit false max-poll-records "100"}}]
(timbre/info "Preparing consumer")
{"bootstrap.servers" kafka-brokers
"group.id" kafka-group
"enable.auto.commit" enable-auto-commit
"auto.offset.reset" "earliest"
;; Enviroment variable is always string
"max.poll.records" (Integer/parseInt max-poll-records)
"topics" (if topics
(mapv #(hash-map :topic-name %) (str/split topics #","))
(throw (IllegalArgumentException. "Kafka topics are
required. You need specify atleast one topic.")))
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
(defmethod ig/init-key ::consumer [_ config]
(timbre/info "Configuring Kafka consumer" config)
(-> (jc/consumer (dissoc config :topics))
(jc/subscribe (get config "topics"))))
(defmethod ig/halt-key! ::consumer [_ consumer]
(timbre/info "Stopping Kafka consumer")
(when consumer
(.close consumer)))
Now any idea how do I consume the messages using this consumer…? Basically, I got stuck on how to get the topic name, like if the debezium is the one that is creating the topic and producing into it then how do I refer that to the consumer to use it…?
I can even keep an eye on the Kafka logs for all the updates also via this command
docker run -it --network=docker-debezium_default --rm edenhill/kafkacat:1.6.0 kafkacat -C -b kafka:9092 -t myapp.public.chatrooms -o -10
where myapp.public.chatrooms is the topic where all the updates are being produced
but how do I use it in the code?