Skip to content

Instantly share code, notes, and snippets.

@neilzheng
Created December 12, 2017 12:19
Show Gist options
  • Save neilzheng/72a8281e6e825f3df884212fe4c3c870 to your computer and use it in GitHub Desktop.
Save neilzheng/72a8281e6e825f3df884212fe4c3c870 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import asyncio
import aiohttp
class Server:
def __init__(self, ws_uri, loop=None, **wskwargs):
super().__init__()
self.ws_uri = ws_uri
self.loop = loop
self.wskwargs = wskwargs
if not self.loop:
self.loop = asyncio.get_event_loop()
def listen(self, host, port, **kwargs):
return asyncio.start_server(self._server_run, host, port, loop=self.loop, **kwargs)
async def _server_run(self, reader, writer):
print("Sock connected.")
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self.ws_uri, **self.wskwargs) as client:
self.loop.create_task(self._ws_run(writer, client))
try:
while True:
data = await reader.read(65536)
if not data:
break
await client.send_bytes(data)
except:
pass
except Exception as err:
writer.close()
print(err)
print("Sock disconnected.")
async def _ws_run(self, writer, client):
print("WS connected.")
try:
while True:
data = await client.receive_bytes()
if not data:
break
writer.write(data)
except:
pass
writer.close()
print("WS disconnected")
def _usage():
print("Usage: {} bind_address bind_port websocket_uri".format(sys.argv[0]))
if __name__ == '__main__':
if len(sys.argv) != 4:
_usage()
exit(1)
loop = asyncio.get_event_loop()
server = Server(sys.argv[3])
loop.run_until_complete(server.listen(sys.argv[1], sys.argv[2]))
loop.run_forever()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment