Группирование кадра данных Pyspark, применение UDF анализа временных рядов к каждой группе
РЕШЕНО См. ниже
У меня есть процесс Pyspark, который принимает кадр данных временных рядов для сайта и вычисляет/добавляет функции для обнаружения аномалий. Оконные функции используются для сравнения опережающих и отстающих чтений, а также используются агрегаты, такие как среднее значение, медиана, стандартное отклонение.
Это выполняется по одному сайту за раз, но я хотел бы запустить это на одном большом фрейме данных, содержащем данные для многих сайтов, применяя функции к каждой группе фрейма данных отдельно и возвращая один измененный фрейм данных.
# Illustrative functions
window = Window.partitionBy("Site").orderBy("DateTime").rowsBetween(-2, 0)
val_mean = df.agg(mean("Value")).collect()[0][0]
df = df.withColumn("NewCol", col("Value") - val_mean)
df = df.withColumn("rolling_mean", avg(col("Value")).over(window))
Я пытаюсь избегать циклов For и Pandas для масштабируемости и параллельной обработки, но не нашел подходящего способа сделать это с помощью PySpark.
Какие здесь варианты?
(Выполнение на Azure Databricks, если это имеет значение)
Пример желаемого поведения
Сайт
DateTime
Значение
NewCol
rolling_mean
A
01.04.2026
1
-0,8
null
A
01.05.2026
2
-0.2
null
A
01.06.2026
3
1.2
0.2
A
01.07.2026
1
-0.8
0.2
A
01.08.2026
2
0.2
0.2
B
01.03.2026
4
-6.4
null
B
01.04.2026
5
-5.4
null
B
01.05.2026
34
23,6
3,93
B
01.06.2026
5
-5,4
4,26
B
01.07.2026
4
-6.4
3.93
РЕШЕНО
Производительность Pandas была намного хуже, чем у чистой реализации Pyspark, и приводила к дополнительной сложности плана выполнения (вызывая ошибки в некоторых кластерах, например, в бессерверном режиме).
Использование concurrent.futures позволило применять существующие функции с минимальной настройкой, а производительность выполнения была во много раз выше, чем реализация Pandas.
# Divide task across workers
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = {executor.submit(anomaly_func, raw_df.filter(raw_df.Id == key)) for key in IdList}
# Union the results dataframe
df = None
for r in results:
df = r.result()
if df is None:
df = df
else:
df = df.union(df)
Подробнее здесь: https://stackoverflow.com/questions/798 ... ame-groups
Как запустить Pyspark UDF отдельно для групп данных ⇐ Python
Программы на Python
-
Anonymous
1768422161
Anonymous
Группирование кадра данных Pyspark, применение UDF анализа временных рядов к каждой группе
[b]РЕШЕНО[/b] См. ниже
У меня есть процесс Pyspark, который принимает кадр данных временных рядов для сайта и вычисляет/добавляет функции для обнаружения аномалий. Оконные функции используются для сравнения опережающих и отстающих чтений, а также используются агрегаты, такие как среднее значение, медиана, стандартное отклонение.
Это выполняется по одному сайту за раз, но я хотел бы запустить это на одном большом фрейме данных, содержащем данные для многих сайтов, применяя функции к каждой группе фрейма данных отдельно и возвращая один измененный фрейм данных.
# Illustrative functions
window = Window.partitionBy("Site").orderBy("DateTime").rowsBetween(-2, 0)
val_mean = df.agg(mean("Value")).collect()[0][0]
df = df.withColumn("NewCol", col("Value") - val_mean)
df = df.withColumn("rolling_mean", avg(col("Value")).over(window))
Я пытаюсь избегать циклов For и Pandas для масштабируемости и параллельной обработки, но не нашел подходящего способа сделать это с помощью PySpark.
Какие здесь варианты?
(Выполнение на Azure Databricks, если это имеет значение)
Пример желаемого поведения
Сайт
DateTime
Значение
NewCol
rolling_mean
A
01.04.2026
1
-0,8
[i]null[/i]
A
01.05.2026
2
-0.2
[i]null[/i]
A
01.06.2026
3
1.2
0.2
A
01.07.2026
1
-0.8
0.2
A
01.08.2026
2
0.2
0.2
B
01.03.2026
4
-6.4
[i]null[/i]
B
01.04.2026
5
-5.4
[i]null[/i]
B
01.05.2026
34
23,6
3,93
B
01.06.2026
5
-5,4
4,26
B
01.07.2026
4
-6.4
3.93
[b]РЕШЕНО[/b]
Производительность Pandas была намного хуже, чем у чистой реализации Pyspark, и приводила к дополнительной сложности плана выполнения (вызывая ошибки в некоторых кластерах, например, в бессерверном режиме).
Использование concurrent.futures позволило применять существующие функции с минимальной настройкой, а производительность выполнения была во много раз выше, чем реализация Pandas.
# Divide task across workers
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = {executor.submit(anomaly_func, raw_df.filter(raw_df.Id == key)) for key in IdList}
# Union the results dataframe
df = None
for r in results:
df = r.result()
if df is None:
df = df
else:
df = df.union(df)
Подробнее здесь: [url]https://stackoverflow.com/questions/79863218/how-to-run-pyspark-udf-separately-over-dataframe-groups[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия