Планировщик (тот же, что и клиент)
Два рабочих узла
У меня есть шесть разделенных паркетом каталогов, каждый из которых содержит четыре разделенных файла. Выходные данные состоят из 25 строк и 87 000 столбцов. Каждая входная папка содержит 1–10 строк и от 10 000 до 20 000 столбцов, при этом каждый каталог имеет размер менее 1 ГБ.
Ниже приведен код, который я выполнил, и журналы. Не могли бы вы подсказать, что я делаю неправильно и как я могу сократить время, ресурсы и количество необходимых шагов?
Журналы:
Предупреждение пользователя: отправка большого графика размером 11,14 МБ. .
Это может привести к некоторому замедлению работы.
Рассмотрите возможность загрузки данных напрямую с помощью Dask
или использования фьючерсов или отложенных объектов для встраивания данных в график без повторения.
См. также https://docs.dask.org/en/stable/best-pr ... -with-dask для получения дополнительной информации.
warnings.warn(
client.py
Это может вызвать некоторое замедление работы.
Рассмотрите возможность загрузки данных напрямую с помощью Dask
или использования фьючерсов или отложенных объектов. чтобы встроить данные в график без повторения.
Для получения дополнительной информации см. также https://docs.dask.org/en/stable/best-pr ... -with-dask.
warnings.warn(
Длительность в конце 803,7652008533478 секунд
запись в файл завершена
Код: Выделить всё
import dask.dataframe as dd
from dask.distributed import Client
import sys
import os
import glob
import time
# Start the timer
start_time = time.time()
# Connect to the Dask distributed cluster
client = Client('IP:8786') # Replace with your scheduler address and port
dirs = sys.argv[1]
directory = dirs
# Use a wildcard pattern to get all Parquet file paths in the directory
parquet_files = glob.glob(os.path.join(directory, '*.parquet'))
print(parquet_files)
df_list = [dd.read_parquet(file) for file in parquet_files]
df_list = [df.set_index('ID') for df in df_list]
df_list = [df.persist() for df in df_list]
concatenated_df = dd.concat(df_list, axis=1)
output_path = 'output.parquet'
# Write the DataFrame to Parquet files in parallel
try:
concatenated_df.to_parquet(output_path, write_index=True)
except Exception as e:
print(f"Error writing Parquet files: {e}")
raise
# Print duration and close client
end_time = time.time()
print(f"Duration at the end {end_time-start_time} seconds")
print("completed writing to file")
client.close()
Быстрее запускайте код, используйте все необходимые ресурсы и сокращайте количество выполняемых задач.< /п>
Подробнее здесь: https://stackoverflow.com/questions/792 ... ame-axis-1
Мобильная версия