Skip to content

Instantly share code, notes, and snippets.

@jtwyles
Last active April 27, 2023 06:18
Show Gist options
  • Save jtwyles/517becb2deebf9e3b2874d8c26c4c99f to your computer and use it in GitHub Desktop.
Save jtwyles/517becb2deebf9e3b2874d8c26c4c99f to your computer and use it in GitHub Desktop.
Calculate SMA and EMA in Python
#
# See how to use this script and what type of input to use at:
# https://steemit.com/cryptocurrency/@jwyles/moving-averages-sma-ema-how-to-use-and-how-to-calculate-with-spreadsheet-and-code-snippets
#
import csv
import re
from functools import reduce
from dateutil import parser
CSV_FILE = 'input.csv'
EMA_LENGTH = 5
EMA_SOURCE = 'close'
candles = []
# Reads the input file and saves to `candles` all the candles found. Each candle is
# a dict with the timestamp and the OHLC values.
def read_candles():
with open(CSV_FILE, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
for row in reader:
try:
candles.append({
'ts': parser.parse(row[0]),
'low': float(re.sub(",", "", row[1])),
'high': float(re.sub(",", "", row[2])),
'open': float(re.sub(",", "", row[3])),
'close': float(re.sub(",", "", row[4]))
})
except:
print('Error parsing {}'.format(row))
# Calculates the SMA of an array of candles using the `source` price.
def calculate_sma(candles, source):
length = len(candles)
sum = reduce((lambda last, x: { source: last[source] + x[source] }), candles)
sma = sum[source] / length
return sma
# Calculates the EMA of an array of candles using the `source` price.
def calculate_ema(candles, source):
length = len(candles)
target = candles[0]
previous = candles[1]
# if there is no previous EMA calculated, then EMA=SMA
if 'ema' not in previous or previous['ema'] == None:
return calculate_sma(candles, source)
else:
# multiplier: (2 / (length + 1))
# EMA: (close - EMA(previous)) x multiplier + EMA(previous)
multiplier = 2 / (length + 1)
ema = (target[source] * multiplier) + (previous['ema'] * (1 - multiplier))
# Formula updated from the original one to be clearer, both give the same results. Old formula:
# ema = ((target[source] - previous['ema']) * multiplier) + previous['ema']
return ema
def calculate(candles, source):
sma = calculate_sma(candles, source)
ema = calculate_ema(candles, source)
candles[0]['sma'] = sma
candles[0]['ema'] = ema
if __name__ == '__main__':
read_candles()
# progress through the array of candles to calculate the indicators for each
# block of candles
position = 0
while position + EMA_LENGTH <= len(candles):
current_candles = candles[position:(position+EMA_LENGTH)]
current_candles = list(reversed(current_candles))
calculate(current_candles, EMA_SOURCE)
position += 1
for candle in candles:
if 'sma' in candle:
print('{}: sma={} ema={}'.format(candle['ts'], candle['sma'], candle['ema']))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment