#!/bin/bash
# init.sh - Script to initialize the HFT project structure and files
# Exit immediately if a command exits with a non-zero status
set -e
# Create project directory
mkdir -p hft_system
cd hft_system
# Create directories
mkdir -p hft_system hft_system/tests hft_system/data hft_system/models hft_system/logs
# Create __init__.py files
touch hft_system/__init__.py
touch hft_system/tests/__init__.py
# Create main application file
cat <<EOL > app.py
# app.py - Main Streamlit application
import os
import threading
import logging
import streamlit as st
import plotly.express as px
from hft_system.data_acquisition import DataAcquisition
from hft_system.data_processing import DataProcessing
from hft_system.feature_engineering import FeatureEngineering
from hft_system.models import DRLAgent, SubModels
from hft_system.execution import ExecutionEngine
from hft_system.risk_management import RiskManager
from hft_system.utils import load_config
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
def main():
"""
Main function to run the Streamlit application with multi-tab support.
"""
st.set_page_config(page_title="Advanced HFT System", layout="wide")
# Sidebar for navigation
st.sidebar.title("Navigation")
app_mode = st.sidebar.selectbox("Choose the app mode",
["Dashboard", "Trading Parameters", "Data Visualization",
"Model Management", "Execution Control", "User Documentation"])
if app_mode == "Dashboard":
show_dashboard()
elif app_mode == "Trading Parameters":
show_trading_parameters()
elif app_mode == "Data Visualization":
show_data_visualization()
elif app_mode == "Model Management":
show_model_management()
elif app_mode == "Execution Control":
show_execution_control()
elif app_mode == "User Documentation":
show_user_documentation()
def show_dashboard():
"""
Display the main dashboard with key performance metrics.
"""
st.title("System Dashboard")
# Display portfolio performance
# Assume we have a function get_portfolio_performance()
performance_data = get_portfolio_performance()
fig = px.line(performance_data, x='Time', y='Portfolio Value', title='Portfolio Value Over Time')
st.plotly_chart(fig)
# Display other key metrics
st.write("Additional metrics will be displayed here.")
def get_portfolio_performance():
"""
Fetch or calculate portfolio performance data.
Returns:
DataFrame: Portfolio performance data.
"""
# Placeholder implementation
import pandas as pd
import numpy as np
dates = pd.date_range(start="2021-01-01", periods=100, freq='T')
values = np.cumsum(np.random.randn(100)) + 100000 # Starting with $100,000
data = pd.DataFrame({'Time': dates, 'Portfolio Value': values})
return data
def show_trading_parameters():
"""
Allow users to adjust trading parameters.
"""
st.title("Trading Parameters")
# Parameters can be adjusted here
# Use session state to store parameters
st.session_state['data_interval'] = st.selectbox("Data Interval", ['1s', '5s', '15s', '1m'])
st.session_state['lookback_period'] = st.slider("Lookback Period", 10, 120, 60)
st.session_state['trading_symbol'] = st.text_input("Trading Symbol", 'AAPL')
st.session_state['initial_capital'] = st.number_input("Initial Capital ($)", 1000, step=1000)
st.session_state['max_position'] = st.number_input("Max Position Size", 1, step=10)
st.session_state['transaction_cost'] = st.number_input("Transaction Cost (%)", 0.0, 1.0, 0.001)
st.session_state['learning_rate'] = st.number_input("Learning Rate", 0.0001, 0.01, 0.001)
st.session_state['gamma'] = st.number_input("Discount Factor (Gamma)", 0.8, 0.99, 0.95)
st.session_state['epochs'] = st.number_input("Training Epochs", 1, 20, 5)
st.success("Parameters updated successfully.")
def show_data_visualization():
"""
Display real-time data visualizations.
"""
st.title("Data Visualization")
# Fetch data
data_acquisition = DataAcquisition()
market_data = data_acquisition.get_real_time_data(st.session_state.get('trading_symbol', 'AAPL'))
# Plot price data
fig = px.line(market_data, x='Timestamp', y='Price', title='Real-Time Price Data')
st.plotly_chart(fig)
# Display order book depth
st.write("Order book depth visualization will be here.")
def show_model_management():
"""
Manage models: train, optimize hyperparameters, and view performance.
"""
st.title("Model Management")
if st.button("Train Model"):
st.write("Training model...")
# Implement model training logic
if st.button("Optimize Hyperparameters"):
st.write("Optimizing hyperparameters...")
# Implement hyperparameter optimization
# Display model performance metrics
st.write("Model performance metrics will be displayed here.")
def show_execution_control():
"""
Control the execution: start/stop trading sessions.
"""
st.title("Execution Control")
if st.button("Start Trading Session"):
st.write("Starting trading session...")
# Implement trading session start logic
if st.button("Stop Trading Session"):
st.write("Stopping trading session...")
# Implement trading session stop logic
# Display execution logs
st.write("Execution logs will be displayed here.")
def show_user_documentation():
"""
Display detailed user documentation within the UI.
"""
st.title("User Documentation")
st.markdown("""
# Welcome to the Advanced HFT System
This documentation provides a comprehensive guide on how to use the system.
## Sections
- **Dashboard**: Overview of system performance.
- **Trading Parameters**: Adjust trading settings.
- **Data Visualization**: View real-time data.
- **Model Management**: Train and manage models.
- **Execution Control**: Start/stop trading sessions.
## Getting Started
1. Navigate to **Trading Parameters** to set your preferences.
2. Fetch and visualize data in the **Data Visualization** tab.
3. Train the model in the **Model Management** tab.
4. Start a trading session in the **Execution Control** tab.
**Note**: Ensure that you have updated the `.env` file with your API keys and configured the necessary services.
""")
if __name__ == "__main__":
main()
EOL
# Create data_acquisition.py
cat <<EOL > hft_system/data_acquisition.py
# data_acquisition.py - Module for data acquisition
import os
import logging
import pandas as pd
from alpaca_trade_api.rest import REST, TimeFrame
import tweepy
from dotenv import load_dotenv
from datetime import datetime, timedelta
load_dotenv()
class DataAcquisition:
def __init__(self):
"""
Initialize the DataAcquisition class by setting up API clients.
"""
# Alpaca API credentials
self.alpaca_api_key = os.getenv('ALPACA_API_KEY')
self.alpaca_secret_key = os.getenv('ALPACA_SECRET_KEY')
self.alpaca_api = REST(self.alpaca_api_key, self.alpaca_secret_key, api_version='v2')
# Twitter API credentials
self.twitter_auth = tweepy.OAuth1UserHandler(
os.getenv('TWITTER_CONSUMER_KEY'),
os.getenv('TWITTER_CONSUMER_SECRET'),
os.getenv('TWITTER_ACCESS_TOKEN'),
os.getenv('TWITTER_ACCESS_SECRET')
)
self.twitter_api = tweepy.API(self.twitter_auth)
logging.info("DataAcquisition initialized.")
def get_market_data(self, symbol, start, end, timeframe):
"""
Fetch historical market data from Alpaca API.
Args:
symbol (str): The trading symbol.
start (str): Start date in ISO format.
end (str): End date in ISO format.
timeframe (TimeFrame): Timeframe for the bars.
Returns:
pd.DataFrame: Market data as a DataFrame.
"""
logging.info(f"Fetching market data for {symbol}")
bars = self.alpaca_api.get_bars(symbol, timeframe, start, end).df
bars = bars[bars['symbol'] == symbol]
return bars
def get_real_time_data(self, symbol):
"""
Fetch real-time market data for a symbol.
Args:
symbol (str): The trading symbol.
Returns:
pd.DataFrame: Real-time market data.
"""
logging.info(f"Fetching real-time data for {symbol}")
end = datetime.utcnow()
start = end - timedelta(minutes=15) # Last 15 minutes
bars = self.get_market_data(symbol, start.isoformat(), end.isoformat(), TimeFrame.Minute)
bars.reset_index(inplace=True)
bars.rename(columns={'timestamp': 'Timestamp', 'close': 'Price'}, inplace=True)
return bars[['Timestamp', 'Price']]
def get_order_book_data(self, symbol):
"""
Fetch order book data (Level II) from Alpaca API.
Args:
symbol (str): The trading symbol.
Returns:
pd.DataFrame: Order book data.
"""
logging.info(f"Fetching order book data for {symbol}")
# Alpaca API does not provide Level II data in the free tier.
# You may need to use a premium data provider or adjust this method accordingly.
raise NotImplementedError("Order book data retrieval not implemented.")
def get_sentiment_data(self, symbol):
"""
Fetch sentiment data from Twitter API.
Args:
symbol (str): The trading symbol.
Returns:
float: Average sentiment score.
"""
logging.info(f"Fetching sentiment data for {symbol}")
query = f"\${symbol} -filter:retweets"
tweets = self.twitter_api.search_tweets(q=query, lang='en', count=100)
sentiments = [TextBlob(tweet.text).sentiment.polarity for tweet in tweets]
avg_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0
return avg_sentiment
EOL
# Create data_processing.py
cat <<EOL > hft_system/data_processing.py
# data_processing.py - Module for data processing and storage
import pandas as pd
import logging
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from dotenv import load_dotenv
import os
load_dotenv()
class DataProcessing:
def __init__(self):
"""
Initialize the DataProcessing class by setting up the InfluxDB client.
"""
self.influxdb_url = os.getenv('INFLUXDB_URL')
self.influxdb_token = os.getenv('INFLUXDB_TOKEN')
self.influxdb_org = os.getenv('INFLUXDB_ORG')
self.influxdb_bucket = os.getenv('INFLUXDB_BUCKET')
self.client = InfluxDBClient(
url=self.influxdb_url,
token=self.influxdb_token,
org=self.influxdb_org
)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
logging.info("DataProcessing initialized.")
def clean_data(self, data):
"""
Clean the data by handling missing values.
Args:
data (pd.DataFrame): The data to clean.
Returns:
pd.DataFrame: Cleaned data.
"""
logging.info("Cleaning data")
data = data.dropna()
return data
def normalize_data(self, data):
"""
Normalize the data for consistent model input.
Args:
data (pd.DataFrame): The data to normalize.
Returns:
pd.DataFrame: Normalized data.
"""
logging.info("Normalizing data")
numeric_cols = data.select_dtypes(include=['float64', 'int64']).columns
data[numeric_cols] = (data[numeric_cols] - data[numeric_cols].mean()) / data[numeric_cols].std()
return data
def store_data(self, data, measurement):
"""
Store data in InfluxDB.
Args:
data (pd.DataFrame): The data to store.
measurement (str): The measurement name.
Returns:
None
"""
logging.info(f"Storing data in InfluxDB under measurement {measurement}")
records = data.to_dict('records')
points = []
for record in records:
point = {
"measurement": measurement,
"tags": {},
"fields": record,
"time": record.get('Timestamp') # Ensure 'Timestamp' is in the data
}
points.append(point)
self.write_api.write(bucket=self.influxdb_bucket, org=self.influxdb_org, record=points)
EOL
# Create feature_engineering.py
cat <<EOL > hft_system/feature_engineering.py
# feature_engineering.py - Module for feature engineering
import pandas as pd
import logging
import ta
from transformers import pipeline
class FeatureEngineering:
def __init__(self):
"""
Initialize the FeatureEngineering class by setting up NLP models.
"""
self.sentiment_model = pipeline('sentiment-analysis')
logging.info("FeatureEngineering initialized.")
def generate_technical_indicators(self, data):
"""
Generate technical indicators from market data.
Args:
data (pd.DataFrame): Market data.
Returns:
pd.DataFrame: Data with technical indicators.
"""
logging.info("Generating technical indicators")
data['SMA'] = ta.trend.SMAIndicator(close=data['close']).sma_indicator()
data['EMA'] = ta.trend.EMAIndicator(close=data['close']).ema_indicator()
data['RSI'] = ta.momentum.RSIIndicator(close=data['close']).rsi()
bollinger = ta.volatility.BollingerBands(close=data['close'])
data['Bollinger_High'] = bollinger.bollinger_hband()
data['Bollinger_Low'] = bollinger.bollinger_lband()
data['MACD'] = ta.trend.MACD(close=data['close']).macd()
data['Stochastic'] = ta.momentum.StochasticOscillator(high=data['high'], low=data['low'], close=data['close']).stoch()
return data
def generate_order_book_features(self, order_book_data):
"""
Generate features from order book data.
Args:
order_book_data (pd.DataFrame): Order book data.
Returns:
pd.DataFrame: Data with order book features.
"""
logging.info("Generating order book features")
# Implement order book feature engineering
# For example, calculate bid-ask spread, order imbalance, etc.
raise NotImplementedError("Order book feature engineering not implemented.")
def generate_sentiment_scores(self, text_data):
"""
Generate sentiment scores from text data.
Args:
text_data (list): List of text data (e.g., tweets).
Returns:
list: Sentiment scores.
"""
logging.info("Generating sentiment scores")
sentiments = self.sentiment_model(text_data)
scores = [s['score'] if s['label'] == 'POSITIVE' else -s['score'] for s in sentiments]
return scores
EOL
# Create models.py
cat <<EOL > hft_system/models.py
# models.py - Module for model implementations
import torch
import torch.nn as nn
import logging
class DRLAgent(nn.Module):
def __init__(self, input_size, output_size):
"""
Initialize the DRLAgent with a Transformer architecture.
Args:
input_size (int): Number of input features.
output_size (int): Number of output actions.
"""
super(DRLAgent, self).__init__()
self.embedding = nn.Linear(input_size, 64)
encoder_layer = nn.TransformerEncoderLayer(d_model=64, nhead=8)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=2)
self.fc = nn.Linear(64, output_size)
self.softmax = nn.Softmax(dim=-1)
logging.info("DRLAgent initialized.")
def forward(self, x):
"""
Forward pass through the network.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Action probabilities.
"""
x = self.embedding(x)
x = self.transformer(x)
x = x.mean(dim=0) # Global average pooling
x = self.fc(x)
return self.softmax(x)
class SubModels:
def __init__(self):
"""
Initialize sub-models for trend, volatility, and sentiment prediction.
"""
self.trend_model = None
self.volatility_model = None
self.sentiment_model = None
logging.info("SubModels initialized.")
def initialize_models(self, input_size):
"""
Initialize sub-models with given input size.
Args:
input_size (int): Number of input features.
Returns:
None
"""
self.trend_model = self._create_model(input_size)
self.volatility_model = self._create_model(input_size)
self.sentiment_model = self._create_model(input_size)
logging.info("Sub-models initialized.")
def _create_model(self, input_size):
"""
Create a simple feedforward neural network.
Args:
input_size (int): Number of input features.
Returns:
nn.Module: A neural network model.
"""
model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
return model
EOL
# Create execution.py
cat <<EOL > hft_system/execution.py
# execution.py - Module for execution and order management
import logging
from alpaca_trade_api.rest import REST, TimeInForce, OrderType, Side
import os
from dotenv import load_dotenv
load_dotenv()
class ExecutionEngine:
def __init__(self):
"""
Initialize the ExecutionEngine with Alpaca API client.
"""
self.alpaca_api = REST(
os.getenv('ALPACA_API_KEY'),
os.getenv('ALPACA_SECRET_KEY'),
api_version='v2'
)
logging.info("ExecutionEngine initialized.")
def execute_order(self, symbol, qty, side, order_type='market', time_in_force='gtc'):
"""
Execute an order using Alpaca API.
Args:
symbol (str): Trading symbol.
qty (float): Quantity to trade.
side (str): 'buy' or 'sell'.
order_type (str): Type of order ('market', 'limit', etc.).
time_in_force (str): Order time in force ('gtc', 'day', etc.).
Returns:
dict: Order information.
"""
logging.info(f"Executing order: {side} {qty} shares of {symbol}")
try:
order = self.alpaca_api.submit_order(
symbol=symbol,
qty=qty,
side=side,
type=order_type,
time_in_force=time_in_force
)
logging.info(f"Order submitted: {order}")
return order
except Exception as e:
logging.error(f"Order execution failed: {e}")
return None
EOL
# Create risk_management.py
cat <<EOL > hft_system/risk_management.py
# risk_management.py - Module for risk management
import logging
import numpy as np
class RiskManager:
def __init__(self):
"""
Initialize the RiskManager with default position limits.
"""
self.position_limits = {}
logging.info("RiskManager initialized.")
def calculate_var(self, portfolio_returns, confidence_level=0.95):
"""
Calculate Value at Risk (VaR) for the portfolio.
Args:
portfolio_returns (list): Historical portfolio returns.
confidence_level (float): Confidence level for VaR.
Returns:
float: Calculated VaR.
"""
var = np.percentile(portfolio_returns, (1 - confidence_level) * 100)
logging.info(f"Calculated VaR at {confidence_level} confidence level: {var}")
return var
def check_position_limits(self, symbol, position_size):
"""
Check if the position size exceeds limits.
Args:
symbol (str): Trading symbol.
position_size (float): Current position size.
Returns:
bool: True if within limits, False otherwise.
"""
limit = self.position_limits.get(symbol, 1000) # Default limit
if abs(position_size) > limit:
logging.warning(f"Position limit exceeded for {symbol}")
return False
return True
def update_position_limits(self, symbol, limit):
"""
Update the position limit for a symbol.
Args:
symbol (str): Trading symbol.
limit (float): New position limit.
Returns:
None
"""
self.position_limits[symbol] = limit
logging.info(f"Position limit for {symbol} updated to {limit}")
EOL
# Create utils.py
cat <<EOL > hft_system/utils.py
# utils.py - Utility functions
import yaml
import logging
def load_config(config_file='config.yaml'):
"""
Load configuration from a YAML file.
Args:
config_file (str): Path to the configuration file.
Returns:
dict: Configuration data.
"""
with open(config_file, 'r') as file:
config = yaml.safe_load(file)
logging.info("Configuration loaded.")
return config
EOL
# Create tests/test_hft_system.py
cat <<EOL > hft_system/tests/test_hft_system.py
# test_hft_system.py - Unit tests for the HFT system
import unittest
from hft_system.data_acquisition import DataAcquisition
from hft_system.data_processing import DataProcessing
from hft_system.feature_engineering import FeatureEngineering
from hft_system.models import DRLAgent, SubModels
from hft_system.execution import ExecutionEngine
from hft_system.risk_management import RiskManager
class TestHFTSystem(unittest.TestCase):
def setUp(self):
"""
Set up test environment by initializing components.
"""
self.data_acquisition = DataAcquisition()
self.data_processing = DataProcessing()
self.feature_engineering = FeatureEngineering()
self.execution_engine = ExecutionEngine()
self.risk_manager = RiskManager()
def test_data_acquisition(self):
"""
Test data acquisition functionality.
"""
data = self.data_acquisition.get_market_data('AAPL', '2021-01-01', '2021-01-02', '1Min')
self.assertIsNotNone(data)
self.assertFalse(data.empty)
def test_data_processing(self):
"""
Test data processing functionality.
"""
data = self.data_acquisition.get_market_data('AAPL', '2021-01-01', '2021-01-02', '1Min')
clean_data = self.data_processing.clean_data(data)
self.assertFalse(clean_data.isnull().values.any())
def test_feature_engineering(self):
"""
Test feature engineering functionality.
"""
data = self.data_acquisition.get_market_data('AAPL', '2021-01-01', '2021-01-02', '1Min')
features = self.feature_engineering.generate_technical_indicators(data)
self.assertIn('SMA', features.columns)
def test_execution_engine(self):
"""
Test execution engine functionality.
"""
# Caution: This test will place a real order if not using a paper trading account.
# Ensure you are using a paper trading account for testing.
order = self.execution_engine.execute_order('AAPL', 1, 'buy')
self.assertIsNotNone(order)
def test_risk_manager(self):
"""
Test risk management functionality.
"""
is_within_limit = self.risk_manager.check_position_limits('AAPL', 500)
self.assertTrue(is_within_limit)
if __name__ == '__main__':
unittest.main()
EOL
# Create setup.py
cat <<EOL > setup.py
from setuptools import setup, find_packages
setup(
name='hft_system',
version='0.1.0',
packages=find_packages(),
install_requires=[
'alpaca-trade-api==2.3.0',
'numpy==1.21.0',
'pandas==1.3.0',
'tweepy==4.3.0',
'ta==0.7.0',
'textblob==0.15.3',
'torch==1.9.0',
'stable-baselines3==1.1.0',
'deap==1.3.1',
'streamlit==1.2.0',
'plotly==5.3.1',
'python-dotenv==0.19.0',
'influxdb-client==1.18.0',
'nltk==3.6.5',
'transformers==4.12.0',
'PyYAML==5.4.1'
],
description='Advanced High-Frequency Trading System with Streamlit UI',
author='Your Name',
author_email='your.email@example.com',
)
EOL
# Create pyproject.toml
cat <<EOL > pyproject.toml
[tool.poetry]
name = "hft_system"
version = "0.1.0"
description = "Advanced High-Frequency Trading System with Streamlit UI"
authors = ["Your Name <your.email@example.com>"]
[tool.poetry.dependencies]
python = "^3.8"
alpaca-trade-api = "2.3.0"
numpy = "1.21.0"
pandas = "1.3.0"
tweepy = "4.3.0"
ta = "0.7.0"
textblob = "0.15.3"
torch = "1.9.0"
stable-baselines3 = "1.1.0"
deap = "1.3.1"
streamlit = "1.2.0"
plotly = "5.3.1"
python-dotenv = "0.19.0"
influxdb-client = "1.18.0"
nltk = "3.6.5"
transformers = "4.12.0"
PyYAML = "5.4.1"
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
EOL
# Create .env file with instructions
cat <<EOL > .env
# .env - Environment variables
# Replace the empty strings with your actual API keys and credentials.
# Alpaca API credentials
ALPACA_API_KEY=""
ALPACA_SECRET_KEY=""
# Twitter API credentials
TWITTER_CONSUMER_KEY=""
TWITTER_CONSUMER_SECRET=""
TWITTER_ACCESS_TOKEN=""
TWITTER_ACCESS_SECRET=""
# InfluxDB configuration
INFLUXDB_URL="http://localhost:8086"
INFLUXDB_TOKEN=""
INFLUXDB_ORG=""
INFLUXDB_BUCKET=""
EOL
# Create README.md
cat <<EOL > README.md
# Advanced High-Frequency Trading System
This project implements an advanced high-frequency trading (HFT) system with a Streamlit user interface (UI). It integrates data acquisition, data processing, feature engineering, model training, execution, and risk management into a cohesive application.
## Features
- **Data Acquisition**: Fetches high-resolution market data, order book data, and sentiment data.
- **Data Processing**: Cleans, normalizes, and stores data efficiently using InfluxDB.
- **Feature Engineering**: Generates technical indicators, order book features, and sentiment scores.
- **Deep Reinforcement Learning**: Employs advanced architectures for trading decisions.
- **Execution Engine**: Integrates with Alpaca API for real trading execution.
- **Risk Management**: Implements advanced strategies including VaR calculations.
- **Streamlit UI**: Provides an interactive interface with multi-tabs for parameter tuning, monitoring, and management.
- **Deployment**: Includes setup scripts and configuration files for easy deployment.
- **Testing**: Contains unit tests to validate the functionality of each component.
## Setup and Installation
### Prerequisites
- Python 3.8+
- InfluxDB installed and running
- API keys for Alpaca and Twitter
### Installation Steps
1. **Clone the Repository**
```bash
git clone https://github.com/yourusername/hft_system.git
cd hft_system
This project is intended for educational and illustrative purposes only. Trading in financial markets carries significant risk, and this code does not constitute financial advice.
cat < requirements.txt
alpaca-trade-api==2.3.0
numpy==1.21.0
pandas==1.3.0
tweepy==4.3.0
ta==0.7.0
textblob==0.15.3
torch==1.9.0
stable-baselines3==1.1.0
deap==1.3.1
streamlit==1.2.0
plotly==5.3.1
python-dotenv==0.19.0
influxdb-client==1.18.0
nltk==3.6.5
transformers==4.12.0
PyYAML==5.4.1
EOL
echo "Project initialization complete. Please update the .env file with your API keys and credentials."
This implementation is intended for educational and illustrative purposes only. Trading in financial markets carries significant risk, and this code does not constitute financial advice. Users should conduct thorough research and consult professional financial advisors before deploying any trading system.