Gregor is an open-source Clojure interface to Apache Kafka.

We use Kafka at Weft as the messaging system that ties our microservices together. Many of those services are written in Clojure and target Kafka 0.9. The Kafka API is well designed and easy to use – leveraging Clojure's Java interop in this case is fairly straightforward, but we wanted something more idiomatic and consistent throughout our pipeline. Gregor was born out of that need.

There are other Kafka libraries available for Clojure including clj-kafka and kafka-fast. At this time of this writing clj-kafka targets Kafka 0.8 which was a non-starter for us. kafka-fast swaps out Zookeeper for Redis which wasn't quite what we wanted, either. Gregor strives only to be an idiomatic, lightweight set of functions that are easy to reason about and use in combination with Kafka's Java classes when necessary.

Consumer API

Here's a direct comparison of constructing a KafkaConsumer using the Java API and Gregor's Clojure API:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("", "test");
props.put("", "true");
props.put("", "1000");
props.put("", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
(ns gregor-sample-app.core
  (:require [gregor.core :as gregor]))

(def props {"auto.offset.reset"       "earliest"
            "" "1000"
            ""      "true"
            ""      "30000"})

(def consumer (gregor/consumer "localhost:9092" "test" ["foo" "bar"] props))

(while true
    (let [consumer-records (gregor/poll consumer 100)]
      (doseq [{:keys [offset key value]} recs]
        (println "offset = " offset ", key = " key ", value = " value))))

Excellent, clojure data structures and functions. Though in fact, using Java interop would look pretty similar to the Gregor code above. The consumer function defaults to using StringDeserializer for both key and value deserializers, but you can specify a different one in the (optional) props map.

Producer API

Now let's take a look at the KafkaProducer example.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i)));

(ns gregor-sample-app.core
  (:require [gregor.core :as gregor]))

(def props {"acks"              "all"
            "retries"           0
            "batch.size"        16384
            ""         1
            "buffer.memory"     33554432
            "auto.offset.reset" "earliest"})

(def producer (gregor/producer "localhost:4242" props))

(dotimes [i 100] (gregor/send producer "my-topic" (str i)))

(gregor/close producer)

Just like the Consumer, the Producer defaults to StringSerializer for both keys and values. Also noteworthy is that the send function has multiple arities that allow you to specify the partition to which the record should be sent, the record key, and a callback in addition to the value.

In a future post I'll cover at-least-once processing semantics with Gregor and using lazy sequences to consume records.