Created
November 21, 2018 17:09
-
-
Save glemaitre/b0f0dddcadd9ebe7df39b657c777ac50 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
import time | |
from concurrent.futures import ProcessPoolExecutor | |
def simulator_submission(): | |
"""Give ``None`` or a submission id.""" | |
return random.choice([random.randint(0, 1000), None]) | |
async def launch_submission(submission_queue, process_queue): | |
while True: | |
# to be replaced by the database query | |
generated_submission = simulator_submission() | |
await submission_queue.put(generated_submission) | |
# No need to wait, we surely have an item at least: None or the | |
# submission id. | |
submission_id = submission_queue.get_nowait() | |
if submission_id is None: | |
await asyncio.sleep(0) | |
continue | |
print(f'launch the training of the submission {submission_id}') | |
proc = await asyncio.subprocess.create_subprocess_shell( | |
'ssh glemaitre@anakim.u-bourgogne.fr "sleep {}"' | |
.format(random.randint(0, 3)), | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.STDOUT | |
) | |
await process_queue.put(proc) | |
print(f'Queuing the process {proc}') | |
async def collect_result(submission_queue, process_queue): | |
while True: | |
proc = await process_queue.get() | |
if proc.returncode is None: | |
# await process_queue.put(proc) lock proc.returncode to change | |
# status. | |
process_queue.put_nowait(proc) | |
await asyncio.sleep(0) | |
else: | |
print(f'collect the log of the submission {proc}') | |
await proc.communicate() | |
# just to simulate different time processing of collection. | |
await asyncio.sleep(random.randint(0, 3)) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
submission_queue = asyncio.LifoQueue(loop=loop, maxsize=5) | |
process_queue = asyncio.LifoQueue(loop=loop, maxsize=5) | |
launcher = launch_submission(submission_queue, process_queue) | |
collecter = collect_result(submission_queue, process_queue) | |
loop.run_until_complete(asyncio.gather(launcher, collecter)) | |
print('loop completed') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment