Gregor
Gregor is an open-source Clojure interface to Apache Kafka.
We use Kafka at Weft as the messaging system that ties our microservices together. Many
of those services are written in Clojure and target Kafka 0.9
. The Kafka API is well
designed and easy to use – leveraging Clojure's Java interop in this case is fairly
straightforward, but we wanted something more idiomatic and consistent throughout our
pipeline. Gregor was born out of that need.
There are other Kafka libraries available for Clojure including clj-kafka
and
kafka-fast
. At this time of this writing clj-kafka
targets Kafka 0.8
which was a
non-starter for us. kafka-fast
swaps out Zookeeper for Redis which wasn't quite what we
wanted, either. Gregor strives only to be an idiomatic, lightweight set of functions that
are easy to reason about and use in combination with Kafka's Java classes when
necessary.
Consumer API
Here's a direct comparison of constructing a KafkaConsumer
using the Java API and
Gregor's Clojure API:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
(ns gregor-sample-app.core (:require [gregor.core :as gregor])) (def props {"auto.offset.reset" "earliest" "auto.commit.interval.ms" "1000" "enable.auto.commit" "true" "session.timeout.ms" "30000"}) (def consumer (gregor/consumer "localhost:9092" "test" ["foo" "bar"] props)) (while true (let [consumer-records (gregor/poll consumer 100)] (doseq [{:keys [offset key value]} recs] (println "offset = " offset ", key = " key ", value = " value))))
Excellent, clojure data structures and functions. Though in fact, using Java interop
would look pretty similar to the Gregor code above. The consumer
function defaults to
using StringDeserializer
for both key and value deserializers, but you can specify a
different one in the (optional) props
map.
Producer API
Now let's take a look at the KafkaProducer
example.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:4242"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i))); producer.close();
(ns gregor-sample-app.core (:require [gregor.core :as gregor])) (def props {"acks" "all" "retries" 0 "batch.size" 16384 "linger.ms" 1 "buffer.memory" 33554432 "auto.offset.reset" "earliest"}) (def producer (gregor/producer "localhost:4242" props)) (dotimes [i 100] (gregor/send producer "my-topic" (str i))) (gregor/close producer)
Just like the Consumer, the Producer defaults to StringSerializer
for both keys and
values. Also noteworthy is that the send
function has multiple arities that allow you
to specify the partition to which the record should be sent, the record key
, and a
callback in addition to the value.
In a future post I'll cover at-least-once processing semantics with Gregor and using lazy sequences to consume records.