"A high-throughput distributed messaging system." site
Notes taken from source
- Created at LinkedIn (open sourced in 2011)
- Implemented in scala and some java
Design Requirements
- High throughput to support high volume event feeds
- Support real-time processing of these feeds to create new, derived feeds
- Support large data backlogs to handle periodic ingestion from offline systems
- Support low-latency delivery to handle more traditional messaging use cases
- Guarantee fault-tolerance in the presence of machine failure
Writes
- writes go to the page cache of OS, i.e. RAM
Reads
- direct transfer from page cache to network socket with
sendfile
, avoids coping data into kafka application for sending - A healthy kafka cluster will show mimimal read activity to disk as a is served primarily from cache.
LinkedIn Usage Stats
- 300+ brokers
- 18K+ topics
- 140k+ partitions
- 220 bil messages per day
- 40 TB in
- 160 TB out
- peak 3.25 mil msg/sec
- 5.5 Gbit/sec in
- 18 Gbit/sec out
General Use Case
- kafka + (Storm, Samza, Spark Streaming, homegrown)
- LinkedIn: activity streams, ops metrics, data bus
- Netflix: real-time monitoring, event processing
- Twitter: w/ storm for real-time data pipelines
- Spotify: log delivery to hadoop
- loggly: log collection and processing
- Mozilla: telemetry data
- kafka brokers handle reads and writes
- Hierarchy (Topic -> Partition -> Replication)
- ZooKeeper manages and shares state for brokers and consumers (brokers only in kafka v0.9)
- Topics/Partitions are append only and immutable sequence
- Length of Topics/Partitions are governed by
age
,max size
, orkey
- Offset determines location in the queue (monotonically increasing integer)
- Consumers use the offset to track position in the queue
- Replicas are solely for data loss prevention (never read from or written to)
- Servers with large RAM used for serving data from cache (LinkedIn 64GB = 60 for cache and 4 for brokers)
- RAID10 with 14 spindles
- more spindles -> higher disk throughput
- Cache on RAID with battery backup
- 1 Gig ethernet
- zookeeper servers -> run only zookeeper, 1 zookeeper per host
- recommend SSDs
- kafka clusters don't span data centers (latency to zookeeper servers?)
- zookeeper cluster tolerates n/2 - 1 failures
- LinkedIn runs 5 node ensembles
- Twitter runs 13 node ensembles
cli
kafka-topics.sh --zookeeper 1.2.3.4:2181 --create --topic topic.name \
--partitions 3 --replication-factor 2 --config x=y
auto create
auto.create.topics.enable = true
kafka-topics.sh --zookeeper 1.2.3.4:2181 --describe --topic topic.name
- Synchronous or Asynchronous
- Sync blocks client on
send()
- Async sends message in background
- Allows for batching of messages
- Pools of Sync producers
- Possible to drop messages if queue is full
- Write Message ack
- message considered committed when "any required" in-sync replicas for that partition have applied the message to their data log
- "any required" defined by producers in
request.required.acks
0
- fire and forget (no ack required)1
- wait for leader to ack-1
- wait for all ISR (in-sync replicas) ack
- only committed messages are given to consumers
request.timeout.ms
if too low can send error back to the client and then an ack from a broker- batch writes
- higher risk for data loss if client dies before pending messages sent
- default partitioner selects based on hash of key
- so default behavior, kafka guarantees that all data for the same key will go to the same partition
- if key is not selected, client will push to a partition and stick to it for a random period of time then switch to another random partition.
- key is retained in the message in the broker
- current list of topic/partition leaders provided to producers by brokers
cp.metadata.broker.list
- Some brokers are designated for "bootstrap"
- At least 1 "bootstrap" broker is required up or producers will stop working
- Recommended to use VIP for "bootstrap" brokers
- storm 0.9.2 uses simple consumer API to integrate well with storm's model of guaranteed message processing
- Consumers pull from kafka
- Consumers are responsible for tracking their offset position
- High level consumer: stores the offsets is zookeeper
- Simple consumer: responsible for tracking offset
- Consumers can rewind time back to the max time or max size of the topic/partition
- Consumer can select subset of a topic's partitions to consume
- config:
group.id
assigns consumer to a "group"zookeeper.connect
broker/topic/etc discovery- ``:
- System Tools
- Replication tools
- stormkafkamon
- JMX
- dropwizard/metrics
- garbage collection time
- data size on disk should be balanced across brokers/disks
- data balance is more important than partition balance
- Leader partition count should be balanced (avoid hot node since all reads/writes go against the associated leader topic/partition)
- track network utilization
- common cause for under-replicated partition
- request latency (if on SSDs should be esentially <1 ms)
- outstanding requests (increasing means items backing up)
- LinkedIn working on adding "Auditing" functionality
- not yet released
vm.swappiness = 0
- Allow more dirty pages but less dirty cache
vm.dirty_*_ratio
- Lengthen flush interval (linked in uses 120s !? They can tolerate 2 mins worth of data loss with replicas)
- Additional spindles (RAID10 with 14 disks)
num.io.threads
: should be >= #disks (initial to # disks)num.network.threads
: adjust based on concurrent # producers, consumers, replication factor- Compression
- snappy and gzip are supported. Snappy recommended. Lower compression, but not as cpu intensive.
- Preducer Example
- Multi-broker on a single node
- Synchronous Producers (blocks on send)
- Don't break up into separate topics unless data is truly independent
- Keep time related messages in the same partition
- Async producers can drop messages send queue is full (defualt queue.buffering.max.messages 10,000)
- producer
request.timeout.ms
if too low can send error back to the client and then an ack from a broker - Use VIPs for bootstrap broker list to help with up/down of "bootstrap" brokers
metadata.broker.list
- Kafka was not designed originally with security in mind
- june 2014 adding security features (TLS, data encryption at rest)
- Garbage collection
- suggest "G1 garbage first" gc
- Educating and coaching on kafka use
- Expanding/reducing size of kafka cluster
- Monitoring consumer apps ("My stuff stopped working, what did kafka do?")
- Consumer lag
- consumers too slow
- too much GC
- Loss of connection to zookeeper servers or brokers
- bug or design flaw
- Rebalancing
- Multiple new nodes trigger repeated rebalancing that hit kafka's rebalancing limit
cf.rebalance.max.retries
(default 4) - New script in v0.8.1 to balance data/partitions across brokers
- Multiple new nodes trigger repeated rebalancing that hit kafka's rebalancing limit
- Problems with partition healing (CAP)
- Measure replication lag?
- Does kafka fsync on each write? if not flush interval?
- Election algorithm
- How do you ensure unique broker ids in automated environment?
- Measure health of zookeeper server
- review puppet-kafka recipe
- What happens to consumer that restarts and looses offset value, start at beginning or end of queue?