У меня есть процесс Pyspark, который берет кадр данных временных рядов для сайта и вычисляет/добавляет функции для обнаружения аномалий. Оконные функции используются для сравнения опережающих и отстающих чтений, а также используются агрегаты, такие как среднее значение, медиана, стандартное отклонение.
Это выполняется по одному сайту за раз, но я хотел бы запустить это на одном большом фрейме данных, содержащем данные для многих сайтов, применяя функции к каждой группе фрейма данных отдельно и возвращая один измененный фрейм данных.
Код: Выделить всё
# Illustrative functions
window = Window.partitionBy("Site").orderBy("DateTime").rowsBetween(-2, 0)
val_mean = df.agg(mean("Value")).collect()[0][0]
df = df.withColumn("NewCol", col("Value") - val_mean)
df = df.withColumn("rolling_mean", avg(col("Value")).over(window))
Какие здесь варианты?
(Выполнение на Azure Databricks, если это имеет значение)
Пример желаемого поведения
Сайт
DateTime
Значение
NewCol
rolling_mean
A
01.04.2026
1
-0,8
null
A
01.05.2026
2
-0.2
null
A
01.06.2026
3
1.2
0.2
A
01.07.2026
1
-0.8
0.2
A
01.08.2026
2
0.2
0.2
B
01.03.2026
4
-6.4
null
B
01.04.2026
5
-5.4
null
B
01.05.2026
34
23,6
3,93
B
01.06.2026
5
-5,4
4,26
B
01.07.2026
4
-6,4
3,93
Подробнее здесь: https://stackoverflow.com/questions/798 ... ame-groups
Мобильная версия