person_id
order_id
item
streak_counter
Алиса
1
A
1
Боб
2
B
1
Алиса
3
B
1
Боб
4
B
2
алиса
5
A
1
боб
6
B
3
Алиса
7
А
2
Боб
8
А
1
В PySpark я бы сделал что-то вроде
Код: Выделить всё
from pyspark.sql import functions as F, Window
partition_cols = [F.col("person_id")]
order_cols = [F.col("order_id")]
window_spec = Window.partitionBy(partition_cols).orderBy(order_cols)
streak_reset = F.coalesce(F.col("item") != sqlf.lag("item").over(window_spec), True)
cond = sqlf.when(streak_reset, 1).otherwise(0)
streak_id = F.sum(cond).over(window_spec)
streak_spec = Window.partitionBy(partition_cols + [streak_id]).orderBy(order_cols)
streak_counter = F.sum(col).over(streak_spec).alias("streak_counter")
df = df.withColumn(streak_counter)
Подробнее здесь: https://stackoverflow.com/questions/764 ... -in-polars
Мобильная версия