Skip to content

Instantly share code, notes, and snippets.

@jquast
Last active May 28, 2024 15:10
Show Gist options
  • Save jquast/3199d9c46de22721e7eb4599287c8d9f to your computer and use it in GitHub Desktop.
Save jquast/3199d9c46de22721e7eb4599287c8d9f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#-u
# Python 2.4+ required
# by contact@jeffquast.com
# This utility creates a multiplexing i/o pipe between a master process
# and multiple child processes. The master process reads in one input file
# and delegates equal chunks of those files into each child process.
#
# The child process script may be a binary program, too. so long as it
# reads from standard input!
#
# For instance, we might have a stat-script.sh:
#
# #!/bin/sh
# while read path; do
# stat $path | awk '/^Modify/ {print $2}'
# done
#
# and an input list,
# $ find /var > files-var.lst
#
# we could pipe that input file to the script:
#
# $ time ./stat-script.sh < files-var.lst
# real 0m48.168s
# user 0m12.963s
# sys 0m58.729s
#
# however, with multiproc.py, we can pipe this input file to 8 sub-processes,
#
# $ time ./multiproc.py -i files-var.lst -s ./stat-script.sh -p 8
#
# real 0m25.403s
# user 0m20.319s
# sys 2m1.663s
#
# and finish in a much shorter time on multi-cpu environments!
import os, sys, subprocess, select
import getopt, re, shutil, shlex, time, fcntl
from n_cpus import n_cpus
# improvements over original
# * job folder and chunk parts automatically initialized from input file, or re-used on launch (-i)
# * job folder chunk parts moved after execution are recognized during runtime
# * number of subprocesses can be expressed in relation to number of cpu's (-z)
# * 1 less subprocess in the processes tree, the target (-s)cript is a directly
# managed child process. this also means signal replication is not necessary
# * job chunks sorted & striped by their integer value regardless of 0-padding (x1, x20, x100)
# * highly accurate ETA and lines/second with --eta
# * no '--shell' is necessary, binary executables may be directly used
# it is also fully compatible with multiproc job folders:
# * default prefix, and chunklength of split-command.sh is emulated
# * job state sub-folders and file conventions also emulated
# * process chunk striping (#1 gets 1, 4, 8, 12, #2 gets 2, 5, 9, 13) is emulated
# * multiple multiprocs on same input folder allowed (with warning)
def main():
cpus, origin = n_cpus(2)
processes = (cpus -1)
chunk_pattern = '([a-zA-Z]+)([0-9]+)'
eta=False
p_args=[]
etaInterval = 1.0
script=None
EXIT_POLL=1.0
n_chunks=999
prefix='x'
input=None
aggregate_output=False
splitonly=False
bufsize=1
SHORT_OPTS='i:s:a:b:p:z:jeh'
LONG_OPTS='input= script= args= processes= bufsize= join eta interval= procsbycpu= nchunks= prefix= chunk_pattern splitonly help'.split()
def usage():
sys.stdout.write (
'\n%s -i <path> -s ./prog.exe [-p <n>]\n' % (sys.argv[0],) + \
'\nif input is a file, that file is split into --nchunks(%i) equal parts\n' \
'of an auto-created folder named as {input}.job-{rows}.{chunksize}, which\n' \
'can be re-respecified as input for re-use.\n' % (n_chunks,) + \
'\nsub-folders are created, each chunk is moved to notify its current state:\n' \
' processing/ a subprocess is actively using this chunk\n' \
' processed/ a subprocess has sucesfully completed this chunk\n' \
' failed/ a subprocess returned non-zero or did not fully read input pipe\n' \
' output/ contains a log of each subprocess chunks combined stdout & stderr\n' \
'\nall arguments:\n' \
' -i or --input=<filepath> input filepath or job folder (required)\n' \
' -s or --script=<filepath> program to subprocess & feed input (required)\n' \
' -a or --args=<string> program arguments to send to script (default:None)\n' \
' -p or --processes=<int> number of sub-processes to use (default=%i)\n' % (processes,) + \
' -j or --join (bool) when specified, the output of each subprocess\n' \
' is written to master process stdout & stderr.\n' \
' effectively joining all child process output pipes.\n' \
' -b or --bufsize=<int> subprocess pipe buffer size, 1=line, 0=unbuffered,\n' \
' -1=sysdef, large int needed for high thoroughput.\n' \
' -e or --eta (bool) when specified, write ETA to stderr\n' \
' --interval=<float> how often the eta should be updated, default=%0.1f\n' % (etaInterval,) + \
' -z or --procsbycpu=<str> a limited {op}{val} equation for specifying the\n' \
' number of processes in relation to number of\n' \
" available cpu. fe: '-1','*2' for (n_cpu-1) &\n" \
' (n_cpu * 2) respectively.\n' \
' --nchunks=<int> number of files to divide input.lst into\n' \
" --prefix=<str> alpha file chunk prefix marker (default='%s')\n" % (prefix,) + \
" --chunk_pattern=<str> chunk part regexp (default='%s')\n" % (chunk_pattern,) + \
' --splitonly (bool) split input file if specified and exit.\n')
try:
opts, args = getopt.getopt \
(args=sys.argv[1:], shortopts=SHORT_OPTS, longopts=LONG_OPTS)
except getopt.GetoptError, err:
sys.stderr.write ('GetoptError, %s.\n' % (err,))
sys.exit (1)
for o, a in opts:
if o in ('-i', '--input',):
if not a.strip() or not os.path.exists(a):
sys.stderr.write ('%s file or folder: %s does not exist.\n' % (o, a,))
sys.exit(1)
input=a
elif o in ('-s', '--script',):
is_exe = lambda p: os.path.exists(p) and os.access(p, os.X_OK)
fpath, fname = os.path.split(a)
if fpath and not is_exe(a):
sys.stderr.write ('%s %s: path is not executable.\n' % (o, a,))
sys.exit(1)
elif not fpath and not True in \
[is_exe(os.path.join(p,a)) for p in os.environ["PATH"].split(os.pathsep)]:
sys.stderr.write ('%s %s: program not found in PATH, or is not executable\n' % (o, a,))
sys.exit(1)
script=a
elif o in ('-a', '--args',):
p_args = shlex.split(a)
elif o in ('-b', '--bufsize',):
try:
bufsize = int(a)
except ValueError, r:
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, e,))
sys.exit(1)
elif o in ('-p', '--processes',):
try:
processes = int(a)
if processes <= 0:
raise ValueError, 'not enough processes for execution'
except ValueError, e:
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, e,))
sys.exit(1)
elif o in ('-z', '--procsbycpu',):
try:
op, val = a.strip()[0], a.strip()[1:]
if val.startswith('='): # allow C-like shorthand, '*=3'
val = ''.join(val.split('=')[1:])
val = float(val) # throws ValueError
if op == '*': processes = int(cpus *val)
elif op == '/': processes = int(cpus /val)
elif op == '-': processes = int(cpus -val)
elif op == '+': processes = int(cpus +val)
else:
raise ValueError, "operator %s invalid, must be one of: '*/-+'" (op,)
if processes < 1:
raise ValueError, 'result less than 1 (%s)' % (processes,)
except ValueError, err:
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, err))
sys.exit (1)
elif o in ('--nchunks',):
try:
n_chunks = int(a)
if n_chunks < 0:
raise ValueError, 'not enough input chunks'
except ValueError, e:
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, e,))
sys.exit(1)
elif o in ('--prefix',):
prefix=a.strip()
if not prefix:
sys.stderr.write ('%s %s: ValueError: nil.\n' % (o, prefix, e,))
sys.exit(1)
elif False in [c.isalpha() for c in prefix]:
sys.stderr.write ('%s %s: ValueError: prefix must be alpha.\n' % (o, prefix, e,))
sys.exit(1)
elif o in ('--splitonly'):
splitonly = True
elif o in ('-j' or '--join'):
aggregate_output = True
elif o in ('-e' or '--eta'):
eta = True
elif o in ('--interval'):
etaInterval = float(a)
elif o in ('-h', '--help',):
usage ()
sys.exit(0)
if not input:
sys.stderr.write ('[-i|--input] file or folder must be specified.\n')
usage()
sys.exit(1)
if not script:
sys.stderr.write ('[-s|--script] argument not supplied or nil.\n')
usage()
sys.exit(1)
try:
re_chunk = re.compile(chunk_pattern)
except:
sys.stderr.write ('Invalid regular expression: %s\n' % (chunk_pattern,))
sys.exit(1)
# instead of managing a list of global job chunks and delegating the next
# available job to the next spawned process, each process id is pre-destined
# to pick a 'stripe' through the set. This is to simulate the original
# bash-multiproc behvaior, but not ideal when processing time varies wildly
# for each input line, for instance xml transforms on xml files of varying sizes.
# TODO: This should be depricated to --stripe !
ischunk = lambda f: re_chunk.match(f) and os.path.isfile(os.path.join(folder,f))
def chunkval(f):
match = re_chunk.match(f)
if match:
prefix, value = match.groups()
return int(value)
if os.path.isdir(input):
folder = input
orderedChunks = [file for val, file in sorted([(chunkval(f), f) for f in os.listdir(folder) if ischunk(f)])]
if eta:
sys.stderr.write ('calculating input size for eta ... ')
rows = numRows([os.path.join(input,chunk) for chunk in orderedChunks])
sys.stderr.write ('%i \n' % (rows,))
else:
folder, rows = createJobFolder(input, n_chunks, prefix)
orderedChunks = [file for val, file in sorted([(chunkval(f), f) for f in os.listdir(folder) if ischunk(f)])]
for chk_folder in (os.path.join(folder,'processing'), os.path.join(folder,'failed')):
if os.path.exists(chk_folder) and os.path.isdir(chk_folder) and os.listdir (chk_folder):
files = sorted(os.listdir(chk_folder))
sys.stderr.write ('error: files in %s/ (%s)\n' % (chk_folder, ','.join(files),))
sys.stderr.write ('\n(m)ove files to %s/ and continue, (c)ontinue, or (e)xit? [e]: ' % (folder,))
opt = sys.stdin.readline().strip().lower()
if opt == 'm':
for file in files:
shutil.move (src=os.path.join(chk_folder,file), dst=folder)
elif opt == 'c':
None
else:
sys.exit(0)
sys.stderr.write ('\n')
stripe = lambda n: [orderedChunks[o-1] for o in range(n+1, len(orderedChunks)+1, processes)]
if splitonly:
sys.exit(0)
# procs is list of child processes, up to --processes
procs = []
# Multiprocessing, each 'p' instance is a new subprocess
for n in range (processes):
p = Multiproc (n, script, p_args, bufsize)
try:
p.load (folder, [fs for fs in stripe(n) if fs])
procs.append (p)
except IndexError:
pass # not enough input files for proc? oh well
received = lastReceived = 0 # number of lines received by child processes
beginTime = lastUpdate = lastUpdate = time.time()
# loop until all processes are complete.
while [True]*len(procs) != [p.completed for p in procs]:
if eta and time.time() - lastUpdate > etaInterval:
if received - lastReceived > 0:
# calculate lines/second since last update
lines_persecond = float(received -lastReceived) / (time.time() - lastUpdate)
else:
# no change! then calculate global lines/second
lines_persecond = float(received) / (time.time() - beginTime)
remaining = float(rows -received)
outline = '[%s of %s] %0.1f lps, %s eta ' \
% (received, rows, lines_persecond, asctime(remaining / lines_persecond))
sys.stderr.write (outline + len(outline)*'\b')
lastUpdate = time.time()
lastReceived = received
# 'multiplexing': wait up to EXIT_POLL for any pipes available for r/w
(ready_read, ready_write, ready_x) = \
select.select ([p.stdout for p in procs if not p.stdout.closed]+[p.stderr for p in procs if not p.stderr.closed],
[p.stdin for p in procs if not p.stdin.closed], [], EXIT_POLL)
# check stdout pipe for output (line-buffered)
for p in [p for p in procs if p.stdout in ready_read]:
out = None
rr = [p.stdout]
while p.stdout in rr and out != '':
out = p.stdout.read()
if out:
p.log (out)
if aggregate_output:
sys.stdout.write (out)
sys.stdout.flush ()
else: # try again
# XXX debug: data waiting on stdout, but not a complete line?
# sys.stderr.write ('eof\n') # XXX
continue
# continue checking for more lines on stdout
rr, rw, rx = select.select([p.stdout],[],[],0)
out=None # free()
(ready_read, ready_write, ready_x) = \
select.select ([p.stdout for p in procs if not p.stdout.closed]+[p.stderr for p in procs if not p.stderr.closed],
[p.stdin for p in procs if not p.stdin.closed], [], EXIT_POLL)
# check stderr pipe for output (unbuffered)
for p in procs:
if p.stderr in ready_read:
err = p.stderr.read()
p.err (err)
if aggregate_output:
sys.stderr.write (err)
err=None # free()
# check stdin willingless
for p in procs:
if p.stdin in ready_write: # and p.poll() != None and not p.stdin.closed:
# all conditions check to write to stdin of child process,
try:
p.stdin.write (p.next()) # write to child process
received += 1
except IndexError:
# no lines remain on input, close its input pipe
p.stdin.close ()
except IOError:
# if the pipe is broken (due to early termination)
# we must indicate that the line pushed with .next() was
# not received, and decrement the index
p.index -= 1
pass
# check for sub-process exit
for (ret, p) in [(p.poll(), p) for p in procs if not p.completed and p.poll() != None]:
# poll for remaining output
(ready_read, ready_write, ready_x) = \
select.select ([p.stdout, p.stderr], [], [], 0)
# read & record all remaining lines
if p.stdout in ready_read:
out = p.stdout.read() #'\n'.join(p.stdout.readlines())
p.log (out)
if aggregate_output:
sys.stdout.write (out)
if not p.stdout.closed:
p.stdout.close ()
out=None
if p.stderr in ready_read:
err = p.stderr.read() #'\n'.join(p.stderr.readlines())
p.err (err)
if aggregate_output:
sys.stderr.write (err)
if not p.stderr.closed:
p.stderr.close ()
err=None
p.close ()
if p.index < p.length:
sys.stderr.write ('[%s/%s]: %s pre-maturely exausted (%i/%i)\n' \
% (p.n, p.pid, p.fp_in.name, p.index, p.length))
# XXX
received += (p.length -p.index)
if ret == 0:
ret = -1 # even with a return code of 0, fake a return status
# of '-1' so that the chunk is moved to failed/
p.complete (ret)
# if any child process returns a non-zero exit status, stop all pipe activity
# and wait for up to timeout for user to read errors and chose to exit
if ret not in (0,-1):
sys.stderr.write ('[%i/%i]: %s returned non-zero: %i.\n' \
% (p.n, p.pid, p.fp_in.name, ret))
sys.stderr.write ('\n')
timeout = 10
outline = ''
while timeout > -1:
sys.stderr.write ('\b' * len(outline))
outline = '(c)ontinue or (e)xit? (%i) [c]: ' % (timeout,)
sys.stderr.write (outline)
if select.select([sys.stdin], [], [], 1) == ([sys.stdin], [], []):
opt = sys.stdin.readline().strip().lower()
if opt == 'e':
sys.stderr.write ('\n')
sys.exit(ret)
break
timeout -= 1
sys.stderr.write ('\n')
# re-investigate available files for stripe
files = [fs for fs in stripe(p.n) if fs]
# spawn new sub-process on new file chunk
n_p = Multiproc(p.n, script, p_args)
try:
# load remaining files into new process
n_p.load (folder, files)
except IndexError:
sys.stderr.write ('[%i] complete: file stripe exausted.\n' % (p.n,))
continue
# replace procs[] reference with new sub-process
procs[p.n] = n_p
sys.stdout.write ('All processes complete (%i processed in %i seconds, %i lines/second).\n' \
% (received, time.time()-beginTime, received/(time.time()-beginTime)))
class Multiproc(subprocess.Popen):
""" The Multiproc class is derived from subprocess.Popen, with additional methods
for managing file part chunks and folder states. """
completed=False
index=0
length=0
chunk=None
files=[]
fp_in=fp_out=None
folder=None
def __init__(self, n, script, p_args, bufsize=1):
self.n = n # our multiproc subprocess id, and our index in global procs[]
subprocess.Popen.__init__ \
(self, args=[script] + p_args, bufsize=bufsize, executable=script,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=True, universal_newlines=True)
# prevent block on .read() for stdin even if select says ready for reading,
# not actually sure if this occured, this was added when it seemed rsync was
# blocking on stderr, but in actuality it was probobly the underlying smbfs. XXX
# -- probobaly occured because of .readline() !
# for fd in self.stdin, self.stdout, self.stderr; do
for fd in [self.stdout, self.stderr]:
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def load(self, folder, inputlist):
" load in input list of files, must call loadPart() before next() "
self.folder = folder
self.files = inputlist
self.completed = False
self.loadPart (0)
def loadPart(self, n=0):
"""
load next chunk of input list as file object fp_in, for line-buffered
reading by next(), open a pairing logfile named fp_out,
raises IndexError when all input files are exausted
"""
# loop until next part is owned, raise IndexError on empty,
while True:
self.index = self.length = 0
try:
chunk = self.files.pop(n)
except IndexError, e:
self.stdin.close() # close process input pipe
raise IndexError, 'input files exausted: %s' % (e,)
if not os.path.exists(os.path.join(self.folder,chunk)):
continue
self.chunk = chunk
processing = os.path.join(self.folder, 'processing', chunk)
self.process()
self.fp_in = open(processing,'r')
self.length = sum([1 for line in self.fp_in.readlines()]) # seek & sum
self.fp_in.seek (0) # rewind
logfile = os.path.join(self.folder, 'output', chunk + '.out')
if not os.path.exists(os.path.dirname(logfile)):
os.makedirs(os.path.dirname(logfile))
self.fp_out = open(logfile, 'w', 1) # open logfile, line-buffered
break
def next(self):
" return next input line, throws IndexError on empty "
if not self.fp_in.closed:
line = self.fp_in.readline()
if line != '':
self.index += 1
return line
self.fp_in.close () # close input file
raise IndexError, 'input exausted'
def close(self):
" close log file output, and input if opened "
self.fp_out.close ()
if not self.fp_in.closed:
self.fp_in.close ()
def complete(self, ret=0):
" move input file to processed/ or failed/ folder with return code ret "
processing = os.path.join(self.folder,'processing',self.chunk)
if not os.path.exists (processing):
sys.stdout.write ('[%i/%i]: race condition, src=%s disappeared, cannot move to processed/\n' % (self.n, self.pid, processing,))
if ret != 0:
failed = os.path.join(self.folder,'failed',self.chunk)
if not os.path.exists(os.path.dirname(failed)):
os.makedirs(os.path.dirname(failed))
shutil.move (src=processing, dst=failed)
else:
processed = os.path.join(self.folder,'processed',self.chunk)
if not os.path.exists(os.path.dirname(processed)):
os.makedirs(os.path.dirname(processed))
shutil.move (src=processing, dst=processed)
self.completed = True
def process(self):
chunkpath = os.path.join(self.folder,self.chunk)
if not os.path.exists (chunkpath):
sys.stdout.write ('[%i/%i]: %s race condition, cannot move to processed/\n' \
% (self.n, self.pid, self.chunk,))
else:
processing = os.path.join(self.folder,'processing',self.chunk)
if not os.path.exists(os.path.dirname(processing)):
os.makedirs (os.path.dirname(processing))
shutil.move (src=chunkpath, dst=processing)
self.completed = False
def log(self, data):
if data:
self.fp_out.write (data)
self.fp_out.flush ()
def err(self, data):
if data:
self.log (data)
sys.stderr.write ('[%i/%i]: %s stderr: %s\n' % (self.n, self.pid, self.fp_out.name, data.rstrip()))
def numRows(filepaths):
rows = 0
for filepath in filepaths:
fp = open(filepath, 'r')
# sys.stderr.write ('determining input file length: ')
rows += sum([1 for line in iter(fp.readline, '')])
# sys.stderr.write ('%i rows.\n' % (rows,))
fp.close ()
return rows
def createJobFolder (filepath, n_chunks=999, prefix='x'):
# determine chunksize, we target no more than, but up to n_chunks.
rows = numRows([filepath])
chunksize = (rows / n_chunks) +1
if chunksize < 1:
chunksize=1
sys.stderr.write ('warning: altering chunk size to 1: ' \
'input file smaller than (-c|--chunks): %i' % (n_chunks,))
# determine job folder path for file chunks
folder = filepath + '.job-%i.%i' % (rows, chunksize)
if not os.path.isdir(folder) and not os.path.exists(folder):
# create job folder
os.makedirs(folder)
sys.stderr.write ('created folder: %s\n' % (folder,))
# create input files, a simple gnu-split, with no gnu-split dependency :)
sys.stderr.write ('splitting %s into %i chunks, %i rows each\n' % (filepath, n_chunks, chunksize))
fi = open(filepath, 'r')
# macro to find output filename for index number
fn_output = lambda idx: '%s%0*i' % (prefix, len(str(n_chunks)), idx)
n=1
while fi and n <= n_chunks:
# create x001 -> x999 for n_chunks=999
fo = open(os.path.join(folder, fn_output(n)), 'w')
changed=False
for row in range(chunksize):
line = fi.readline()
if not line:
fo.close()
if not changed: # rm unwritten file
os.unlink (os.path.join(folder, fn_output(n)))
else:
sys.stderr.write ('warning: %s incomplete size %i (chunksize=%i)\n' % (fn_output(n), row, chunksize,))
fi.close()
fi=None
break
else:
fo.write (line)
changed = True
if not line:
break
fo.close()
n+=1
elif os.path.isdir(folder): # exists as a folder
sys.stderr.write ('warning: job folder already exists: %s\n' \
'continuing work. specify job folder as input to supress this warning.\n' % (folder,))
else: # exists, but not as a folder
sys.stderr.write ('job folder conflict: %s exists and is not a folder!\n')
sys.exit(0)
return folder, rows
def asctime(seconds):
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
weeks, days = divmod(days, 7)
years, weeks = divmod(weeks, 52)
if years > 0: return '%iy %iw %id' % (years, weeks, days,)
if weeks > 0: return '%iw %id %ih' % (weeks, days, hours,)
if days > 0: return '%id %ih %im' % (days, hours, minutes)
if hours > 0: return '%ih %im %is' % (hours, minutes, seconds)
if minutes > 0: return '%im %is' % (minutes, seconds,)
else: return '%is' % (seconds,)
if __name__ == '__main__':
main()
# ripped from
# http://stackoverflow.com/questions/1006289/how-to-find-out-the-number-of-cpus-in-python
def n_cpus(def_ncpus=2):
# def_ncpus will be returned when #cpus cannot be determined
try: # python 2.6+
import multiprocessing
return multiprocessing.cpu_count(), 'python2.6-native'
except (ImportError, NotImplementedError): pass
try: # POSIX
res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
if res > 0:
return res, 'posix SC_NPROCESSORS_ONLN'
except (AttributeError, ValueError): pass
try: # jython
from java.lang import Runtime
runtime = Runtime.getRuntime()
res = runtime.availableProcessors()
if res > 0:
return res, 'java runtime'
except ImportError: pass
try: # BSD
sysctl = subprocess.Popen(['sysctl', '-n', 'hw.ncpu'], stdout=subprocess.PIPE)
scStdout = sysctl.communicate()[0]
res = int(scStdout)
if res > 0:
return res, 'BSD sysctl'
except (OSError, ValueError): pass
try: # Linux
res = open('/proc/cpuinfo').read().count('processor\t:')
if res > 0:
return res, '/proc/cpuinfo'
except IOError: pass
try: # Other UNIXes (heuristic)
try:
dmesg = open('/var/run/dmesg.boot').read()
except IOError:
dmesgProcess = subprocess.Popen(['dmesg'], stdout=subprocess.PIPE)
dmesg = dmesgProcess.communicate()[0]
res = 0
while '\ncpu' + str(res) + ':' in dmesg:
res += 1
if res > 0:
return res
except OSError: pass
return def_ncpus, 'Cannot determine n_cpus.'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment