Saturday, August 25, 2012

Down the Rabbit Hole with Kafka

Introduction

Kafka is a "distributed publish-subscribe messaging system". In this post, I will discuss how the nitty-gritty details of how the producer and consumer mechanisms work in Kafka 0.7.1.

In a typical setup, you have a single ZooKeeper instance and a cluster of Kafka servers (e.g. 3) on a high-numbered RAID array. Here, we strive to understand the dynamic perspective of a Kafka setup.

Starting Up Kafka
  • For each server, a unique brokerid is specified in server.properties. This serves to uniquely identify a broker while allowing the broker to change host or port.
  • By default, when a Kafka broker starts up, it registers itself with zookeeper (unless enable.zookeeper is false in server.properties). The ephemeral ZK znode /brokers/ids/ contains the (host, port) 2-tuple.
  • Kafka will then attempt to read the topics and the load the logs from log.dir (which defaults to /tmp/kafka-logs if you use the provided server.properties). If the log.dir path does not exist, a new log directory is created.
  • Note that at this point, no topics or partitions are created. Topics and partitions are only created when a producer registers with the broker. The number of partitions is specified via num.partitions and this is for each topic in that particular server. If you wish to specify partitions on a per-topic basis, you can override the default number of partitions using topic.partition.count.map. Once again, this is server-specific.
  • The Kafka broker is registered with ZK once the LogManager is instantiated. All existing topics and partitions are also registered with ZK.
  • A folder is created in log.dir for each combination of topic+partition. For instance, if you have a topic "test-topic" with 3 partitions, then you will have the following folders: "test-topic-0", "test-topic-1", "test-topic-2".
  • What is stored in ZooKeeper regarding your Kafka broker?
    Basically the mapping from broker id to (host, port) and the mapping from (topic, broker id) to number of partitions.

Kafka Producer

When a new Producer is instantiated, it looks at either zk.connect (for automatic broker discovery) or broker.list (for static list of kafka brokers defined by the 3-tuple (brokerId, host, port)).

Internally, the producer client keeps a local copy of the list of brokers and their number of partitions. If you are using zookeeper, then this copy changes over time when brokers are added or dropped.

Assume that you have the following code:

ProducerData data = new ProducerData("test-topic", "test-topic");
producer.send(data);

Which partition/broker does the message go to? It depends. The request gets funneled into a send method in kafka.producer, which routes the request to a different function depending on whether you have zookeeper enabled.
  • If you go with the zookeeper option...
  • the Producer retrieves from
    maintains a pool of connections to the brokers, one per broker.
  • In zkSend(),  the topicPartitionsList is fetch from for the specified topic "test-topic" via a call to getPartitionListForTopic(). This returns a scala sequence of (brokerId, partitionId). For instance, If we have two brokers of 3 and 4 partitions respectively, then getPartitionListForTopic may return Seq( (0,0),(0,1),(0,2),   (1,0,),(1,1),(1,2),(1,3) ). This result is sorted in asc ordering by brokerId as primary and partitionId as secondary.
  • The length of that sequence is assigned to totalNumPartitions
  • Now, we want to pick a partitionId in the range [0, N-1], where N is totalNumPartitions
    • If the semantic key is specific in the ProducerData, i.e.:
      new ProducerData("test-topic", "test-key", "test-message"),
      and...
      • If partitioner.class was specified: then the partition(key, numPartitions) method is called which returns the required partitionId.
      • Else the partitioner.class was not specified and the default partitioner class (kafka.producer.DefaultPartitioner) is used. This returns the equivalent of
        math.abs(key.hashCode) % numPartitions
    • Otherwise, partitionId = random.nextInt(totalNumPartitions)
  • Using this partitionId, we can find the which broker that partition id belongs to by looking up the sequence using the partitionId as the index.
  • Now that we have the broker information, the client proceeds to send the message over the wire.
  • The process to find a broker+partition to send the message repeats up to zkReadRetries (aka zk.read.num.retries) times, and each trial other than the first re-reads information from ZK.
  • If you go with the static broker list option...
  • getPartitionListForTopic() is called which returns a sequence as described earlier.
  • Now we have: partitionId = random.nextInt(totalNumPartitions)
  • Using this partitionId, we can retrieve the broker information by looking up the sequence using partitionId as index.
  • Now that we have the broker information, the client proceeds to send the message over the wire.
Note that for async, messages are simply batched before sending, and batch.size and queue.time provide SLA guarantees for the message.

Kafka Consumer

There are two consumer APIs you should be aware of: the high level api (aka ConsumerConnector) and low level api (SimpleConsumer). The big difference between here is that the high level api does broker discovery, consumer rebalancing and keep track of state (i.e. offsets) in zookeeper, while the low level api does not.

If you have a consumer that needs to do fancy stuff such as replaying using specific offsets (e.g. storm spout or a hadoop job which may fail), then that consumer needs to keep track of state manually, and so you should use the low level api.

Kafka Consumer High Level API

The high level api stores state using zookeeper and groups consumers together for load balancing using a unique group_id which is provided by the client. To simply our understanding of state management in ZK, we can think of the znodes as a hash table storing the following information:

key :: value
 owners(group_id, topic, broker_id, partition id) :: consumer_node_id
offsets(group_id, topic, broker_id, partition_id) :: offset counter value
consumer(group_id, consumer_id) :: map({topic, num of streams})

The consumer_id is a 2-tuple of the form (hostname, uuid). This allows for threaded consumers on a single host. The owners(...) key acts as a lock and simplify offset management by ensuring that no more than one consumers are reading from the same combination of (group_id, topic, broker_id, partition_id).

We refer to a consumer here refers to an instance of ConsumerConnector(). A ConsumerConnector instance can have multiple KafkaStreams to allow for multi-threaded consumption.

Because each broker partition can be matched only to one consumer at any given time, you will have non-active consumers if you have more consumers than broker partitions. The benefit of using the high level api is that a consumer will not be starved if a broker fails in a given cluster of kafka brokers. When the failed broker is restored, messages will then be consumed from that broker.

The consumer rebalancing algorithm is triggered via ZK watchers on either of the following conditions:
- addition/removal of broker
- addition/removal of consumer
- new allowed topic

This rebalancing algorithm is triggered for every ConsumerConnector instance in the consumer group (hopefully around the same time, but this isn't a guarantee). So how does the rebalancing work? Effectively speaking for each ConsumerConnector:

First, syncedRebalance() is called. syncedRebalance() effectively loops around rebalance() for a maximum of rebalance.retries.max (defaults to 4) times. For each rebalance attempt, it is possible for a ZK exception to be thrown due to changing ZK states. If there is an exception, the exception is safely caught and the consumer backs off for rebalance.backoff.ms milliseconds (this defaults to zk.synctime.ms). In rebalance() a number of actions happen:
  1. The consumer closes all fetch requests (to avoid data duplication) and offsets are flushed out to ZK
  2. Release all partition ownership from ZK by deleting the znodes for owners(group_id, topic, broker_id, partition id)
  3. Get the partitions per topic mapping
  4. For each topic that the ConsumerConnector is subscribed to:
    1. Using the partitions per topic mapping, get the partitions for that topic, which are of the form (broker-partition). This list is sorted.
    2. Get the total number of consumers for that topic. This is the total number of KafkaStreams subscribing to that topic in the consumer group, which might be more than the number of ConsumerConnector instances.
    3. For each KafkaStreams in the ConsumerConnector: 
      1. Range partition the the sorted partitions to consumer as equally as possible, with the first few consumers getting an extra partition if there are left overs (Note: the consumers were sorted).
        Example 1: If you have 5 partitions with 2 ConsumerConnector instances of 1 stream each, then consumer 0 gets [p0, p1, p2] and consumer 1 gets [p3, p4].
        Example 2: If you have 5 partitions with 2 ConsumerConnector instances of 4 streams each, then consumer 0 gets [p0, p1, p2, 3], and consumer 1 gets [p4].
      2. Note that range partitioning allows for locality, where there is a higher chance for a consumer to fetch data from multiple partitions from a broker rather than all the brokers.

Kafka Consumer Low Level API

In the low level api, you provide everything -- broker host+port, partition id, and offset.

long offset = 0;
SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);

The low level api does not talk to zookeeper, and you are responsible for figuring out which broker and partition to connect to and keep track of your offsets.

Achieving High-Availability
  • Local Redundancy
    Using RAID mirroring will provide local data redundancy, while striping provides the benefit of increased performance. According to the docs, LinkedIn uses RAID 10 on Ext 4.
  • Non Local Redundancy
    As of this writing, Kafka does not support non-local data redundancy. Work is in progress to support inter-cluster replication (KAFKA-50) in v0.8 with automatic recovery.
    While intra-cluster mirroring via MirrorMaker (run kafka.tools.MirrorMaker) has been supported since v0.7, MirrorMaker does not do failover.
Packaged Tools
  • Additional tools are available in kafka.tools. Such tools include:
    • ConsumerOffsetChecker: Shows how much the consumer is lagging behind
    • MirrorMaker: discussed earlier
    • ProducerShell/ConsumerShell: explained in the quickstart guide
    • ExportZKOffsets/ImportZkOffsets: Manual configuration of ZK state.
    • etc
Performance
  • If you wish to benchmark Kafka's performance, the following entry points are provided in bin/
    • kafka-consumer-perf-test.sh
    • kafka-producer-perf-test.sh
    • kafka-simple-consumer-perf-test.sh
  • There is a bug in each of those scripts: (kafka.tools.* should be kafka.perf.*)
  • These helper scripts might be useful.
  • However, you probably want to take a look at the sources for usage.
  • All these being said, the defaults provided are actually pretty reasonable.
Credit

Information for the above write up comes from a few sources:

Sunday, August 12, 2012

Making Eclipse Usable

As a Vim user, I have found it really hard to get back to using Eclipse. For big Java projects, using Eclipse can improve productivity tremendously. Here's what I have done to make Eclipse usable:

Tuesday, August 07, 2012

Will Wright on Games

Wired recently ran a pretty good article about Will Wright (the guy who created SimCity) and his insights. Some cool pointers:
  • There is no win or lose
  • Give users room and creativity to express themselves (freedom, avenue of self-expression)
  • A system that tries to design the ultimate game for every player?
  • Games can be potentially more addictive than drugs
  • Social games evolved based on user feedback + metrics

Monday, August 06, 2012

Real-time Pipeline

Here are a bunch of tools that might be useful for building a real-time pipeline:

  • Kafka: Persistent Message Broker with O(1) access time
  • Storm: Real time computation system
    • Trident: High-level abstraction on top of Storm
  • Esper: Complex Event Processing (CEP)

May talk more about these tools in future.

Friday, August 03, 2012

Re-balancing MySQL Cluster

Recently, I had to investigate whether MySQL Cluster's (aka mysql-ndb: network database) partitioning uses fixed modulo hashing.

Using fixed modulo hashing for partitioning would be pretty bad when a new data node is added as all rows would need to be rehashed and re-distributed among all the available nodes (assuming the modulo is the number of nodes). This would be both computationally cost and time consuming. A common sense alternative would be consistent hashing.

Asking around on  #mysql-ndb (freenode), MongtoDB pointed out me to two great posts by Frazer (who works at MySQL).

1) MySQL Cluster Online Scaling
This is the blog post that mentions the HashMap distribution scheme and how data migration works. In this scheme, a lookup table is added as another layer of indirection on top of standard md5 hashing with constant modulo. When k new nodes are added, only the lookup table needs to be modified and the minimal number of rows (k/n) would be moved from each original node to the new nodes. The missing piece of information in this post is what the lookup table actually looks like.

2) Data Distribution in MySQL Cluster
This is the preceding blog post, which explains sharding in mysql-cluster terminology.

Sadly, the official documentation does not mention the partitioning schemes in great detail. However, a great detailed example on the procedure to add new data nodes is provided.

Unconvinced that the HashMap distribution scheme works, I decided to verify by trying:

For verification, I started with the following setup using 2 replicas for each node group.


(Note: I reserved node 50 for running ndb tools).

and the following schema in the database `awesome`.


I had 100 records with unique user_id from 0 to 99.

Running
ndb_select_all -c 192.168.0.1 user_prefs -d awesome --rowid -D "|"

gives the fragment/shard location of each row in the table. E.g.:

Running
ndb_desc -c 192.168.0.1 user_prefs -d awesome -p -n

gives the node id (primary and secondary) for each partition.

For instance, partition 0 exists on node 11 as the primary and node 12 as the secondary. Nodes 11 and 12 form a node group.

A rolling restart of all services (i.e. mgm, ndb and mysqld)  is needed to bring the two new nodes online. This can be avoided if the set up was pre-configured to take into account future new nodes, and these nodes are not brought online until they are needed. After the restart, I ended up with following cluster configuration. (the new nodes 31, 32 form nodegroup 2). This brings the number of data nodes from 4 to 6.


I executed the partition re-organization, which took 8.69 sec.
Re-running ndb_desc gives the following:
which shows that data re-distribution occurred.

In order to verify that not all rows are unnecessarily re-distributed, I ran the ndb_select_all program and wrote a python script to compare the location of the rows before and after the partition reorganization. The results of this comparison can be found here, which shows that rows are only moved to the newly added fragments and not existing ones.

Statistically speaking,
I started with the following number of rows in fragments (or shard) 0 to 3.
[25, 26, 25, 24]

The reorganization moved the following number of rows from each fragment to a new fragment (either fragment 4 or 5)
[8, 8, 10, 10]

This number is indeed in the ballpark of 2/6 of the original number of rows in each fragment.

Unlike MongoDB, where online migration can happen any time and the locations of the records are non-deterministic, partition reorganization is manually triggered in mysql-ndb via the "alter online table [tbl] reorganize partition" DDL. A SQL "optimize table" statement needs to be issued to reclaim free space after partition reorganization. 

Lastly, it should be noted that MySQL-ndb currently supports a maximum of 48 data nodes, although this limit might be raised in future.

Tuesday, June 26, 2012

FB Folly's Atomic Hashmap

Recently, I watched Spencer Ahrens's talk on Folly's AtomicHashMap at the Facebook C++ conference. I thought it was pretty cool and was definitely impressed by the tricks that were used to achieved lock-free and wait-free hash map operations. The AtomicHashMap is 1.3x - 5x faster than boost.

Here are the slides and video.

Some cool takeaways:
  • Open addressing style
  • Growing hashmap: create another submap instead of rehash
  • Compare and swap for insert
  • ThreadCachedInt
  • Thread Local Storage
  • Microsharding when atomic inc is a bottleneck
  • Modulus is too slow for calculating the hash: so take mod on next power of two and use bit ops
    • What's the optimal size? Branch misprediction becomes a factor too.

Saturday, June 09, 2012

Sun Java on Ubuntu 12

Sun Java is no longer available on Ubuntu packaging due to licensing issues with Oracle. OpenJDK is provided instead. One solution to this is to use the sun-update-jre package via Dunisoft. However, this did not meet my requirements as the package does not include javac.

My solution was to extract the bin file provided by Sun/Oracle by following the steps here. Do note to fix your symlinks for javadoc, javah and javap.

sudo update-alternatives --install /usr/bin/javac javac /usr/lib/jvm/jdk1.6.0_32/bin/javac 1
sudo update-alternatives --install /usr/bin/java java /usr/lib/jvm/jdk1.6.0_32/bin/java 1
sudo update-alternatives --install /usr/bin/javaws javaws /usr/lib/jvm/jdk1.6.0_32/bin/javaws 1
sudo update-alternatives --install /usr/bin/javadoc javadoc /usr/lib/jvm/jdk1.6.0_32/bin/javadoc 1
sudo update-alternatives --install /usr/bin/javah javah /usr/lib/jvm/jdk1.6.0_32/bin/javah 1
sudo update-alternatives --install /usr/bin/javap javap /usr/lib/jvm/jdk1.6.0_32/bin/javap 1

sudo update-alternatives --config javac
sudo update-alternatives --config java
sudo update-alternatives --config javaws
sudo update-alternatives --config javadoc
sudo update-alternatives --config javah
sudo update-alternatives --config javap

Monday, May 14, 2012

Ubuntu Precise Pangolin

I installed Ubuntu Precise Pangolin (amd64) on a separate partition today. The installation went well, with the usual black screen problem.

The amd64 build felt speedier for most applications compared to the i386 build..

I was unimpressed with Unity and so I proceeded to get back the Gnome 2 look and feel by following the instructions here.

Make sure that sources are up to date:
$ sudo apt-get update

Install the gnome fallback:
$ sudo apt-get install gnome-session-fallback
Logout, select Gnome Classic using the gear icon, and then login.

I replaced lightdm with the more awesome gdm:
$ sudo apt-get install gdm
$ sudo apt-get purge lightdm

Install aptitude, gdebi (so faciliate future .deb installations) and synaptic
$ sudo apt-get install aptitude gdebi synaptic

Remove the annoying global app-menu
$ sudo apt-get purge appmenu-gtk appmenu-gtk3 appmenu-qt indicator-appmenu

Remove Unity (Woots!)
$ sudo apt-get remove unity-lens-music unity-lens-applications unity-greeter unity-common unity-asset-pool unity-2d-launcher unity-2d libunity-misc4 libunity-2d-private0 gir1.2-unity-4.0

Remove the semi-invisible overlay scrollbar
$ sudo apt-get purge liboverlay-scrollbar-0.2-0 liboverlay-scrollbar3-0.2-0 overlay-scrollbar appmenu-gtk appmenu-gtk3 appmenu-qt indicator-appmenu

Remove compiz (I dislike special effects)
$ sudo apt-get remove compiz compiz-plugins-main-default libcompizconfig0

Install gconf so we can change the min,max,close buttons to the right:
$ sudo apt-get install gconf-editor
Run gconf-editor in terminal and look for the path  /apps/metacity/general in the tree.
Change the value of button_layout to ":minimize,maximize,close"

Following advice from here, I proceeded to install the cool weather indicator and multiload indicator.
sudo apt-get install indicator-weather indicator-multiload

Now, restart your computer.
Note: I did not install gnome-shell.
I proceeded to make more tweaks:

I installed my favorite Gnome-Do.
$ sudo apt-get install gnome-do
After install, you can start it from Applications -> Accessories -> Gnome Do
It should load on start up and it can be activated via super+space.

I brought back the Alt+F2 shortcut for the run dialog:
Bring up Keyboard window -> Shortcuts (tab).
Look for System -> "Show the run command prompt".
Change the value to Alt+F2.

I installed mscorefonts.
Bring up Synaptic Package Manager. Look for ttf-mscorefonts-installer.
Also installed Mac Fonts (e.g. Lucida Grande)

I also installed the awesome zsh and Google Chrome, and nautilus-open-terminal.

Finally, as my screen is 2 pixels off, I brought up Display and changed the resolution from 1368x768 to 1366x768.

Current problems:
1. xrandr gives me "xrandr: Failed to get size of gamma for output default"
2. Can't connect to external monitor.
Solved: [source]
The trick is to use "acpi_osi=Linux acpi_backlight=vendor" during the boot phase.
This works:
$ sudo sed "s/\(GRUB_CMDLINE_LINUX=\)\"\"/\1\"acpi_osi=Linux acpi_backlight=vendor\"/" /etc/default/grub -i 
$ sudo update-grub
# reboot now

War Story Update:
On a second attempt at configuring Ubuntu 12.04 (Precise) using the above steps, I ran into some problems due to failing to remove compiz completely. The menu bar for windows disappeared and key bindings such appeared to be misconfigured. I then removed compiz via the Ubuntu Software Center and all things were OK after that.

Ubuntu: failed to get i915 symbols, graphics turbo disabled

As it happened, I have been getting the "failed to get i915 symbols, graphics turbo disabled" error on my Ubuntu laptop for the past 2 years. I have finally got around to browsing the related thread on the Ubuntu forums and eliminating the error message.

Apparently, doing the first suggested actions prevented the computer from booting in Linux:

$ sudo su -
$ echo "softdep intel_ips pre: i915" > /etc/modprobe.d/intel-ips-dep-i915.conf

So, I did the second suggested action instead (which worked):

$ gksudo gedit /etc/modprobe.d/blacklist.conf
Add "blacklist intel_ips"

I am not sure what are the side effects of blacklisting, but I doubt it will have any difference compared to the original "fail+disabled" scenario.