Last active
March 22, 2021 03:14
-
-
Save Voyz/09652dd37e499547bf4fcbe85954d1e6 to your computer and use it in GitHub Desktop.
Databay Inlet for IBKR CP Web API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TS_DATE_FORMAT_DB_ISO = '%Y-%m-%dT%H:%M:%S,%f' # comma works for mongodb and is valid ISO | |
def datetimeindex_to_UTC(_dti): | |
dti = _dti.copy(deep=True) | |
if 'tz' not in dti or dti.tz is None: | |
dti = dti.tz_localize('UTC') | |
else: | |
dti = dti.tz_convert('UTC') | |
dti = dti.dt.strftime(TS_DATE_FORMAT_DB_ISO).astype(str).str.slice(0, -3) | |
return dti | |
def timestamp_to_UTC(df, key:str = 't'): | |
df[key] = pd.to_datetime(df[key], unit='ms') | |
df.set_index(key, inplace=True, drop=True) | |
df.index = datetimeindex_to_UTC(df.index.to_series()) | |
df[key] = df.index | |
return df | |
def parse_data(data): | |
# print(data) | |
df = pd.DataFrame(data) | |
if df.empty: | |
return [] | |
df.rename(columns={'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 't': 'date'}, inplace=True) | |
df['date'] = pd.to_datetime(df['date'], unit='ms') | |
# do your own data parsing here if needed | |
df = timestamp_to_UTC(df, key='date') | |
df = df.loc[:, ['date', 'open', 'high', 'low', 'close', 'volume']] | |
return df.to_dict('records') | |
class IbkrAsyncInlet(HttpInlet): | |
def __init__(self, conid:str, period:str, bar:str, cacert:Union[bool, str, os.PathLike]=False, filter:Filter=None, *args, **kwargs): | |
self.conid = conid | |
self.filter = filter | |
params = { | |
'conid': conid, | |
'period': period, | |
'bar': bar, | |
} | |
super().__init__(url=BASE_URL + MARKET_HISTORY_URL, | |
json=True, | |
cacert=cacert, | |
params=params, | |
*args, **kwargs) | |
async def pull(self, update:Update): | |
try: | |
result = await super().pull(update) | |
if 'data' not in result: | |
_LOGGER.error(f'Result does\'t contain data: {result}') | |
return [] | |
else: | |
data = parse_data(result['data']) | |
if self.filter is not None: | |
data = self.filter.run(data) | |
return data | |
except requests.exceptions.ConnectionError as e: | |
if 'the target machine actively refused it' in str(e) \ | |
or 'Connection aborted.' in str(e): | |
_LOGGER.error(f'Cannot connect to Gateway at: {BASE_URL}') | |
return [] | |
else: | |
raise e | |
except aiohttp.client_exceptions.ClientConnectorError as e: | |
if 'Connect call failed' in str(e): | |
_LOGGER.error(f'Cannot connect to Gateway at: {BASE_URL}') | |
return [] | |
else: | |
raise e |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment