Skip to content

Instantly share code, notes, and snippets.

@ThePsyjo
Last active July 17, 2024 21:49
Show Gist options
  • Save ThePsyjo/b717d2eaca2deb09b8130b3e917758f6 to your computer and use it in GitHub Desktop.
Save ThePsyjo/b717d2eaca2deb09b8130b3e917758f6 to your computer and use it in GitHub Desktop.
Script to reset the start position of partitions in kafka topics close to the consumer-group's offset
"""Reset the low watermark of partitions close to the point where given consumer-group has been."""
import argparse
import sys
from collections import OrderedDict
from json import dumps, loads
from pprint import pformat
from subprocess import PIPE, Popen
from tempfile import NamedTemporaryFile
from typing import Generator, Iterable, Union
class MultipleConsumerGroupsException(Exception):
"""Exception indicating that more than one consumer-group exists on the given topic."""
pass
class GetCgDataError(Exception):
"""Exception indicating an error when fetching raw consumer-group data"""
pass
class Hasher(OrderedDict):
"""
Auto-expanding dict.
https://stackoverflow.com/a/3405143/190597
"""
def __missing__(self, key):
value = self[key] = type(self)()
return value
def __getattr__(self, item):
if item not in self:
raise AttributeError(item)
return self[item]
def __setattr__(self, key, value):
if key not in self:
raise AttributeError(key)
super().__setattr__(key=key, value=value)
def get_cg_data(bootstrap_server: str) -> str:
"""
Run `kafka-consumer-groups` to fetch raw consumer group data.
If `kafka-consumer-groups` exists with a non-zero returncode, `GetCgDataError` will be raised.
:param str bootstrap_server: bootstrap server(s)
:return: raw output from `kafka-consumer-groups`
"""
p = Popen(
f'kafka-consumer-groups --bootstrap-server {bootstrap_server} --describe --offsets --all-groups'.split(),
stdout=PIPE,
# stderr=PIPE,
)
out, error = p.communicate()
if p.returncode:
raise GetCgDataError(p.returncode)
return out.decode()
def parse(data: str) -> OrderedDict:
"""
Parse raw data into an `OrderedDict` instance.
:param str data: raw data from :func:`get_cg_data`
:return OrderedDict: parsed data
"""
result = Hasher()
for block in data.split('\n\n'):
block = block.strip()
if not block.startswith('GROUP'):
continue
for line in block.splitlines()[1:]:
cg, topic, partition, current, end, *rest = line.split()
result[topic][cg][int(partition)] = dict(current=int(current), end=int(end))
return result
def generate_offset_json(data: str, cg_map: dict, margin: int = 1_000_000, raise_on_multiple_cg: bool = True) -> dict:
"""
Generate specification json to be used with `kafka-delete-records`.
Iterate over `cg_map` items and generate config blocks for each existing and matching partition.
:param str data: raw data from :func:`get_cg_data`
:param dict cg_map: topic->consumer-group map to include
:param int margin: margin to subtract from current offset, 0 -> set to current offset, 100 -> set 100 before
current offset
:param bool raise_on_multiple_cg: raise `MultipleConsumerGroupsException` if more than one consumer-group is present
on the given topic
:return dict: config to be used with `kafka-delete-records`
"""
result = dict(version=1, partitions=[])
state = parse(data=data)
offsets = Hasher()
for topic, cg in cg_map.items():
if raise_on_multiple_cg and len(state[topic]) > 1:
raise MultipleConsumerGroupsException(list(state[topic].keys()))
for partition, values in state[topic][cg].items():
# print(partition, values)
offset = max(0, values['current'] - margin)
if partition in offsets[topic]:
offsets[topic][partition] = min(offsets[topic][partition], offset)
else:
offsets[topic][partition] = offset
for topic, partitions in offsets.items():
for partition, offset in partitions.items():
result['partitions'].append(
dict(
topic=topic,
partition=partition,
offset=offset,
)
)
return result
def parse_cg_map(cg_map: Iterable[str]) -> Generator[tuple, None, None]:
"""
Parse list of `topic:cg` items into topic, cg pairs.
:param Iterable[str] cg_map: cg maps to extract
"""
for item in cg_map:
yield item.split(':')
def main(
bootstrap_server: str,
cg_map: Iterable[str],
margin: int,
ignore_extra_cg: bool = False,
pretend: bool = False,
json: bool = False,
) -> Union[None, int]:
"""
Main function connecting all the stings.
* fetch raw data (`kafka-consumer-groups`)
* parse raw data
* generate delete config
* write delete config to temporary file
* pretend or execute (`kafka-delete-records`) with gathered data
:param str bootstrap_server: bootstrap server(s) for `kafka-consumer-groups` and `kafka-delete-records`
:param Iterable[str] cg_map: topic->consumer-group maps
:param int margin: offset margin
:param bool ignore_extra_cg: ignore extra consumer-groups on topic
:param bool pretend: only show what would be done
:param bool json: print valid json when in pretend mode
:return: `None` or return code of `kafka-delete-records`
"""
cg_map = dict(parse_cg_map(cg_map))
with NamedTemporaryFile('w') as tmp:
tmp.write(
dumps(
generate_offset_json(
data=get_cg_data(
bootstrap_server=bootstrap_server,
),
cg_map=cg_map,
margin=margin,
raise_on_multiple_cg=not ignore_extra_cg,
)
)
)
tmp.flush()
cmd = f'kafka-delete-records --bootstrap-server {bootstrap_server} --offset-json-file {tmp.name}'
if pretend:
with open(tmp.name, 'r') as f:
if json:
config = f.read()
else:
config = pformat(loads(f.read()))
print(f'{cmd}\n---\n{config}')
else:
p = Popen(cmd.split())
p.wait()
return p.returncode
def _main() -> Union[None, int]:
parser = argparse.ArgumentParser()
parser.add_argument(
'-B',
'--bootstrap-server',
action='store',
dest='bootstrap_server',
default='localhost:9092',
help='kafka bootstrap server (default=localhost:9092)',
)
parser.add_argument(
'-m',
'--map',
action='store',
dest='cg_map',
nargs='+',
required=True,
help='consumer group map in the form of `topic:consumer-group`',
)
parser.add_argument(
'-i',
'--ignore-extra-cg',
action='store_true',
dest='ignore_extra_cg',
help='ignore if other than specified consumer-group(s) exist on the given topic',
)
parser.add_argument('-p', '--pretend', action='store_true', dest='pretend', help='only show what would be done')
parser.add_argument(
'-j',
'--json',
action='store_true',
dest='json',
help='print literal json instead of pprint of the generated config',
)
parser.add_argument(
'-M',
'--margin',
action='store',
dest='margin',
default=1_000_000,
type=int,
help='subtract this margin from current offset (default=1_000_000)',
)
return main(**vars(parser.parse_args()))
if __name__ == '__main__':
sys.exit(_main())
@ThePsyjo
Copy link
Author

ThePsyjo commented Aug 4, 2023

Updated to be able to specify multiple consumer groups in map. When multiple cg are specified it will emit the smallest offset of all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment