Last active
August 29, 2015 14:19
-
-
Save asvetlov/b53c3d8a2493197c902b to your computer and use it in GitHub Desktop.
async/await
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
class CM: | |
async def __aenter__(self): | |
print(1) | |
await asyncio.sleep(0.01) | |
print(2) | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
print(exc_type, exc_val, exc_tb) | |
await asyncio.sleep(0.01) | |
class CM2: | |
@asyncio.coroutine | |
def __aenter__(self): | |
print(1) | |
yield from asyncio.sleep(0.01) | |
print(2) | |
return self | |
@asyncio.coroutine | |
def __aexit__(self, exc_type, exc_val, exc_tb): | |
print(exc_type, exc_val, exc_tb) | |
yield from asyncio.sleep(0.01) | |
async def f(): | |
cm = CM() | |
async with cm as c: | |
assert cm is c | |
await asyncio.sleep(0.01) | |
raise RuntimeError('a bug') | |
print('done') | |
async def f2(): | |
cm = CM() | |
async with cm as c: | |
assert cm is c | |
await asyncio.sleep(0.01) | |
raise RuntimeError('a bug') | |
print('done') | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(f()) | |
except RuntimeError as exc: | |
print(exc) | |
try: | |
loop.run_until_complete(f2()) | |
except RuntimeError as exc: | |
print(exc) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def f(fut): | |
val = await fut | |
return val | |
loop = asyncio.get_event_loop() | |
fut = asyncio.Future(loop=loop) | |
loop.call_soon(fut.set_result, 123) | |
ret = loop.run_until_complete(f(fut)) | |
loop.close() | |
print(ret) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
END = b'Bye-bye!\n' | |
async def echo_client(): | |
reader, writer = await asyncio.open_connection('localhost', 8000) | |
writer.write(b'Hello, world\n') | |
writer.write(b'What a fine day it is.\n') | |
writer.write(END) | |
while True: | |
line = await reader.readline() | |
print('received:', line) | |
if line == END or not line: | |
break | |
writer.close() | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(echo_client()) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def echo_server(): | |
await asyncio.start_server(handle_connection, 'localhost', 8000) | |
async def handle_connection(reader, writer): | |
while True: | |
data = await reader.read(8192) | |
if not data: | |
break | |
writer.write(data) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(echo_server()) | |
try: | |
loop.run_forever() | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment