Поскольку положительные режимы начинаются с минимумов и заканчиваются пиками, а отрицательные режимы начинаться с пиков и заканчиваться минимумами, режимы никогда не должны перекрываться.
Ниже приведен иллюстрированный пример положительных и отрицательных режимов с порогом 2. Обратите внимание, что первый пик и минимум находятся в нейтральном режиме. режиме, поскольку до начала первого положительного режима не происходит снижения, превышающего порог 2. Однако окончательный отрицательный режим должен быть расширен до -0,80, поскольку -0,80 меньше, чем -0,73 (где он в настоящее время останавливается), и между текущим концом и правильным концом нет увеличения хотя бы на 2.
Как бы вы реализовали это на Python? Я пробовал использовать scipy.signal.find_peaks, но не смог заставить его работать, и мне кажется, что должен быть более простой способ, который мне не хватает.

#test.ipynb
import numpy as np
import pandas as pd
from regimes import plot_regimes, process_time_series
# Sample Data Generation
np.random.seed(0)
time = np.arange(0, 100, 1)
value = np.sin(np.linspace(0, 10 * np.pi, 100)) + np.random.normal(0, 1, 100)
df = pd.DataFrame({'Time': time, 'Value': value})
# Process the time series data
df_extrema, regimes_df, final_regimes_df = process_time_series(df, value_column='Value', threshold=2, time_column='Time')
# Plot the final regimes along with peaks and troughs
plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time')
# regimes.py
import pandas as pd
import numpy as np
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
def find_extrema(df, time_column='Time', value_column='Value'):
"""
Identifies peaks and troughs in the given time series data.
Parameters:
- df (pd.DataFrame): DataFrame containing time and value columns.
- time_column (str): Name of the column containing the time data.
- value_column (str): Name of the column containing the time series values.
Returns:
- df_extrema (pd.DataFrame): DataFrame with extrema points and their types.
"""
peaks, _ = find_peaks(df[value_column])
troughs, _ = find_peaks(-df[value_column])
extrema_indices = list(peaks) + list(troughs)
extrema_types = ['Peak'] * len(peaks) + ['Trough'] * len(troughs)
# Include first and last points as extrema
if 0 not in extrema_indices:
first_val = df.iloc[0][value_column]
second_val = df.iloc[1][value_column]
extrema_indices.append(0)
extrema_types.append('Peak' if first_val > second_val else 'Trough')
last_idx = len(df) - 1
if last_idx not in extrema_indices:
last_val = df.iloc[last_idx][value_column]
second_last_val = df.iloc[last_idx - 1][value_column]
extrema_indices.append(last_idx)
extrema_types.append('Peak' if last_val > second_last_val else 'Trough')
# Create the extrema DataFrame
extrema = sorted(extrema_indices)
sorted_extrema_types = [extrema_types[extrema_indices.index(idx)] for idx in extrema]
df_extrema = df.iloc[extrema].copy().reset_index(drop=True)
df_extrema['Type'] = sorted_extrema_types
return df_extrema
def label_regimes(df_extrema, threshold=0.25, value_column='Value', time_column='Time'):
"""
Labels regimes based on transitions between extrema points.
Parameters:
- df_extrema (pd.DataFrame): DataFrame containing extrema points and types.
- threshold (float): Minimum height change to consider as a regime.
- value_column (str): Column name for time series values.
- time_column (str): Column name for time values.
Returns:
- regimes_df (pd.DataFrame): DataFrame containing labeled regimes.
"""
regimes = []
for i in range(len(df_extrema) - 1):
current = df_extrema.iloc
next_ = df_extrema.iloc[i + 1]
height_change = next_[value_column] - current[value_column]
if abs(height_change) >= threshold:
regime_type = 'Positive' if current['Type'] == 'Trough' and next_['Type'] == 'Peak' else 'Negative'
regimes.append({
'Start_Time': current[time_column],
'End_Time': next_[time_column],
'Start_Type': current['Type'],
'End_Type': next_['Type'],
'Height_Change': height_change if regime_type == 'Positive' else -height_change,
'Regime_Type': regime_type
})
return pd.DataFrame(regimes)
def merge_consecutive_regimes(regimes_df, df, time_column='Time', value_column='Value'):
"""
Merges consecutive regimes of the same type.
"""
if regimes_df.empty:
return regimes_df.copy()
merged_regimes = []
current_regime = regimes_df.iloc[0].copy()
for i in range(1, len(regimes_df)):
next_regime = regimes_df.iloc
if next_regime['Regime_Type'] == current_regime['Regime_Type']:
current_regime['End_Time'] = next_regime['End_Time']
current_regime['End_Type'] = next_regime['End_Type']
start_value = df.loc[df[time_column] == current_regime['Start_Time'], value_column].values[0]
end_value = df.loc[df[time_column] == current_regime['End_Time'], value_column].values[0]
current_regime['Height_Change'] = (
end_value - start_value if current_regime['Regime_Type'] == 'Positive' else start_value - end_value
)
else:
merged_regimes.append(current_regime)
current_regime = next_regime.copy()
merged_regimes.append(current_regime)
return pd.DataFrame(merged_regimes).reset_index(drop=True)
def calculate_whitespace(merged_regimes_df, df, time_column='Time', value_column='Value'):
"""
Identifies gaps between merged regimes as whitespace.
"""
whitespace = []
for i in range(len(merged_regimes_df) - 1):
current_regime = merged_regimes_df.iloc
next_regime = merged_regimes_df.iloc[i + 1]
current_end_time = current_regime['End_Time']
next_start_time = next_regime['Start_Time']
if next_start_time > current_end_time:
current_end_value = df.loc[df[time_column] == current_end_time, value_column].values[0]
next_start_value = df.loc[df[time_column] == next_start_time, value_column].values[0]
whitespace_type = 'Positive' if current_end_value < next_start_value else 'Negative'
whitespace.append({
'Start_Time': current_end_time,
'End_Time': next_start_time,
'Start_Height': current_end_value,
'End_Height': next_start_value,
'Whitespace_Type': whitespace_type
})
return pd.DataFrame(whitespace)
def add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column='Time', value_column='Value'):
"""
Adds whitespace as new regimes and merges consecutive regimes of the same type.
"""
if whitespace_df.empty:
return merged_regimes_df.copy()
whitespace_regimes = []
for _, ws in whitespace_df.iterrows():
start_time = ws['Start_Time']
end_time = ws['End_Time']
whitespace_type = ws['Whitespace_Type']
start_type = 'Trough' if whitespace_type == 'Positive' else 'Peak'
end_type = 'Peak' if whitespace_type == 'Positive' else 'Trough'
height_change = ws['End_Height'] - ws['Start_Height'] if whitespace_type == 'Positive' else ws['Start_Height'] - ws['End_Height']
whitespace_regimes.append({
'Start_Time': start_time,
'End_Time': end_time,
'Start_Type': start_type,
'End_Type': end_type,
'Height_Change': height_change,
'Regime_Type': whitespace_type
})
whitespace_regimes_df = pd.DataFrame(whitespace_regimes)
combined_regimes = pd.concat([merged_regimes_df, whitespace_regimes_df], ignore_index=True)
combined_regimes = combined_regimes.sort_values(by='Start_Time').reset_index(drop=True)
return merge_consecutive_regimes(combined_regimes, df, time_column, value_column)
def process_time_series(df, time_column='Time', value_column='Value', threshold=0.25):
"""
Processes the time series data to identify, merge, and label regimes.
Parameters:
- df (pd.DataFrame): DataFrame containing time and value data.
- time_column (str): Column name for time data.
- value_column (str): Column name for time series values.
- threshold (float): Height change threshold to identify significant regimes.
Returns:
- df_extrema (pd.DataFrame): DataFrame of identified peaks and troughs.
- regimes_df (pd.DataFrame): DataFrame of initially labeled regimes.
- final_regimes_df (pd.DataFrame): DataFrame of final labeled regimes.
"""
if time_column not in df.columns or value_column not in df.columns:
raise ValueError(f"DataFrame must contain '{time_column}' and '{value_column}' columns.")
df = df.sort_values(by=time_column).reset_index(drop=True)
df_extrema = find_extrema(df, time_column, value_column)
regimes_df = label_regimes(df_extrema, threshold, value_column, time_column)
merged_regimes_df = merge_consecutive_regimes(regimes_df, df, time_column, value_column)
whitespace_df = calculate_whitespace(merged_regimes_df, df, time_column, value_column)
final_regimes_df = add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column, value_column)
return df_extrema, regimes_df, final_regimes_df
def plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time'):
"""
Plots the time series with shaded regimes, highlighting peaks and troughs and labeling their heights.
Parameters:
- df (pd.DataFrame): Original time series data.
- df_extrema (pd.DataFrame): DataFrame with identified peaks and troughs.
- final_regimes_df (pd.DataFrame): DataFrame containing the final regimes.
- value_column (str): Column name for time series values.
- time_column (str): Column name for time.
Returns:
- None: Displays the plot.
"""
plt.figure(figsize=(14, 7))
plt.plot(df[time_column], df[value_column], label=value_column, color='blue')
# Plot peaks and troughs
peaks = df_extrema[df_extrema['Type'] == 'Peak']
troughs = df_extrema[df_extrema['Type'] == 'Trough']
plt.plot(peaks[time_column], peaks[value_column], 'r^', markersize=10, label='Peaks')
plt.plot(troughs[time_column], troughs[value_column], 'gv', markersize=10, label='Troughs')
# Annotate peak and trough heights
for _, row in peaks.iterrows():
plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}",
fontsize=9, color='red', ha='center', va='bottom')
for _, row in troughs.iterrows():
plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}",
fontsize=9, color='green', ha='center', va='top')
# Define colors for regimes
color_map = {'Positive': 'green', 'Negative': 'orange'}
# Shade final regimes
for _, regime in final_regimes_df.iterrows():
plt.axvspan(regime['Start_Time'], regime['End_Time'],
color=color_map[regime['Regime_Type']], alpha=0.3)
# Create custom legend for shaded regimes
positive_patch = mpatches.Patch(color='green', alpha=0.3, label='Positive Regime')
negative_patch = mpatches.Patch(color='orange', alpha=0.3, label='Negative Regime')
# Combine all legends
plt.legend(handles=[
plt.Line2D([], [], color='blue', label=value_column),
plt.Line2D([], [], marker='^', color='r', linestyle='None', markersize=10, label='Peaks'),
plt.Line2D([], [], marker='v', color='g', linestyle='None', markersize=10, label='Troughs'),
positive_patch,
negative_patch
], loc='upper right')
# Enhance plot
plt.title('Time Series with Shaded Positive and Negative Regimes')
plt.xlabel(time_column)
plt.ylabel(value_column)
plt.grid(True)
plt.tight_layout()
plt.show()
Подробнее здесь: https://stackoverflow.com/questions/790 ... -from-peak