Я использую Dask для настройки кластера. Сейчас я настраиваю и планировщик, и воркеры на локальном хосте.
cluster = SSHCluster(["localhost", "localhost"], Connect_options = {"known_hosts": Нет}, worker_options={"n_workers": params["n_workers"], }, Scheduler_options = {"port": 0, "dashboard_address": ":8797"},) клиент = Клиент(кластер) Есть ли способ создать глобальные параметры с сохранением состояния, которые можно инициализировать на стороне рабочих процессов и использовать любые методы worker_methods, которые впоследствии назначаются для вычисления на рабочих процессах?
Я нашел метод client.register_worker_plugin.
def read_only_data(self, jsonfilepath): с open(jsonfilepath, "r") в качестве файла чтения: вернуть json.load(read_file) защита основной(): кластер = SSHCluster(params) # упрощенно клиент = Клиент(кластер) плагин = read_only_data (jsonfilepath) client.register_worker_plugin(плагин, имя="данные только для чтения") Однако ReadOnlyData инициализируется на стороне клиента, следовательно, self.persons и self.persons_len копируются в рабочие процессы (а не инициализируются на стороне рабочих). Хотя это может быть полезно для небольших данных, если набор данных огромен, это повлечет за собой дополнительные накладные расходы на связь при копировании (если только я что-то концептуально не упускаю).
Предположим, ReadOnlyData и файл в «jsonfilepath» были доступны на рабочей стороне. Мы могли бы вызвать его из «worker_method_1» и «worker_method_2», которые имеют произвольную логику; однако это означает, что его придется вызывать каждый раз при вызове рабочих процессов. Есть ли какое-то событие/метод «инициализации», которое происходит на стороне рабочих, сразу после создания рабочих и перед назначением рабочих методов, которое позволило бы нам предварительно загружать некоторые структуры данных в качестве глобальных параметров с отслеживанием состояния, обычно разделяемых среди последующих экземпляров рабочих методов?
Обновить
После того, как вы попробовали предложения @mdurant с файлом JSON размером 280 МБ, код зависал в client.rescribe() более чем на час. Загрузка файла за один процесс без Dask занимает менее 20 секунд. В дашборде рабочие все используют примерно 2 Гб, а память прибавляется. Также записывается сетевая активность.

Внезапно скрипт аварийно завершился с ошибкой:
distributed.comm.core.CommClosedError: в : TimeoutError: [Errno 10] Время соединения истекло
Использование памяти чрезмерно. Единственная память, которая у меня есть, — 280 МБ для JSON. 280 МБ x 6 рабочих должно составлять ок. 1,7гб, и уж точно не по 2гб на каждого рабочего.
Я подозреваю, что JSON копируется всем воркерам. В документации Dask также указано, что данные копируются в рабочие процессы:
реплицировать -> Установить репликацию фьючерсов внутри сети. Скопируйте данные на множество рабочих. Это помогает транслировать часто используемые данные и может повысить устойчивость. При этом выполняется копирование дерева данных по всей сети индивидуально для каждого фрагмента данных. Эта операция блокируется до завершения. Это не гарантирует репликацию данных будущим работникам.
Однако это все же не объясняет чрезмерное использование памяти и то, почему Dask не удается скопировать 280 МБ 6 воркерам менее чем за 1 час.
Мобильная версия