Skip to content

Instantly share code, notes, and snippets.

@quan-nguyen2
Last active November 9, 2022 10:28
Show Gist options
  • Save quan-nguyen2/2c0964859b97197a8bc741a23a0e3f36 to your computer and use it in GitHub Desktop.
Save quan-nguyen2/2c0964859b97197a8bc741a23a0e3f36 to your computer and use it in GitHub Desktop.
this script dumps kafka topics to files by day
#!/usr/bin/env bash
set -e
if ! command -v kafka-run-class &>/dev/null; then
if ! command -v kafka-run-class.sh &>/dev/null; then
echo "kafka-run-class or kafka-run-class.sh could not be found in PATH"
exit 1
else
sh=1 # this is most likely we downloaded kafka binary and use *.sh files to working with kafka
fi
else
sh=0 # this is when we install kafka by brew
fi
# extract offsets (start, end) offsets for each partitions of a given topic; print to output and write to output_file
# example: extract_offsets $brokers $topic 2022-11-07
function extract_offsets() {
local -r brokers=$1
local -r topic=$2
local -r day=$3 # something like 2022-07-01
local -r end_date=$(date -I -d "$day + 1 day")
local -r ts1=$(date -d "$day" +"%s000") # convert 2022-07-01 => timestamp in millisecond
local -r ts2=$(date -d "$end_date" +"%s000") # convert 2022-07-02 => timestamp in millisecond
local -r tmp_dir=$(mktemp -d)
# for debug
if [ $sh -eq 1 ]; then
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time $ts1 >$tmp_dir/start
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time $ts2 >$tmp_dir/end
else
kafka-run-class kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time $ts1 >$tmp_dir/start
kafka-run-class kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time $ts2 >$tmp_dir/end
fi
# join above 2 files
# awk -F':' '{print $(NF-1),"\t",$NF}' write last 2 columns of each file, each column separated by ":"
join <(cat $tmp_dir/start | awk -F':' '{print $(NF-1),"\t",$NF}') <(cat $tmp_dir/end | awk -F':' '{print $(NF-1),"\t",$NF}') |
# print: partition, start_offset, end_offset, end_offset-start_offset
# the result will be something like
# 0 23014 23038 24
#1 20864 20884 20
#2 20351 20377 26
#3 18690 18690 0
#4 19124 19126 2
#5 28902 28916 14
awk '{print $1,$2,$3,$3-$2}'
}
export -f extract_offsets
function dump() {
local -r brokers=$1
local -r topic=$2
local -r partition=$3
local -r start_offset=$4
local -r end_offset=$5
if [ $sh -eq 1 ]; then
kafka-console-consumer.sh --bootstrap-server $brokers \
--topic $topic --partition $partition --offset $start_offset \
--max-messages $((end_offset - start_offset))
else
kafka-console-consumer --bootstrap-server $brokers \
--topic $topic --partition $partition --offset $start_offset \
--max-messages $((end_offset - start_offset))
fi
}
export -f dump
# run need brokers, topic, date, output files as result
function run() {
local -r brokers=$1
local -r topic=$2
local -r date=$3
local -r output=$4
local -r offsets_file=$(mktemp)
local -r temp_out=$(mktemp)
extract_offsets $brokers $topic $date | tee $offsets_file
while read -r line; do
# each line contains 4 fields: partition start_offset end_offset
partition=$(awk '{print $1}' <<<"$line")
start_offset=$(awk '{print $2}' <<<"$line")
end_offset=$(awk '{print $3}' <<<"$line")
echo "dumping partition $partition for offset range [$start_offset, $end_offset]"
dump $brokers $topic $partition $start_offset $end_offset | gzip >>$temp_out
done <"$offsets_file"
mv $temp_out $output
}
export -f run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment