Я попробовал оба варианта multiprocessing и ray для одновременной обработки пакетами из миллионов небольших файлов, и я хотел бы одновременно писать несколько выходных файлов во время этой обработки.
Меня смущает блокировка методов .get(), связанных, например, с apply_async() (в многопроцессорной обработке) и ray.get().
С помощью ray у меня есть удаленная функция (
Код: Выделить всё
process_group()Код: Выделить всё
import ray
import pandas as pd
# from multiprocessing import Pool
ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
##-----------------------
## With ray :
df_list = ray.get([process_group.remote(data) for data in data_list])
##-----------------------
## With multiprocessing :
#f_list = pool.map(process_group, list_of_indices_into_data_list)
##
## data are both known from the parent process
## and I use copy-on-write semantic to avoid having 60 copies.
## All the function needs are a list of indices
## of where to fetch slices of the read-only data.
##
very_big_df = pd.concatenate(df_list)
##-----------------------
## Write to file :
very_big_df.to_parquet(outputfile)
Код: Выделить всё
very_big_dfКод: Выделить всё
10-30 [s]Код: Выделить всё
180 [s]Можно ли записать файл на диск неблокирующим способом, пока цикл продолжается, чтобы сэкономить около 10% времени (что позволит сэкономить около одного дня вычислений)?
К тому времени, когда параллельные процессы следующей итерации цикла завершатся, времени будет достаточно для выходные данные предыдущей итерации, которые будут записаны.
Похоже, что все задействованные здесь ядра работают почти на 100%, поэтому модуль Threading, вероятно, также не рекомендуется. multiprocessing.apply_async() расстраивает еще больше, поскольку ему не нужен мой невыбираемый вывод
Код: Выделить всё
very_big_df[ОБНОВЛЕНИЕ] Для простоты я не упомянул, что между всеми процессами существует большая общая переменная (именно поэтому я назвал ее параллельным процессом, а также одновременная запись файла). В результате мой заглавный вопрос был отредактирован.
Итак, на самом деле перед заданиями параллельного луча есть такой фрагмент кода:
Код: Выделить всё
shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])
[ОБНОВЛЕНИЕ 2] Общий массив представляет собой таблицу поиска, т.е. доступен только для чтения для параллельных рабочих процессов.
[ОБНОВЛЕНИЕ 3] Я попробовал оба предложенных решения: Threading и Ray/compute()
Для последнего было предложено использовать функцию записи в качестве удаленно и отправить операцию записи асинхронно внутри цикла for, что, как я изначально думал, возможно только через .get(), который будет блокироваться.
Итак, в случае с Рэем показаны оба решения:
Код: Выделить всё
@ray.remote
def write_to_parquet(df_list, filename):
df = pd.concat(df_list)
df.to_parquet(filename, engine='pyarrow', compression=None)
# Share array created outside the loop, read-only (big lookup table).
# About 600 MB
shared_array_id = ray.put(shared_array)
for data_list in many_data_lists:
new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
write_to_parquet.remote(df_list, my_filename)
## Using threading, one would remove the ray decorator:
# write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
# write_thread.start()
В конце концов, использование многопроцессорной обработки с использованием Threading оказывается наиболее эффективным решением, поскольку часть обработки (без ввода-вывода) выполняется быстрее:
Код: Выделить всё
from multiprocessing import Pool
# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)
def process_group(my_data):
# Process a new dataframe here using my_data and some other data inside shared_array
...
return my_df
n_workers = 60
with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
# data_list contains thousands of elements. I choose a chunksize of 10
df_list = pool.map(process_group, data_list, 10)
write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
write_thread.start()
время для всех параллельных групп процессов на итерацию цикла:
РЭЙ:
Код: Выделить всё
250 [s]Многопроцессорность:
Код: Выделить всё
233 [s]Ввод-вывод: запись файла паркета размером 5 ГБ на внешний вращающийся диск USB 3 занимает около 35 секунд. Около 10 секунд на внутреннем вращающемся диске.
Рэй: затраты ~5 секунд на создание будущего с помощью write_to_parquet.remote(), который блокирует цикл. Это все еще 50% времени, которое потребуется для записи на вращающийся диск. Это не идеально.
многопроцессорность: измеренные издержки 0 с.
общее время у стены:
Рэй:
Код: Выделить всё
486 [s]Многопроцессорность:
Код: Выделить всё
436 [s]Я повторял это несколько раз, различия между Ray и Multiprocessing постоянно показывает ускорение многопроцессорной обработки примерно на 50 секунд. Это существенная разница, которая также озадачивает, поскольку Рэй рекламирует более высокую эффективность.
Я проведу это большее количество итераций и отчитаюсь о стабильности (память, потенциальные проблемы со сбором мусора,...)
Подробнее здесь: https://stackoverflow.com/questions/602 ... sing-or-ra
Мобильная версия