Маркировка режима Python с использованием явного порога для увеличения/уменьшения от пика до минимумаPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Маркировка режима Python с использованием явного порога для увеличения/уменьшения от пика до минимума

Сообщение Anonymous »

Я пытаюсь найти самые длинные отрезки времени во временном ряду, где значение от начала до конца увеличивается хотя бы на определенный порог без какого-либо снижения за промежуточный период хотя бы на этот порог. Думайте о временных рядах как о биржевом графике. Предположим, порог составляет 50 долларов. Найдите и отметьте самые длительные отрезки времени, когда акции растут как минимум на 50 долларов, но при этом не падают на 50 долларов. Эти длительные периоды затем называют «позитивным режимом». Повторите процесс для снижения как минимум на 50 долларов, не поднимаясь за это время на 50 долларов, и назовите этот «негативный режим».
Поскольку положительные режимы начинаются с минимумов и заканчиваются пиками, а отрицательные режимы начинаться с пиков и заканчиваться минимумами, режимы никогда не должны перекрываться.
Ниже приведен иллюстрированный пример положительных и отрицательных режимов с порогом 2. Обратите внимание, что первый пик и минимум находятся в нейтральном режиме. режиме, поскольку до начала первого положительного режима не происходит снижения, превышающего порог 2. Однако окончательный отрицательный режим должен быть расширен до -0,80, поскольку -0,80 меньше, чем -0,73 (где он в настоящее время останавливается), и между текущим концом и правильным концом нет увеличения хотя бы на 2.
Как бы вы реализовали это на Python? Я пробовал использовать scipy.signal.find_peaks, но не смог заставить его работать, и мне кажется, что должен быть более простой способ, который мне не хватает.
Изображение

#test.ipynb

import numpy as np
import pandas as pd
from regimes import plot_regimes, process_time_series

# Sample Data Generation
np.random.seed(0)
time = np.arange(0, 100, 1)
value = np.sin(np.linspace(0, 10 * np.pi, 100)) + np.random.normal(0, 1, 100)
df = pd.DataFrame({'Time': time, 'Value': value})

# Process the time series data
df_extrema, regimes_df, final_regimes_df = process_time_series(df, value_column='Value', threshold=2, time_column='Time')

# Plot the final regimes along with peaks and troughs
plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time')

# regimes.py
import pandas as pd
import numpy as np
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

def find_extrema(df, time_column='Time', value_column='Value'):
"""
Identifies peaks and troughs in the given time series data.

Parameters:
- df (pd.DataFrame): DataFrame containing time and value columns.
- time_column (str): Name of the column containing the time data.
- value_column (str): Name of the column containing the time series values.

Returns:
- df_extrema (pd.DataFrame): DataFrame with extrema points and their types.
"""
peaks, _ = find_peaks(df[value_column])
troughs, _ = find_peaks(-df[value_column])

extrema_indices = list(peaks) + list(troughs)
extrema_types = ['Peak'] * len(peaks) + ['Trough'] * len(troughs)

# Include first and last points as extrema
if 0 not in extrema_indices:
first_val = df.iloc[0][value_column]
second_val = df.iloc[1][value_column]
extrema_indices.append(0)
extrema_types.append('Peak' if first_val > second_val else 'Trough')

last_idx = len(df) - 1
if last_idx not in extrema_indices:
last_val = df.iloc[last_idx][value_column]
second_last_val = df.iloc[last_idx - 1][value_column]
extrema_indices.append(last_idx)
extrema_types.append('Peak' if last_val > second_last_val else 'Trough')

# Create the extrema DataFrame
extrema = sorted(extrema_indices)
sorted_extrema_types = [extrema_types[extrema_indices.index(idx)] for idx in extrema]
df_extrema = df.iloc[extrema].copy().reset_index(drop=True)
df_extrema['Type'] = sorted_extrema_types

return df_extrema

def label_regimes(df_extrema, threshold=0.25, value_column='Value', time_column='Time'):
"""
Labels regimes based on transitions between extrema points.

Parameters:
- df_extrema (pd.DataFrame): DataFrame containing extrema points and types.
- threshold (float): Minimum height change to consider as a regime.
- value_column (str): Column name for time series values.
- time_column (str): Column name for time values.

Returns:
- regimes_df (pd.DataFrame): DataFrame containing labeled regimes.
"""
regimes = []
for i in range(len(df_extrema) - 1):
current = df_extrema.iloc
next_ = df_extrema.iloc[i + 1]
height_change = next_[value_column] - current[value_column]

if abs(height_change) >= threshold:
regime_type = 'Positive' if current['Type'] == 'Trough' and next_['Type'] == 'Peak' else 'Negative'
regimes.append({
'Start_Time': current[time_column],
'End_Time': next_[time_column],
'Start_Type': current['Type'],
'End_Type': next_['Type'],
'Height_Change': height_change if regime_type == 'Positive' else -height_change,
'Regime_Type': regime_type
})

return pd.DataFrame(regimes)

def merge_consecutive_regimes(regimes_df, df, time_column='Time', value_column='Value'):
"""
Merges consecutive regimes of the same type.
"""
if regimes_df.empty:
return regimes_df.copy()

merged_regimes = []
current_regime = regimes_df.iloc[0].copy()

for i in range(1, len(regimes_df)):
next_regime = regimes_df.iloc

if next_regime['Regime_Type'] == current_regime['Regime_Type']:
current_regime['End_Time'] = next_regime['End_Time']
current_regime['End_Type'] = next_regime['End_Type']
start_value = df.loc[df[time_column] == current_regime['Start_Time'], value_column].values[0]
end_value = df.loc[df[time_column] == current_regime['End_Time'], value_column].values[0]

current_regime['Height_Change'] = (
end_value - start_value if current_regime['Regime_Type'] == 'Positive' else start_value - end_value
)
else:
merged_regimes.append(current_regime)
current_regime = next_regime.copy()

merged_regimes.append(current_regime)
return pd.DataFrame(merged_regimes).reset_index(drop=True)

def calculate_whitespace(merged_regimes_df, df, time_column='Time', value_column='Value'):
"""
Identifies gaps between merged regimes as whitespace.
"""
whitespace = []
for i in range(len(merged_regimes_df) - 1):
current_regime = merged_regimes_df.iloc
next_regime = merged_regimes_df.iloc[i + 1]

current_end_time = current_regime['End_Time']
next_start_time = next_regime['Start_Time']

if next_start_time > current_end_time:
current_end_value = df.loc[df[time_column] == current_end_time, value_column].values[0]
next_start_value = df.loc[df[time_column] == next_start_time, value_column].values[0]
whitespace_type = 'Positive' if current_end_value < next_start_value else 'Negative'

whitespace.append({
'Start_Time': current_end_time,
'End_Time': next_start_time,
'Start_Height': current_end_value,
'End_Height': next_start_value,
'Whitespace_Type': whitespace_type
})

return pd.DataFrame(whitespace)

def add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column='Time', value_column='Value'):
"""
Adds whitespace as new regimes and merges consecutive regimes of the same type.
"""
if whitespace_df.empty:
return merged_regimes_df.copy()

whitespace_regimes = []
for _, ws in whitespace_df.iterrows():
start_time = ws['Start_Time']
end_time = ws['End_Time']
whitespace_type = ws['Whitespace_Type']
start_type = 'Trough' if whitespace_type == 'Positive' else 'Peak'
end_type = 'Peak' if whitespace_type == 'Positive' else 'Trough'
height_change = ws['End_Height'] - ws['Start_Height'] if whitespace_type == 'Positive' else ws['Start_Height'] - ws['End_Height']

whitespace_regimes.append({
'Start_Time': start_time,
'End_Time': end_time,
'Start_Type': start_type,
'End_Type': end_type,
'Height_Change': height_change,
'Regime_Type': whitespace_type
})

whitespace_regimes_df = pd.DataFrame(whitespace_regimes)
combined_regimes = pd.concat([merged_regimes_df, whitespace_regimes_df], ignore_index=True)
combined_regimes = combined_regimes.sort_values(by='Start_Time').reset_index(drop=True)

return merge_consecutive_regimes(combined_regimes, df, time_column, value_column)

def process_time_series(df, time_column='Time', value_column='Value', threshold=0.25):
"""
Processes the time series data to identify, merge, and label regimes.

Parameters:
- df (pd.DataFrame): DataFrame containing time and value data.
- time_column (str): Column name for time data.
- value_column (str): Column name for time series values.
- threshold (float): Height change threshold to identify significant regimes.

Returns:
- df_extrema (pd.DataFrame): DataFrame of identified peaks and troughs.
- regimes_df (pd.DataFrame): DataFrame of initially labeled regimes.
- final_regimes_df (pd.DataFrame): DataFrame of final labeled regimes.
"""
if time_column not in df.columns or value_column not in df.columns:
raise ValueError(f"DataFrame must contain '{time_column}' and '{value_column}' columns.")

df = df.sort_values(by=time_column).reset_index(drop=True)
df_extrema = find_extrema(df, time_column, value_column)
regimes_df = label_regimes(df_extrema, threshold, value_column, time_column)
merged_regimes_df = merge_consecutive_regimes(regimes_df, df, time_column, value_column)
whitespace_df = calculate_whitespace(merged_regimes_df, df, time_column, value_column)
final_regimes_df = add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column, value_column)

return df_extrema, regimes_df, final_regimes_df

def plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time'):
"""
Plots the time series with shaded regimes, highlighting peaks and troughs and labeling their heights.

Parameters:
- df (pd.DataFrame): Original time series data.
- df_extrema (pd.DataFrame): DataFrame with identified peaks and troughs.
- final_regimes_df (pd.DataFrame): DataFrame containing the final regimes.
- value_column (str): Column name for time series values.
- time_column (str): Column name for time.

Returns:
- None: Displays the plot.
"""
plt.figure(figsize=(14, 7))
plt.plot(df[time_column], df[value_column], label=value_column, color='blue')

# Plot peaks and troughs
peaks = df_extrema[df_extrema['Type'] == 'Peak']
troughs = df_extrema[df_extrema['Type'] == 'Trough']

plt.plot(peaks[time_column], peaks[value_column], 'r^', markersize=10, label='Peaks')
plt.plot(troughs[time_column], troughs[value_column], 'gv', markersize=10, label='Troughs')

# Annotate peak and trough heights
for _, row in peaks.iterrows():
plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}",
fontsize=9, color='red', ha='center', va='bottom')

for _, row in troughs.iterrows():
plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}",
fontsize=9, color='green', ha='center', va='top')

# Define colors for regimes
color_map = {'Positive': 'green', 'Negative': 'orange'}

# Shade final regimes
for _, regime in final_regimes_df.iterrows():
plt.axvspan(regime['Start_Time'], regime['End_Time'],
color=color_map[regime['Regime_Type']], alpha=0.3)

# Create custom legend for shaded regimes
positive_patch = mpatches.Patch(color='green', alpha=0.3, label='Positive Regime')
negative_patch = mpatches.Patch(color='orange', alpha=0.3, label='Negative Regime')

# Combine all legends
plt.legend(handles=[
plt.Line2D([], [], color='blue', label=value_column),
plt.Line2D([], [], marker='^', color='r', linestyle='None', markersize=10, label='Peaks'),
plt.Line2D([], [], marker='v', color='g', linestyle='None', markersize=10, label='Troughs'),
positive_patch,
negative_patch
], loc='upper right')

# Enhance plot
plt.title('Time Series with Shaded Positive and Negative Regimes')
plt.xlabel(time_column)
plt.ylabel(value_column)
plt.grid(True)
plt.tight_layout()
plt.show()


Подробнее здесь: https://stackoverflow.com/questions/790 ... -from-peak
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»