Created
February 12, 2020 12:26
-
-
Save gmaze/0b45d925c8492e0c0daaed654e2e1965 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#~/usr/bin/env python | |
# | |
# Useful functions for xarray time series analysis | |
# (c) G. Maze, Ifremer | |
# | |
import numpy as np | |
import xarray as xr | |
import pandas as pd | |
from statsmodels.tsa.seasonal import seasonal_decompose | |
from scipy.signal import detrend, butter, lfilter | |
from sklearn.linear_model import LinearRegression | |
from scipy import signal | |
from scipy import interpolate | |
from scipy.fftpack import rfft, irfft, fftfreq | |
import holoviews as hv | |
from holoviews.streams import Stream, param, Params | |
import panel as pn | |
def vrange(x): | |
return np.nanmin(x), np.nanmax(x) | |
def apply_to_two_periods(da, periods=[1, 2], reduce=np.mean, apply=np.subtract): | |
"""For a given year, reduce to 2 periods and then apply a function to them | |
Ex: Compute mean(JJA) minus mean(JFM) | |
apply_to_two_periods(da, periods=[[6,7,8] , [1,2,3]], reduce=np.mean, apply=np.subtract) | |
""" | |
perA = periods[0] | |
perB = periods[1] | |
iperA = np.isin(da['time.month'].values, perA, assume_unique=False) | |
iperB = np.isin(da['time.month'].values, perB, assume_unique=False) | |
valA = da.isel(time=iperA).reduce(reduce) | |
valB = da.isel(time=iperB).reduce(reduce) | |
return apply(valA, valB) | |
def apply_to_one_period(da, period=[1, 2, 3], reduce=np.mean): | |
"""For a given year, reduce to 1 period and then apply a function to it | |
Ex: | |
Compute annual time series with mean(JJA): | |
apply_to_one_period(da, period=[6,7,8], reduce=np.mean) | |
""" | |
iperA = np.isin(da['time.month'].values, period, assume_unique=False) | |
return da.isel(time=iperA).reduce(reduce) | |
def monthly_to_annual(da, window=None, fct=np.mean, periods=[1, 2], center='01-01'): | |
"""Sub-sample an inter-annual monthly time series using specific operators and time windows | |
Examples: | |
Annual maximum: | |
monthly_to_annual(da, window='year', fct=np.max) | |
Annual maximum over JFM only: | |
monthly_to_annual(da, window='period', periods=[1,2,3], fct=np.max) | |
Annual mean(JJA): | |
monthly_to_annual(da, window='period', periods=[6,7,8], fct=np.mean) | |
Annual mean(JJA) - mean(JFM): | |
monthly_to_annual(da, window='delta', periods=[[6,7,8],[1,2,3]], fct=np.mean) | |
March of the following year versus September of this year: | |
monthly_to_annual(da, window='forward', periods=[9,3], fct=np.mean) | |
This is like: f(Y+1,Mar.) - f(Y,Sep.), output centered on Y | |
JFM of this year versus JJA of the previous year: | |
monthly_to_annual(da, window='backward', periods=[[6,7,8],[1,2,3]], fct=np.mean) | |
This is like: f(Y,Jan./Feb./Mar.) - f(Y-1,Jun./Jul./Aug.), output centered on Y | |
New time series on centered on January 1st by default, you change to another | |
day of the year with 'center': | |
monthly_to_annual(da, window='year', center='06-15') # Center on June 15th | |
gmaze@ifremer.fr | |
""" | |
# New time axis centered on January 1st: | |
new_time_axis = np.arange(np.datetime64(pd.to_datetime(str(da['time'].values[0])).strftime('%Y')), | |
np.datetime64(pd.to_datetime(str(da['time'].values[-1])).strftime('%Y'))\ | |
+ np.timedelta64(1, '1Y'), | |
np.timedelta64(1, '1Y')) | |
# center = '06-01' # June 1st | |
new_time_axis = np.array( | |
[np.datetime64(pd.to_datetime(i + '-' + center)) for i in pd.to_datetime(new_time_axis).strftime('%Y')]) | |
if window == 'year': | |
# By year: | |
x_sub = da.groupby('time.year').apply(fct) | |
x_sub['year'] = new_time_axis | |
elif window == 'period': | |
# By year and period for each years: | |
x_sub = da.groupby('time.year').apply(apply_to_one_period, period=periods, reduce=fct) | |
x_sub['year'] = new_time_axis | |
elif window == 'delta': | |
# By year and 2 periods for each years: | |
x_sub = da.groupby('time.year').apply(apply_to_two_periods, periods=periods, reduce=np.mean, apply=np.subtract) | |
x_sub['year'] = new_time_axis | |
elif window == 'forward': | |
# By year/period and the next year/period: | |
x_center = da.groupby('time.year').apply(apply_to_one_period, period=periods[0], reduce=fct) | |
x_forward = da.groupby('time.year').apply(apply_to_one_period, period=periods[1], reduce=fct) | |
x_forward = x_forward[1:] | |
x_forward['year'] = x_forward['year'] - 1 | |
x_sub = x_forward - x_center | |
x_sub['year'] = new_time_axis[0:-1] | |
elif window == 'backward': | |
# By year/period and the previous year/period: | |
x_backward = da.groupby('time.year').apply(apply_to_one_period, period=periods[0], reduce=fct) | |
x_backward = x_backward[0:-1] | |
x_backward['year'] = x_backward['year'] + 1 | |
x_center = da.groupby('time.year').apply(apply_to_one_period, period=periods[1], reduce=fct) | |
x_sub = x_center - x_backward | |
x_sub['year'] = new_time_axis[1:] | |
else: | |
raise Exception('Unknown window code: {}'.format(window)) | |
# Adjust time axis: | |
x_sub = x_sub.rename({'year': 'time'}) | |
return x_sub | |
def get_seascyc(da, freq='M', tile=True): | |
""" Compute a typical seasonal cycle from a monthly xr.DataArray time series | |
The Seas.Cyc. is computed as the mean of eah months for which annual means | |
have been removed. | |
Input | |
----- | |
xr.DataArray | |
A monthly time series with a 'time' axis | |
Returns | |
------- | |
xr.DataArray | |
the typical seasonal cycle repeated along the orginial time axis | |
gmaze@ifremer.fr | |
""" | |
ts = da.copy(deep=True) # Working copy of the observed time series | |
# Remove annual means: | |
yr_mean = ts.groupby('time.year').mean('time') # Annual mean time series | |
ts_yan = ts.values # To hold the time series with annual means removed | |
years = [int(str(i)) for i in da['time'].values.astype('datetime64[Y]')] | |
for y in yr_mean['year']: | |
v = yr_mean.sel(year=y).values | |
mask = np.argwhere(years == y.values) | |
ts_yan[mask] = ts_yan[mask] - np.tile(v, mask.shape) | |
ts_yan = xr.DataArray(ts_yan, dims='time', coords={'time': ts['time'].values}) | |
# Compute typical seasonal cycle: | |
if freq == 'M': | |
sc = ts_yan.groupby('time.month').mean('time') | |
seq = ts['time'].values.astype('datetime64[M]').astype(int) % 12 + 1 | |
elif freq == 'W': | |
sc = ts_yan.groupby('time.week').mean('time') | |
seq = ts['time'].values.astype('datetime64[W]').astype(int) % 52 + 1 | |
elif freq == 'D': | |
sc = ts_yan.groupby('time.dayofyear').mean('time') | |
seq = ts['time'].values.astype('datetime64[D]').astype(int) % 365 + 1 | |
# Broadcast to full time series: | |
if tile: | |
ts_sc = np.empty((len(ts['time']),)) | |
axis = sc[sc.dims[0]].values | |
for this, ii in zip(axis, np.arange(0, len(axis))): | |
fill_value = sc[ii].values | |
mask = np.argwhere(seq == this) | |
ts_sc[mask] = np.tile(fill_value, len(mask))[:, np.newaxis] | |
# Output | |
return xr.DataArray(ts_sc, dims='time', coords={'time': ts['time'].values}).rename('seasonal') | |
else: | |
# Output | |
return sc.rename('seasonal') | |
def freq_to_scperiod(freq): | |
"""Return one annual cycle length for a given Pandas time index | |
Parameters | |
---------- | |
freq : str or offset | |
Frequency to convert | |
Returns | |
------- | |
period : int | |
Periodicity of freq | |
Notes | |
----- | |
Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52. | |
https://github.com/statsmodels/statsmodels/issues/2856 | |
""" | |
freq = freq.rule_code.upper() | |
if freq == 'A' or freq.startswith(('A-', 'AS-')): | |
return 1 | |
elif freq == 'Q' or freq.startswith(('Q-', 'QS-')): | |
return 4 | |
elif freq == 'M' or freq.startswith(('M-', 'MS')): | |
return 12 | |
elif freq == 'W' or freq.startswith('W-'): | |
return 52 | |
elif freq == 'D': | |
return 365 | |
elif freq == 'H': | |
return 365 * 24 | |
else: # pragma : no cover | |
raise ValueError("freq {} not understood. Please report if you " | |
"think this is in error.".format(freq)) | |
def decompose_ts(da, freq='M'): | |
"""Seasonal decomposition using moving averages of a xr.DataArray time series | |
Input | |
----- | |
xr.DataArray | |
A time series with a 'time' axis. | |
freq: str | |
Time series frequency: 'Y': annual M': monthly, 'D': daily | |
Returns | |
------- | |
xr.DataSet | |
A dataset with the input time series additive decomposition | |
""" | |
# Construct a pandas time series with the xarray.DataArray : | |
dt, c = '30 days', 1 | |
if freq is 'Y': dt, c = '365 day', 1 | |
if freq is 'D': dt, c = '1 day', 0 | |
index = pd.date_range( | |
pd.to_datetime(str(da['time'].values[0])).strftime('%Y-%m-%d'), | |
(pd.to_datetime(str(da['time'].values[-1])) + c * pd.Timedelta(dt)).strftime('%Y-%m-%d'), freq=freq) | |
data = pd.Series(da.values, index=index) | |
# Then decompose: | |
result = seasonal_decompose(data, model='additive', extrapolate_trend='freq', | |
freq=freq_to_scperiod(data.index.freq)) | |
# Create a new xarray.dataset with all signals: | |
r = result.trend.to_xarray().rename('trend').to_dataset() | |
r['seasonal'] = result.seasonal.to_xarray() | |
r['resid'] = result.resid.to_xarray() | |
r['observed'] = result.observed.to_xarray() | |
r = r.rename({'index': 'time'}) | |
return r | |
def decompose_tsia(da, freq='M', cutoff=5): | |
"""Additive decomposition of a xr.DataArray time series | |
Decompose a time series into a: | |
- Seasonal Cycle | |
- Linear trend | |
- Low Frequency (lower than cutoff) | |
- Interannual (from 1 year to cutoff) | |
- Residual | |
Filtering is done with a simple moving average | |
Returns | |
------- | |
xr.DataSet | |
A dataset with the input time series additive decomposition | |
gmaze@ifremer.fr | |
""" | |
# Seasonal cycle length for a given frequency: | |
if freq == 'Y': w = 1 | |
if freq == 'M': w = 12 | |
if freq == 'W': w = 52 | |
if freq == 'D': w = 365 | |
# Init output DataSet: | |
RES = xr.DataArray(da.values, | |
dims='time', | |
coords={'time': da['time'].values}, | |
name='observed').to_dataset() | |
# 1st extract the seasonal cycle: | |
if freq != 'Y': | |
RES['seasonal'] = get_seascyc(da, freq=freq, tile=True) | |
else: | |
RES['seasonal'] = xr.DataArray(np.zeros_like(RES['observed'].values), dims='time') | |
# 2nd: Remove linear trend: | |
y = RES['observed'].values - RES['seasonal'].values | |
x = np.arange(0, len(y)) | |
iok = np.argwhere(~np.isnan(y)) | |
model = LinearRegression().fit(x[iok], y[iok]) | |
y_pred = model.predict(x[:, np.newaxis]).squeeze() | |
RES['linear_trend'] = xr.DataArray(y_pred, dims='time') | |
# 3rd: Very Low Frequency filter out above 'cutoff' years: | |
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values | |
y_pred = xr.DataArray(y, dims='time').rolling(time=cutoff * w, center=True).mean() | |
RES['low_freq'] = y_pred | |
# 4th: Interannual | |
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \ | |
- RES['low_freq'].values | |
y_pred = xr.DataArray(y, dims='time').rolling(time=w, center=True).mean() | |
RES['interannual'] = y_pred | |
# Residual: | |
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \ | |
- RES['low_freq'].values - RES['interannual'].values | |
y_pred = y # Residual ! | |
RES['residual'] = xr.DataArray(y_pred, dims='time') | |
# Return new xarray.dataset with all signal components: | |
return RES | |
def plot_decompose_tsia(tsa, grid='split', width=600, height=200): | |
w, h = width, height | |
if 'interannual' in tsa: | |
if grid == 'split': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h) | |
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
+ tsa['interannual'].hvplot(label='Interannual').opts(show_grid=True, width=w, height=h) | |
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h) | |
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
if grid == 'classic': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h) | |
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h) | |
* tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
* tsa['interannual'].hvplot(label='Interannual').opts(show_grid=True, width=w, height=h) | |
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
if grid == 'pair': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
* tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h) | |
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
* tsa['interannual'].hvplot(label='Interannual').opts(show_grid=True, width=w, height=h) | |
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h) | |
* tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
if grid == 'mix': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
* (tsa['seasonal'] + tsa['linear_trend']).hvplot(label='Linear Trend + Seasonal').opts(show_grid=True, | |
width=w, | |
height=h) | |
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
else: | |
if grid == 'split': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h) | |
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h) | |
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
if grid == 'classic': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h) | |
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h) | |
* tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
if grid == 'pair': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
* tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h) | |
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h) | |
* tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
if grid == 'mix': | |
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h) | |
* (tsa['seasonal'] + tsa['linear_trend']).hvplot(label='Linear Trend + Seasonal').opts(show_grid=True, | |
width=w, | |
height=h) | |
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h) | |
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h) | |
) | |
layout.cols(2) | |
return layout | |
class tsia_decomposer: | |
""" Time series decomposition """ | |
def __init__(self, \ | |
filter_type='Hanning', \ | |
window_size=5 * 12, \ | |
cutoff=10 * 12, \ | |
windowIA_size=2 * 12, \ | |
cutoffIA=1 * 12, \ | |
freq='M', \ | |
fill_borders=False): | |
self.filter_type = filter_type | |
self.window_size = window_size | |
self.cutoff = cutoff | |
self.windowIA_size = windowIA_size | |
self.cutoffIA = cutoffIA | |
self.freq = freq | |
self.fill = fill_borders | |
def low_pass_weights_ma(self, window): | |
w = np.repeat(1.0, window) / window | |
return w | |
def low_pass_weights_hanning(self, window): | |
w = signal.hann(window) | |
w = w / sum(w) | |
return w | |
def low_pass_weights_lanczos(self, window, cutoff): | |
order = ((window - 1) // 2) + 1 | |
nwts = 2 * order + 1 | |
w = np.zeros([nwts]) | |
n = nwts // 2 | |
w[n] = 2 * cutoff | |
k = np.arange(1., n) | |
sigma = np.sin(np.pi * k / n) * n / (np.pi * k) | |
firstfactor = np.sin(2. * np.pi * cutoff * k) / (np.pi * k) | |
w[n - 1:0:-1] = firstfactor * sigma | |
w[n + 1:-1] = firstfactor * sigma | |
return w[1:-1] | |
def low_pass_fft(self, y, window): | |
sig = y.values | |
iok = ~np.isnan(sig) | |
sig = sig[iok] | |
N, fs = sig.size, 1 | |
tim = np.arange(N) / fs | |
W = fftfreq(sig.size, d=tim[1] - tim[0]) | |
f_signal = rfft(sig) | |
cut_f_signal = f_signal.copy() | |
cut_f_signal[(W > 1 / window)] = 0 | |
cut_signal = irfft(cut_f_signal) | |
y.values[iok] = cut_signal | |
return y | |
def low_pass_butter(self, y, window): | |
sig = y.values | |
iok = ~np.isnan(sig) | |
sig = sig[iok] | |
fs = 1 | |
cutoff_month = 1 / window | |
w = cutoff_month / (fs / 2) # Normalize the frequency | |
b, a = signal.butter(5, w, 'low') | |
cut_signal = signal.filtfilt(b, a, sig) | |
y.values[iok] = cut_signal | |
return y | |
def low_pass(self, y, window=12, cutoff=1 / 60.): | |
# Get weights: | |
if self.filter_type == 'Moving Average': | |
wgts = self.low_pass_weights_ma(window) | |
# Apply weights with xarray: | |
weight = xr.DataArray(wgts, dims=['window']) | |
y_pred = y.rolling(time=len(wgts), center=True).construct('window').dot(weight) | |
if self.filter_type == 'Lanczos': | |
wgts = self.low_pass_weights_lanczos(window, cutoff) | |
# Apply weights with xarray: | |
weight = xr.DataArray(wgts, dims=['window']) | |
y_pred = y.rolling(time=len(wgts), center=True).construct('window').dot(weight) | |
if self.filter_type == 'Hanning': | |
wgts = self.low_pass_weights_hanning(window) | |
# Apply weights with xarray: | |
weight = xr.DataArray(wgts, dims=['window']) | |
y_pred = y.rolling(time=len(wgts), center=True).construct('window').dot(weight) | |
if self.filter_type == 'FFT': | |
y_pred = self.low_pass_fft(y, window) | |
if self.filter_type == 'Butterworth': | |
y_pred = self.low_pass_butter(y, window) | |
# Recenter around the initial time mean: | |
#y_pred = y_pred - y_pred.mean()+y.mean() | |
return y_pred | |
def fill_borders(self, d, method='spline'): | |
x = np.arange(0, len(d.values)) | |
iok = np.argwhere(~np.isnan(d.values)) | |
if self.freq == 'M': c = 12 | |
if self.freq == 'Y': c = 1 | |
# xi = np.concatenate((np.array((-12*2,)),x,np.array((np.max(x)+12*2,))),axis=0) | |
# yi = np.concatenate((np.array((d.values[iok[0]])),d.values,np.array((d.values[iok][-1]))),axis=0) | |
xi = np.concatenate((np.array((-c * 10,)), np.array((-c * 5,)), x, np.array((np.max(x) + c * 5,)), | |
np.array((np.max(x) + c * 10,))), axis=0) | |
yi = np.concatenate((np.array((d.values[iok[0]])), np.array((d.values[iok[0]])), d.values, | |
np.array((d.values[iok][-1])), np.array((d.values[iok][-1]))), axis=0) | |
ioki = np.argwhere(~np.isnan(yi)) | |
if method == 'spline': | |
s = interpolate.InterpolatedUnivariateSpline(xi[ioki], yi[ioki]) | |
yi = s(xi) | |
# d.values = yi[1:-1] | |
d.values = yi[2:-2] | |
if method == 'poly': | |
# poly = np.polyfit(x[iok].squeeze(), d.values[iok], deg=5) | |
# yi = np.polyval(poly, x) | |
# d.values = yi | |
poly = np.polyfit(xi[ioki], yi[ioki], deg=5) | |
yi = np.polyval(poly, xi) | |
d.values = yi[2:-2] | |
return d | |
def fit_predict(self, da): | |
# Init output DataSet: | |
RES = xr.DataArray(da.values, | |
dims='time', | |
coords={'time': da['time'].values}, | |
name='observed').to_dataset() | |
# 1st extract the seasonal cycle: | |
if self.freq != 'Y': | |
RES['seasonal'] = get_seascyc(da, freq=self.freq, tile=True) | |
else: | |
RES['seasonal'] = xr.DataArray(np.zeros_like(RES['observed'].values), dims='time') | |
# 2nd: Remove linear trend: | |
y = RES['observed'].values - RES['seasonal'].values | |
x = np.arange(0, len(y)) | |
iok = np.argwhere(~np.isnan(y)) | |
model = LinearRegression().fit(x[iok], y[iok]) | |
y_pred = model.predict(x[:, np.newaxis]).squeeze() | |
RES['linear_trend'] = xr.DataArray(y_pred, dims='time') | |
# 3rd: Very Low Frequency filter out above X years: | |
y = RES['observed'] - RES['seasonal'] - RES['linear_trend'] | |
y_pred = self.low_pass(y, window=self.window_size, cutoff=1 / self.cutoff) | |
if self.fill: | |
y_pred = self.fill_borders(y_pred, method='spline') # Extrapolate start/end points | |
RES['low_freq'] = y_pred | |
if self.windowIA_size != np.Inf: | |
# 4th: Interannual | |
y = RES['observed'] - RES['seasonal'] - RES['linear_trend'] - RES['low_freq'] | |
y_pred = self.low_pass(y, window=self.windowIA_size, cutoff=1 / self.cutoffIA) | |
if self.fill: | |
y_pred = self.fill_borders(y_pred, method='spline') # Extrapolate start/end points | |
RES['interannual'] = y_pred | |
# Residual: | |
if self.windowIA_size != np.Inf: | |
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \ | |
- RES['low_freq'].values - RES['interannual'].values | |
else: | |
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \ | |
- RES['low_freq'].values | |
y_pred = y # Residual ! | |
RES['residual'] = xr.DataArray(y_pred, dims='time') | |
return RES | |
class VariableExplorer_2comp(param.Parameterized): | |
preprocessing_rolling_mean_width = param.Integer(default=3, bounds=(0, 6), doc="Pre-processing") | |
low_frequency_window_size = param.Integer(default=4 * 12, bounds=(1, 12 * 5)) | |
interannual_window_size = param.Integer(default=1 * 12, bounds=(1, 12 * 5)) | |
variable = param.ObjectSelector(default="", objects=list()) | |
filter_type = param.ObjectSelector(default='Butterworth', objects=['Moving Average', 'Hanning', 'Butterworth']) | |
def __init__(self, ds, freq='M', default="", windows=[3, 4 * 12, 1 * 12], width=850, height=200, ncols=1): | |
self.ds = ds | |
self.freq = freq | |
self.geo = (ncols,width,height) | |
self.param['preprocessing_rolling_mean_width'].default = windows[0] | |
self.param['low_frequency_window_size'].default = windows[1] | |
self.param['interannual_window_size'].default = windows[2] | |
Variable_list = list(ds.data_vars) | |
self.param['variable'].objects = Variable_list | |
if default == "": | |
self.param['variable'].default = Variable_list[0] | |
else: | |
self.param['variable'].default = Variable_list[Variable_list.index(default)] | |
@param.depends('variable', 'low_frequency_window_size', 'interannual_window_size', 'filter_type', | |
'preprocessing_rolling_mean_width') | |
def load_variable(self): | |
da = self.ds[self.variable].copy() | |
da.values = da.values - np.mean(da.values) | |
# if 'volume' in self.variable: | |
# da.values = da.values / svy | |
# da.attrs['units'] = 'Svy' | |
if 'units' not in da.attrs: | |
da.attrs['units'] = '[?]' | |
if 'long_name' not in da.attrs: | |
da.attrs['long_name'] = self.variable | |
# Pre-processing: | |
if self.preprocessing_rolling_mean_width > 0: | |
da.values = (da.rolling(time=self.preprocessing_rolling_mean_width, center=True).mean()).values | |
# Decomposition: | |
RES = tsia_decomposer(filter_type=self.filter_type, | |
window_size=self.low_frequency_window_size, | |
windowIA_size=self.interannual_window_size, | |
freq=self.freq, | |
fill_borders=False).fit_predict(da) | |
# Now compose a Plot: | |
# y_obs = {'Time': da['time'].values, da.units: RES['observed'].values} | |
y_obs = {'Time': da['time'].values, da.units: RES['observed'].values - RES['seasonal'].values} | |
y_seas = {'Time': da['time'].values, da.units: RES['seasonal'].values} | |
y_ltrn = {'Time': da['time'].values, da.units: RES['linear_trend'].values} | |
# y_lowf = {'Time': da['time'].values, da.units: RES['low_freq'].values} | |
y_lowf = {'Time': da['time'].values, da.units: RES['linear_trend'].values + RES['low_freq'].values} | |
y_inta = {'Time': da['time'].values, da.units: RES['interannual'].values} | |
y_resd = {'Time': da['time'].values, da.units: RES['residual'].values} | |
ydim = hv.Dimension(da.units, range=vrange(y_obs[da.units])) | |
cv_obs = hv.Curve(y_obs, 'Time', ydim, label='Observation - Seas. Cyc.', group=da.long_name).opts(color='gray', | |
framewise=True) | |
# cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange', framewise=True) | |
cv_lowf = hv.Curve(y_lowf, 'Time', ydim, label='Linear Trend + Low Freq.', group=da.long_name).opts(color='red', | |
framewise=True) | |
cv_inta = hv.Curve(y_inta, 'Time', ydim, label='Interannual', group=da.long_name).opts(color='blue', | |
framewise=True) | |
# ydim = hv.Dimension(da.units, range=vrange(y_lowf[da.units])) | |
ydim = hv.Dimension(da.units, range=vrange(y_seas[da.units])) | |
cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange', | |
framewise=True) | |
cv_seas = hv.Curve(y_seas, 'Time', ydim, label='Seasonal', group=da.long_name).opts(color='blue', | |
framewise=True) | |
cv_resd = hv.Curve(y_resd, 'Time', ydim, label='Residual', group=da.long_name).opts(color='gray', | |
framewise=True) | |
w,h = self.geo[1], self.geo[2] | |
# layout = (cv_obs * cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w) | |
# + cv_resd * cv_seas) | |
# layout.cols(2) | |
# layout = (cv_obs * cv_ltrn * cv_seas.opts(show_grid=True, framewise=True, width=w) | |
# + cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w) | |
# + cv_resd.opts(show_grid=True, framewise=True, width=w)) | |
# layout.cols(1) | |
# layout = (cv_obs | |
# * cv_lowf * cv_inta.opts(show_grid=True, framewise=True, width=w) | |
# + cv_ltrn * cv_seas * cv_resd.opts(show_grid=True, framewise=True, width=w)) | |
layout = hv.Layout(( | |
hv.Overlay( (cv_obs, cv_lowf, cv_inta) ).opts(show_grid=True, framewise=True, width=w)\ | |
.opts(toolbar='above', legend_position='top'),\ | |
hv.Overlay( (cv_ltrn, cv_seas, cv_resd) ).opts(show_grid=True, framewise=True, width=w)\ | |
.opts(toolbar='above', legend_position='bottom') | |
)) | |
layout.cols(self.geo[0]) | |
return layout | |
class VariableExplorer_1comp(param.Parameterized): | |
preprocessing_rolling_mean_width = param.Integer(default=3, bounds=(0, 6), doc="Pre-processing") | |
low_frequency_window_size = param.Integer(default=1 * 12, bounds=(1, 12 * 10)) | |
variable = param.ObjectSelector(default="", objects=list()) | |
filter_type = param.ObjectSelector(default='Butterworth', objects=['Moving Average', 'Hanning', 'Butterworth']) | |
def __init__(self, ds, freq='M', default="", windows=[3, 1 * 12], width=850, height=200, ncols=1): | |
self.ds = ds | |
self.freq = freq | |
self.geo = (ncols,width,height) | |
self.param['preprocessing_rolling_mean_width'].default = windows[0] | |
self.param['low_frequency_window_size'].default = windows[1] | |
Variable_list = list(ds.data_vars) | |
self.param['variable'].objects = Variable_list | |
if default == "": | |
self.param['variable'].default = Variable_list[0] | |
else: | |
self.param['variable'].default = Variable_list[Variable_list.index(default)] | |
@param.depends('variable', 'low_frequency_window_size', 'filter_type', | |
'preprocessing_rolling_mean_width') | |
def load_variable(self): | |
da = self.ds[self.variable].copy() | |
da.values = da.values - np.mean(da.values) | |
if 'units' not in da.attrs: | |
da.attrs['units'] = '[?]' | |
if 'long_name' not in da.attrs: | |
da.attrs['long_name'] = self.variable | |
# Pre-processing: | |
if self.preprocessing_rolling_mean_width > 0: | |
da.values = (da.rolling(time=self.preprocessing_rolling_mean_width, center=True).mean()).values | |
# Decomposition: | |
RES = tsia_decomposer(filter_type=self.filter_type, | |
window_size=self.low_frequency_window_size, | |
windowIA_size=np.Inf, | |
freq=self.freq, | |
fill_borders=False).fit_predict(da) | |
# Now compose a Plot: | |
# y_obs = {'Time': da['time'].values, da.units: RES['observed'].values} | |
y_obs = {'Time': da['time'].values, da.units: RES['observed'].values - RES['seasonal'].values} | |
y_seas = {'Time': da['time'].values, da.units: RES['seasonal'].values} | |
y_ltrn = {'Time': da['time'].values, da.units: RES['linear_trend'].values} | |
# y_lowf = {'Time': da['time'].values, da.units: RES['low_freq'].values} | |
y_lowf = {'Time': da['time'].values, da.units: RES['linear_trend'].values + RES['low_freq'].values} | |
y_resd = {'Time': da['time'].values, da.units: RES['residual'].values} | |
ydim = hv.Dimension(da.units, range=vrange(y_obs[da.units])) | |
cv_obs = hv.Curve(y_obs, 'Time', ydim, label='Observation - Seas. Cyc.', group=da.long_name).opts(color='gray', | |
framewise=True) | |
# cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange', framewise=True) | |
cv_lowf = hv.Curve(y_lowf, 'Time', ydim, label='Linear Trend + Low Freq.', group=da.long_name).opts(color='red', | |
framewise=True) | |
ydim = hv.Dimension(da.units, range=vrange(y_seas[da.units])) | |
cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange', | |
framewise=True) | |
cv_seas = hv.Curve(y_seas, 'Time', ydim, label='Seasonal', group=da.long_name).opts(color='blue', | |
framewise=True) | |
cv_resd = hv.Curve(y_resd, 'Time', ydim, label='Residual', group=da.long_name).opts(color='gray', | |
framewise=True) | |
w,h = self.geo[1], self.geo[2] | |
# layout = (cv_obs * cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w) | |
# + cv_resd * cv_seas) | |
# layout.cols(2) | |
# layout = (cv_obs * cv_ltrn * cv_seas.opts(show_grid=True, framewise=True, width=w) | |
# + cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w) | |
# + cv_resd.opts(show_grid=True, framewise=True, width=w)) | |
# layout.cols(1) | |
layout = hv.Layout(( | |
hv.Overlay( (cv_obs,cv_lowf) ).opts(show_grid=True, framewise=True, width=w)\ | |
.opts(toolbar='above', legend_position='top'),\ | |
hv.Overlay( (cv_seas,cv_resd) ).opts(show_grid=True, framewise=True, width=w)\ | |
.opts(toolbar='above', legend_position='bottom') | |
)) | |
layout.cols(self.geo[0]) | |
return layout | |
def dashboard(*args, **kwargs): | |
"""Open an interactive dashboard to visualise time decomposition of xr.DataSet time series | |
Examples: | |
dashboard(ds, default='ohc', windows=[6,48,8]) | |
dashboard(ds, freq='M', default='MW_volume', windows=[3,48,12], ncols=1, width=1200) | |
""" | |
if 'windows' in kwargs: | |
windows = kwargs.get('windows') | |
if len(windows)==2: | |
explorer = VariableExplorer_1comp(*args, **kwargs) | |
elif len(windows)==3: | |
explorer = VariableExplorer_2comp(*args, **kwargs) | |
var_dmap = hv.DynamicMap(explorer.load_variable) | |
panel = pn.Row(explorer.param, var_dmap) | |
return panel |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment