import polars as pl
df = pl.DataFrame(
{
"timestamp": [1, 2, 3, 4, 5, 6, 7, 8],
"threshold": [8, None, None, None, 5, None, None, 8],
"value": [2, 3, 4, 5, 6, 7, 8, 9],
"event": [1, 0, 0, 0, 1, 0, 0, 1],
"start_ts": [1, None, None, None, 5, None, None, 8],
"end_ts": [6, None, None, None, 8, None, None, 8],
"event_id": [0, None, None, None, 1, None, None, 2],
}
).with_columns(pl.col("end_ts").sub(pl.col("start_ts")).alias("event_span"))
print(df)
shape: (8, 8)
┌───────────┬───────────┬───────┬───────┬──────────┬────────┬──────────┬────────────┐
│ timestamp ┆ threshold ┆ value ┆ event ┆ start_ts ┆ end_ts ┆ event_id ┆ event_span │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═══════════╪═══════════╪═══════╪═══════╪══════════╪════════╪══════════╪════════════╡
│ 1 ┆ 8 ┆ 2 ┆ 1 ┆ 1 ┆ 6 ┆ 0 ┆ 5 │
│ 2 ┆ null ┆ 3 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 3 ┆ null ┆ 4 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 4 ┆ null ┆ 5 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 5 ┆ 5 ┆ 6 ┆ 1 ┆ 5 ┆ 8 ┆ 1 ┆ 3 │
│ 6 ┆ null ┆ 7 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 7 ┆ null ┆ 8 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 8 ┆ 8 ┆ 9 ┆ 1 ┆ 8 ┆ 8 ┆ 2 ┆ 0 │
└───────────┴───────────┴───────┴───────┴──────────┴────────┴──────────┴────────────┘
- timestamp — это реальная временная метка.
- threshold — это значение, которого должно достичь или превысить значение каждого события в течение периода события.
- value — это значение для каждой временной метки, мы можем иметь повторяющиеся значения.
- event — двоичный столбец, указывающий, генерирует ли определенная временная метка событие.
- start_ts — это начальная временная метка события. Например, значение start_ts, равное 1, означает, что событие начнется в конце временной метки1, в начале временной метки2
- end_ts — это временная метка окончания события.
- event_id — уникальный идентификатор каждого события.
- event_span — это количество временных меток, охватываемых событием.
Я хочу определить:
- event outcome: двоичное значение, указывающее, достигается ли пороговое значение каждого события по значению в течение каждого периода событий.
- event outcome timestamp: временная метка, когда первое значение достигает порогового значения
- Событие 0 охватывает [2, 3, 4, 5, 6], событие 1 охватывает [6, 7, 8], а событие 2 не охватывает ничего.
- события не будут выходить за рамки имеющихся у нас данных (например, end_ts 0)
.with_columns(
pl.int_ranges(pl.col("start_ts")+1, pl.col("end_ts")+1) # Map event full path
.alias("event_timestamps")
)
.explode("event_timestamps") # Generate event full path
.drop(pl.col("value"))
.join(
df
.select(pl.col("timestamp"), pl.col("value"))
.rename({"timestamp": "event_timestamps"}),
on="event_timestamps", how="left") # Get the value of the full path
.with_columns(# Map event outcome based on threshold
pl.when(pl.col("value") >= pl.col("threshold"))
.then(1)
.otherwise(None)
.alias("event_outcome")
)
.with_columns(# Get event outcome timestamp
pl.when(pl.col("event_outcome") == 1)
.then(pl.col("event_timestamps"))
.otherwise(None)
.alias("event_outcome_timestamp")
)
.with_columns(# Map event outcome to the event start timestamp
pl.col("event_outcome", "event_outcome_timestamp")
.fill_null(strategy="backward")
.over("event_id")
)
.unique("event_id")
.sort("event_id")
.with_columns(# Take care of the events that have not exceeded the threshold
pl.when(pl.col("event_outcome").is_null())
.then(0)
.otherwise(pl.col("event_outcome"))
.alias("event_outcome")
)
.select(pl.col("event_id", "event_outcome", "event_outcome_timestamp"))
)
event_df
shape: (2, 3)
┌──────────┬───────────────┬─────────────────────────┐
│ event_id ┆ event_outcome ┆ event_outcome_timestamp │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i32 ┆ i64 │
╞══════════╪═══════════════╪═════════════════════════╡
│ 0 ┆ 0 ┆ null │
│ 1 ┆ 1 ┆ 6 │
└──────────┴───────────────┴─────────────────────────┘
df = df.join(event_df, on="event_id", how="left")
Подробнее здесь: https://stackoverflow.com/questions/791 ... alculation