Last active
April 23, 2024 03:54
-
-
Save hzhu212/16ef61d3d78d8ad1b6ab55150998580b to your computer and use it in GitHub Desktop.
Python competitive parallel / race concurrent / ThreadPool / ProcessPool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import as_completed | |
from concurrent.futures.process import ProcessPoolExecutor | |
import os | |
import psutil | |
import time | |
def kill_child_processes(parent_pid): | |
try: | |
parent = psutil.Process(parent_pid) | |
except psutil.NoSuchProcess as e: | |
print(f'kill_child_processes exception: {e}') | |
return | |
children = parent.children(recursive=True) | |
for process in children: | |
try: | |
process.terminate() | |
except Exception as e: | |
print(f'exception occured when killing subprocess(pid={process.pid},status={process.status()})') | |
raise e | |
def task(n: int): | |
print(f'running task {n}') | |
time.sleep(3) | |
return n**2 | |
def main(): | |
# stop all tasks when got first 3 results | |
collector = [] | |
with ProcessPoolExecutor(max_workers=4) as pool: | |
future_to_idx = {pool.submit(task, i) : i for i in range(20)} | |
all_done = False | |
for future in as_completed(future_to_idx): | |
print([future._state for future in future_to_idx]) | |
idx = future_to_idx[future] | |
res = future.result() | |
print(f'task {idx} returned: {res}') | |
collector.append(res) | |
if len(collector) >= 3: | |
print('successfully got 3 results') | |
break | |
else: | |
all_done = True | |
# if we call pool.shutdown(wait=False) here, regardless of the remaining running and pending tasks, | |
# the main process may hang permanently because of dead-lock. to avoid this, we have to wait all tasks down, | |
# or cancel all pending tasks and terminate all running tasks mannually. | |
if not all_done: | |
# ProcessPoolExecutor.shutdown(cancel_futures=True) requires 3.9+, and deadlock with `as_completed`: https://tiewkh.github.io/blog/python-thread-pool-executor/ | |
# so we need to cancel all futures mannually: https://stackoverflow.com/questions/66923690/does-concurrent-futures-as-completed-yield-for-cancelled-futures | |
print('cancelling all pending futures ...') | |
for future in future_to_idx: | |
future.cancel() | |
print('cancelled all pending futures') | |
print([future._state for future in future_to_idx]) | |
# running futures won't stop at cancelling, need to stop mannualy: | |
# option 1. kill: https://stackoverflow.com/questions/42782953/python-concurrent-futures-how-to-make-it-cancelable | |
# option 2. stop safely: https://superfastpython.com/multiprocessing-pool-stop-all-tasks/ | |
print('killing running tasks ...') | |
kill_child_processes(os.getpid()) | |
print('killed running tasks') | |
print([future._state for future in future_to_idx]) | |
print('exiting ProcessPoolExecutor') | |
print('exited ProcessPoolExecutor') | |
return collector | |
if __name__ == '__main__': | |
res = main() | |
print(res) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment