A short example showing asyncio semaphores
import asyncio
import time
from random import random
async def my_coroutine(sem,task_id):
async with sem:
# critical section of code
print(f"Acquired id:{task_id}, time:{time.strftime('%H:%M:%S', time.localtime())}")
_run_time = .5 + random()
await asyncio.sleep(_run_time)
print(f"Released id:{task_id}, time:{time.strftime('%H:%M:%S', time.localtime())}, {_run_time}")
return _run_time
async def main():
_start_time = time.time()
print(f"Start time:{time.strftime('%H:%M:%S', time.localtime(_start_time))}")
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(my_coroutine(sem,i)) for i in range(10)]
result = await asyncio.gather(*tasks)
_end_time = time.time()
print(f"End time:{time.strftime('%H:%M:%S', time.localtime(_end_time))}")
print(f"running time:{_end_time - _start_time}")
_total=0
for _r in result:
_total+=_r
print(f"executuion time: {_total}")
asyncio.run(main)
#if you are in jupyter/ipython
#await main()
output
Start time:09:54:51
Acquired id:0, time:09:54:51
Acquired id:1, time:09:54:51
Acquired id:2, time:09:54:51
Released id:0, time:09:54:51, 0.543004737902306
Acquired id:3, time:09:54:51
Released id:2, time:09:54:52, 0.716452786407052
Acquired id:4, time:09:54:52
Released id:1, time:09:54:52, 0.8981319786398339
Acquired id:5, time:09:54:52
Released id:3, time:09:54:52, 1.0064910961164992
Acquired id:6, time:09:54:52
Released id:4, time:09:54:53, 1.3037485263905855
Acquired id:7, time:09:54:53
Released id:5, time:09:54:53, 1.4120800095210995
Acquired id:8, time:09:54:53
Released id:6, time:09:54:54, 1.315146059710378
Acquired id:9, time:09:54:54
Released id:7, time:09:54:54, 1.2501999443690721
Released id:8, time:09:54:55, 1.3127928613465163
Released id:9, time:09:54:55, 0.8168799396808626
End time:09:54:55
running time:3.688829183578491
executuion time: 10.574927940084205
The semaphore is limiting the number of concurrent executions of my_corouting
to 3. Then we launch 10 coroutines at the same time.
Each coroutine will run for 0.5 + random()
(between .5 and 1.5 seconds).
The total running time is 3.68, thanks to asynchronous parallelization. The combined running time of all coroutines is 10.57.
We can use asyncio.wait_for
and asyncio.wait
to modify the behabiour of the previous example
async def main_timeout():
_start_time = time.time()
print(f"Start time:{time.strftime('%H:%M:%S', time.localtime(_start_time))}")
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(my_coroutine(sem,i)) for i in range(10)]
result=[]
for _t in tasks:
try:
_r = await asyncio.wait_for(_t, timeout=1)
except TimeoutError as errn:
print(f"task {_t} timeout {errn.args}")
_end_time = time.time()
print(f"End time:{time.strftime('%H:%M:%S', time.localtime(_end_time))}")
print(f"running time:{_end_time - _start_time}")
_total=0
for _r in result:
_total+=_r
print(f"executuion time: {_total}")
async def main_wait():
_start_time = time.time()
print(f"Start time:{time.strftime('%H:%M:%S', time.localtime(_start_time))}")
sem = asyncio.Semaphore(3)
tasks=[]
for i in range(10):
tasks.append(asyncio.create_task(my_coroutine(sem,i)))
_creation_time = time.time()
_time =time.strftime('%H:%M:%S', time.localtime(_start_time))
print(f"task id:{i} created, time:{_time}")
result, pending = await asyncio.wait(tasks, timeout=3)
_end_time = time.time()
print(f"End time:{time.strftime('%H:%M:%S', time.localtime(_end_time))}")
print(f"running time:{_end_time - _start_time}")
_total=0
for _r in result:
if isinstance(_r,asyncio.Task):
continue
_total+=_r
_total_pending=[]
for _p in pending:
_total_pending.append(_p.cancelled())
print(f"executuion time: {_total}")