Запись файлов одновременно с другими задачами, связанными с процессором, с использованием многопроцессорной обработки илPython

Программы на Python
Ответить
Anonymous
 Запись файлов одновременно с другими задачами, связанными с процессором, с использованием многопроцессорной обработки ил

Сообщение Anonymous »

У меня есть рабочая станция с 72 ядрами (на самом деле 36 многопоточных процессоров, multiprocessing.cpu_count() показывает как 72 ядра).

Я попробовал оба варианта multiprocessing и ray для одновременной обработки пакетами из миллионов небольших файлов, и я хотел бы одновременно писать несколько выходных файлов во время этой обработки.

Меня смущает блокировка методов .get(), связанных, например, с apply_async() (в многопроцессорной обработке) и ray.get().

С помощью ray у меня есть удаленная функция (

Код: Выделить всё

process_group()
), который параллельно обрабатывает группы данных в цикле. Далее в комментариях также приводится версия кода с использованием модуля multiprocessing.

Код: Выделить всё

import ray
import pandas as pd
# from multiprocessing import Pool

ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
##-----------------------
## With ray :
df_list = ray.get([process_group.remote(data) for data in data_list])
##-----------------------
## With multiprocessing :
#f_list = pool.map(process_group, list_of_indices_into_data_list)
##
##      data are both known from the parent process
##      and I use copy-on-write semantic to avoid having 60 copies.
##      All the function needs are a list of indices
##      of where to fetch slices of the read-only data.
##
very_big_df = pd.concatenate(df_list)
##-----------------------
## Write to file :
very_big_df.to_parquet(outputfile)
Поэтому на каждой итерации цикла мне приходится собирать выходные данные многих процессовprocess_group(), которые вычислялись одновременно, в виде списка кадров данных df_list< /code> для объединения в один более крупный

Код: Выделить всё

very_big_df
[/b] кадр данных. Последний необходимо записать на диск (обычно размеры от ~1 до ~3 ГБ). Написание одного такого файла занимает около

Код: Выделить всё

10-30 [s]
[/b], пока это займет около [/b] для удаленного управления Process_group для обработки. Существуют тысячи итераций цикла. Таким образом, это займет несколько дней.

Можно ли записать файл на диск неблокирующим способом, пока цикл продолжается, чтобы сэкономить около 10% времени (что позволит сэкономить около одного дня вычислений)?

К тому времени, когда параллельные процессы следующей итерации цикла завершатся, времени будет достаточно для выходные данные предыдущей итерации, которые будут записаны.
Похоже, что все задействованные здесь ядра работают почти на 100%, поэтому модуль Threading, вероятно, также не рекомендуется. multiprocessing.apply_async() расстраивает еще больше, поскольку ему не нужен мой невыбираемый вывод

Код: Выделить всё

very_big_df
[/b] фрейм данных, которым мне пришлось бы поделиться с некоторыми более сложными деталями, что может стоить мне времени, которое я пытаюсь сэкономить, и я надеялся, что ray справится с чем-то подобным эффективно.

[ОБНОВЛЕНИЕ] Для простоты я не упомянул, что между всеми процессами существует большая общая переменная (именно поэтому я назвал ее параллельным процессом, а также одновременная запись файла). В результате мой заглавный вопрос был отредактирован.
Итак, на самом деле перед заданиями параллельного луча есть такой фрагмент кода:

Код: Выделить всё

shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])
Однако не уверен, делает ли это это скорее «параллельным» выполнением, а не просто параллельными операциями.

[ОБНОВЛЕНИЕ 2] Общий массив представляет собой таблицу поиска, т.е. доступен только для чтения для параллельных рабочих процессов.

[ОБНОВЛЕНИЕ 3] Я попробовал оба предложенных решения: Threading и Ray/compute()
Для последнего было предложено использовать функцию записи в качестве удаленно и отправить операцию записи асинхронно внутри цикла for, что, как я изначально думал, возможно только через .get(), который будет блокироваться.

Итак, в случае с Рэем показаны оба решения:

Код: Выделить всё

@ray.remote
def write_to_parquet(df_list, filename):
df = pd.concat(df_list)
df.to_parquet(filename, engine='pyarrow', compression=None)

# Share array created outside the loop, read-only (big lookup table).
# About 600 MB
shared_array_id = ray.put(shared_array)

for data_list in many_data_lists:

new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
write_to_parquet.remote(df_list, my_filename)

## Using threading, one would remove the ray decorator:
# write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
# write_thread.start()
Для решения RAY это требовало, однако для увеличения object_store_memory значения по умолчанию было недостаточно: 10% памяти узла ~ 37 ГБ (у меня 376 ГБ оперативной памяти) который затем ограничивается 20 ГБ, а единственные объекты, хранящиеся в общей сложности около 22 ГБ: один список кадров данных df_list (около 11 ГБ) и результат их объединения внутри функции записи (около 11 ГБ) тогда 11 ГБ), при условии, что во время конкатенации существует копия. Если нет, то эта проблема с памятью не имеет смысла, и мне интересно, смогу ли я передать количество представлений, что, как я думал, происходит по умолчанию. Это довольно разочаровывающий аспект RAY, поскольку я не могу точно предсказать, сколько памяти будет занимать каждый df_list, оно может варьироваться от 1x до 3x...

В конце концов, использование многопроцессорной обработки с использованием Threading оказывается наиболее эффективным решением, поскольку часть обработки (без ввода-вывода) выполняется быстрее:

Код: Выделить всё

from multiprocessing import Pool

# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)

def process_group(my_data):
# Process a new dataframe here using my_data and some other data inside shared_array
...
return my_df

n_workers = 60
with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
# data_list contains thousands of elements. I choose a chunksize of 10
df_list = pool.map(process_group, data_list, 10)
write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
write_thread.start()
На каждой итерации цикла обычно len(many_data_lists) = 7000 и каждый список содержит 7 числовых массивов размером (3, 9092). Таким образом, эти 7000 списков отправляются 60 рабочим:

время для всех параллельных групп процессов на итерацию цикла:

РЭЙ: [/b]

Многопроцессорность: [/b]

Ввод-вывод: запись файла паркета размером 5 ГБ на внешний вращающийся диск USB 3 занимает около 35 секунд. Около 10 секунд на внутреннем вращающемся диске.

Рэй: затраты ~5 секунд на создание будущего с помощью write_to_parquet.remote(), который блокирует цикл. Это все еще 50% времени, которое потребуется для записи на вращающийся диск. Это не идеально.

многопроцессорность: измеренные издержки 0 с.

общее время у стены:

Рэй: [/b]

Многопроцессорность: [/b]

Я повторял это несколько раз, различия между Ray и Multiprocessing постоянно показывает ускорение многопроцессорной обработки примерно на 50 секунд. Это существенная разница, которая также озадачивает, поскольку Рэй рекламирует более высокую эффективность.

Я проведу это большее количество итераций и отчитаюсь о стабильности (память, потенциальные проблемы со сбором мусора,...)

Подробнее здесь: https://stackoverflow.com/questions/602 ... sing-or-ra
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»