Skip to content

Instantly share code, notes, and snippets.

@qharlie
Forked from bdowling/NATs-Replay.py
Created August 20, 2019 02:25
Show Gist options
  • Save qharlie/ef43db549629ded20103d8cf853f14d3 to your computer and use it in GitHub Desktop.
Save qharlie/ef43db549629ded20103d8cf853f14d3 to your computer and use it in GitHub Desktop.
These are just some sample scripts to dump the nats stream and replay it to a local NATs server.
#!/usr/bin/env python
from nats.aio.client import Client as NATS
import json
import asyncio
import logging
import argparse
import gzip
import glob
import time
import os
import re
async def main(loop, nc, datafiles):
await nc.connect(servers=opt.nats, loop=loop)
map = {
"sym": "symbol",
"a": "average",
"c": "close",
"h": "high",
"k": "transactions",
"l": "low",
"o": "open",
"t": "totalvalue",
"x": "exchange",
"v": "volume",
"s": "start",
"e": "end",
"vw": "vwap",
"av": "totalvolume",
"op":
"dailyopen", # depricated? stream often has 0 for op
}
needed_keys = {'o', 'h', 'l', 'c', 'v'}
pam = {v: k for k, v in map.items()}
prevts = None
for file in datafiles:
print(f"Streaming contents of {file}...")
try:
with gzip.GzipFile(file, 'rb') as feed:
for line in feed:
event = json.loads(line)
if (not opt.all) and 'A.' in event['channel']:
continue
if (opt.match) and not opt.match.search(event['channel']):
continue
# print(event)
ts = int(event['ts'])
if needed_keys.intersection(list(event['data'].keys())):
data = event['data']
else:
data = ({
pam[k]: v
for k, v in event['data'].items() if k in pam
})
print(
f'{time.strftime("%m/%d %H:%M:%S", time.localtime(ts/1000))} {event["channel"]}'
f' {data["o"]} {data["h"]} {data["l"]} {data["c"]} {data["v"]}'
f' {time.strftime("%m/%d %H:%M:%S", time.localtime(data['s']/1000))} - '
f' {time.strftime("%m/%d %H:%M:%S", time.localtime(data['e']/1000))}'
)
if opt.speedup > 0 and prevts and prevts < ts:
sleep = ((ts - prevts) / 100) * opt.speedup
if sleep > opt.maxsleep:
sleep = opt.maxsleep
print(f' - sleep {sleep:0.2f}')
await asyncio.sleep(sleep, loop=loop)
# print(data)
await nc.publish(event['channel'],
json.dumps(data).encode())
await nc.flush()
# msg = await asyncio.wait_for(future, 1)
prevts = ts
except Exception as e:
print(e)
await nc.close()
if __name__ == '__main__':
nc = NATS()
logging.basicConfig()
parser = argparse.ArgumentParser()
parser.add_argument(
"--datadir",
help="Specify a directory containing *.json.gz files to replay",
type=str,
nargs='*',
default='data/')
parser.add_argument(
"--datafiles",
help="Specify the file(s) to replay",
type=str,
nargs='*')
parser.add_argument(
"--speedup",
help="Change the time multiplier to speedup (or slow down >1) the event flow",
default=0.002,
type=float)
parser.add_argument(
"--maxsleep",
help="Max time to sleep between events",
default=2.0,
type=float)
parser.add_argument(
"--nosleep", "--fast",
help="Stream the data as fast as possible (no speedup multiplier used)",
const=0,
dest='speedup',
nargs='?',
type=float)
parser.add_argument(
"--match",
help="Include events wtih channel matching supplied regular expression",
type=re.compile)
parser.add_argument(
"--nats",
help="Specify the destination NATS server(s)",
type=str,
nargs='*',
default=["nats://192.168.0.100:4222"])
parser.add_argument(
"--all",
help="Include A.* events as well as AM.*",
action='store_true')
opt = parser.parse_args()
if not opt.datafiles:
opt.datafiles = glob.glob(os.path.join(opt.datadir, '*.json.gz'))
future = asyncio.Future()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop, nc, opt.datafiles))
#!/usr/bin/env python
from alpaca_trade_api.stream2 import StreamConn
import time
import json
conn = StreamConn()
map = {
"sym": "symbol",
"a": "average",
"c": "close",
"h": "high",
"k": "transactions",
"l": "low",
"o": "open",
"t": "totalvalue",
"x": "exchange",
"v": "volume",
"s": "start",
"e": "end",
"vw": "vwap",
"av": "totalvolume",
"op": "dailyopen",
}
reverse_pam = {v: k for k,v in map.items()}
# revert the mapping done in polygon/stream, we could avoid using that module
# but it does the heavy asyncio lifting to make this a simple script.
def reverse_map(data):
data = ({reverse_pam[k]: v for k, v in data.items() if k in reverse_pam})
return data
@conn.on(r'.*')
async def on_data(conn, channel, data):
print(json.dumps({'ts': round(time.time_ns()/1000000), 'channel': channel, 'data': reverse_map(data._raw)}))
if __name__ == '__main__':
conn.run(['AM.*', 'C.*'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment