Skip to content

Instantly share code, notes, and snippets.

@rainbowbird
Created February 22, 2020 12:52
Show Gist options
  • Save rainbowbird/c68fad83fe00c8db3b43ab79aa963388 to your computer and use it in GitHub Desktop.
Save rainbowbird/c68fad83fe00c8db3b43ab79aa963388 to your computer and use it in GitHub Desktop.
Pipelining Chaining Commands - Coroutines with generators
"""Use coroutines to sum log file data with different log levels.
log file content:
ERROR: 78
DEBUG: 72
WARN: 99
CRITICAL: 97
FATAL: 40
FATAL: 33
CRITICAL: 34
ERROR: 10
ERROR: 89
ERROR: 46
"""
import functools
import sys
import time
def init_coroutine(func):
@functools.wraps(func)
def init(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return init
def read_forever(fobj, target):
"""Read from a file as long as there are lines.
Wait for the other process to write more lines.
Send the lines to 'target'.
"""
counter = 0
while True:
line = fobj.readline()
if not line:
time.sleep(0.1)
continue
target.send(line)
@init_coroutine
def filter_comments(target):
"""Filter out all lines starting with #.
"""
while True:
line = yield
if not line.strip().startswith('#'):
target.send(line)
@init_coroutine
def get_number(targets):
"""Read the number in the line and convert it to an integer.
Use the level read from the line to choose the to target.
"""
while True:
line = yield
level, number = line.split(':')
number = int(number)
targets[level].send(number)
@init_coroutine
def fatal():
"""Handle fatal errors."""
sum_ = 0
while True:
value = yield
sum_ += value
sys.stdout.write('FATAL sum: %7d\n' % sum_)
sys.stdout.flush()
@init_coroutine
def critical():
"""Handle critical errors."""
sum_ = 0
while True:
value = yield
sum_ += value
sys.stdout.write('CRITICAL sum: %7d\n' % sum_)
sys.stdout.flush()
@init_coroutine
def error():
"""Handle normal errors."""
sum_ = 0
while True:
value = yield
sum_ += value
sys.stdout.write('ERROR sum: %7d\n' % sum_)
sys.stdout.flush()
@init_coroutine
def warn():
"""Handle warnings."""
sum_ = 0
while True:
value = yield
sum_ += value
sys.stdout.write('WARN sum: %7d\n' % sum_)
sys.stdout.flush()
@init_coroutine
def debug():
"""Handle debug messages."""
sum_ = 0
while True:
value = yield
sum_ += value
sys.stdout.write('DEBUG sum: %7d\n' % sum_)
sys.stdout.flush()
TARGETS = {'CRITICAL': critical(),
'DEBUG': debug(),
'ERROR': error(),
'FATAL': fatal(),
'WARN': warn()}
def show_sum(file_name='out.txt'):
"""Start start the pipline.
"""
read_forever(open(file_name), filter_comments(get_number(TARGETS)))
if __name__ == '__main__':
show_sum(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment