Created
April 1, 2017 00:31
-
-
Save ivannp/52d3b8ddc8d62943fc2d496ea27d9547 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dshelper as dsh | |
import instrumentdb as idb | |
import logging | |
import numpy as np | |
import os | |
import pandas as pd | |
import psutil | |
import sys | |
import tensorflow as tf | |
import time | |
from sklearn.preprocessing import OneHotEncoder | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis as QDA | |
from sqlalchemy import create_engine, MetaData | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy import Column, Integer, String, DateTime, Float, ForeignKey, UniqueConstraint | |
from sqlalchemy.orm import sessionmaker | |
DeclarativeBase = declarative_base() | |
class Model(DeclarativeBase): | |
__tablename__ = 'models' | |
id = Column(Integer, primary_key=True) | |
name = Column(String, nullable=False) | |
__table_args__ = (UniqueConstraint('name', name='unco1'),) | |
class Forecast(DeclarativeBase): | |
__tablename__ = 'forecasts' | |
id = Column(Integer, primary_key=True) | |
model = Column(Integer, ForeignKey('models.id'), nullable=False) | |
symbol = Column(String) | |
ts = Column(DateTime) | |
fore = Column(Float) | |
details = Column(String) | |
__table_args__ = (UniqueConstraint('model', 'ts', 'symbol', name='unco1'),) | |
class WalkForwardLoop: | |
def __init__(self, model_name, log_file=None, classifier=None, index_format='%Y-%m-%d', db_url=None, scale=True, | |
verbose=False, tensorflow=True): | |
self.model_name = model_name # The model name to use for the database | |
self.classifier = classifier # The classifier object | |
self.log_file = log_file | |
self.index_format = index_format | |
self.db_url = db_url | |
self.db_session = None | |
self.scale = scale | |
if self.db_url is not None: | |
self.init_db() | |
self.verbose = verbose | |
self.tensorflow = tensorflow | |
def init_db(self): | |
engine = create_engine(self.db_url) | |
DeclarativeBase.metadata.create_all(engine) | |
Session = sessionmaker(bind=engine) | |
self.db_session = Session() | |
try: | |
self.db_session.add(Model(name=self.model_name)) | |
self.db_session.commit() | |
except: | |
self.db_session.rollback() | |
pass | |
self.model_id = self.db_session.query(Model.id).filter(Model.name == self.model_name).first()[0] | |
def run(self, features, response, forecast_locations, max_history=1e6, symbol_column=None, tensorflow=None, | |
verbose=None): | |
assert len(features) == len(response) | |
if isinstance(verbose, bool): | |
self.verbose = verbose | |
if isinstance(tensorflow, bool): | |
self.tensorflow = tensorflow | |
db_session = None | |
if self.db_url is not None: | |
self.init_db() | |
timer = None | |
if sys.platform == 'win32': | |
timer = time.clock | |
else: | |
timer = time.time | |
for ii in range(0, forecast_locations.len()): | |
# Prepare the range for training for this iteration | |
history_end = forecast_locations.starts[ii] | |
history_start = 0 | |
if (history_end - history_start + 1) > max_history: | |
history_start = history_end - max_history + 1 | |
xx = features.iloc[history_start:history_end].as_matrix() | |
yy = response.iloc[history_start:history_end].as_matrix() | |
# Scale the data | |
if self.scale: | |
std_scaler = StandardScaler() | |
xx = std_scaler.fit_transform(xx) | |
fore_xx = features.iloc[forecast_locations.starts[ii]:(forecast_locations.ends[ii] + 1)].as_matrix() | |
if self.scale: | |
fore_xx = std_scaler.transform(fore_xx) | |
# Train the model and predict | |
start = timer() | |
# fore = self.classifier.fit_predict(xx, yy, fore_xx) | |
if tensorflow: | |
fore = self.tensorflow_fit_predict(xx, yy, fore_xx) | |
else: | |
fore = self.fit_predict(xx, yy, fore_xx) | |
forecasting_time = timer() - start | |
fore_df = pd.DataFrame(fore, index=features.iloc[ | |
forecast_locations.starts[ii]:(forecast_locations.ends[ii] + 1)].index) | |
# Generate proper column names. Map -1,0,1 to 'short','out','long'. The 4th column is the class. | |
# fore_df.columns = np.append(np.array(['short','long'])[self.classes.astype(int) + 1], ['class']) | |
fore_df.ix[:, 2] = np.where(fore_df.ix[:, 2] == -1, 'short', 'long') | |
fore_df.columns = np.array(['short_prob', 'long_prob', 'class']) | |
# print(fore_df) | |
fore = fore[:, 2] | |
metric = np.round(np.amax(fore_df.ix[:, 0:4], axis=1), 2) | |
# Save results to a database or somewhere else | |
if self.db_session is not None: | |
for jj in range(len(fore)): | |
row_id = forecast_locations.starts[ii] + jj | |
ts = features.index[row_id] | |
details = fore_df.iloc[[jj]].to_json(orient='split', date_format='iso') | |
if symbol_column is not None: | |
symbol = symbol_column[row_id] | |
rs = self.db_session.query(Forecast.id).filter(Forecast.ts == ts).filter( | |
Forecast.model == self.model_id).filter(Forecast.symbol == symbol).first() | |
if rs is None: | |
ff = Forecast(model=self.model_id, ts=ts, fore=fore[jj], details=details, symbol=symbol) | |
self.db_session.add(ff) | |
else: | |
ff = Forecast(id=rs[0], model=self.model_id, ts=ts, fore=fore[jj], details=details, | |
symbol=symbol) | |
self.db_session.merge(ff) | |
else: | |
rs = self.db_session.query(Forecast.id).filter(Forecast.ts == ts).filter( | |
Forecast.model == self.model_id).first() | |
if rs is None: | |
ff = Forecast(model=self.model_id, ts=ts, fore=fore[jj], details=details) | |
self.db_session.add(ff) | |
else: | |
ff = Forecast(id=rs[0], model=self.model_id, ts=ts, fore=fore[jj], details=details) | |
self.db_session.merge(ff) | |
# Log output | |
if self.log_file is not None: | |
out_str = "\n" + features.index[forecast_locations.starts[ii]].strftime(self.index_format) + " - " + \ | |
features.index[forecast_locations.ends[ii]].strftime(self.index_format) + "\n" + \ | |
"=======================\n" + \ | |
" history: from: " + features.index[history_start].strftime(self.index_format) + ", to: " + \ | |
features.index[history_end - 1].strftime(self.index_format) + \ | |
", length: " + str(history_end - history_start) + "\n" + \ | |
" forecast length: " + str( | |
forecast_locations.ends[ii] - forecast_locations.starts[ii] + 1) + "\n" + \ | |
" forecast: [" + ','.join(str(round(ff, 2)) for ff in fore) + "]\n" + \ | |
" probs: [" + ','.join(str(round(mm, 2)) for mm in metric) + "]\n" + \ | |
" time [training+forecasting]: " + str(round(forecasting_time, 2)) + " secs\n" | |
with open(self.log_file, "a") as ff: | |
print(out_str, file=ff) | |
if self.db_session is not None: | |
self.db_session.commit() | |
def tensorflow_fit_predict(self, x, y, newx): | |
learning_rate = 0.01 | |
batch_size = 'auto' | |
num_passes = 2 | |
display_step = 1 | |
if isinstance(batch_size, str): | |
if batch_size == 'auto': | |
batch_size = min(200, x.shape[0]) | |
else: | |
raise ValueError("'auto' is the only acceptable string for batch_size") | |
num_batches = x.shape[0] // batch_size | |
# print("num_batches = {0}, batch_size = {1}, x.shape = {2}".format(num_batches, batch_size, x.shape)) | |
# Map the y's to [0,nlevels) | |
classes = np.sort(np.unique(y)) | |
self.classes = classes | |
yz = np.searchsorted(classes, y) | |
# One hot encode them | |
ohe = OneHotEncoder(n_values=len(classes), sparse=False) | |
yy = ohe.fit_transform(yz) | |
res = None | |
nfeatures = x.shape[1] | |
nlabels = yy.shape[1] | |
# Define the tensorflow graph. A new graph for each iteration. Otherwise, all iterations use | |
# the default graph, and the memory usage explodes quickly. | |
with tf.Graph().as_default(): | |
input = tf.placeholder(tf.float32, [None, nfeatures]) | |
label = tf.placeholder(tf.float32, [None, nlabels]) | |
nconv1 = 32 | |
cw1 = tf.Variable(tf.random_normal([1, 3, 1, nconv1])) | |
cb1 = tf.Variable(tf.random_normal([nconv1])) | |
conv_input = tf.reshape(input, shape=[-1, 1, nfeatures, 1]) | |
cl1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv_input, cw1, strides=[1, 1, 1, 1], padding='SAME'), cb1)) | |
mp1 = tf.nn.max_pool(cl1, ksize=[1, 1, 2, 1], strides=[1, 1, 2, 1], padding='SAME') | |
nhidden1 = 128 | |
w1 = tf.Variable(tf.random_normal([378*32, nhidden1])) | |
b1 = tf.Variable(tf.random_normal([nhidden1])) | |
fc_input = tf.reshape(mp1, [-1, w1.get_shape().as_list()[0]]) | |
l1 = tf.nn.relu(tf.add(tf.matmul(fc_input, w1), b1)) | |
nhidden2 = 128 | |
w2 = tf.Variable(tf.random_normal([nhidden1, nhidden2])) | |
b2 = tf.Variable(tf.random_normal([nhidden2])) | |
l2 = tf.nn.relu(tf.add(tf.matmul(l1, w2), b2)) | |
w3 = tf.Variable(tf.random_normal([nhidden2, nlabels])) | |
b3 = tf.Variable(tf.random_normal([nlabels])) | |
model = tf.add(tf.matmul(l2, w3), b3) | |
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=model, labels=label)) | |
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost) | |
correct_fores = tf.equal(tf.argmax(model, 1), tf.argmax(label, 1)) | |
accuracy = tf.reduce_mean(tf.cast(correct_fores, tf.float32)) | |
# Initializing the variables | |
init_op = tf.global_variables_initializer() | |
# Train our neural network | |
all_features = np.array_split(x, num_batches) | |
all_labels = np.array_split(yy, num_batches) | |
# Launch the graph | |
with tf.Session() as sess: | |
sess.run(init_op) | |
avg_cost = 0. | |
total_batches = num_batches * num_passes | |
# Log some data | |
first_shape = all_features[0].shape | |
last_shape = all_features[len(all_features)-1].shape | |
logging.debug("total_batches = {0}, first shape = {1}, last shape = {2}, x shape = {3}, y shape = {4}".format(total_batches, first_shape, last_shape, x.shape, yy.shape)) | |
process = psutil.Process(os.getpid()) | |
logging.debug(process.memory_info().rss) | |
for ii in range(total_batches): | |
features = np.ascontiguousarray(all_features[ii % num_batches]) | |
labels = np.ascontiguousarray(all_labels[ii % num_batches]) | |
_, cc, acc = sess.run([optimizer, cost, accuracy], feed_dict={input: features, label: labels}) | |
if ii % display_step == 0 and self.verbose: | |
logging.info("Minibatch: {0:04d}, Loss: {1:.4f}, Accuracy: {2:.2f}%".format(ii + 1, cc, acc * 100)) | |
# Predict | |
out = tf.nn.softmax(model) | |
probs = sess.run(out, feed_dict={input: newx}) | |
# probs = sess.run(tf.argmax(model, 1), feed_dict={input: newx}) | |
# print(probs) | |
if len(probs.shape) == 1: | |
probs = np.reshape(probs, (1, -1)) | |
# Append the resulting class to the probabilities | |
res = np.append(probs, [self.classes[np.argmax(probs, 1)]], axis=1) | |
return res | |
def stack_series(all_data, series): | |
res = None | |
for ss in series: | |
tt = pd.concat([all_data[ss]['full']['entry'], all_data[ss]['features']], axis=1).dropna() | |
# tt['symbol'] = pd.Series(ss, index=tt.index) | |
tt.insert(0, 'symbol', pd.Series(ss, index=tt.index)) | |
if res is None: | |
res = tt | |
else: | |
res = res.append(tt) | |
res = res.sort_index() | |
# res = res.sort(['ts','symbol']) | |
return res | |
def drive_wfl(): | |
# symbols = ['HO2'] | |
symbols = ['HO2', 'CL2'] | |
all_data = dsh.load('all_data.bin') | |
combined = stack_series(all_data, symbols) | |
# print(combined['symbol'].tail(20)) | |
symbol_column = None | |
if 'symbol' in combined.columns: | |
symbol_column = combined['symbol'] | |
combined.drop('symbol', axis=1, inplace=True) | |
response = combined.iloc[:, 0] | |
features = combined.iloc[:, 1:] | |
fl = dsh.ForecastLocations(features.index) | |
ml = WalkForwardLoop('qda', 'ml.log', db_url='sqlite:///ml.sqlite') | |
ml.run(features, response, fl, symbol_column=symbol_column) | |
def extend_price(ts1, ts2): | |
first_index = np.where(ts2.index == ts1.index[0]) | |
if len(first_index) != 1 or len(first_index[0]) != 1: | |
raise ('Failed to find the index to stich the series') | |
first_index = first_index[0][0] | |
res = ts2.pct_change().shift(-1)[:first_index].append(ts1) | |
# Walk the series backwards, building the prices from the returns | |
for ii in range(first_index, 0, -1): | |
res[ii - 1] = res[ii] / (1.0 + res[ii - 1]) | |
return res | |
def pinnacle_csv(csv_path): | |
ss = pd.read_csv(csv_path, header=None, parse_dates=True, index_col=0) | |
ss = ss.ix[:, 0:4] | |
ss.columns = ['open', 'high', 'low', 'close'] | |
return ss | |
def returns_wfl(): | |
ho_pin = pinnacle_csv('d:/DATA/CLCDATA/HO_REV.CSV') | |
ho_pin = ho_pin.ix[:, 3] | |
db = idb.CsiDb() | |
ho_csi = db.load_bars('HO2') | |
ho_csi = ho_csi.ix[:, 3] | |
ho_ext = np.round(extend_price(ho_csi, ho_pin), 4) | |
rets = ho_ext.pct_change() | |
erets = rets.pow(2).ewm(span=36).mean().pow(1 / 2) | |
arets = rets / erets | |
arets = arets.dropna() | |
history_len = 3 * 252 # Three years | |
nrows = len(arets) - history_len | |
mm = np.full((nrows, history_len), np.nan) | |
for ii in range(history_len, len(arets)): | |
mm[ii - history_len, :] = arets[(ii - history_len + 1):(ii + 1)] | |
response = np.where(arets < 0, -1, 1) | |
response = pd.DataFrame(response, index=arets.index) | |
# Remove the first history_len + 1. The extra one removed is | |
# because we need to shift the features one position forward, | |
# to align with the response, thus, we loose one more feature. | |
response = response.tail(-history_len - 1) | |
features = mm[:(mm.shape[0] - 1), :] | |
features = pd.DataFrame(features, index=response.index) | |
# print(response.head()) | |
# print(arets.head()) | |
# print(features.head().iloc[:,-3:]) | |
# print(response.tail()) | |
# print(arets.tail()) | |
# print(features.tail().iloc[:,-3:]) | |
fl = dsh.ForecastLocations(features.index, start_date="2011-12-31") | |
ml = WalkForwardLoop('tf_conv', log_file='ml.log', db_url='sqlite:///ml.sqlite') | |
ml.run(features, response, fl, verbose=False, tensorflow=True) | |
def main(): | |
# Init logging | |
logging.basicConfig(filename='diag.log',level=logging.DEBUG) | |
returns_wfl() | |
if __name__ == "__main__": | |
main() # Save results to a database or somewhere else |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment