Skip to content

Instantly share code, notes, and snippets.

@hzhu212
Last active April 23, 2024 03:54
Show Gist options
  • Save hzhu212/16ef61d3d78d8ad1b6ab55150998580b to your computer and use it in GitHub Desktop.
Save hzhu212/16ef61d3d78d8ad1b6ab55150998580b to your computer and use it in GitHub Desktop.
Python competitive parallel / race concurrent / ThreadPool / ProcessPool
from concurrent.futures import as_completed
from concurrent.futures.process import ProcessPoolExecutor
import os
import psutil
import time
def kill_child_processes(parent_pid):
try:
parent = psutil.Process(parent_pid)
except psutil.NoSuchProcess as e:
print(f'kill_child_processes exception: {e}')
return
children = parent.children(recursive=True)
for process in children:
try:
process.terminate()
except Exception as e:
print(f'exception occured when killing subprocess(pid={process.pid},status={process.status()})')
raise e
def task(n: int):
print(f'running task {n}')
time.sleep(3)
return n**2
def main():
# stop all tasks when got first 3 results
collector = []
with ProcessPoolExecutor(max_workers=4) as pool:
future_to_idx = {pool.submit(task, i) : i for i in range(20)}
all_done = False
for future in as_completed(future_to_idx):
print([future._state for future in future_to_idx])
idx = future_to_idx[future]
res = future.result()
print(f'task {idx} returned: {res}')
collector.append(res)
if len(collector) >= 3:
print('successfully got 3 results')
break
else:
all_done = True
# if we call pool.shutdown(wait=False) here, regardless of the remaining running and pending tasks,
# the main process may hang permanently because of dead-lock. to avoid this, we have to wait all tasks down,
# or cancel all pending tasks and terminate all running tasks mannually.
if not all_done:
# ProcessPoolExecutor.shutdown(cancel_futures=True) requires 3.9+, and deadlock with `as_completed`: https://tiewkh.github.io/blog/python-thread-pool-executor/
# so we need to cancel all futures mannually: https://stackoverflow.com/questions/66923690/does-concurrent-futures-as-completed-yield-for-cancelled-futures
print('cancelling all pending futures ...')
for future in future_to_idx:
future.cancel()
print('cancelled all pending futures')
print([future._state for future in future_to_idx])
# running futures won't stop at cancelling, need to stop mannualy:
# option 1. kill: https://stackoverflow.com/questions/42782953/python-concurrent-futures-how-to-make-it-cancelable
# option 2. stop safely: https://superfastpython.com/multiprocessing-pool-stop-all-tasks/
print('killing running tasks ...')
kill_child_processes(os.getpid())
print('killed running tasks')
print([future._state for future in future_to_idx])
print('exiting ProcessPoolExecutor')
print('exited ProcessPoolExecutor')
return collector
if __name__ == '__main__':
res = main()
print(res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment