gregor.core

->producer-record

(->producer-record topic value)(->producer-record topic key value)(->producer-record topic partition key value)(->producer-record topic partition timestamp key value)

assign!

(assign! consumer topic partition & tps)
Manually assign topics and partitions to this consumer.

assignment

(assignment consumer)
Get the set of partitions currently assigned to this consumer.

Closeable

protocol

Provides two ways to close things: a default one with 'close [thing]'
and the one with the specified timeout.

members

close

(close this)(close this timeout)

commit-offsets!

(commit-offsets! consumer)(commit-offsets! consumer offsets)
Commit offsets returned by the last poll for all subscribed topics and partitions, or
manually specify offsets to commit.

offsets (optional) - commit the specified offsets for the specified list of topics
and partitions to Kafka. A seq of offset maps, as below:

e.g. {:topic "foo"
      :partition 1
      :offset 42}

optionally provide metadata:

e.g. {:topic "bar"
      :partition 0
      :offset 17
      :metadata "Dude, that's so meta."}

commit-offsets-async!

(commit-offsets-async! consumer)(commit-offsets-async! consumer callback)(commit-offsets-async! consumer offsets callback)
Commit offsets returned by the last poll for all subscribed topics and partitions,
or manually specify offsets to commit.

This is an asynchronous call and will not block. Any errors encountered are either
passed to the callback (if provided) or discarded.

offsets (optional) - commit the specified offsets for the specified list of topics
and partitions to Kafka. A seq of offset maps, as below:

e.g. {:topic "foo"
      :partition 1
      :offset 42}

optionally provide metadata:

e.g. {:topic "bar"
      :partition 0
      :offset 17
      :metadata "Dude, that's so meta."}

The committed offset should be the next message your application will consume,
i.e. lastProcessedMessageOffset + 1.

committed

(committed consumer topic partition)
Return OffsetAndMetadata of the last committed offset for the given partition. This
offset will be used as the position for the consumer in the event of a failure.

If no offsets have been committed, return nil.

consumer

(consumer servers group-id)(consumer servers group-id topics)(consumer servers group-id topics config)
Return a KafkaConsumer.

Args:
  servers: comma-separated host:port strs or list of strs as bootstrap servers
  group-id: str that identifies the consumer group this consumer belongs to
  topics: an optional list of topics to which the consumer will be dynamically
          subscribed.
  config: an optional map of str to str containing additional consumer
          configuration. More info on optional config is available here:
          http://kafka.apache.org/documentation.html#newconsumerconfigs

The StringDeserializer class is the default for both key.deserializer and
value.deserializer.

consumer-record->map

(consumer-record->map record)

create-topic

(create-topic zk-config topic {:keys [partitions replication-factor config rack-aware-mode], :or {partitions 1, replication-factor 1, config nil, rack-aware-mode :safe}})
Create a topic.

Args:
  zk-config: A map with Zookeeper connection details as expected by
             with-zookeeper.
  topic: The name of the topic to create.
  An unnamed configuration map. Valid keys are as follows:

    :partitions         (optional) The number of ways to partition the topic.
                                   Defaults to 1.
    :replication-factor (optional) The replication factor for the topic.
                                   Defaults to 1.
    :config             (optional) A map of configuration options for the
                                   topic.
    :rack-aware-mode    (optional) Control how rack aware replica assignment
                                   is done. Valid values are :disabled,
                                   :enforced, :safe. Default is :safe.

delete-topic

(delete-topic zk-config topic)
Delete a topic.

Args:
  zk-config: A map with Zookeeper connection details as expected by
             with-zookeeper.
  topic: The name of the topic to delete.

flush

(flush producer)
Invoking this method makes all buffered records immediately available to send (even if
linger.ms is greater than 0) and blocks on the completion of the requests associated
with these records.

offset-and-metadata

(offset-and-metadata offset)(offset-and-metadata offset metadata)
Metadata for when an offset is committed.

pause

(pause consumer topic partition & tps)
Suspend fetching for a seq of topic name, partition number pairs.

poll

(poll consumer)(poll consumer timeout)
Return a seq of consumer records currently available to the consumer (via a single poll).
Fetches sequetially from the last consumed offset.

A consumer record is represented as a clojure map with corresponding keys :value, :key,
:partition, :topic, :offset

timeout - the time, in milliseconds, spent waiting in poll if data is not
available. If 0, returns immediately with any records that are available now.
Must not be negative.

position

(position consumer topic partition)
Return the offset of the next record that will be fetched (if a record with that
offset exists).

producer

(producer servers)(producer servers config)
Return a KafkaProducer.

The producer is thread safe and sharing a single producer instance across
threads will generally be faster than having multiple instances.

Args:
 servers: comma-separated host:port strs or list of strs as bootstrap servers
 config: an optional map of str to str containing additional producer
         configuration. More info on optional config is available here:
         http://kafka.apache.org/documentation.html#producerconfigs

The StringSerializer class is the default for both key.serializer and value.serializer

rack-aware-modes

records

(records consumer)(records consumer timeout)
Return a lazy sequence of sequences of consumer-records by polling the consumer.

Each element in the returned sequence is the seq of consumer records returned from a
poll by the consumer. The consumer fetches sequetially from the last consumed offset.

A consumer record is represented as a clojure map with corresponding keys :value, :key,
:partition, :topic, :offset

timeout - the time, in milliseconds, spent waiting in poll if data is not
available. If 0, returns immediately with any records that are available now.
Must not be negative.

resume

(resume consumer topic partition & tps)
Resume specified partitions which have been paused.

seek!

(seek! consumer topic partition offset)
Overrides the fetch offsets that the consumer will use on the next poll.

seek-to!

(seek-to! consumer offset topic partition & tps)
Seek to the :beginning or :end offset for each of the given partitions.

send

(send producer topic value)(send producer topic key value)(send producer topic partition key value)(send producer topic partition timestamp key value)
Asynchronously send a record to a topic, providing at least a topic and value.

send-then

(send-then producer topic value callback)(send-then producer topic key value callback)(send-then producer topic partition key value callback)(send-then producer topic partition timestamp key value callback)
Asynchronously send a record to a topic, providing at least a topic and value, and
invoke the provided callback when the send has been acknowledged.

The callback function should take 2 args:
  - a metadata map: the metadata for the record that was sent. Keys
    are :topic, :partition, :offset.
  - a java.lang.Exception object: the exception thrown during processing of this
    record.

subscribe

(subscribe consumer topics-or-regex & [partitions-assigned-fn partitions-revoked-fn])
Subscribe to the given list of topics to get dynamically assigned partitions. Topic
subscriptions are not incremental. This list will replace the current assignment (if
there is one). It is not possible to combine topic subscription with group management
with manual partition assignment through assign(List). If the given list of topics is
empty, it is treated the same as unsubscribe.

topics-or-regex can be a list of topic names or a java.util.regex.Pattern object to
subscribe to all topics matching a specified pattern.

the optional functions are a callback interface to trigger custom actions when the set
of partitions assigned to the consumer changes.

subscription

(subscription consumer)
Get the current subscription for this consumer.

topic-exists?

(topic-exists? zk-config topic)
Query whether or not a topic exists.

Args:
  zk-config: A map with Zookeeper connection details as expected by
             with-zookeeper.
  topic: The name of the topic to check for.

topic-partition

(topic-partition topic partition)
A topic name and partition number.

topics

(topics zk-config)
Query existing topics.

Args:
  zk-config: A map with Zookeeper connection details as expected by
             with-zookeeper.

unsubscribe

(unsubscribe consumer)
Unsubscribe from topics currently subscribed with subscribe. This also clears any
partitions directly assigned through assign.

wakeup

(wakeup consumer)
Wakeup the consumer. This method is thread-safe and is useful in particular to abort a
long poll. The thread which is blocking in an operation will throw WakeupException.

with-zookeeper

macro

(with-zookeeper zk-config zookeeper & body)
A utility macro for interacting with Zookeeper.

Args:
  zk-config: A map with Zookeeper connection details. This will be validated
             using validate-zookeeper-config before use.
  zookeeper: This will be bound to an instance of ZkUtils while the body is
             executed. The connection to Zookeeper will be cleaned up when
             the body exits.