Skip to content

Instantly share code, notes, and snippets.

@simion
Created October 18, 2016 14:52
Show Gist options
  • Save simion/4447668fd69ae191b5f42895fa26017f to your computer and use it in GitHub Desktop.
Save simion/4447668fd69ae191b5f42895fa26017f to your computer and use it in GitHub Desktop.
Django management command - multiprocessing helper
import multiprocessing
from django.core.management import BaseCommand
def multiprocess_func(view_func):
""" Decoratior which closes database connection before running wrapped function.
Also unpacks args for multiprocessing module. """
def _decorator(args):
connection.close()
# unpack args if list/tuple (which means multiple arguments were given)
if isinstance(args, (list, tuple)):
return view_func(*args)
# single argument, just send it
return view_func(args)
return wraps(view_func)(_decorator)
# ignore progress_* functions, it's another helper function, you can use tqdm
class BaseManagementCommand(BaseCommand):
def multiprocess(self, target, args_list, processes=3, extra_args=None, return_imap=False, progress=True):
"""
:param target: function (IMPORTANT! function must be importable by path, so declared outside of classes)
:param args_list: iterable
:param processes: number of parallel processes
:param extra_args: joins this list of args to main args. target must unpack the arguments manually
:param return_imap: boolean, if true, IMapUnorderedIterator is returned instead of a list containing results
:param progress: boolean, whether to show progressbar or not
:return: an iterable of results given by target (IMapUnorderedIterator or list)
"""
if extra_args:
assert isinstance(extra_args, (list, tuple)), 'extra_args must be a list'
args_list = [[arg] + extra_args for arg in args_list]
count = len(args_list)
logger.info("Starting multiprocess Pool: target={}, processes={}, for {} args".format(target, processes, count))
if progress:
self.progress_start(count)
pool = NoDaemonPool(processes=processes)
imap_result = pool.imap_unordered(target, args_list)
try:
while True:
index = getattr(imap_result, '_index')
if index == count:
break
if progress:
self.progress_set(index + 1)
time.sleep(0.1)
if progress:
self.progress_finish()
except KeyboardInterrupt as e:
pool.terminate()
pool.join()
raise CommandError('Caught KeyboardInterrupt, Pool workers terminated')
else:
pool.close()
pool.join()
if return_imap:
return imap_result
return [result for result in imap_result]
from base_command import multiprocess_func, BaseManagementommand
from app.utils import BaseCommand
import time
from myapp.models import UberModel
@multiprocess_func
def prepare_instance_name(uber_instance):
"""
this function should do the logic you need to paralellize
Return whatever result you need to return (optional)
"""
time.sleep(0.1) # simulate some time consuming work
return uber_instance.name
@multiprocess_func
def prepare_instance_name_2(uber_instance, prefix, suffix):
time.sleep(0.1) # simulate some time consuming work
return u'{}: {} ({})'.format(prefix, uber_instance.name, suffix)
class Command(BaseManagementCommand):
def handle(self, *args, **options):
""" Usage example of multiprocess helper """
qs = UberModel.objects.all()[:20]
# start processing in parallel
prepared_names = self.multiprocess(prepare_instance_name, qs)
# when code reaches here, all tasks are executed and results are stored in "prepared_stores_names"
print prepared_names
# now let's show an example with more that one argument
prepared_names_2 = self.multiprocess(prepare_instance_name_2, qs, extra_args=['Foo', 'Bar'])
print prepared_names_2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment