Как запустить Pyspark UDF отдельно для групп данныхPython

Программы на Python
Ответить
Anonymous
 Как запустить Pyspark UDF отдельно для групп данных

Сообщение Anonymous »

Группирование кадра данных Pyspark, применение UDF анализа временных рядов к каждой группе

РЕШЕНО См. ниже

У меня есть процесс Pyspark, который принимает кадр данных временных рядов для сайта и вычисляет/добавляет функции для обнаружения аномалий. Оконные функции используются для сравнения опережающих и отстающих чтений, а также используются агрегаты, такие как среднее значение, медиана, стандартное отклонение.
Это выполняется по одному сайту за раз, но я хотел бы запустить это на одном большом фрейме данных, содержащем данные для многих сайтов, применяя функции к каждой группе фрейма данных отдельно и возвращая один измененный фрейм данных.
# Illustrative functions
window = Window.partitionBy("Site").orderBy("DateTime").rowsBetween(-2, 0)
val_mean = df.agg(mean("Value")).collect()[0][0]

df = df.withColumn("NewCol", col("Value") - val_mean)
df = df.withColumn("rolling_mean", avg(col("Value")).over(window))

Я пытаюсь избегать циклов For и Pandas для масштабируемости и параллельной обработки, но не нашел подходящего способа сделать это с помощью PySpark.
Какие здесь варианты?
(Выполнение на Azure Databricks, если это имеет значение)
Пример желаемого поведения



Сайт
DateTime
Значение
NewCol
rolling_mean




A
01.04.2026
1
-0,8
null


A
01.05.2026
2
-0.2
null


A
01.06.2026
3
1.2
0.2


A
01.07.2026
1
-0.8
0.2


A
01.08.2026
2
0.2
0.2


B
01.03.2026
4
-6.4
null


B
01.04.2026
5
-5.4
null


B
01.05.2026
34
23,6
3,93


B
01.06.2026
5
-5,4
4,26


B
01.07.2026
4
-6.4
3.93



РЕШЕНО
Производительность Pandas была намного хуже, чем у чистой реализации Pyspark, и приводила к дополнительной сложности плана выполнения (вызывая ошибки в некоторых кластерах, например, в бессерверном режиме).
Использование concurrent.futures позволило применять существующие функции с минимальной настройкой, а производительность выполнения была во много раз выше, чем реализация Pandas.
# Divide task across workers
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 
   results = {executor.submit(anomaly_func, raw_df.filter(raw_df.Id == key)) for key in IdList}

# Union the results dataframe
df = None
for r in results:
   df = r.result()
   if df is None:
       df = df
   else:
       df = df.union(df)


Подробнее здесь: https://stackoverflow.com/questions/798 ... ame-groups
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»