Last active
July 17, 2024 21:49
-
-
Save ThePsyjo/b717d2eaca2deb09b8130b3e917758f6 to your computer and use it in GitHub Desktop.
Script to reset the start position of partitions in kafka topics close to the consumer-group's offset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Reset the low watermark of partitions close to the point where given consumer-group has been.""" | |
import argparse | |
import sys | |
from collections import OrderedDict | |
from json import dumps, loads | |
from pprint import pformat | |
from subprocess import PIPE, Popen | |
from tempfile import NamedTemporaryFile | |
from typing import Generator, Iterable, Union | |
class MultipleConsumerGroupsException(Exception): | |
"""Exception indicating that more than one consumer-group exists on the given topic.""" | |
pass | |
class GetCgDataError(Exception): | |
"""Exception indicating an error when fetching raw consumer-group data""" | |
pass | |
class Hasher(OrderedDict): | |
""" | |
Auto-expanding dict. | |
https://stackoverflow.com/a/3405143/190597 | |
""" | |
def __missing__(self, key): | |
value = self[key] = type(self)() | |
return value | |
def __getattr__(self, item): | |
if item not in self: | |
raise AttributeError(item) | |
return self[item] | |
def __setattr__(self, key, value): | |
if key not in self: | |
raise AttributeError(key) | |
super().__setattr__(key=key, value=value) | |
def get_cg_data(bootstrap_server: str) -> str: | |
""" | |
Run `kafka-consumer-groups` to fetch raw consumer group data. | |
If `kafka-consumer-groups` exists with a non-zero returncode, `GetCgDataError` will be raised. | |
:param str bootstrap_server: bootstrap server(s) | |
:return: raw output from `kafka-consumer-groups` | |
""" | |
p = Popen( | |
f'kafka-consumer-groups --bootstrap-server {bootstrap_server} --describe --offsets --all-groups'.split(), | |
stdout=PIPE, | |
# stderr=PIPE, | |
) | |
out, error = p.communicate() | |
if p.returncode: | |
raise GetCgDataError(p.returncode) | |
return out.decode() | |
def parse(data: str) -> OrderedDict: | |
""" | |
Parse raw data into an `OrderedDict` instance. | |
:param str data: raw data from :func:`get_cg_data` | |
:return OrderedDict: parsed data | |
""" | |
result = Hasher() | |
for block in data.split('\n\n'): | |
block = block.strip() | |
if not block.startswith('GROUP'): | |
continue | |
for line in block.splitlines()[1:]: | |
cg, topic, partition, current, end, *rest = line.split() | |
result[topic][cg][int(partition)] = dict(current=int(current), end=int(end)) | |
return result | |
def generate_offset_json(data: str, cg_map: dict, margin: int = 1_000_000, raise_on_multiple_cg: bool = True) -> dict: | |
""" | |
Generate specification json to be used with `kafka-delete-records`. | |
Iterate over `cg_map` items and generate config blocks for each existing and matching partition. | |
:param str data: raw data from :func:`get_cg_data` | |
:param dict cg_map: topic->consumer-group map to include | |
:param int margin: margin to subtract from current offset, 0 -> set to current offset, 100 -> set 100 before | |
current offset | |
:param bool raise_on_multiple_cg: raise `MultipleConsumerGroupsException` if more than one consumer-group is present | |
on the given topic | |
:return dict: config to be used with `kafka-delete-records` | |
""" | |
result = dict(version=1, partitions=[]) | |
state = parse(data=data) | |
offsets = Hasher() | |
for topic, cg in cg_map.items(): | |
if raise_on_multiple_cg and len(state[topic]) > 1: | |
raise MultipleConsumerGroupsException(list(state[topic].keys())) | |
for partition, values in state[topic][cg].items(): | |
# print(partition, values) | |
offset = max(0, values['current'] - margin) | |
if partition in offsets[topic]: | |
offsets[topic][partition] = min(offsets[topic][partition], offset) | |
else: | |
offsets[topic][partition] = offset | |
for topic, partitions in offsets.items(): | |
for partition, offset in partitions.items(): | |
result['partitions'].append( | |
dict( | |
topic=topic, | |
partition=partition, | |
offset=offset, | |
) | |
) | |
return result | |
def parse_cg_map(cg_map: Iterable[str]) -> Generator[tuple, None, None]: | |
""" | |
Parse list of `topic:cg` items into topic, cg pairs. | |
:param Iterable[str] cg_map: cg maps to extract | |
""" | |
for item in cg_map: | |
yield item.split(':') | |
def main( | |
bootstrap_server: str, | |
cg_map: Iterable[str], | |
margin: int, | |
ignore_extra_cg: bool = False, | |
pretend: bool = False, | |
json: bool = False, | |
) -> Union[None, int]: | |
""" | |
Main function connecting all the stings. | |
* fetch raw data (`kafka-consumer-groups`) | |
* parse raw data | |
* generate delete config | |
* write delete config to temporary file | |
* pretend or execute (`kafka-delete-records`) with gathered data | |
:param str bootstrap_server: bootstrap server(s) for `kafka-consumer-groups` and `kafka-delete-records` | |
:param Iterable[str] cg_map: topic->consumer-group maps | |
:param int margin: offset margin | |
:param bool ignore_extra_cg: ignore extra consumer-groups on topic | |
:param bool pretend: only show what would be done | |
:param bool json: print valid json when in pretend mode | |
:return: `None` or return code of `kafka-delete-records` | |
""" | |
cg_map = dict(parse_cg_map(cg_map)) | |
with NamedTemporaryFile('w') as tmp: | |
tmp.write( | |
dumps( | |
generate_offset_json( | |
data=get_cg_data( | |
bootstrap_server=bootstrap_server, | |
), | |
cg_map=cg_map, | |
margin=margin, | |
raise_on_multiple_cg=not ignore_extra_cg, | |
) | |
) | |
) | |
tmp.flush() | |
cmd = f'kafka-delete-records --bootstrap-server {bootstrap_server} --offset-json-file {tmp.name}' | |
if pretend: | |
with open(tmp.name, 'r') as f: | |
if json: | |
config = f.read() | |
else: | |
config = pformat(loads(f.read())) | |
print(f'{cmd}\n---\n{config}') | |
else: | |
p = Popen(cmd.split()) | |
p.wait() | |
return p.returncode | |
def _main() -> Union[None, int]: | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-B', | |
'--bootstrap-server', | |
action='store', | |
dest='bootstrap_server', | |
default='localhost:9092', | |
help='kafka bootstrap server (default=localhost:9092)', | |
) | |
parser.add_argument( | |
'-m', | |
'--map', | |
action='store', | |
dest='cg_map', | |
nargs='+', | |
required=True, | |
help='consumer group map in the form of `topic:consumer-group`', | |
) | |
parser.add_argument( | |
'-i', | |
'--ignore-extra-cg', | |
action='store_true', | |
dest='ignore_extra_cg', | |
help='ignore if other than specified consumer-group(s) exist on the given topic', | |
) | |
parser.add_argument('-p', '--pretend', action='store_true', dest='pretend', help='only show what would be done') | |
parser.add_argument( | |
'-j', | |
'--json', | |
action='store_true', | |
dest='json', | |
help='print literal json instead of pprint of the generated config', | |
) | |
parser.add_argument( | |
'-M', | |
'--margin', | |
action='store', | |
dest='margin', | |
default=1_000_000, | |
type=int, | |
help='subtract this margin from current offset (default=1_000_000)', | |
) | |
return main(**vars(parser.parse_args())) | |
if __name__ == '__main__': | |
sys.exit(_main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated to be able to specify multiple consumer groups in map. When multiple cg are specified it will emit the smallest offset of all.