Saturday, August 25, 2012

Down the Rabbit Hole with Kafka


Kafka is a "distributed publish-subscribe messaging system". In this post, I will discuss how the nitty-gritty details of how the producer and consumer mechanisms work in Kafka 0.7.1.

In a typical setup, you have a single ZooKeeper instance and a cluster of Kafka servers (e.g. 3) on a high-numbered RAID array. Here, we strive to understand the dynamic perspective of a Kafka setup.

Starting Up Kafka
  • For each server, a unique brokerid is specified in This serves to uniquely identify a broker while allowing the broker to change host or port.
  • By default, when a Kafka broker starts up, it registers itself with zookeeper (unless enable.zookeeper is false in The ephemeral ZK znode /brokers/ids/ contains the (host, port) 2-tuple.
  • Kafka will then attempt to read the topics and the load the logs from log.dir (which defaults to /tmp/kafka-logs if you use the provided If the log.dir path does not exist, a new log directory is created.
  • Note that at this point, no topics or partitions are created. Topics and partitions are only created when a producer registers with the broker. The number of partitions is specified via num.partitions and this is for each topic in that particular server. If you wish to specify partitions on a per-topic basis, you can override the default number of partitions using Once again, this is server-specific.
  • The Kafka broker is registered with ZK once the LogManager is instantiated. All existing topics and partitions are also registered with ZK.
  • A folder is created in log.dir for each combination of topic+partition. For instance, if you have a topic "test-topic" with 3 partitions, then you will have the following folders: "test-topic-0", "test-topic-1", "test-topic-2".
  • What is stored in ZooKeeper regarding your Kafka broker?
    Basically the mapping from broker id to (host, port) and the mapping from (topic, broker id) to number of partitions.

Kafka Producer

When a new Producer is instantiated, it looks at either zk.connect (for automatic broker discovery) or broker.list (for static list of kafka brokers defined by the 3-tuple (brokerId, host, port)).

Internally, the producer client keeps a local copy of the list of brokers and their number of partitions. If you are using zookeeper, then this copy changes over time when brokers are added or dropped.

Assume that you have the following code:

ProducerData data = new ProducerData("test-topic", "test-topic");

Which partition/broker does the message go to? It depends. The request gets funneled into a send method in kafka.producer, which routes the request to a different function depending on whether you have zookeeper enabled.
  • If you go with the zookeeper option...
  • the Producer retrieves from
    maintains a pool of connections to the brokers, one per broker.
  • In zkSend(),  the topicPartitionsList is fetch from for the specified topic "test-topic" via a call to getPartitionListForTopic(). This returns a scala sequence of (brokerId, partitionId). For instance, If we have two brokers of 3 and 4 partitions respectively, then getPartitionListForTopic may return Seq( (0,0),(0,1),(0,2),   (1,0,),(1,1),(1,2),(1,3) ). This result is sorted in asc ordering by brokerId as primary and partitionId as secondary.
  • The length of that sequence is assigned to totalNumPartitions
  • Now, we want to pick a partitionId in the range [0, N-1], where N is totalNumPartitions
    • If the semantic key is specific in the ProducerData, i.e.:
      new ProducerData("test-topic", "test-key", "test-message"),
      • If partitioner.class was specified: then the partition(key, numPartitions) method is called which returns the required partitionId.
      • Else the partitioner.class was not specified and the default partitioner class (kafka.producer.DefaultPartitioner) is used. This returns the equivalent of
        math.abs(key.hashCode) % numPartitions
    • Otherwise, partitionId = random.nextInt(totalNumPartitions)
  • Using this partitionId, we can find the which broker that partition id belongs to by looking up the sequence using the partitionId as the index.
  • Now that we have the broker information, the client proceeds to send the message over the wire.
  • The process to find a broker+partition to send the message repeats up to zkReadRetries (aka times, and each trial other than the first re-reads information from ZK.
  • If you go with the static broker list option...
  • getPartitionListForTopic() is called which returns a sequence as described earlier.
  • Now we have: partitionId = random.nextInt(totalNumPartitions)
  • Using this partitionId, we can retrieve the broker information by looking up the sequence using partitionId as index.
  • Now that we have the broker information, the client proceeds to send the message over the wire.
Note that for async, messages are simply batched before sending, and batch.size and queue.time provide SLA guarantees for the message.

Kafka Consumer

There are two consumer APIs you should be aware of: the high level api (aka ConsumerConnector) and low level api (SimpleConsumer). The big difference between here is that the high level api does broker discovery, consumer rebalancing and keep track of state (i.e. offsets) in zookeeper, while the low level api does not.

If you have a consumer that needs to do fancy stuff such as replaying using specific offsets (e.g. storm spout or a hadoop job which may fail), then that consumer needs to keep track of state manually, and so you should use the low level api.

Kafka Consumer High Level API

The high level api stores state using zookeeper and groups consumers together for load balancing using a unique group_id which is provided by the client. To simply our understanding of state management in ZK, we can think of the znodes as a hash table storing the following information:

key :: value
 owners(group_id, topic, broker_id, partition id) :: consumer_node_id
offsets(group_id, topic, broker_id, partition_id) :: offset counter value
consumer(group_id, consumer_id) :: map({topic, num of streams})

The consumer_id is a 2-tuple of the form (hostname, uuid). This allows for threaded consumers on a single host. The owners(...) key acts as a lock and simplify offset management by ensuring that no more than one consumers are reading from the same combination of (group_id, topic, broker_id, partition_id).

We refer to a consumer here refers to an instance of ConsumerConnector(). A ConsumerConnector instance can have multiple KafkaStreams to allow for multi-threaded consumption.

Because each broker partition can be matched only to one consumer at any given time, you will have non-active consumers if you have more consumers than broker partitions. The benefit of using the high level api is that a consumer will not be starved if a broker fails in a given cluster of kafka brokers. When the failed broker is restored, messages will then be consumed from that broker.

The consumer rebalancing algorithm is triggered via ZK watchers on either of the following conditions:
- addition/removal of broker
- addition/removal of consumer
- new allowed topic

This rebalancing algorithm is triggered for every ConsumerConnector instance in the consumer group (hopefully around the same time, but this isn't a guarantee). So how does the rebalancing work? Effectively speaking for each ConsumerConnector:

First, syncedRebalance() is called. syncedRebalance() effectively loops around rebalance() for a maximum of rebalance.retries.max (defaults to 4) times. For each rebalance attempt, it is possible for a ZK exception to be thrown due to changing ZK states. If there is an exception, the exception is safely caught and the consumer backs off for milliseconds (this defaults to In rebalance() a number of actions happen:
  1. The consumer closes all fetch requests (to avoid data duplication) and offsets are flushed out to ZK
  2. Release all partition ownership from ZK by deleting the znodes for owners(group_id, topic, broker_id, partition id)
  3. Get the partitions per topic mapping
  4. For each topic that the ConsumerConnector is subscribed to:
    1. Using the partitions per topic mapping, get the partitions for that topic, which are of the form (broker-partition). This list is sorted.
    2. Get the total number of consumers for that topic. This is the total number of KafkaStreams subscribing to that topic in the consumer group, which might be more than the number of ConsumerConnector instances.
    3. For each KafkaStreams in the ConsumerConnector: 
      1. Range partition the the sorted partitions to consumer as equally as possible, with the first few consumers getting an extra partition if there are left overs (Note: the consumers were sorted).
        Example 1: If you have 5 partitions with 2 ConsumerConnector instances of 1 stream each, then consumer 0 gets [p0, p1, p2] and consumer 1 gets [p3, p4].
        Example 2: If you have 5 partitions with 2 ConsumerConnector instances of 4 streams each, then consumer 0 gets [p0, p1, p2, 3], and consumer 1 gets [p4].
      2. Note that range partitioning allows for locality, where there is a higher chance for a consumer to fetch data from multiple partitions from a broker rather than all the brokers.

Kafka Consumer Low Level API

In the low level api, you provide everything -- broker host+port, partition id, and offset.

long offset = 0;
SimpleConsumer consumer = new SimpleConsumer("", 9092, 10000, 1024000);
FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);

The low level api does not talk to zookeeper, and you are responsible for figuring out which broker and partition to connect to and keep track of your offsets.

Achieving High-Availability
  • Local Redundancy
    Using RAID mirroring will provide local data redundancy, while striping provides the benefit of increased performance. According to the docs, LinkedIn uses RAID 10 on Ext 4.
  • Non Local Redundancy
    As of this writing, Kafka does not support non-local data redundancy. Work is in progress to support inter-cluster replication (KAFKA-50) in v0.8 with automatic recovery.
    While intra-cluster mirroring via MirrorMaker (run has been supported since v0.7, MirrorMaker does not do failover.
Packaged Tools
  • Additional tools are available in Such tools include:
    • ConsumerOffsetChecker: Shows how much the consumer is lagging behind
    • MirrorMaker: discussed earlier
    • ProducerShell/ConsumerShell: explained in the quickstart guide
    • ExportZKOffsets/ImportZkOffsets: Manual configuration of ZK state.
    • etc
  • If you wish to benchmark Kafka's performance, the following entry points are provided in bin/
  • There is a bug in each of those scripts: (* should be kafka.perf.*)
  • These helper scripts might be useful.
  • However, you probably want to take a look at the sources for usage.
  • All these being said, the defaults provided are actually pretty reasonable.

Information for the above write up comes from a few sources:

Sunday, August 12, 2012

Making Eclipse Usable

As a Vim user, I have found it really hard to get back to using Eclipse. For big Java projects, using Eclipse can improve productivity tremendously. Here's what I have done to make Eclipse usable:

Tuesday, August 07, 2012

Will Wright on Games

Wired recently ran a pretty good article about Will Wright (the guy who created SimCity) and his insights. Some cool pointers:
  • There is no win or lose
  • Give users room and creativity to express themselves (freedom, avenue of self-expression)
  • A system that tries to design the ultimate game for every player?
  • Games can be potentially more addictive than drugs
  • Social games evolved based on user feedback + metrics

Monday, August 06, 2012

Real-time Pipeline

Here are a bunch of tools that might be useful for building a real-time pipeline:

  • Kafka: Persistent Message Broker with O(1) access time
  • Storm: Real time computation system
    • Trident: High-level abstraction on top of Storm
  • Esper: Complex Event Processing (CEP)

May talk more about these tools in future.

Friday, August 03, 2012

Re-balancing MySQL Cluster

Recently, I had to investigate whether MySQL Cluster's (aka mysql-ndb: network database) partitioning uses fixed modulo hashing.

Using fixed modulo hashing for partitioning would be pretty bad when a new data node is added as all rows would need to be rehashed and re-distributed among all the available nodes (assuming the modulo is the number of nodes). This would be both computationally cost and time consuming. A common sense alternative would be consistent hashing.

Asking around on  #mysql-ndb (freenode), MongtoDB pointed out me to two great posts by Frazer (who works at MySQL).

1) MySQL Cluster Online Scaling
This is the blog post that mentions the HashMap distribution scheme and how data migration works. In this scheme, a lookup table is added as another layer of indirection on top of standard md5 hashing with constant modulo. When k new nodes are added, only the lookup table needs to be modified and the minimal number of rows (k/n) would be moved from each original node to the new nodes. The missing piece of information in this post is what the lookup table actually looks like.

2) Data Distribution in MySQL Cluster
This is the preceding blog post, which explains sharding in mysql-cluster terminology.

Sadly, the official documentation does not mention the partitioning schemes in great detail. However, a great detailed example on the procedure to add new data nodes is provided.

Unconvinced that the HashMap distribution scheme works, I decided to verify by trying:

For verification, I started with the following setup using 2 replicas for each node group.

(Note: I reserved node 50 for running ndb tools).

and the following schema in the database `awesome`.

I had 100 records with unique user_id from 0 to 99.

ndb_select_all -c user_prefs -d awesome --rowid -D "|"

gives the fragment/shard location of each row in the table. E.g.:

ndb_desc -c user_prefs -d awesome -p -n

gives the node id (primary and secondary) for each partition.

For instance, partition 0 exists on node 11 as the primary and node 12 as the secondary. Nodes 11 and 12 form a node group.

A rolling restart of all services (i.e. mgm, ndb and mysqld)  is needed to bring the two new nodes online. This can be avoided if the set up was pre-configured to take into account future new nodes, and these nodes are not brought online until they are needed. After the restart, I ended up with following cluster configuration. (the new nodes 31, 32 form nodegroup 2). This brings the number of data nodes from 4 to 6.

I executed the partition re-organization, which took 8.69 sec.
Re-running ndb_desc gives the following:
which shows that data re-distribution occurred.

In order to verify that not all rows are unnecessarily re-distributed, I ran the ndb_select_all program and wrote a python script to compare the location of the rows before and after the partition reorganization. The results of this comparison can be found here, which shows that rows are only moved to the newly added fragments and not existing ones.

Statistically speaking,
I started with the following number of rows in fragments (or shard) 0 to 3.
[25, 26, 25, 24]

The reorganization moved the following number of rows from each fragment to a new fragment (either fragment 4 or 5)
[8, 8, 10, 10]

This number is indeed in the ballpark of 2/6 of the original number of rows in each fragment.

Unlike MongoDB, where online migration can happen any time and the locations of the records are non-deterministic, partition reorganization is manually triggered in mysql-ndb via the "alter online table [tbl] reorganize partition" DDL. A SQL "optimize table" statement needs to be issued to reclaim free space after partition reorganization. 

Lastly, it should be noted that MySQL-ndb currently supports a maximum of 48 data nodes, although this limit might be raised in future.