|
import argparse |
|
import asyncio |
|
import gzip |
|
import os |
|
import shutil |
|
import tempfile |
|
import time |
|
from datetime import datetime |
|
from io import BytesIO |
|
|
|
import aiohttp |
|
import numpy as np |
|
import pandas as pd |
|
|
|
|
|
class App: |
|
|
|
base_url = "https://public.bybit.com/trading" |
|
|
|
def __init__(self) -> None: |
|
# parser |
|
self._setup_parser() |
|
self.args = self.parser.parse_args() |
|
|
|
# args |
|
self.symbol = self.args.symbol |
|
self.start = self.args.start |
|
self.end = self.args.end |
|
self.interval = self.args.interval |
|
self.output_dir = self.args.output_dir |
|
self.log_dir = self.args.log_dir |
|
|
|
self.start_dt = datetime.strptime(self.start, "%Y%m%d") |
|
self.end_dt = datetime.strptime(self.end, "%Y%m%d") |
|
self.daterange = pd.date_range(self.start_dt, self.end_dt, freq="D") |
|
|
|
self.chunked_daterange = [ |
|
self.daterange[i : i + 30] for i in range(0, len(self.daterange), 30) |
|
] |
|
|
|
os.makedirs(self.output_dir, exist_ok=True) |
|
os.makedirs(self.log_dir, exist_ok=True) |
|
|
|
def _setup_parser(self) -> None: |
|
""" setup parser |
|
|
|
""" |
|
|
|
self.parser = argparse.ArgumentParser() |
|
self.parser.add_argument("symbol", type=str, help="trading symbol") |
|
self.parser.add_argument("start", type=str, help="start date in YYYYMMDD") |
|
self.parser.add_argument("end", type=str, help="end date in YYYYMMDD") |
|
self.parser.add_argument("interval", type=int, help="candle interval in seconds") |
|
self.parser.add_argument("-o", "--output_dir", type=str, default="./", help="output directory (default: ./)") |
|
self.parser.add_argument("-l", "--log_dir", type=str, default="./", help="log directory (default: ./)",) |
|
|
|
def _convert_to_candle(self, df: pd.DataFrame, interval: str): |
|
""" |
|
convert trading data to ohlcv data. |
|
required columns of df: ['datetime', 'side', 'size', 'price'] |
|
|
|
df: |
|
- datetime(pd.datetime64[ns]): timestamp of the trade |
|
- side(str): 'Buy' or 'Sell' |
|
- size(float): size of the trade |
|
- price(float): price of the trade |
|
""" |
|
|
|
df = df[["datetime", "side", "size", "price"]] |
|
|
|
df.loc[:, ["buySize"]] = np.where(df["side"] == "Buy", df["size"], 0) |
|
df.loc[:, ["sellSize"]] = np.where(df["side"] == "Sell", df["size"], 0) |
|
df.loc[:, ["datetime"]] = df["datetime"].dt.floor(interval) |
|
|
|
df = df.groupby("datetime").agg( |
|
{ |
|
"price": ["first", "max", "min", "last"], |
|
"size": "sum", |
|
"buySize": "sum", |
|
"sellSize": "sum", |
|
} |
|
) |
|
|
|
# multiindex to single index |
|
df.columns = ["_".join(col) for col in df.columns] |
|
df = df.rename( |
|
columns={ |
|
"price_first": "open", |
|
"price_max": "high", |
|
"price_min": "low", |
|
"price_last": "close", |
|
"size_sum": "volume", |
|
"buySize_sum": "buyVolume", |
|
"sellSize_sum": "sellVolume", |
|
} |
|
) |
|
|
|
return df |
|
|
|
async def _get_as_bytes(self, session: aiohttp.ClientSession, url: str, date: datetime) -> tuple[datetime, bytes]: |
|
""" get data as bytes |
|
|
|
""" |
|
|
|
async with session.get(url) as response: |
|
data = await response.read() |
|
|
|
return (date, data) |
|
|
|
async def _download(self, daterange: pd.DatetimeIndex) -> list[tuple[datetime, bytes]]: |
|
""" download |
|
|
|
download trading historical data from bybit (https://public.bybit.com/trading/). |
|
Asynchronously GET requests csv.gz data and returns a list of its datetime and byte tuples. |
|
|
|
""" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
tasks = [] |
|
for date in daterange: |
|
filename = f'{self.symbol}{date.strftime("%Y-%m-%d")}.csv.gz' |
|
url = os.path.join(self.base_url, self.symbol, filename) |
|
tasks.append(self._get_as_bytes(session, url, date)) |
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
return results |
|
|
|
async def run(self): |
|
|
|
start_time = time.time() |
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
for daterange in self.chunked_daterange: |
|
results = await self._download(daterange) |
|
|
|
for dt, data in results: |
|
with gzip.open(BytesIO(data), "rt") as f: |
|
df = pd.read_csv(f) |
|
df.loc[:, ["datetime"]] = pd.to_datetime(df["timestamp"], unit="s") |
|
df = self._convert_to_candle(df, f"{self.interval}s") |
|
|
|
df.to_csv(os.path.join( |
|
temp_dir, f"{dt.strftime('%Y-%m-%d')}.csv" |
|
)) |
|
|
|
archive_path = os.path.join(self.output_dir, self.symbol) |
|
shutil.make_archive(archive_path, "zip", temp_dir) |
|
|
|
print(f"Time: {time.time() - start_time}") |
|
|
|
|
|
if __name__ == "__main__": |
|
app = App() |
|
asyncio.run(app.run()) |