Skip to content

Instantly share code, notes, and snippets.

@ggalmazor
Created August 19, 2024 10:39
Show Gist options
  • Save ggalmazor/758bba14035f323267d0c3b6970ff552 to your computer and use it in GitHub Desktop.
Save ggalmazor/758bba14035f323267d0c3b6970ff552 to your computer and use it in GitHub Desktop.
Ruby script that compares downsampling algorithms that keep local extrema
# frozen_string_literal: true
require 'gnuplot'
require 'narray'
# Function to calculate the perpendicular distance from a point to a line
def perpendicular_distance_pip(x0, y0, x1, y1, xp, yp)
num = ((y1 - y0) * xp - (x1 - x0) * yp + x1 * y0 - y1 * x0).abs
denom = Math.sqrt((y1 - y0) ** 2 + (x1 - x0) ** 2)
num / denom
end
# PIP downsampling function
def pip_downsample(time, series, n_out)
return [time, series] if n_out >= time.size
pip_time = [time[0], time[-1]]
pip_series = [series[0], series[-1]]
while pip_time.size < n_out
max_dist = 0
index_to_add = nil
(0...time.size).each do |i|
next if pip_time.include?(time[i])
prev_index = pip_time.rindex { |t| t < time[i] }
next_index = pip_time.index { |t| t > time[i] }
x0 = pip_time[prev_index]
y0 = pip_series[prev_index]
x1 = pip_time[next_index]
y1 = pip_series[next_index]
x = time[i]
y = series[i]
dist = perpendicular_distance_pip(x0, y0, x1, y1, x, y)
if dist > max_dist
max_dist = dist
index_to_add = i
end
end
pip_time.insert(pip_time.index { |t| t > time[index_to_add] }, time[index_to_add])
pip_series.insert(pip_time.index(time[index_to_add]), series[index_to_add])
end
[pip_time, pip_series]
end
# LTTB downsampling function
def lttb_downsample(time, series, n_out)
return [time, series] if n_out >= time.size
bucket_size = (time.size.to_f / (n_out - 2)).ceil
downsampled_time = [time[0]]
downsampled_series = [series[0]]
(1...(n_out - 1)).each do |i|
bucket_start = (i - 1) * bucket_size
bucket_end = [i * bucket_size, time.size].min
next if bucket_end <= bucket_start # Skip if the bucket is invalid or empty
avg_x = time[bucket_start...bucket_end].sum / (bucket_end - bucket_start)
avg_y = series[bucket_start...bucket_end].sum / (bucket_end - bucket_start)
max_area = -1
selected_index = nil
(bucket_start...bucket_end).each do |j|
area = ((time[j] - downsampled_time[-1]) * (avg_y - downsampled_series[-1]) -
(avg_x - downsampled_time[-1]) * (series[j] - downsampled_series[-1])).abs
if area > max_area
max_area = area
selected_index = j
end
end
if selected_index
downsampled_time << time[selected_index]
downsampled_series << series[selected_index]
end
end
downsampled_time << time[-1]
downsampled_series << series[-1]
[downsampled_time, downsampled_series]
end
# Function to find local maxima and minima
def find_extremes_top_k(time, series)
peaks = []
troughs = []
(1...(series.size - 1)).each do |i|
if series[i] > series[i - 1] && series[i] > series[i + 1]
peaks << i
elsif series[i] < series[i - 1] && series[i] < series[i + 1]
troughs << i
end
end
[peaks, troughs]
end
# Top-K Significant Extremes Downsampling function
def top_k_extremes_downsample(time, series, n_out)
peaks, troughs = find_extremes_top_k(time, series)
extremes = peaks + troughs
if extremes.size + 2 <= n_out
selected_indices = extremes
else
prominences = extremes.map do |index|
left = index - 1
right = index + 1
[(series[index] - series[left]).abs, (series[index] - series[right]).abs].min
end
selected_indices = extremes.zip(prominences)
.sort_by { |_, prominence| -prominence }
.take(n_out - 2)
.map(&:first)
end
selected_indices = [0] + selected_indices.sort + [series.size - 1]
[selected_indices.map { |i| time[i] }, selected_indices.map { |i| series[i] }]
end
# Function to find local maxima and minima
def find_extremes_eps(time, series)
peaks = []
troughs = []
(1...(series.size - 1)).each do |i|
if series[i] > series[i - 1] && series[i] > series[i + 1]
peaks << i
elsif series[i] < series[i - 1] && series[i] < series[i + 1]
troughs << i
end
end
[peaks, troughs]
end
# EPS (Extreme Point Sampling) Downsampling function
def eps_downsample(time, series, n_out)
peaks, troughs = find_extremes_eps(time, series)
extremes = peaks + troughs
if extremes.size + 2 <= n_out
selected_indices = extremes
else
selected_indices = extremes.sample(n_out - 2)
end
selected_indices = [0] + selected_indices.sort + [series.size - 1]
[selected_indices.map { |i| time[i] }, selected_indices.map { |i| series[i] }]
end
# Function to find local maxima and minima
def find_extremes_ler(time, series)
peaks = []
troughs = []
(1...(series.size - 1)).each do |i|
if series[i] > series[i - 1] && series[i] > series[i + 1]
peaks << i
elsif series[i] < series[i - 1] && series[i] < series[i + 1]
troughs << i
end
end
[peaks, troughs]
end
# LER (Local Extrema Retention) Downsampling function
def ler_downsample(time, series, n_out, threshold = 0.05)
peaks, troughs = find_extremes_ler(time, series)
extremes = peaks + troughs
prominences = extremes.map do |index|
left = index - 1
right = index + 1
[(series[index] - series[left]).abs, (series[index] - series[right]).abs].min
end
significant_extremes = extremes.zip(prominences)
.select { |_, prominence| prominence >= threshold }
.map(&:first)
if significant_extremes.size + 2 <= n_out
selected_indices = significant_extremes
else
selected_indices = significant_extremes.sample(n_out - 2)
end
selected_indices = [0] + selected_indices.sort + [series.size - 1]
[selected_indices.map { |i| time[i] }, selected_indices.map { |i| series[i] }]
end
# Plotting the original and downsampled series
def plot(time, series, time_downsampled, series_downsampled, title, output_file)
Gnuplot.open do |gp|
Gnuplot::Plot.new(gp) do |plot|
plot.terminal "pngcairo size 800,600"
plot.output output_file
plot.title title
plot.xlabel "Time"
plot.ylabel "Value"
plot.grid
plot.data << Gnuplot::DataSet.new([time.to_a, series.to_a]) do |ds|
ds.with = "lines"
ds.title = "Original Series"
ds.linewidth = 1
ds.linecolor = "rgb '#FFB6C1'"
end
plot.data << Gnuplot::DataSet.new([time_downsampled, series_downsampled]) do |ds|
ds.with = "lines"
ds.title = "Downsampled Series"
ds.linewidth = 2
ds.linecolor = "rgb 'red'"
end
end
end
end
def perpendicular_distance_rdp(point, line_start, line_end)
x0, y0 = point
x1, y1 = line_start
x2, y2 = line_end
num = ((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1).abs
denom = Math.sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2)
num / denom
end
# RDP (Ramer-Douglas-Peucker) Downsampling function
def rdp(time, series, epsilon)
return [[time[0], series[0]], [time[-1], series[-1]]] if time.size < 3
line_start = [time[0], series[0]]
line_end = [time[-1], series[-1]]
max_distance = 0
index_of_max = 0
(1...time.size - 1).each do |i|
point = [time[i], series[i]]
distance = perpendicular_distance_rdp(point, line_start, line_end)
if distance > max_distance
max_distance = distance
index_of_max = i
end
end
if max_distance > epsilon
left_results = rdp(time[0..index_of_max], series[0..index_of_max], epsilon)
right_results = rdp(time[index_of_max..-1], series[index_of_max..-1], epsilon)
left_results[0...-1] + right_results
else
[[time[0], series[0]], [time[-1], series[-1]]]
end
end
# Function to generate a synthetic time series resembling stock prices
def generate_synthetic_series(length: 100, start_price: 100.0, volatility: 0.02, trend: 0.001)
series = NArray.float(length)
series[0] = start_price
(1...length).each do |i|
random_walk = (rand - 0.5) * 2 * volatility
series[i] = series[i - 1] * (1 + trend + random_walk)
end
series
end
# Parameters for the synthetic series
length = 500
start_price = 100.0
volatility = 0.02 # Volatility factor, higher values lead to more fluctuations
trend = 0.001 # Upward trend, positive values create an upward trend, negative for downward
# Generate the synthetic time series
time = NArray.sfloat(length).indgen!
series = generate_synthetic_series(length: length, start_price: start_price, volatility: volatility, trend: trend)
n_out = 50
# Downsample using PIP algorithm
pip_time, pip_series = pip_downsample(time.to_a, series.to_a, n_out)
plot(time, series, pip_time, pip_series, 'PIP (Perceptually Important Points) Downsampling', "pip.png")
# Downsample using LTTB algorithm
lttb_time, lttb_series = lttb_downsample(time.to_a, series.to_a, n_out)
plot(time, series, lttb_time, lttb_series, "LTTB (Largest-Triangle, Three Buckets) Downsampling", "lttb.png")
# Downsample using Top-K Significant Extremes algorithm
topk_time, topk_series = top_k_extremes_downsample(time.to_a, series.to_a, n_out)
plot(time, series, topk_time, topk_series, "Top-K Significant Extremes Downsampling", "topk.png")
# Downsample using EPS algorithm
eps_time, eps_series = eps_downsample(time.to_a, series.to_a, n_out)
plot(time, series, eps_time, eps_series, "EPS (Extreme Point Sampling) Downsampling", "eps.png")
# Downsample using LER algorithm
ler_time, ler_series = ler_downsample(time.to_a, series.to_a, n_out)
plot(time, series, ler_time, ler_series, "LER (Local Extrema Retention) Downsampling", "ler.png")
# Downsample using RDP algorithm with a chosen epsilon value
epsilon = 3
rdp_result = rdp(time.to_a, series.to_a, epsilon)
rdp_time, rdp_series = rdp_result.transpose
plot(time, series, rdp_time, rdp_series, "RDP (Ramer-Douglas-Peucker) Downsampling", "rdp_epsilon_#{epsilon}.png")
@ggalmazor
Copy link
Author

This code was generated by ChatGPT, with a few minor tweaks from me

@ggalmazor
Copy link
Author

This gist is linked at a blog post in my personal webpage 👉 https://ggalmazor.com/blog/evaluating_downsampling_algorithms.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment