Как оптимизировать вычисление полей для большого набора данных в записных книжках Azure Synapse Spark с накопительной суPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как оптимизировать вычисление полей для большого набора данных в записных книжках Azure Synapse Spark с накопительной су

Сообщение Anonymous »

У меня есть набор данных с более чем 3 миллионами записей и несколькими столбцами. Вот образец моего набора данных:


< tr>
item
item_base
дата
quantity_1
quantity_2




1
20
202410
600
7493


< td>1
20
202411
17000
16431



Каждый item-item_base-date создает уникальный ключ. Мне нужно вычислить новый столбец «фактическое_значение» на основе следующей логики:
Предположим, у нас есть операция ранжирования таблицы для item-item_base и порядок по дате, тогда

Для ранга = 1 фактическое_значение = количество_1,
Для ранга = 2 фактическое_значение = количество_1 – количество_2.
Для ранга > 2 фактическое_значение = количество_1 - сумма(все предыдущее количество_1) - сумма(все предыдущее количество_2) - сумма(все предыдущее фактическое_значение)
Вот мой подход к решению этой проблемы:
Сначала я создаю 2 дополнительных столбца cumulative_1 и cumulative_2, которые по сути представляют собой сумму количества_1 и количество_2 с использованием окон sql.

Код: Выделить всё

SUM(quantity_1) OVER(PARTITION BY item, item_base ORDER BY date ROWS BETWEEN UNBOUNDED PRECEIDING AND 1 PRECEDING) as cumulative_1 и так далее. Кроме того, я создаю столбец ранга в качестве идентификатора row_no.
Spark не поддерживает рекурсивные CTE, поэтому реализация sum(all prev fact_value)
утомительна. Мне пришлось переключиться на фрейм данных pandas, чтобы завершить расчеты. Вот мой код:

Код: Выделить всё

my_df = df.toPandas()
my_df['actual_value'] = 0.0

for i in range(len(my_df)):
if my_df.at[i, 'rank'] == 1:
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1']
elif my_df.at[i, 'rank'] == 2:
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'quantity_2']
else:
previous_actual_values = my_df.loc[(my_df['item'] == my_df.at[i, 'item']) &
(my_df['item_base'] == my_df.at[i, 'item_base']) &
(my_df['date'] < my_df.at[i, 'date']), 'actual_value'].sum()

my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'cumulative_2'] - my_df.at[i, 'cumulative_1'] - previous_actual_values

if my_df.at[i, 'actual_value'] < 0:
my_df.at[i, 'actual_value'] = 0

Код выполняет свою работу и выдает правильный результат.

Код: Выделить всё

item        | item_base| date    | quantity_1 | quantity_2 | cumulative_1 | cumulative_2 | rank | actual_value
------------|----------|---------|------------|------------|--------------|--------------|------|--------------
1           | 20       | 202410  | 600        | 7493       |              |              | 1    | 600
1           | 20       | 202411  | 17000      | 16431      | 600          | 7493         | 2    | 569
1           | 20       | 202412  | 785        | 24456      | 17600        | 23924        | 3    | 0
1           | 20       | 202501  | 0          | 25775      | 18385        | 48380        | 4    | 0
1           | 20       | 202502  |            | 26131      | 18385        | 74155        | 5    |
1           | 20       | 202503  | 0          | 39452      | 18385        | 100286       | 6    | 0
1           | 20       | 202504  |            | 38087      | 18385        | 139738       | 7    |
1           | 20       | 202505  | 2856       | 28916      | 18385        | 177825       | 8    | 0
1           | 20       | 202506  | 500000     | 42254      | 21241        | 206741       | 9    | 270849
1           | 20       | 202507  |            | 36776      | 521241       | 248995       | 10   |
1           | 20       | 202508  | 660        | 23523      | 521241       | 285771       | 11   | 0
1           | 20       | 202509  | 1316000    | 25543      | 521901       | 309294       | 12   | 212787
1           | 20       | 202510  | 265220     | 30589      | 1837901      | 334837       | 13   | 0
1           | 20       | 202511  | 47580      |            | 1864421      | 365426       | 14   | 0
Теперь проблема. Поскольку мне приходится использовать pandas, коду требуется целая вечность, чтобы работать с большими наборами данных. Мне нужно либо найти способ сделать это в самом Spark, либо повысить эффективность приведенного выше кода. Я рассматривал возможность векторизации вычислений, но изо всех сил пытаюсь найти эффективный способ вычисления совокупного фактического_значения для строк, где ранг > 2.
РЕДАКТИРОВАТЬ: я не могу исправить формат таблицу вывода, вот скриншот вывода:
Изображение


Подробнее здесь: https://stackoverflow.com/questions/792 ... park-noteb
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как реализовать контроль выполнения ячеек и оповещения JavaScript в записных книжках Azure Databricks?
    Anonymous » » в форуме Python
    0 Ответы
    31 Просмотры
    Последнее сообщение Anonymous
  • Чтение PDF-файла с помощью записных книжек Azure Synapse
    Гость » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Гость
  • Проблемы с подключением к Infor Data Lake с использованием Spark JDBC в Azure Synapse Spark Notebook
    Anonymous » » в форуме Python
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Использование %debug в записных книжках VSCode/Jupyter
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Как свернуть все ячейки по умолчанию в записных книжках Jupyter в коде Visual Studio?
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»