Подходы и проблемы с массовой одновременной обработкой с использованием concurrent.futures и/или asyncioPython

Программы на Python
Ответить
Anonymous
 Подходы и проблемы с массовой одновременной обработкой с использованием concurrent.futures и/или asyncio

Сообщение Anonymous »

Я хочу запускать миллиарды задач, связанных с вводом-выводом, в пуле из тысяч потоков. Вот мои задачи:

[*]Уменьшить использование памяти. Пул потоков из concurrent.futures использует несвязанную очередь. Слишком много сообщений и всплывающих сообщений об использовании памяти, но слишком мало и недостаточно работы.
[*]Соберите результаты. Ни пул, ни его контекстный менеджер не хранят завершенные фьючерсы, поэтому сбор результатов не требуется для уменьшения использования памяти. Мне бы хотелось иметь возможность собирать результаты, если они мне когда-нибудь понадобятся.
[*]Обработка исключений. Я хотел бы, чтобы исключения Python в любой из задач распространялись на основной поток.
[*]Включите asyncio через цикл.run_in_executor()
< /ul>
Каждое будущее из concurrent.futures может иметь add_done_callback, который собирает результат и обрабатывает любые исключения. Исключения, возникающие в этих обратных вызовах, игнорируются, но поскольку обратные вызовы выполняются из основного потока, исключения могут быть сохранены как глобальная переменная и повторно вызваны из цикла отправки. Это не помогает уменьшить использование памяти, но я не понял, как это сделать даже с помощью asyncio (см. ниже).
Чтобы включить asyncio, я создаю потребительскую задачу и Задача продюсера. Производитель отправляет вызываемые объекты исполнителю цикла в синхронном цикле, помещая полученные фьючерсы в несвязанную очередь.
Потребитель поддерживает набор ожидающих фьючерсов, ожидая первого результата из любого из этих наборов. или очередь. Он ожидает результата asyncio.wait() как для очереди, так и для второй сопрограммы asyncio.wait() для набора ожидающих фьючерсов. Если результат приходит из очереди, он добавляет его в набор и возобновляет ожидание. Если результатом является завершенное будущее, он обрабатывает результаты и обрабатывает исключения, прежде чем возобновить ожидание. Задача будет завершена, когда очередь отключится.
Не слишком ли я усложняю задачу? Есть ли более простой подход?
Я не нашел аккуратного способа уменьшить использование памяти. Лучшее, что я могу придумать, это проверить _work_queue.qsize() пула потоков и, если он превысит пороговое значение, просто дождаться asyncio.sleep(). Неужели должен быть лучший способ?

Подробнее здесь: https://stackoverflow.com/questions/792 ... current-fu
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»