Created
October 23, 2022 14:48
-
-
Save Voyz/454d55f8f76c2b3b65e52708c9da886e to your computer and use it in GitHub Desktop.
Vertices of a timeseries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def timeseries_vertices(ser:pd.Series, distance=15, coeff=3): | |
name = ser.name | |
# remove intermediate vertices on lines | |
def no_lines(df): | |
df.loc[:, 'gt_before'] = df.loc[:, name] > df.loc[:, name].shift(1) | |
df.loc[:, 'gt_after'] = df.loc[:, name] > df.loc[:, name].shift(-1) | |
df.loc[:, 'is_peak'] = df.loc[:, 'gt_before'] == df.loc[:, 'gt_after'] | |
df = df.loc[df.loc[:, 'is_peak'], :].copy() | |
return df | |
# reduce peaks that have one or both sides short | |
def no_small_peaks(df, name, threshold, mutual): | |
df.loc[:, 'diff_before'] = df.loc[:, name] - df.loc[:, name].shift(1) | |
df.loc[:, 'diff_after'] = df.loc[:, name] - df.loc[:, name].shift(-1) | |
mod = df.loc[:, name].std() / threshold | |
if mutual: | |
df.loc[:, 'is_not_small'] = (abs(df.loc[:, 'diff_before']) > mod) & (abs(df.loc[:, 'diff_after']) > mod) | |
else: | |
df.loc[:, 'is_not_small'] = (abs(df.loc[:, 'diff_before']) > mod) | (abs(df.loc[:, 'diff_after']) > mod) | |
df = df.loc[df.loc[:, 'is_not_small']].copy() | |
return df | |
df = ser.to_frame() | |
df = no_lines(df) | |
df = no_small_peaks(df, name, 10, False) | |
df = no_lines(df) | |
df = no_small_peaks(df, name, 15, True) | |
df = no_small_peaks(df, name, 10, True) | |
df = no_lines(df) | |
downsampled = df.loc[:, name] | |
downsampled.loc[ser.index[0]] = ser.iloc[0] | |
downsampled.loc[ser.index[-1]] = ser.iloc[-1] | |
downsampled.sort_index(inplace=True) | |
downsampled = downsampled.reindex(ser.index, fill_value=np.nan).interpolate('index').ffill().bfill() | |
prominence = downsampled.std() / coeff | |
peaks, _ = find_peaks(downsampled, distance=distance) | |
ppeaks, _ = find_peaks(downsampled, distance=distance, prominence=prominence, width=3) | |
valleys, _ = find_peaks(downsampled * -1, distance=distance) | |
pvalleys, _ = find_peaks(downsampled * -1, distance=distance, prominence=prominence, width=3) | |
# if no peaks and valleys were found, lets return start - end | |
if len(ppeaks) == 0 or len(pvalleys) == 0: | |
ppeaks = [0 if ser.iloc[0] > ser.iloc[-1] else ser.shape[0]-1] | |
valleys = [ser.shape[0]-1 if ser.iloc[0] > ser.iloc[-1] else 0] | |
else: | |
# add first element | |
if ppeaks[0] < pvalleys[0]: | |
pvalleys = np.insert(pvalleys, 0, 0) | |
else: | |
ppeaks = np.insert(ppeaks, 0, 0) | |
vindex = np.concatenate([ppeaks, pvalleys]) | |
vertices = pd.DataFrame(index=vindex) | |
vertices.sort_index(inplace=True) | |
vertices.loc[ppeaks, 'peak'] = True | |
vertices.loc[pvalleys, 'peak'] = False | |
previous_index = vindex[0] | |
previous_is_peak = vertices.loc[vindex[0], 'peak'] | |
copy_peaks = [] | |
copy_valleys = [] | |
for index, row in vertices.iterrows(): | |
is_peak = row.loc['peak'] | |
if index != vindex[0]: | |
# if index - previous_index < distance: | |
# continue | |
if is_peak == previous_is_peak: | |
continue | |
if is_peak: | |
previous_is_peak = True | |
copy_peaks.append(index) | |
else: | |
previous_is_peak = False | |
copy_valleys.append(index) | |
previous_index = index | |
ppeaks = copy_peaks | |
pvalleys = copy_valleys | |
return list(peaks), list(ppeaks), list(valleys), list(pvalleys), downsampled |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment