У меня есть большой набор данных в классе с именем ModelData. . Этот набор данных используется примерно 20 поставщиками услуг. Основная цель — выполнение различных операций над данными для сбора информации. Поставщики устроены таким образом, что они используют данные других поставщиков, поэтому они также взаимодействуют друг с другом.
Проблема сейчас в том, что инициализация поставщиков и сбор данных занимают очень долго. Я хочу оптимизировать этот процесс.
Я создал дерево зависимостей, чтобы определить порядок инициализации поставщиков. Моя идея заключалась в том, чтобы использовать модуль многопроцессорности Python для одновременной инициализации нескольких провайдеров. Я прочитал несколько статей о multiprocessing.Manager Python для обмена данными между несколькими процессами. Затем я придумал решение, которое прикреплю сюда. Я пытался использовать Events для его оптимизации, но теперь это занимает больше времени, чем при инициализации всех провайдеров по одному.
Это файл Main.py
Код: Выделить всё
import sys
from multiprocessing.managers import BaseManager
def generate_random_data(rows, cols):
return pd.DataFrame(np.random.rand(rows, cols), columns=[f'col_{i}' for i in range(cols)])
def main():
input_data = {
f'dataframe_{i}': generate_random_data(3000, 3000) # 1000 rows and 50 columns for each DataFrame
for i in range(20)
}
BaseManager.register("ModelData", ModelData)
manager = BaseManager()
manager.start()
model_data = manager.ModelData(input_data)
print(f"Size: {sys.getsizeof(model_data)}")
provider_collection = ProviderCollection(model_data)
provider_collection.initialize_providers()
if __name__ == "__main__":
main()
Код: Выделить всё
from typing import Any
from pandas import DataFrame
import random
class ModelData:
def __init__(self, input_data: dict[str, DataFrame]):
self.data: dict[str, DataFrame] = input_data
def get_len(self):
return len(self.data)
def get_random_data(self, num_items) -> dict[Any, DataFrame]:
keys = list(self.data.keys())
random_keys = random.sample(keys, num_items)
return {key: self.data[key] for key in random_keys}
Код: Выделить всё
import string
import time
from collections import defaultdict
from ModelData import ModelData
from typing import List
from multiprocessing import Event
from pandas import DataFrame
class TestProvider:
def __init__(self, model_data: ModelData,name: string, dependencys: List['TestProvider'] = None):
self.__input_data: ModelData = model_data
self.dependencys = dependencys if dependencys is not None else []
self.is_initialized = Event()
self.name = name
self.some_values: dict[str, DataFrame] = defaultdict(DataFrame)
def initialize(self) -> None:
print(f"Waiting for {self.name} to be initialized...")
for dependency in self.dependencys:
dependency.wait_for_initialization()
dependency.do_something()
print(f"Starting: {self.name}")
self.is_initialized.set()
def wait_for_initialization(self):
return self.is_initialized.wait()
def do_something(self):
for key, value in self.__input_data.get_random_data(20).items():
self.some_values[key] = value
for dependency in self.dependencys:
dependency.do_something()
Код: Выделить всё
from ModelData import ModelData
from TestProvider import TestProvider
from TestProvider2 import TestProvider2
from multiprocessing import Process
from multiprocessing.managers import BaseManager
class ProviderCollection:
def __init__(self, model_data: ModelData):
self.providers = []
BaseManager.register("TestProvider", TestProvider)
manager = BaseManager()
manager.start()
provider1 = TestProvider(model_data, "1")
self.providers.append(provider1)
provider2 = TestProvider(model_data,"2", dependencys=[provider1])
self.providers.append(provider2)
provider5 = TestProvider(model_data, "5", dependencys=[provider1, provider2])
self.providers.append(provider5)
def initialize_providers(self):
processes = []
for provider in self.get_providers():
process = Process(target=provider.initialize)
processes.append(process)
process.start()
print("Processes Started")
for process in processes:
process.join()
def get_providers(self):
return self.providers
Подробнее здесь: https://stackoverflow.com/questions/792 ... -processes