Мы работаем над задачей параллелизма, где один объект «выборки» имеет несколько зависимых задач, которые должны выполняться поэтапно. Например, этап 1 имеет задачи (1a, 1b), этап 2 имеет задачи (2a, 2b) и т. Д. Каждый этап может начинаться только после того, как все задачи на предыдущем этапе завершены. закончили-т. Е. Если 1a и 1b помечены как завершенные, то мы запускаем этап 2. Однако в контексте многопроцессов эти ссылки забираются и передаются каждому работнику. Это означает, что каждая задача работает на копии образца, а не на общей изменяемой ссылке. После завершения задач у нас остается несколько копий, состояние которого мы должны примириться вручную. < /P>
Я хотел бы знать: < /p>
[*] Лучшие практики для оркестровки зависимых задач, так что, когда все задачи
на стадии 1 завершены, я могу начать стадию 2, не теряя
rack of the сделано. < /li>
Как избежать проблемы «потерянной изменчивости», где каждый процесс изменяет копию, и мне нужно их слияние. Существуют ли рекомендуемые шаблоны или структуры данных (например, Multiprocessing.Manager или какая -то форма общей памяти), которые упрощают это
< /li>
Как обрабатывать сценарий, где каждая задача изменяет то же самое
Пример объекта, но мы хотим только окончательные, агрегированные результаты в одном
place.
< /ul>
ниже приведен упрощенный пример кода. В реальном коде каждая задача изменяет внутренние данные выборки, но как только мы используем ProcessPoolexeCutor, ссылки на образцы объекта становятся отключенными копиями. < /P>
import concurrent.futures
class Sample:
def __init__(self, sample_id):
self.sample_id = sample_id
# For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...}
self.stage_completion = {
'1': [False, False],
'2': [False, False],
'3': [False, False]
}
def do_task(self, stage, sub_idx):
# Do some work here
print(f"Doing {stage}{sub_idx} for sample {self.sample_id}")
self.stage_completion[stage][sub_idx] = True
return self # Return self for convenience
def run_task(sample_obj, stage, sub_idx):
return sample_obj.do_task(stage, sub_idx)
def main():
sample = Sample(sample_id=123)
with concurrent.futures.ProcessPoolExecutor() as executor:
# Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1])
future1 = executor.submit(run_task, sample, '1', 0)
future2 = executor.submit(run_task, sample, '1', 1)
# Wait for them to finish
result1 = future1.result()
result2 = future2.result()
# Now I'd like to check if stage 1 is fully done before scheduling stage 2
# But result1 and result2 are separate copies with their own state
# This is where merging states or having a centralized tracking is tricky
print("Stage 1 results from result1:", result1.stage_completion)
print("Stage 1 results from result2:", result2.stage_completion)
if __name__ == "__main__":
main()
< /code>
Как вы можете видеть, каждый возвращенный образцо -объект может иметь частичное представление об общем состоянии. Я бы предпочел решение, которое держит их в синхронизации или легко объединяет их, не обращаясь к руководству по написанию «Функций слияния» для каждой внутренней структуры данных. В Python для управления (и в конечном итоге примирение) изменяемого состояния по параллельным задачам, чтобы я мог координировать зависимые задачи, не теряя единого состояния общего объекта? Советы, примеры с использованием многопроцессорной, одновременной. Обработки или более подходящую библиотеку, будут очень оценены. Все вызовы этой базы данных могут сделать это медленным ...
Подробнее здесь: https://stackoverflow.com/questions/793 ... tasks-when
Как я могу сохранить и примирить изменяемое состояние объектного объекта через параллельные задачи, когда мариновано нар ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение