Saturday, August 25, 2012

Down the Rabbit Hole with Kafka

Introduction

Kafka is a "distributed publish-subscribe messaging system". In this post, I will discuss how the nitty-gritty details of how the producer and consumer mechanisms work in Kafka 0.7.1.

In a typical setup, you have a single ZooKeeper instance and a cluster of Kafka servers (e.g. 3) on a high-numbered RAID array. Here, we strive to understand the dynamic perspective of a Kafka setup.

Starting Up Kafka
  • For each server, a unique brokerid is specified in server.properties. This serves to uniquely identify a broker while allowing the broker to change host or port.
  • By default, when a Kafka broker starts up, it registers itself with zookeeper (unless enable.zookeeper is false in server.properties). The ephemeral ZK znode /brokers/ids/ contains the (host, port) 2-tuple.
  • Kafka will then attempt to read the topics and the load the logs from log.dir (which defaults to /tmp/kafka-logs if you use the provided server.properties). If the log.dir path does not exist, a new log directory is created.
  • Note that at this point, no topics or partitions are created. Topics and partitions are only created when a producer registers with the broker. The number of partitions is specified via num.partitions and this is for each topic in that particular server. If you wish to specify partitions on a per-topic basis, you can override the default number of partitions using topic.partition.count.map. Once again, this is server-specific.
  • The Kafka broker is registered with ZK once the LogManager is instantiated. All existing topics and partitions are also registered with ZK.
  • A folder is created in log.dir for each combination of topic+partition. For instance, if you have a topic "test-topic" with 3 partitions, then you will have the following folders: "test-topic-0", "test-topic-1", "test-topic-2".
  • What is stored in ZooKeeper regarding your Kafka broker?
    Basically the mapping from broker id to (host, port) and the mapping from (topic, broker id) to number of partitions.

Kafka Producer

When a new Producer is instantiated, it looks at either zk.connect (for automatic broker discovery) or broker.list (for static list of kafka brokers defined by the 3-tuple (brokerId, host, port)).

Internally, the producer client keeps a local copy of the list of brokers and their number of partitions. If you are using zookeeper, then this copy changes over time when brokers are added or dropped.

Assume that you have the following code:

ProducerData data = new ProducerData("test-topic", "test-topic");
producer.send(data);

Which partition/broker does the message go to? It depends. The request gets funneled into a send method in kafka.producer, which routes the request to a different function depending on whether you have zookeeper enabled.
  • If you go with the zookeeper option...
  • the Producer retrieves from
    maintains a pool of connections to the brokers, one per broker.
  • In zkSend(),  the topicPartitionsList is fetch from for the specified topic "test-topic" via a call to getPartitionListForTopic(). This returns a scala sequence of (brokerId, partitionId). For instance, If we have two brokers of 3 and 4 partitions respectively, then getPartitionListForTopic may return Seq( (0,0),(0,1),(0,2),   (1,0,),(1,1),(1,2),(1,3) ). This result is sorted in asc ordering by brokerId as primary and partitionId as secondary.
  • The length of that sequence is assigned to totalNumPartitions
  • Now, we want to pick a partitionId in the range [0, N-1], where N is totalNumPartitions
    • If the semantic key is specific in the ProducerData, i.e.:
      new ProducerData("test-topic", "test-key", "test-message"),
      and...
      • If partitioner.class was specified: then the partition(key, numPartitions) method is called which returns the required partitionId.
      • Else the partitioner.class was not specified and the default partitioner class (kafka.producer.DefaultPartitioner) is used. This returns the equivalent of
        math.abs(key.hashCode) % numPartitions
    • Otherwise, partitionId = random.nextInt(totalNumPartitions)
  • Using this partitionId, we can find the which broker that partition id belongs to by looking up the sequence using the partitionId as the index.
  • Now that we have the broker information, the client proceeds to send the message over the wire.
  • The process to find a broker+partition to send the message repeats up to zkReadRetries (aka zk.read.num.retries) times, and each trial other than the first re-reads information from ZK.
  • If you go with the static broker list option...
  • getPartitionListForTopic() is called which returns a sequence as described earlier.
  • Now we have: partitionId = random.nextInt(totalNumPartitions)
  • Using this partitionId, we can retrieve the broker information by looking up the sequence using partitionId as index.
  • Now that we have the broker information, the client proceeds to send the message over the wire.
Note that for async, messages are simply batched before sending, and batch.size and queue.time provide SLA guarantees for the message.

Kafka Consumer

There are two consumer APIs you should be aware of: the high level api (aka ConsumerConnector) and low level api (SimpleConsumer). The big difference between here is that the high level api does broker discovery, consumer rebalancing and keep track of state (i.e. offsets) in zookeeper, while the low level api does not.

If you have a consumer that needs to do fancy stuff such as replaying using specific offsets (e.g. storm spout or a hadoop job which may fail), then that consumer needs to keep track of state manually, and so you should use the low level api.

Kafka Consumer High Level API

The high level api stores state using zookeeper and groups consumers together for load balancing using a unique group_id which is provided by the client. To simply our understanding of state management in ZK, we can think of the znodes as a hash table storing the following information:

key :: value
 owners(group_id, topic, broker_id, partition id) :: consumer_node_id
offsets(group_id, topic, broker_id, partition_id) :: offset counter value
consumer(group_id, consumer_id) :: map({topic, num of streams})

The consumer_id is a 2-tuple of the form (hostname, uuid). This allows for threaded consumers on a single host. The owners(...) key acts as a lock and simplify offset management by ensuring that no more than one consumers are reading from the same combination of (group_id, topic, broker_id, partition_id).

We refer to a consumer here refers to an instance of ConsumerConnector(). A ConsumerConnector instance can have multiple KafkaStreams to allow for multi-threaded consumption.

Because each broker partition can be matched only to one consumer at any given time, you will have non-active consumers if you have more consumers than broker partitions. The benefit of using the high level api is that a consumer will not be starved if a broker fails in a given cluster of kafka brokers. When the failed broker is restored, messages will then be consumed from that broker.

The consumer rebalancing algorithm is triggered via ZK watchers on either of the following conditions:
- addition/removal of broker
- addition/removal of consumer
- new allowed topic

This rebalancing algorithm is triggered for every ConsumerConnector instance in the consumer group (hopefully around the same time, but this isn't a guarantee). So how does the rebalancing work? Effectively speaking for each ConsumerConnector:

First, syncedRebalance() is called. syncedRebalance() effectively loops around rebalance() for a maximum of rebalance.retries.max (defaults to 4) times. For each rebalance attempt, it is possible for a ZK exception to be thrown due to changing ZK states. If there is an exception, the exception is safely caught and the consumer backs off for rebalance.backoff.ms milliseconds (this defaults to zk.synctime.ms). In rebalance() a number of actions happen:
  1. The consumer closes all fetch requests (to avoid data duplication) and offsets are flushed out to ZK
  2. Release all partition ownership from ZK by deleting the znodes for owners(group_id, topic, broker_id, partition id)
  3. Get the partitions per topic mapping
  4. For each topic that the ConsumerConnector is subscribed to:
    1. Using the partitions per topic mapping, get the partitions for that topic, which are of the form (broker-partition). This list is sorted.
    2. Get the total number of consumers for that topic. This is the total number of KafkaStreams subscribing to that topic in the consumer group, which might be more than the number of ConsumerConnector instances.
    3. For each KafkaStreams in the ConsumerConnector: 
      1. Range partition the the sorted partitions to consumer as equally as possible, with the first few consumers getting an extra partition if there are left overs (Note: the consumers were sorted).
        Example 1: If you have 5 partitions with 2 ConsumerConnector instances of 1 stream each, then consumer 0 gets [p0, p1, p2] and consumer 1 gets [p3, p4].
        Example 2: If you have 5 partitions with 2 ConsumerConnector instances of 4 streams each, then consumer 0 gets [p0, p1, p2, 3], and consumer 1 gets [p4].
      2. Note that range partitioning allows for locality, where there is a higher chance for a consumer to fetch data from multiple partitions from a broker rather than all the brokers.

Kafka Consumer Low Level API

In the low level api, you provide everything -- broker host+port, partition id, and offset.

long offset = 0;
SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);

The low level api does not talk to zookeeper, and you are responsible for figuring out which broker and partition to connect to and keep track of your offsets.

Achieving High-Availability
  • Local Redundancy
    Using RAID mirroring will provide local data redundancy, while striping provides the benefit of increased performance. According to the docs, LinkedIn uses RAID 10 on Ext 4.
  • Non Local Redundancy
    As of this writing, Kafka does not support non-local data redundancy. Work is in progress to support inter-cluster replication (KAFKA-50) in v0.8 with automatic recovery.
    While intra-cluster mirroring via MirrorMaker (run kafka.tools.MirrorMaker) has been supported since v0.7, MirrorMaker does not do failover.
Packaged Tools
  • Additional tools are available in kafka.tools. Such tools include:
    • ConsumerOffsetChecker: Shows how much the consumer is lagging behind
    • MirrorMaker: discussed earlier
    • ProducerShell/ConsumerShell: explained in the quickstart guide
    • ExportZKOffsets/ImportZkOffsets: Manual configuration of ZK state.
    • etc
Performance
  • If you wish to benchmark Kafka's performance, the following entry points are provided in bin/
    • kafka-consumer-perf-test.sh
    • kafka-producer-perf-test.sh
    • kafka-simple-consumer-perf-test.sh
  • There is a bug in each of those scripts: (kafka.tools.* should be kafka.perf.*)
  • These helper scripts might be useful.
  • However, you probably want to take a look at the sources for usage.
  • All these being said, the defaults provided are actually pretty reasonable.
Credit

Information for the above write up comes from a few sources:

4 comments:

Anonymous said...

Congratulations for your nice post.

I'm a beginner in Kafka, so I have a basics question regarding high availability.

You said in this post that Kafka has not a Non Local Redundancy. It's mean that the cluster will fail if some node goes down?

Regards,
Mauricio G.




Anonymous said...

This is an excellent post on kafka ,this gives so much more info than the design doc.Thanks for this effort

Anonymous said...

Great write-up, really helped to fill in the holes left by the official Kafka documentation!

Ravindranath Akila Gunaratne said...

Is there a way to specify the partition Id in the high level API?
http://stackoverflow.com/questions/19068439/can-a-kafka-consumer-be-assigned-to-a-specific-partition