Как эффективно использовать диспетчер процессов Python с несколькими процессами?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как эффективно использовать диспетчер процессов Python с несколькими процессами?

Сообщение Anonymous »

Моя проблема — часть очень большого и сложного проекта. Я не хочу раскрывать здесь все подробности, так как это слишком много, но я хочу описать основные части в примере программы.
У меня есть большой набор данных в классе с именем ModelData. . Этот набор данных используется примерно 20 поставщиками услуг. Основная цель — выполнение различных операций над данными для сбора информации. Поставщики устроены таким образом, что они используют данные других поставщиков, поэтому они также взаимодействуют друг с другом.
Проблема сейчас в том, что инициализация поставщиков и сбор данных занимают очень долго. Я хочу оптимизировать этот процесс.
Я создал дерево зависимостей, чтобы определить порядок инициализации поставщиков. Моя идея заключалась в том, чтобы использовать модуль многопроцессорности Python для одновременной инициализации нескольких провайдеров. Я прочитал несколько статей о multiprocessing.Manager Python для обмена данными между несколькими процессами. Затем я придумал решение, которое прикреплю сюда. Я пытался использовать Events для его оптимизации, но теперь это занимает больше времени, чем при инициализации всех провайдеров по одному.
Это файл Main.py

Код: Выделить всё

import sys
from multiprocessing.managers import BaseManager

def generate_random_data(rows, cols):
return pd.DataFrame(np.random.rand(rows, cols), columns=[f'col_{i}' for i in range(cols)])

def main():
input_data = {
f'dataframe_{i}': generate_random_data(3000, 3000)  # 1000 rows and 50 columns for each DataFrame
for i in range(20)
}

BaseManager.register("ModelData", ModelData)
manager = BaseManager()
manager.start()
model_data = manager.ModelData(input_data)

print(f"Size: {sys.getsizeof(model_data)}")
provider_collection = ProviderCollection(model_data)

provider_collection.initialize_providers()

if __name__ == "__main__":
main()
Это ModelData.py

Код: Выделить всё

from typing import  Any
from pandas import DataFrame
import random

class ModelData:
def __init__(self, input_data: dict[str, DataFrame]):
self.data: dict[str, DataFrame] = input_data

def get_len(self):
return len(self.data)

def get_random_data(self, num_items) -> dict[Any, DataFrame]:
keys = list(self.data.keys())
random_keys = random.sample(keys, num_items)
return {key: self.data[key] for key in random_keys}
Это TestProvider.py

Код: Выделить всё

import string
import time
from collections import defaultdict

from ModelData import ModelData
from typing import List
from multiprocessing import Event

from pandas import DataFrame

class TestProvider:
def __init__(self, model_data: ModelData,name: string, dependencys: List['TestProvider'] = None):
self.__input_data: ModelData = model_data
self.dependencys = dependencys if dependencys is not None else []
self.is_initialized = Event()
self.name = name
self.some_values: dict[str, DataFrame] = defaultdict(DataFrame)

def initialize(self) -> None:
print(f"Waiting for {self.name} to be initialized...")
for dependency in self.dependencys:
dependency.wait_for_initialization()
dependency.do_something()

print(f"Starting: {self.name}")
self.is_initialized.set()

def wait_for_initialization(self):
return self.is_initialized.wait()

def do_something(self):
for key, value in self.__input_data.get_random_data(20).items():
self.some_values[key] = value
for dependency in self.dependencys:
dependency.do_something()
Это ProviderCollection.py

Код: Выделить всё

from ModelData import ModelData
from TestProvider import TestProvider
from TestProvider2 import TestProvider2
from multiprocessing import Process
from multiprocessing.managers import BaseManager

class ProviderCollection:
def __init__(self, model_data: ModelData):
self.providers = []
BaseManager.register("TestProvider", TestProvider)
manager = BaseManager()
manager.start()

provider1 = TestProvider(model_data, "1")
self.providers.append(provider1)

provider2 = TestProvider(model_data,"2", dependencys=[provider1])
self.providers.append(provider2)

provider5 = TestProvider(model_data, "5", dependencys=[provider1, provider2])
self.providers.append(provider5)

def initialize_providers(self):
processes = []
for provider in self.get_providers():
process = Process(target=provider.initialize)
processes.append(process)
process.start()

print("Processes Started")
for process in processes:
process.join()

def get_providers(self):
return self.providers
Я думаю, проблема в том, что между разными провайдерами и, следовательно, между разными процессами происходит слишком много вызовов. Как я могу обойти это? Есть ли у кого-нибудь лучшая идея решить эту проблему? Я с нетерпением жду любых ваших предложений.

Подробнее здесь: https://stackoverflow.com/questions/792 ... -processes
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»