Last active
March 31, 2020 16:02
-
-
Save samklr/7927e877a822eaa83246562c3277bb8c to your computer and use it in GitHub Desktop.
Offset Management on HBase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Save offsets for each batch into HBase | |
*/ | |
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange], | |
hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={ | |
val hbaseConf = HBaseConfiguration.create() | |
hbaseConf.addResource("src/main/resources/hbase-site.xml") | |
val conn = ConnectionFactory.createConnection(hbaseConf) | |
val table = conn.getTable(TableName.valueOf(hbaseTableName)) | |
val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds) | |
val put = new Put(rowKey.getBytes) | |
for(offset <- offsetRanges){ | |
put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString), | |
Bytes.toBytes(offset.untilOffset.toString)) | |
} | |
table.put(put) | |
conn.close() | |
} | |
/* Returns last committed offsets for all the partitions of a given topic from HBase in | |
following cases. | |
*/ | |
def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String, | |
zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={ | |
val hbaseConf = HBaseConfiguration.create() | |
val zkUrl = zkQuorum+"/"+zkRootDir | |
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, | |
sessionTimeout,connectionTimeOut) | |
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false) | |
val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME | |
)).get(TOPIC_NAME).toList.head.size | |
zkClientAndConnection._1.close() | |
zkClientAndConnection._2.close() | |
//Connect to HBase to retrieve last committed offsets | |
val conn = ConnectionFactory.createConnection(hbaseConf) | |
val table = conn.getTable(TableName.valueOf(hbaseTableName)) | |
val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + | |
String.valueOf(System.currentTimeMillis()) | |
val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0 | |
val scan = new Scan() | |
val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow( | |
stopRow.getBytes).setReversed(true)) | |
val result = scanner.next() | |
var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0 | |
if (result != null){ | |
//If the result from hbase scanner is not null, set number of partitions from hbase | |
to the number of cells | |
hbaseNumberOfPartitionsForTopic = result.listCells().size() | |
} | |
val fromOffsets = collection.mutable.Map[TopicPartition,Long]() | |
if(hbaseNumberOfPartitionsForTopic == 0){ | |
// initialize fromOffsets to beginning | |
for (partition <- 0 to zKNumberOfPartitionsForTopic-1){ | |
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0) | |
} | |
} else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){ | |
// handle scenario where new partitions have been added to existing kafka topic | |
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){ | |
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), | |
Bytes.toBytes(partition.toString))) | |
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong) | |
} | |
for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){ | |
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0) | |
} | |
} else { | |
//initialize fromOffsets from last run | |
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){ | |
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), | |
Bytes.toBytes(partition.toString))) | |
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong) | |
} | |
} | |
scanner.close() | |
conn.close() | |
fromOffsets.toMap | |
} | |
val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum, | |
zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut) | |
val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent, | |
Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets)) | |
* | |
For each RDD in a DStream apply a map transformation that processes the message. | |
*/ | |
inputDStream.foreachRDD((rdd,batchTime) => { | |
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges | |
offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset, | |
offset.untilOffset)) | |
val newRDD = rdd.map(message => processMessage(message)) | |
newRDD.count() | |
saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime) | |
}) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout) | |
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false) | |
def readOffsets(topics: Seq[String], groupId:String): | |
Map[TopicPartition, Long] = { | |
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long] | |
val partitionMap = zkUtils.getPartitionsForTopics(topics) | |
// /consumers/<groupId>/offsets/<topic>/ | |
partitionMap.foreach(topicPartitions => { | |
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1) | |
topicPartitions._2.foreach(partition => { | |
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition | |
try { | |
val offsetStatTuple = zkUtils.readData(offsetPath) | |
if (offsetStatTuple != null) { | |
LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*) | |
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), | |
offsetStatTuple._1.toLong) | |
} | |
} catch { | |
case e: Exception => | |
LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*) | |
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L) | |
} | |
}) | |
}) | |
topicPartOffsetMap.toMap | |
} | |
val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets)) | |
def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = { | |
offsets.foreach(or => { | |
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic); | |
val acls = new ListBuffer[ACL]() | |
val acl = new ACL | |
acl.setId(ANYONE_ID_UNSAFE) | |
acl.setPerms(PERMISSIONS_ALL) | |
acls += acl | |
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition; | |
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset | |
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" | |
+ or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls)) | |
LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*) | |
}) | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
stream.foreachRDD { rdd => | |
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges | |
// some time later, after outputs have completed | |
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) | |
} | |
//http://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Minimum TODOs on a per job basis: | |
# 1. define name, application jar path, main class, queue and log4j-yarn.properties path | |
# 2. remove properties not applicable to your Spark version (Spark 1.x vs. Spark 2.x) | |
# 3. tweak num_executors, executor_memory (+ overhead), and backpressure settings | |
# the two most important settings: | |
num_executors=6 | |
executor_memory=3g | |
# 3-5 cores per executor is a good default balancing HDFS client throughput vs. JVM overhead | |
# see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ | |
executor_cores=3 | |
# backpressure | |
receiver_max_rate=100 | |
receiver_initial_rate=30 | |
spark-submit --master yarn --deploy-mode cluster \ | |
--name <my-job-name> \ | |
--class <main-class> \ | |
--driver-memory 2g \ | |
--num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory} \ | |
--queue <realtime_queue> \ | |
--files <hdfs:///path/to/log4j-yarn.properties> \ | |
--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \ | |
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \ | |
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer` \ | |
--conf spark.locality.wait=10 `# Increase job parallelity by reducing Spark Delay Scheduling (potentially big performance impact (!)) (Default: 3s)` \ | |
--conf spark.task.maxFailures=8 `# Increase max task failures before failing job (Default: 4)` \ | |
--conf spark.ui.killEnabled=false `# Prevent killing of stages and corresponding jobs from the Spark UI` \ | |
--conf spark.logConf=true `# Log Spark Configuration in driver log for troubleshooting` \ | |
`# SPARK STREAMING CONFIGURATION` \ | |
--conf spark.streaming.blockInterval=200 `# [Optional] Tweak to balance data processing parallelism vs. task scheduling overhead (Default: 200ms)` \ | |
--conf spark.streaming.receiver.writeAheadLog.enable=true `# Prevent data loss on driver recovery` \ | |
--conf spark.streaming.backpressure.enabled=true \ | |
--conf spark.streaming.backpressure.pid.minRate=10 `# [Optional] Reduce min rate of PID-based backpressure implementation (Default: 100)` \ | |
--conf spark.streaming.receiver.maxRate=${receiver_max_rate} `# [Spark 1.x]: Workaround for missing initial rate (Default: not set)` \ | |
--conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# [Spark 1.x]: Corresponding max rate setting for Direct Kafka Streaming (Default: not set)` \ | |
--conf spark.streaming.backpressure.initialRate=${receiver_initial_rate} `# [Spark 2.x]: Initial rate before backpressure kicks in (Default: not set)` \ | |
`# YARN CONFIGURATION` \ | |
--conf spark.yarn.driver.memoryOverhead=512 `# [Optional] Set if --driver-memory < 5GB` \ | |
--conf spark.yarn.executor.memoryOverhead=1024 `# [Optional] Set if --executor-memory < 10GB` \ | |
--conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts (needs to be <= yarn.resourcemanager.am.max-attempts in YARN, which defaults to 2) (Default: yarn.resourcemanager.am.max-attempts)` \ | |
--conf spark.yarn.am.attemptFailuresValidityInterval=1h `# Attempt counter considers only the last hour (Default: (none))` \ | |
--conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))` \ | |
--conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour` \ | |
</path/to/spark-application.jar> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
Can you please help me out with Java code for above one. As i am implementing kafka Spark streaming in Java...b