Как переписать функцию группового скользящего агрегирования на основе DataFrame для использования выражений Polars?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как переписать функцию группового скользящего агрегирования на основе DataFrame для использования выражений Polars?

Сообщение Anonymous »

Мне нужно применить пользовательскую функцию группового скользящего агрегирования к каждой n-й строке для определенных столбцов DataFrame. Моя текущая реализация работает с DataFrame в качестве аргумента и возвращает измененный DataFrame.
У меня есть несколько проблем с текущим подходом:

[*]Функция требует указания многих имен столбцов.
[*]Нерелевантные столбцы (если они есть) обрабатываются, даже если они не нужны.
< /ul>
Я хотел бы переписать эту функцию, чтобы она работала с выражения в качестве аргументов, а также возвращает результат в виде выражения. Я не уверен, что этот подход более эффективен, но кажется, что он сделает обработку более чистой.
К сожалению, у меня возникли проблемы с выяснением того, как реализовать это с помощью полярных выражений. .
Может ли кто-нибудь подсказать мне, как преобразовать этот подход, основанный на DataFrame, в подход, основанный на выражениях?
Вот моя текущая функция, которая работает с Кадры данных:
from typing import Callable, Sequence
import numpy as np
import polars as pl
from numba import guvectorize

@guvectorize(['(float64[:], int64, float64[:])'], '(n),()->(n)')
def rolling_func(input_array, window_size, out):
"""Example for a custom rolling function with a specified window size."""
n = len(input_array)
for i in range(n):
start = max(i - window_size + 1, 0)
out = np.mean(input_array[start:i+1])

def apply_rolling_gathered_agg(
df,
func: Callable,
window_size: int,
*func_args,
group_col: str | list[str] | None = None,
value_col: str | None = None,
result_col: str = 'result',
every_nth: int = 1,
window_buffer: int = 0,
return_dtype: pl.DataType = pl.Float64) -> pl.DataFrame:
"""
Apply a custom rolling aggregation function to a DataFrame, with grouping and every nth value selection.

This function performs a rolling aggregation on a specified value column in a Polars DataFrame. It allows
grouping by one or more columns, gathering every nth value, and applying a custom aggregation function
(e.g., `rolling_func`) with a specified window size and optional buffer.

Args:
df (pl.DataFrame): The DataFrame to operate on.
func (Callable): The aggregation function to apply to each rolling window.
window_size (int): The size of the window over which to apply the aggregation function.
*func_args: Additional arguments to pass to the custom function.
group_col (str | list[str] | None, optional): The column(s) to group by. If `None`, the first column is used.
value_col (str | None, optional): The column to apply the rolling function to. If `None`, the last column is used.
result_col (str, optional): The name of the result column in the output DataFrame. Default is 'result'.
every_nth (int, optional): The step size for gathering values within each group. Default is 1.
window_buffer (int, optional): A buffer to add around the rolling window, extending the window on both ends. Default is 0.
return_dtype (pl.DataType, optional): The desired data type for the result column. Default is `pl.Float64`.

Returns:
pl.DataFrame: A DataFrame containing the results of the rolling aggregation, with one row per group.

Example:
# Create a sample DataFrame with two groups 'A' and 'B', and values from 0 to 99
df = pl.DataFrame({
'group': np.repeat(['A', 'B'], 100), # Repeat 'A' and 'B' for each group
'value': np.tile(np.arange(100), 2) # Tile the values 0 to 99 for each group
})
func_args = []
res = apply_rolling_gathered_agg(
df,
func=rolling_func,
window_size=3,
*func_args,
group_col='group',
value_col='value',
every_nth=10,
window_buffer=0,
return_dtype=pl.Float64,
)
print(res)
res_pd = res.to_pandas()
"""
# Handle cases where group_col or value_col might not be passed
cols = df.columns
group_col = group_col or cols[0]
value_col = value_col or cols[-1]

# If group_col is a list, ensure it is processed correctly
if isinstance(group_col, list):
group_by = group_col
else:
group_by = [group_col]

# Temporary index column for rolling aggregation
index_col = '_index'

# Calculate the total window size
total_window = every_nth * (window_size + window_buffer)
period = f'{total_window}i'

# Apply rolling aggregation
result = (
df
.with_row_index(name=index_col)
.rolling(index_column=index_col, period=period, group_by=group_by)
.agg(
pl.all().last(), # pass the last element of all present columns
pl.col(value_col)
.reverse().gather_every(every_nth).reverse()
.map_batches(lambda batch: func(batch, window_size, *func_args), return_dtype=return_dtype)
.last().alias(result_col)) # This is the desired expression
.drop(index_col)
)
return result

Я хочу преобразовать эту функцию в функцию на основе выражений, похожую на:
def expr_apply_rolling_gathered_agg(
group_expr: pl.Expr | Sequence[pl.Expr], # Single or list of group column expressions
value_expr: pl.Expr, # Expression for the value column (series/column)
func: Callable, # The rolling aggregation function
window_size: int, # Size of the rolling window
*func_args, # Additional arguments for the rolling function
every_nth: int = 1, # Step size for gathering values
window_buffer: int = 0, # Buffer size around the window
return_dtype: pl.DataType = pl.Float64 # Output data type
) -> pl.Expr:
pass

РЕДАКТИРОВАТЬ: подходит лиroll_map в качестве замены прокатки и Map_batches? Rolling_map, похоже, ожидает пользовательскую функцию агрегирования. Она возвращает первый результат для каждого скользящего окна, если функция не является агрегацией, как в этом случае:
series = pl.Series(pl.arange(10, eager=True)).rolling_map(lambda x: x+1, window_size=3)
print(series.to_list())
[None, None, 1, 2, 3, 4, 5, 6, 7, 8]


Подробнее здесь: https://stackoverflow.com/questions/792 ... to-use-pol
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»