Объединение Dask по нескольким осям данных = 1Python

Программы на Python
Ответить
Anonymous
 Объединение Dask по нескольким осям данных = 1

Сообщение Anonymous »

Я новичок в Dask. Пытаясь запустить concat для списка DataFrames, я заметил, что он требует больше времени, ресурсов и задач, чем ожидалось. Вот подробности моего запуска:
Планировщик (тот же, что и клиент)
Два рабочих узла
У меня есть шесть разделенных паркетом каталогов, каждый из которых содержит четыре разделенных файла. Выходные данные состоят из 25 строк и 87 000 столбцов. Каждая входная папка содержит 1–10 строк и от 10 000 до 20 000 столбцов, при этом каждый каталог имеет размер менее 1 ГБ.
Ниже приведен код, который я выполнил, и журналы. Не могли бы вы подсказать, что я делаю неправильно и как я могу сократить время, ресурсы и количество необходимых шагов?
Журналы:
Предупреждение пользователя: отправка большого графика размером 11,14 МБ. .
Это может привести к некоторому замедлению работы.
Рассмотрите возможность загрузки данных напрямую с помощью Dask
или использования фьючерсов или отложенных объектов для встраивания данных в график без повторения.
См. также https://docs.dask.org/en/stable/best-pr ... -with-dask для получения дополнительной информации.
warnings.warn(
client.py:3371: UserWarning : Отправка большого графика размером 10,33 МБ.
Это может вызвать некоторое замедление работы.
Рассмотрите возможность загрузки данных напрямую с помощью Dask
или использования фьючерсов или отложенных объектов. чтобы встроить данные в график без повторения.
Для получения дополнительной информации см. также https://docs.dask.org/en/stable/best-pr ... -with-dask.
warnings.warn(
Длительность в конце 803,7652008533478 секунд
запись в файл завершена

Код: Выделить всё

import dask.dataframe as dd
from dask.distributed import Client
import sys
import os
import glob
import time

# Start the timer
start_time = time.time()

# Connect to the Dask distributed cluster
client = Client('IP:8786')  # Replace with your scheduler address and port

dirs = sys.argv[1]
directory = dirs

# Use a wildcard pattern to get all Parquet file paths in the directory
parquet_files = glob.glob(os.path.join(directory, '*.parquet'))
print(parquet_files)

df_list = [dd.read_parquet(file) for file in parquet_files]

df_list = [df.set_index('ID') for df in df_list]

df_list = [df.persist() for df in df_list]

concatenated_df = dd.concat(df_list, axis=1)

output_path = 'output.parquet'

# Write the DataFrame to Parquet files in parallel
try:
concatenated_df.to_parquet(output_path, write_index=True)
except Exception as e:
print(f"Error writing Parquet files: {e}")
raise

# Print duration and close client
end_time = time.time()
print(f"Duration at the end {end_time-start_time} seconds")
print("completed writing to file")

client.close()

Примечание. Я начинаю с небольших DataFrames и понимаю, что использовать pandas для них будет быстрее и проще. Однако я стремлюсь расширить это решение для обработки очень больших DataFrames, поэтому сначала я тестирую небольшие DataFrames.
Быстрее запускайте код, используйте все необходимые ресурсы и сокращайте количество выполняемых задач.< /п>

Подробнее здесь: https://stackoverflow.com/questions/792 ... ame-axis-1
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»