Last active
April 27, 2023 06:18
-
-
Save jtwyles/517becb2deebf9e3b2874d8c26c4c99f to your computer and use it in GitHub Desktop.
Calculate SMA and EMA in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# See how to use this script and what type of input to use at: | |
# https://steemit.com/cryptocurrency/@jwyles/moving-averages-sma-ema-how-to-use-and-how-to-calculate-with-spreadsheet-and-code-snippets | |
# | |
import csv | |
import re | |
from functools import reduce | |
from dateutil import parser | |
CSV_FILE = 'input.csv' | |
EMA_LENGTH = 5 | |
EMA_SOURCE = 'close' | |
candles = [] | |
# Reads the input file and saves to `candles` all the candles found. Each candle is | |
# a dict with the timestamp and the OHLC values. | |
def read_candles(): | |
with open(CSV_FILE, 'r') as csvfile: | |
reader = csv.reader(csvfile, delimiter=',') | |
for row in reader: | |
try: | |
candles.append({ | |
'ts': parser.parse(row[0]), | |
'low': float(re.sub(",", "", row[1])), | |
'high': float(re.sub(",", "", row[2])), | |
'open': float(re.sub(",", "", row[3])), | |
'close': float(re.sub(",", "", row[4])) | |
}) | |
except: | |
print('Error parsing {}'.format(row)) | |
# Calculates the SMA of an array of candles using the `source` price. | |
def calculate_sma(candles, source): | |
length = len(candles) | |
sum = reduce((lambda last, x: { source: last[source] + x[source] }), candles) | |
sma = sum[source] / length | |
return sma | |
# Calculates the EMA of an array of candles using the `source` price. | |
def calculate_ema(candles, source): | |
length = len(candles) | |
target = candles[0] | |
previous = candles[1] | |
# if there is no previous EMA calculated, then EMA=SMA | |
if 'ema' not in previous or previous['ema'] == None: | |
return calculate_sma(candles, source) | |
else: | |
# multiplier: (2 / (length + 1)) | |
# EMA: (close - EMA(previous)) x multiplier + EMA(previous) | |
multiplier = 2 / (length + 1) | |
ema = (target[source] * multiplier) + (previous['ema'] * (1 - multiplier)) | |
# Formula updated from the original one to be clearer, both give the same results. Old formula: | |
# ema = ((target[source] - previous['ema']) * multiplier) + previous['ema'] | |
return ema | |
def calculate(candles, source): | |
sma = calculate_sma(candles, source) | |
ema = calculate_ema(candles, source) | |
candles[0]['sma'] = sma | |
candles[0]['ema'] = ema | |
if __name__ == '__main__': | |
read_candles() | |
# progress through the array of candles to calculate the indicators for each | |
# block of candles | |
position = 0 | |
while position + EMA_LENGTH <= len(candles): | |
current_candles = candles[position:(position+EMA_LENGTH)] | |
current_candles = list(reversed(current_candles)) | |
calculate(current_candles, EMA_SOURCE) | |
position += 1 | |
for candle in candles: | |
if 'sma' in candle: | |
print('{}: sma={} ema={}'.format(candle['ts'], candle['sma'], candle['ema'])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment