Skip to content

Instantly share code, notes, and snippets.

@Nikolaj-K
Created January 28, 2024 22:00
Show Gist options
  • Save Nikolaj-K/70461b139f12c02d6e3b09095fec9329 to your computer and use it in GitHub Desktop.
Save Nikolaj-K/70461b139f12c02d6e3b09095fec9329 to your computer and use it in GitHub Desktop.
Basic async io example
"""
Script discussed in the video
https://youtu.be/RltoGfNhex4
asyncio Enables (interleaved) sequences of functions execution within one thread.
Makes e.g. sense if you want less intensive computation results earlier (for IO) or
If some awaited computation is done in some external thread (e.g. info from a server).
"""
import asyncio
class Config:
EXPONENTS = [3, 5, 1, 4, 3, 2] # Note the random order
def pyramid_sum_function_analytical(exponent: int) -> int:
n = 10**exponent
return n * (n-1) / 2
def pyramid_sum_function(exponent: int) -> int:
n = 10**exponent
s = 0
for k in range(n):
s += k
assert s == pyramid_sum_function_analytical(exponent)
print(f"\t[pyramid_sum_function] The sum of number up to 10^{exponent} equals {s}.")
return s
def compute_sums_functions():
sums_iter = map(pyramid_sum_function, Config.EXPONENTS)
# Evaluate and put return values in list
sums = list(sums_iter)
print("f[compute_sums_functions] finished with {sums}.\n")
return sums
async def bonus_sleeping_log(message, time):
await asyncio.sleep(time)
if message:
print(f"[bonus_sleeping_log] message: {message}\n")
async def pyramid_sum_coroutine(exponent: int) -> int:
# Here 'async' enables the funciton to be gathered into the event loop
n = 10**exponent
s = 0
for k in range(n):
s += k
if k % 100==0:
# Make "productive waiting until picking it up again" call, which enables
# progression in the event loop of all gathered coroutines.
# (In this example, the coroutines are all instances of pyramid_sum_coroutine with different arguments.)
# Warning: Note that this switching between functions comes with some time overhead.
await bonus_sleeping_log("", 0) # Note: If this line is 'pass' instead, then no async happens
assert s == n*(n-1)/2
print(f"\tThe sum of number up to 10^{exponent} equals {s}.")
return s
async def gather_coroutines(print_log):
# Put routines into list, evaluate them
# (iff event loop is ran) and put return values in list
# 1.
aws_iter = map(pyramid_sum_coroutine, Config.EXPONENTS)
if print_log:
# Auxiliary log
# Note: Only blocking if being awaited. With time=3, this will never happened if not awaited
t_hello = asyncio.create_task(bonus_sleeping_log("HELLO", 3))
print(f"\n[gather_coroutines] t_hello = {t_hello}\n") # <Task pending coro=...>
# 2.
awaitable_sums = asyncio.gather(*aws_iter) # Task with coroutines put concurrently
if print_log:
print(f"[gather_coroutines] awaitable_sums = {awaitable_sums}\n") # <_GatheringFuture pending>
# 3.
sums = await awaitable_sums
if print_log:
print(f"\n[gather_coroutines] awaitable_sums = {awaitable_sums}\n") # <_GatheringFuture finished result=...>
# Auxiliary log
# Note: Log will never happen if not awaited
t_goodbye = asyncio.create_task(bonus_sleeping_log("GOODBYE", 2))
print(f"[gather_coroutines] t_goodbye = {t_goodbye}\n") # <Task pending coro=...>
await t_goodbye
print(f"[gather_coroutines] t_goodbye = {t_goodbye}\n") # <Task finished coro=...>
return sums
async def wait_coroutines():
RETURN_WHEN = asyncio.FIRST_COMPLETED # = asyncio.ALL_COMPLETED also possible
aws_iter = map(pyramid_sum_coroutine, Config.EXPONENTS)
_done, _pending = await asyncio.wait(list(aws_iter), return_when=RETURN_WHEN) # Using .waits instead of .gather. Lower level.
def legacy_run(coro):
event_loop = asyncio.get_event_loop()
return event_loop.run_until_complete(coro)
if __name__=="__main__":
print(80 * "-" + f"\n[main] Run sequential compute_sums_functions")
sums = compute_sums_functions()
USE_LEGACY_RUN = True
print(80 * "-" + f"\n[main] Run gather_coroutines (with USE_LEGACY_RUN={USE_LEGACY_RUN})")
async_sums = gather_coroutines(print_log=True)
res_sums = legacy_run(async_sums) if USE_LEGACY_RUN else asyncio.run(async_sums)
assert res_sums == sums
print(80 * "-" + f"\n[main] Run wait_coroutines")
asyncio.run(wait_coroutines()) # Alternative. Note: Here .run does not return the list of ints.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment