Created
March 26, 2019 22:14
-
-
Save AHartTN/145581998e0ecb3a4837cf7231927b84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import intrinio | |
from datetime import datetime | |
from dateutil.relativedelta import relativedelta | |
from os.path import join | |
from mssql import MSSQL | |
from utility import Utility | |
class StockData(object): | |
"""This class was developed to make working with stock data easier""" | |
MSSSQL_SERVER = "ANTHONY-DESKTOP" | |
MSSSQL_DATABASE = "Intrinio" | |
SQL = MSSQL(MSSSQL_SERVER, MSSSQL_DATABASE) | |
STOCK_DATA_PATH = "C:/Users/Public/Documents/" | |
COMPANY_INFO_DIRECTORY = "Company Info/" | |
SECURITY_DIRECTORY = "Security/" | |
COMPANY_FILE_NAME = "COMPANIES.json" | |
SECURITY_FILE_NAME = "SECURITIES.json" | |
COMPANY_FILE_PATH = join(STOCK_DATA_PATH, COMPANY_FILE_NAME) | |
SECURITY_FILE_PATH = join(STOCK_DATA_PATH, SECURITY_FILE_NAME) | |
COMPANY_INFO_PATH = join(STOCK_DATA_PATH, COMPANY_INFO_DIRECTORY) | |
SECURITY_PATH = join(STOCK_DATA_PATH, SECURITY_DIRECTORY) | |
YEARS_OF_DATA = 20 | |
CURRENT_DATE = datetime.utcnow() | |
START_DATE = CURRENT_DATE - relativedelta(years=YEARS_OF_DATA) | |
NUMBER_OF_DAYS = (CURRENT_DATE - START_DATE).days | |
COMPANY_TABLE_NAME = "Company" | |
COMPANY_INFO_TABLE_NAME = "CompanyInfo" | |
COMPANY_INFO_SECURITY_TABLE_NAME = "CompanyInfoSecurity" | |
SECURITY_TABLE_NAME = "[Security]" | |
COMPANY_COLUMNS = ("name", "cik", "lei", "ticker", "latest_filing_date") | |
COMPANY_INFO_COLUMNS = ("ticker", "stock_exchange", "name", "legal_name", "cik", "sic", "lei", "entity_status", "entity_legal_form", "sector", "industry_category", "industry_group", "latest_filing_date", "standardized_active", "template", "ceo", "employees", "short_description", "long_description", "inc_country", "inc_state", "hq_address1", "hq_address2", "hq_address_city", "hq_state", "hq_address_postal_code", "hq_country", "mailing_address", "business_address", "business_phone_no", "company_url") | |
COMPANY_INFO_SECURITY_COLUMNS = ("figi_unique_id", "last_crsp_adj_date", "composite_figi_ticker", "security_type", "delisted_security", "stock_exchange", "security_name", "share_class_figi", "primary_listing", "figi", "figi_ticker", "figi_exch_cntry", "currency", "composite_figi", "market_sector", "mic", "etf", "exch_symbol", "ticker") | |
SECURITY_COLUMNS = ("ticker", "figi_ticker", "figi", "security_name", "market_sector", "security_type", "stock_exchange", "last_crsp_adj_date") | |
COMPANY_COLUMNS_COUNT = len(COMPANY_COLUMNS) | |
COMPANY_INFO_COLUMNS_COUNT = len(COMPANY_INFO_COLUMNS) | |
COMPANY_INFO_SECURITY_COLUMNS_COUNT = len(COMPANY_INFO_SECURITY_COLUMNS) | |
def __init__(self, username: str, password: str): | |
intrinio.client.username = username | |
intrinio.client.password = password | |
print("Stock Data Helper Initialized!") | |
return | |
def get_company(self, ticker: str = None): | |
"""Retrieves company information for a given ticker symbol""" | |
company = intrinio.companies(ticker) | |
return [dict(zip(company.columns, company_value)) for company_value in company.values] | |
def get_security(self, symbol: str = None): | |
"""Retrieves company information for a given ticker symbol""" | |
security = intrinio.securities(symbol) | |
return [dict(zip(security.columns, security_value)) for security_value in security.values] | |
def write_securities_to_mssql(self, securities): | |
self.SQL.write_list(self.SECURITY_TABLE_NAME, self.SQL.SECURITY_COLUMNS, Utility.get_values_list(self.SECURITY_COLUMNS, securities)) | |
return self.read_securities_from_sql() | |
def write_companies_to_mssql(self, companies): | |
self.SQL.write_list(self.COMPANY_TABLE_NAME, self.SQL.COMPANY_COLUMNS, Utility.get_values_list(self.COMPANY_COLUMNS, companies)) | |
return self.read_companies_from_sql() | |
def write_company_info_to_mssql(self, company_id: int, info): | |
values = (company_id,) + Utility.get_values(self.COMPANY_INFO_COLUMNS, info) | |
company_info_id = self.SQL.write(self.COMPANY_INFO_TABLE_NAME, self.SQL.COMPANY_INFO_COLUMNS, values) | |
securities = info.get("securities") | |
security_ids = self.write_company_info_securities_to_mssql(company_id, company_info_id, securities) | |
return { | |
"company_id": company_id, | |
"company_info_id": company_info_id, | |
"security_ids": security_ids} | |
return | |
def write_company_info_securities_to_mssql(self, company_id: int, company_info_id: int, securities): | |
if securities and len(securities) > 0: | |
values = tuple((company_id, company_info_id, ) + Utility.get_values(self.COMPANY_INFO_SECURITY_COLUMNS, security) for security in securities) | |
self.SQL.write_list(self.COMPANY_INFO_SECURITY_TABLE_NAME, self.SQL.COMPANY_INFO_SECURITY_COLUMNS, values) | |
return self.SQL.read(self.COMPANY_INFO_SECURITY_TABLE_NAME, CompanyID = company_id, CompanyInfoID = company_info_id) | |
return [] | |
def read_companies_from_sql(self): | |
results = self.SQL.read(self.COMPANY_TABLE_NAME) | |
return results | |
def read_securities_from_sql(self): | |
results = self.SQL.read(self.SECURITY_TABLE_NAME) | |
return results | |
def get_company_id(self, ticker): | |
results = self.SQL.read(self.COMPANY_TABLE_NAME, Ticker = ticker) | |
return results[0].get("ID") | |
def read_company_info_from_sql(self, ticker): | |
results = self.SQL.read(self.COMPANY_INFO_TABLE_NAME, Ticker = ticker) | |
return results | |
def retrieve_securities(self, force_update=False): | |
update_sql = force_update | |
update_file = force_update | |
securities = self.read_securities_from_sql() | |
if not securities or len(securities) == 0: | |
update_sql = True | |
securities = Utility.read_from_file(self.SECURITY_FILE_PATH) | |
if not securities or sum(1 for i in securities) == 0: | |
update_file = True | |
securities = self.get_security() | |
if not securities or sum(1 for i in securities) == 0: | |
raise ValueError("We were unable to retrieve the list of securities", securities) | |
if update_file: | |
Utility.write_to_file(securities, self.SECURITY_FILE_PATH) | |
if update_sql: | |
self.write_securities_to_mssql(securities) | |
if update_file or update_sql: | |
securities = self.read_securities_from_sql() | |
return securities | |
def retrieve_companies(self, force_update=False): | |
update_sql = force_update | |
update_file = force_update | |
companies = self.read_companies_from_sql() | |
if not companies or len(companies) == 0: | |
update_sql = True | |
companies = Utility.read_from_file(self.COMPANY_FILE_PATH) | |
if not companies or sum(1 for i in companies) == 0: | |
update_file = True | |
companies = self.get_company() | |
if not companies or sum(1 for i in companies) == 0: | |
raise ValueError("We were unable to retrieve the list of companies", companies) | |
if update_file: | |
Utility.write_to_file(companies, self.COMPANY_FILE_PATH) | |
if update_sql: | |
self.write_companies_to_mssql(companies) | |
if update_file or update_sql: | |
companies = self.read_companies_from_sql() | |
return companies | |
def retrieve_company_info(self, company_id, ticker, force_update=False) -> dict: | |
update_sql = force_update | |
update_file = force_update | |
file_path = join(self.COMPANY_INFO_PATH, "%s_%s.json" % (company_id, ticker)) | |
companies = self.read_company_info_from_sql(ticker) | |
company_info = () | |
if companies and len(companies) > 0: | |
company_info = companies[0] | |
else: | |
print("%s is not in the database" % ticker) | |
if not company_info: | |
update_sql = True | |
raw_company_info = Utility.read_from_file(file_path) | |
if raw_company_info: | |
company_info = dict(raw_company_info) | |
if not company_info: | |
update_file = True | |
company_info = list(self.get_company(ticker))[0] | |
if update_file: | |
Utility.write_to_file(company_info, file_path) | |
if update_sql: | |
self.write_company_info_to_mssql(company_id, company_info) | |
if update_file or update_sql: | |
company_info = self.read_company_info_from_sql(ticker) | |
return company_info | |
def get_stock_files(self): | |
return [f for f in listdir(STOCK_DATA_PATH) if isfile(join(STOCK_DATA_PATH, f))] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment