Мне нужно применить пользовательскую функцию группового скользящего агрегирования к каждой n-й строке для определенных столбцов DataFrame. Моя текущая реализация работает с DataFrame в качестве аргумента и возвращает измененный DataFrame.
У меня есть несколько проблем с текущим подходом:
[*]Функция требует указания многих имен столбцов.
[*]Нерелевантные столбцы (если они есть) обрабатываются, даже если они не нужны.
< /ul>
Я хотел бы переписать эту функцию, чтобы она работала с выражения в качестве аргументов, а также возвращает результат в виде выражения. Я не уверен, что этот подход более эффективен, но кажется, что он сделает обработку более чистой.
К сожалению, у меня возникли проблемы с выяснением того, как реализовать это с помощью полярных выражений. .
Может ли кто-нибудь подсказать мне, как преобразовать этот подход, основанный на DataFrame, в подход, основанный на выражениях?
Вот моя текущая функция, которая работает с Кадры данных:
from typing import Callable, Sequence
import numpy as np
import polars as pl
from numba import guvectorize
@guvectorize(['(float64[:], int64, float64[:])'], '(n),()->(n)')
def rolling_func(input_array, window_size, out):
"""Example for a custom rolling function with a specified window size."""
n = len(input_array)
for i in range(n):
start = max(i - window_size + 1, 0)
out = np.mean(input_array[start:i+1])
def apply_rolling_gathered_agg(
df,
func: Callable,
window_size: int,
*func_args,
group_col: str | list[str] | None = None,
value_col: str | None = None,
result_col: str = 'result',
every_nth: int = 1,
window_buffer: int = 0,
return_dtype: pl.DataType = pl.Float64) -> pl.DataFrame:
"""
Apply a custom rolling aggregation function to a DataFrame, with grouping and every nth value selection.
This function performs a rolling aggregation on a specified value column in a Polars DataFrame. It allows
grouping by one or more columns, gathering every nth value, and applying a custom aggregation function
(e.g., `rolling_func`) with a specified window size and optional buffer.
Args:
df (pl.DataFrame): The DataFrame to operate on.
func (Callable): The aggregation function to apply to each rolling window.
window_size (int): The size of the window over which to apply the aggregation function.
*func_args: Additional arguments to pass to the custom function.
group_col (str | list[str] | None, optional): The column(s) to group by. If `None`, the first column is used.
value_col (str | None, optional): The column to apply the rolling function to. If `None`, the last column is used.
result_col (str, optional): The name of the result column in the output DataFrame. Default is 'result'.
every_nth (int, optional): The step size for gathering values within each group. Default is 1.
window_buffer (int, optional): A buffer to add around the rolling window, extending the window on both ends. Default is 0.
return_dtype (pl.DataType, optional): The desired data type for the result column. Default is `pl.Float64`.
Returns:
pl.DataFrame: A DataFrame containing the results of the rolling aggregation, with one row per group.
Example:
# Create a sample DataFrame with two groups 'A' and 'B', and values from 0 to 99
df = pl.DataFrame({
'group': np.repeat(['A', 'B'], 100), # Repeat 'A' and 'B' for each group
'value': np.tile(np.arange(100), 2) # Tile the values 0 to 99 for each group
})
func_args = []
res = apply_rolling_gathered_agg(
df,
func=rolling_func,
window_size=3,
*func_args,
group_col='group',
value_col='value',
every_nth=10,
window_buffer=0,
return_dtype=pl.Float64,
)
print(res)
res_pd = res.to_pandas()
"""
# Handle cases where group_col or value_col might not be passed
cols = df.columns
group_col = group_col or cols[0]
value_col = value_col or cols[-1]
# If group_col is a list, ensure it is processed correctly
if isinstance(group_col, list):
group_by = group_col
else:
group_by = [group_col]
# Temporary index column for rolling aggregation
index_col = '_index'
# Calculate the total window size
total_window = every_nth * (window_size + window_buffer)
period = f'{total_window}i'
# Apply rolling aggregation
result = (
df
.with_row_index(name=index_col)
.rolling(index_column=index_col, period=period, group_by=group_by)
.agg(
pl.all().last(), # pass the last element of all present columns
pl.col(value_col)
.gather_every(every_nth, offset=every_nth-1)
.map_batches(lambda batch: func(batch, window_size, *func_args), return_dtype=return_dtype)
.last().alias(result_col)) # This is the desired expression
.drop(index_col)
)
return result
Я хочу преобразовать эту функцию в функцию на основе выражений, похожую на:
def expr_apply_rolling_gathered_agg(
group_expr: pl.Expr | Sequence[pl.Expr], # Single or list of group column expressions
value_expr: pl.Expr, # Expression for the value column (series/column)
func: Callable, # The rolling aggregation function
window_size: int, # Size of the rolling window
*func_args, # Additional arguments for the rolling function
every_nth: int = 1, # Step size for gathering values
window_buffer: int = 0, # Buffer size around the window
return_dtype: pl.DataType = pl.Float64 # Output data type
) -> pl.Expr:
pass
Подробнее здесь: https://stackoverflow.com/questions/792 ... to-use-pol
Как переписать функцию группового скользящего агрегирования на основе DataFrame для использования выражений Polars? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение