Created
October 18, 2016 14:52
-
-
Save simion/4447668fd69ae191b5f42895fa26017f to your computer and use it in GitHub Desktop.
Django management command - multiprocessing helper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
from django.core.management import BaseCommand | |
def multiprocess_func(view_func): | |
""" Decoratior which closes database connection before running wrapped function. | |
Also unpacks args for multiprocessing module. """ | |
def _decorator(args): | |
connection.close() | |
# unpack args if list/tuple (which means multiple arguments were given) | |
if isinstance(args, (list, tuple)): | |
return view_func(*args) | |
# single argument, just send it | |
return view_func(args) | |
return wraps(view_func)(_decorator) | |
# ignore progress_* functions, it's another helper function, you can use tqdm | |
class BaseManagementCommand(BaseCommand): | |
def multiprocess(self, target, args_list, processes=3, extra_args=None, return_imap=False, progress=True): | |
""" | |
:param target: function (IMPORTANT! function must be importable by path, so declared outside of classes) | |
:param args_list: iterable | |
:param processes: number of parallel processes | |
:param extra_args: joins this list of args to main args. target must unpack the arguments manually | |
:param return_imap: boolean, if true, IMapUnorderedIterator is returned instead of a list containing results | |
:param progress: boolean, whether to show progressbar or not | |
:return: an iterable of results given by target (IMapUnorderedIterator or list) | |
""" | |
if extra_args: | |
assert isinstance(extra_args, (list, tuple)), 'extra_args must be a list' | |
args_list = [[arg] + extra_args for arg in args_list] | |
count = len(args_list) | |
logger.info("Starting multiprocess Pool: target={}, processes={}, for {} args".format(target, processes, count)) | |
if progress: | |
self.progress_start(count) | |
pool = NoDaemonPool(processes=processes) | |
imap_result = pool.imap_unordered(target, args_list) | |
try: | |
while True: | |
index = getattr(imap_result, '_index') | |
if index == count: | |
break | |
if progress: | |
self.progress_set(index + 1) | |
time.sleep(0.1) | |
if progress: | |
self.progress_finish() | |
except KeyboardInterrupt as e: | |
pool.terminate() | |
pool.join() | |
raise CommandError('Caught KeyboardInterrupt, Pool workers terminated') | |
else: | |
pool.close() | |
pool.join() | |
if return_imap: | |
return imap_result | |
return [result for result in imap_result] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from base_command import multiprocess_func, BaseManagementommand | |
from app.utils import BaseCommand | |
import time | |
from myapp.models import UberModel | |
@multiprocess_func | |
def prepare_instance_name(uber_instance): | |
""" | |
this function should do the logic you need to paralellize | |
Return whatever result you need to return (optional) | |
""" | |
time.sleep(0.1) # simulate some time consuming work | |
return uber_instance.name | |
@multiprocess_func | |
def prepare_instance_name_2(uber_instance, prefix, suffix): | |
time.sleep(0.1) # simulate some time consuming work | |
return u'{}: {} ({})'.format(prefix, uber_instance.name, suffix) | |
class Command(BaseManagementCommand): | |
def handle(self, *args, **options): | |
""" Usage example of multiprocess helper """ | |
qs = UberModel.objects.all()[:20] | |
# start processing in parallel | |
prepared_names = self.multiprocess(prepare_instance_name, qs) | |
# when code reaches here, all tasks are executed and results are stored in "prepared_stores_names" | |
print prepared_names | |
# now let's show an example with more that one argument | |
prepared_names_2 = self.multiprocess(prepare_instance_name_2, qs, extra_args=['Foo', 'Bar']) | |
print prepared_names_2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment