Skip to content

Instantly share code, notes, and snippets.

@thom-vend
Last active June 2, 2021 01:44
Show Gist options
  • Save thom-vend/0fe10e1b857aa0d95921fa52a1274feb to your computer and use it in GitHub Desktop.
Save thom-vend/0fe10e1b857aa0d95921fa52a1274feb to your computer and use it in GitHub Desktop.
Zookeeper read-write test job in python3
#!/usr/bin/env python3
from datetime import datetime
from icecream import ic
from kazoo.client import KazooClient
from multiprocessing import Pool
from pprint import pprint
import argparse
import logging
import random
import signal
import string
import sys
import time
import traceback
args = None
def dataforzktest():
rand = "".join(random.choice(string.ascii_uppercase) for _ in range(32))
date = datetime.today().strftime("%Y-%m-%d-%H:%M:%S")
data = f"{date}__{rand}"
return data.encode()
def printme(s):
if not args.noprint:
logging.info(s)
def zk_write(zk, subpath, name):
path = "{}/{}".format(subpath.rstrip("/"), name)
ic(path)
write_latency = -1
if not zk.exists(path):
ic()
zk.ensure_path(subpath)
zk.create(path, dataforzktest())
else:
ic()
tic = time.perf_counter()
zk.set(path, dataforzktest())
write_latency = time.perf_counter() - tic
return write_latency
def zk_read(zk, subpath, name):
path = "{}/{}".format(subpath.rstrip("/"), name)
read_latency = -1
if zk.exists(path):
tic = time.perf_counter()
data, stat = zk.get(path)
read_latency = time.perf_counter() - tic
ic(stat)
ic(data)
datas = data.decode("utf-8")
printme(f"Version: {stat.version}, data: {datas}")
else:
ic()
printme(f"Path {path} don't exist")
return read_latency
def zk_appjob(workernum):
ic(workernum)
zk_node = args.jobname
if args.autoname:
zk_node = f"{args.jobname}{workernum}"
write_latencies = []
read_latencies = []
avg_write = -1
avg_read = -1
start_time = time.perf_counter()
zk = KazooClient(hosts=args.host, read_only=(not args.write))
zk.start()
ic()
while True:
if args.write:
l = zk_write(zk, args.zkpath, zk_node)
if l != -1 and not args.nostats:
write_latencies.append(l)
if args.read:
l = zk_read(zk, args.zkpath, zk_node)
if l != -1 and not args.nostats:
read_latencies.append(l)
if not args.nostats and (time.perf_counter() - start_time) > 5:
lwrite_latencies = len(write_latencies)
lread_latencies = len(read_latencies)
if lwrite_latencies > 0:
avg_write = round((sum(write_latencies) / lwrite_latencies) * 1000, 2)
write_latencies = []
if lread_latencies > 0:
avg_read = round((sum(read_latencies) / lread_latencies) * 1000, 2)
read_latencies = []
start_time = time.perf_counter()
logging.info(
f"STATS: avg latencies: write={avg_write}ms, read={avg_read}ms -- writes:{lwrite_latencies}, reads{lread_latencies}"
)
if args.apploop:
if args.delay > 0.001:
time.sleep(args.delay)
else:
continue
else:
break
ic()
zk.stop
class GracefulExit(Exception):
pass
def signal_handler(signum, frame):
raise GracefulExit()
def job_fullloop(workernum):
ic(workernum)
while True:
try:
zk_appjob(workernum)
except GracefulExit:
logging.info("Gracefull shutdown...")
sys.exit(0)
return
except Exception as e:
logging.error(traceback.format_exc())
if args.fullloop:
ic()
if args.delay > 0.001:
time.sleep(args.delay)
else:
continue
else:
break
def mp_init(_args):
global args
args = _args
if not args.debug:
ic.disable()
def multiprocess(args):
with Pool(args.worker, initializer=mp_init, initargs=(args,)) as p:
ic()
p.map(job_fullloop, range(0, args.worker))
def signal_handler(signum, frame):
raise GracefulExit()
def main():
global args
parser = argparse.ArgumentParser(
description="zookeeper-check-kazoo dirty tool",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--host", dest="host", default="127.0.0.1:2181", help='Set "host:port"'
)
parser.add_argument(
"--write",
action="store_true",
dest="write",
default=False,
help="Write some data",
)
parser.add_argument(
"--read",
action="store_true",
dest="read",
default=False,
help="Read some data (can be enable with write mode to read just after",
)
parser.add_argument(
"--zk-path", dest="zkpath", default="/sremigrationtest/", help="ZK base path"
)
parser.add_argument(
"--job-name",
dest="jobname",
default="test0",
help="Name of the job, entry in ZK",
)
parser.add_argument(
"--auto-suffix",
action="store_true",
dest="autoname",
default=False,
help="Auto add suffix on the job name",
)
parser.add_argument(
"--debug",
action="store_true",
dest="debug",
default=False,
help="Enable debug mode",
)
parser.add_argument(
"--apploop",
action="store_true",
dest="apploop",
default=False,
help="Infinite app style loop",
)
parser.add_argument(
"--fullloop",
action="store_true",
dest="fullloop",
default=False,
help="Full reconnection loop",
)
parser.add_argument(
"--delay",
dest="delay",
type=float,
default=1.0,
help="Pause x seconds before doing it again (loops mode only), 0 for no delay",
)
parser.add_argument(
"--worker",
dest="worker",
type=int,
default=1,
help="Amount of worker to do the requested job in //",
)
parser.add_argument(
"--no-stats",
action="store_true",
dest="nostats",
default=False,
help="Disable stats prints",
)
parser.add_argument(
"--no-prints",
action="store_true",
dest="noprint",
default=False,
help="Disable priting read values",
)
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
ic.disable()
logging.basicConfig(level=logging.INFO)
ic(args)
if not args.read and not args.write:
logging.error(
"Nothing todo, --read and/or --write need to be passed as argument, -h for help"
)
sys.exit(42)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if args.worker > 1:
multiprocess(args)
else:
job_fullloop(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment