Skip to content

Instantly share code, notes, and snippets.

@AKarbas
Last active April 10, 2019 16:51
Show Gist options
  • Save AKarbas/7bb4649235f65e75fbfbd7a935d17188 to your computer and use it in GitHub Desktop.
Save AKarbas/7bb4649235f65e75fbfbd7a935d17188 to your computer and use it in GitHub Desktop.
Some stuff on Apache Kafka

Some Highlights:

  • Replication 1000 partitions from one broker to another can take up 20ms. This can be too high for some real-time applications

  • File Descriptors limits: Kafka needs open file descriptors for files and network connections . We recommend at least 128000 allowed for file descriptors.

  • Max socket buffer size , can be increased to enable high-performance data transfer. More details are here http://www.psc.edu/index.php/networking/641-tcp-tune

  • "In general, one can produce at 10s of MB/sec on just a single partition as shown in this benchmark."

  • When a broker is shut down uncleanly (e.g., kill -9), the observed unavailability could be proportional to the number of partitions. Suppose that a broker has a total of 2000 partitions, each with 2 replicas. Roughly, this broker will be the leader for about 1000 partitions. When this broker fails uncleanly, all those 1000 partitions become unavailable at exactly the same time. Suppose that it takes 5 ms to elect a new leader for a single partition. It will take up to 5 seconds to elect the new leader for all 1000 partitions. So, for some partitions, their observed unavailability can be 5 seconds plus the time taken to detect the failure.

    • If one is unlucky, the failed broker may be the controller. In this case, the process of electing the new leaders won’t start until the controller fails over to a new broker. The controller failover happens automatically, but requires the new controller to read some metadata for every partition from ZooKeeper during initialization. For example, if there are 10,000 partitions in the Kafka cluster and initializing the metadata from ZooKeeper takes 2 ms per partition, this can add 20 more seconds to the unavailability window.
  • Our experiments show that replicating 1000 partitions from one broker to another can add about 20 ms latency.

    • As a rule of thumb, if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.
  • You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput * 30.

  • If you need to choose between faster CPUs or more cores, choose more cores. The extra concurrency that multiple cores offers will far outweigh a slightly faster clock speed.

  • RAID can potentially do better at balancing load between disks (although it doesn't always seem to) because it balances load at a lower level. The primary downside of RAID is that it reduces the available disk space. Another potential benefit of RAID is the ability to tolerate disk failures.

    • You should not use RAID 5 or RAID 6 because of the significant hit on write throughput and, to a lesser extent, the I/O cost of rebuilding the array when a disk fails (the rebuild cost applies to RAID, in general, but it is worst for RAID 6 and then RAID 5).
    • You should use RAID 10 if the additional cost is acceptable. Otherwise, configure your Kafka server with multiple log directories, each directory mounted on a separate drive.
  • You should run Kafka on XFS or ext4. XFS typically performs well with little tuning when compared to ext4 and it has become the default filesystem for many Linux distributions.

  • Avoid the large machines because they often lead to imbalanced resource usage. For example, all the memory is being used, but none of the CPU. They can also add logistical complexity if you have to run multiple nodes per machine.

  • Confluent (on JVM)

Choosing Topic/Partitions:

Based on throughput requirements one can pick a rough number of partitions:

  • Lets call the throughput from producer to a single partition is P
  • Throughput from a single partition to a consumer is C
  • Target throughput is T
  • Required partitions = Max (T/P, T/C)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment