Skip to content

Instantly share code, notes, and snippets.

@nngogol
Created May 7, 2020 12:36
Show Gist options
  • Save nngogol/15d51db5b979c8f9ae05ea1bd7a67754 to your computer and use it in GitHub Desktop.
Save nngogol/15d51db5b979c8f9ae05ea1bd7a67754 to your computer and use it in GitHub Desktop.
(async)Racing show in less then 100 lines
'''
Made at 2020-05-07 15:31:52
Asyncio Racing Show
run with:
$ clear; python3 -uB comp_simple.py
'''
import asyncio, random, queue
failed_runners = queue.Queue()
flag = False
# first runner
async def runner1(name):
global flag, failed_runners
for i in range(5):
# print(f'RUNNING Mr.{name} at {i} meters')
await asyncio.sleep(random.randint(1,2)/100)
if not flag: # if nobody grabbed the flag -> I won.
flag = f' Mister {name}'
print(f' flag is taken by Mr.{name}\n')
else: # if flag is gone -> I failed.
failed_runners.put(f'Finished, but failed "Mr.{name}"')
# other runners
async def runner2(name):
# thesame code, as in runner1
global flag, failed_runners
for i in range(5):
# print(f'RUNNING Mr.{name} at {i} meters')
await asyncio.sleep(random.randint(1,2)/100)
if not flag: # if nobody grabbed the flag -> I won.
flag = f' Mister {name}'
print(f' flag is taken by Mr.{name}\n')
else: # if flag is gone -> I failed.
failed_runners.put(f'Finished, but failed "Mr.{name}"')
async def runner3(name):
# thesame code, as in runner1
global flag, failed_runners
for i in range(5):
# print(f'RUNNING Mr.{name} at {i} meters')
await asyncio.sleep(random.randint(1,2)/100)
if not flag: # if nobody grabbed the flag -> I won.
flag = f' Mister {name}'
print(f' flag is taken by Mr.{name}\n')
else: # if flag is gone -> I failed.
failed_runners.put(f'Finished, but failed "Mr.{name}"')
# a while-true checker + little async sleep.
async def checker():
global flag, failed_runners
print('START of COMPETITION')
# Let the race BEGIN!
while not flag:
# let's wait for flag state more
await asyncio.sleep(.001)
# Race Finished! Let's do conclusions:
# show winner
print(' ', '-'*30, f'\n WINNER is {flag.strip()}! Congrats!')
# IF there are failers - show them
if not failed_runners.empty():
print(' list of failers:')
while not failed_runners.empty():
item = failed_runners.get()
if item is not None: print(f' >>> {item}')
else: break
print('\nEND OF COMPETITION')
print('global START\n')
loop = asyncio.get_event_loop()
# in 4 tasks in asyncs loop
loop.run_until_complete(asyncio.gather(
runner1(1), runner2(2), runner3(3)
,checker()))
loop.close()
print('\nglobal END')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment