Last active
July 8, 2024 15:04
-
-
Save pssolanki111/07913e9c88ff8e80c8470871345c46c4 to your computer and use it in GitHub Desktop.
This is an advanced use case for streaming a high number of symbols on polygon websockets. This uses a combination of uvloop, orjson, aioredis, grouping messages before transmitting and async streaming to achieve high throughput. This can be easily modified or extended to support simpler or more complex use cases. The key idea is keeping the str…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import polygon | |
from polygon.enums import StreamCluster | |
import config | |
import uvloop | |
from orjson import dumps | |
import aioredis | |
from datetime import datetime, time | |
# Specify our event loop policy | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
REDIS_URL = f'redis://{config.redis_host}:{config.redis_port}' | |
CLOSE_TIME = time(17, 5, 0) | |
class StockStreamer: | |
def __init__(self): | |
self.api_key, self.redis_pool, self.redis = config.polygon_key, None, None | |
self.stream_client = polygon.AsyncStreamClient(self.api_key, StreamCluster.STOCKS, max_memory_queue=None, | |
ping_timeout=None, ping_interval=None) | |
self.messages = [] | |
async def initialize_redis_connections(self): | |
self.redis_pool = aioredis.ConnectionPool.from_url(REDIS_URL) | |
self.redis = aioredis.Redis(connection_pool=self.redis_pool, encoding='utf-8', decode_responses=True) | |
async def stock_trades_handler(self, msg: str): | |
""" | |
The handler function for the 'trades' service on stocks cluster | |
:param msg: the message received from stream client. | |
""" | |
asyncio.create_task(self.push_to_redis(msg)) | |
# print(msg) | |
async def subscribe(self): | |
await self.stream_client.subscribe_stock_trades(handler_function=self.stock_trades_handler) | |
# await self.stream_client.change_handler('status', self.stock_trades_handler) | |
async def run(self): | |
while self.time_within_bounds(): | |
await self.stream_client.handle_messages() | |
print(f'current time past the close time: {CLOSE_TIME} | Terminating... See Ya Tomorrow') | |
async def push_to_redis(self, msg: str): | |
""" | |
Coroutine to push messages to redis queue ``stocktrades`` | |
:param msg: The message as received by the handler | |
""" | |
self.messages.append(msg) | |
# print(f'{len(self.messages)}. {msg}') | |
if len(self.messages) >= 5000: | |
temp = self.messages | |
self.messages = [] | |
await self.redis.rpush('stocktrades', dumps(temp)) # encode to byte string from dictionary | |
print(f'{len(temp)} msgs pushed to redis at {datetime.now()}') | |
@staticmethod | |
def time_within_bounds() -> bool: | |
current_time = datetime.now().time() | |
if current_time <= CLOSE_TIME: | |
return True | |
return False | |
async def main(): | |
streamer = StockStreamer() | |
await streamer.initialize_redis_connections() | |
await streamer.subscribe() | |
await streamer.run() | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment