Instructions
-
-
Save vividvilla/5b548e42176edccb4eb1f274bfc59146 to your computer and use it in GitHub Desktop.
CREATE DATABASE ticks; | |
CREATE TABLE ticks ( | |
token integer NOT NULL, | |
date timestamp without time zone, | |
price double precision | |
); |
# Run celery workers | |
# celery -A db worker --loglevel=info | |
import sys | |
import json | |
import psycopg2 | |
import logging | |
from celery import Celery | |
from datetime import datetime | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
# Configure with your own broker | |
app = Celery("tasks", broker="redis://localhost:6379/4") | |
# Initialize db | |
db = psycopg2.connect(database="ticks", user="username", password="password", host="127.0.0.1", port="5432") | |
# Db insert statement | |
insert_tick_statement = "INSERT INTO ticks (date, token, price) VALUES (%(date)s, %(token)s, %(price)s)" | |
# Task to insert to SQLite db | |
@app.task | |
def insert_ticks(ticks): | |
c = db.cursor() | |
for tick in ticks: | |
c.execute(insert_tick_statement, { | |
"date": datetime.now(), | |
"token": tick["instrument_token"], | |
"price": tick["last_price"]}) | |
logging.info("Inserting ticks to db : {}".format(json.dumps(ticks))) | |
try: | |
db.commit() | |
except Exception: | |
db.rollback() | |
logging.exception("Couldn't write ticks to db: ") |
import sys | |
import json | |
import logging | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
import time | |
from db import insert_ticks | |
from kiteconnect import WebSocket | |
# Initialise. | |
kws = WebSocket("api_key", "public_token", "zerodha_user_id") | |
# RELIANCE BSE, RELIANCE NSE, NIFTY 50, SENSEX | |
tokens = [128083204, 73856, 256265, 265] | |
# Callback for tick reception. | |
def on_tick(ticks, ws): | |
logging.info("on tick - {}".format(json.dumps(ticks))) | |
insert_ticks.delay(ticks) | |
# Callback for successful connection. | |
def on_connect(ws): | |
logging.info("Successfully connected to WebSocket") | |
def on_close(): | |
logging.info("WebSocket connection closed") | |
def on_error(): | |
logging.info("WebSocket connection thrown error") | |
# Assign the callbacks. | |
kws.on_tick = on_tick | |
kws.on_connect = on_connect | |
kws.on_close = on_close | |
kws.on_error = on_error | |
# Infinite loop on the main thread. Nothing after this will run. | |
# You have to use the pre-defined callbacks to manage subscriptions. | |
kws.connect(threaded=True) | |
# kws.connect(disable_ssl_verification=True) # for ubuntu | |
count = 0 | |
while True: | |
logging.info("This is main thread. Will subscribe to each token in tokens list with 5s delay") | |
if count < len(tokens): | |
if kws.is_connected(): | |
logging.info("Subscribing to: {}".format(tokens[count])) | |
kws.subscribe([tokens[count]]) | |
kws.set_mode(kws.MODE_LTP, [tokens[count]]) | |
count += 1 | |
else: | |
logging.info("Connecting to WebSocket...") | |
time.sleep(5) |
Hi Vividvilla, Thanks for posting this.. looking forward for more content 👍
In insert_ticks.delay(), What is delay() function here. insert_ticks(ticks) doesn't works?
In insert_ticks.delay(), What is delay() function here. insert_ticks(ticks) doesn't works?
You have run this file as Celery job. task.delay
makes the insertion async.
Hi vividvilla. Firstly thanks for such an eloquent code. How can we write the tick data to a csv instead of an sql? Can you please help?
You just replace the Celery job to write the CSV data to the disk.
Hi vividvilla, how do we do this in SQLite3 as part of Python? When I try with SQLite3, it only shows the data being dumped into redis rdb and no data is being inserted into the sqlite3 table.
Hi Vividvilla,
I tried applying it but when I run it I get
2023-08-11 08:07:06.094807 WS Started
2023-08-11 08:07:06,658 - celery.utils.functional - DEBUG -
def insert_ticks(ticks):
return 1
Can you please suggest why it is not working? I am trying to save tick data as redis dictionary
I am not sure if this script will work with latest Celery, you might need to check that part. Also to debug start logging inside insert_ticks
to see if you are receiving ticks before writing it to Redis.
Hi vividvilla. Firstly thanks for such an eloquent code. How can we write the tick data to a csv instead of an sql? Can you please help?
Thanks