Я хочу запускать миллиарды задач, связанных с вводом-выводом, в пуле из тысяч потоков. Вот мои задачи:
[*]Уменьшить использование памяти. Пул потоков из concurrent.futures использует несвязанную очередь. Слишком много сообщений и всплывающих сообщений об использовании памяти, но слишком мало и недостаточно работы.
[*]Соберите результаты. Ни пул, ни его контекстный менеджер не хранят завершенные фьючерсы, поэтому сбор результатов не требуется для уменьшения использования памяти. Мне бы хотелось иметь возможность собирать результаты, если они мне когда-нибудь понадобятся.
[*]Обработка исключений. Я хотел бы, чтобы исключения Python в любой из задач распространялись на основной поток.
[*]Включите asyncio через цикл.run_in_executor()
< /ul>
Каждое будущее из concurrent.futures может иметь add_done_callback, который собирает результат и обрабатывает любые исключения. Исключения, возникающие в этих обратных вызовах, игнорируются, но поскольку обратные вызовы выполняются из основного потока, исключения могут быть сохранены как глобальная переменная и повторно вызваны из цикла отправки. Это не помогает уменьшить использование памяти, но я не понял, как это сделать даже с помощью asyncio (см. ниже).
Чтобы включить asyncio, я создаю потребительскую задачу и Задача продюсера. Производитель отправляет вызываемые объекты исполнителю цикла в синхронном цикле, помещая полученные фьючерсы в несвязанную очередь.
Потребитель поддерживает набор ожидающих фьючерсов, ожидая первого результата из любого из этих наборов. или очередь. Он ожидает результата asyncio.wait() как для очереди, так и для второй сопрограммы asyncio.wait() для набора ожидающих фьючерсов. Если результат приходит из очереди, он добавляет его в набор и возобновляет ожидание. Если результатом является завершенное будущее, он обрабатывает результаты и обрабатывает исключения, прежде чем возобновить ожидание. Задача будет завершена, когда очередь отключится.
Не слишком ли я усложняю задачу? Есть ли более простой подход?
Я не нашел аккуратного способа уменьшить использование памяти. Лучшее, что я могу придумать, это проверить _work_queue.qsize() пула потоков и, если он превысит пороговое значение, просто дождаться asyncio.sleep(). Неужели должен быть лучший способ?
Подробнее здесь: https://stackoverflow.com/questions/792 ... current-fu
Подходы и проблемы с массовой одновременной обработкой с использованием concurrent.futures и/или asyncio ⇐ Python
Программы на Python
1734021008
Anonymous
Я хочу запускать миллиарды задач, связанных с вводом-выводом, в пуле из тысяч потоков. Вот мои задачи:
[*]Уменьшить использование памяти. Пул потоков из concurrent.futures использует несвязанную очередь. Слишком много сообщений и всплывающих сообщений об использовании памяти, но слишком мало и недостаточно работы.
[*]Соберите результаты. Ни пул, ни его контекстный менеджер не хранят завершенные фьючерсы, поэтому сбор результатов не требуется для уменьшения использования памяти. Мне бы хотелось иметь возможность собирать результаты, если они мне когда-нибудь понадобятся.
[*]Обработка исключений. Я хотел бы, чтобы исключения Python в любой из задач распространялись на основной поток.
[*]Включите asyncio через цикл.run_in_executor()
< /ul>
Каждое будущее из concurrent.futures может иметь add_done_callback, который собирает результат и обрабатывает любые исключения. Исключения, возникающие в этих обратных вызовах, игнорируются, но поскольку обратные вызовы выполняются из основного потока, исключения могут быть сохранены как глобальная переменная и повторно вызваны из цикла отправки. Это не помогает уменьшить использование памяти, но я не понял, как это сделать даже с помощью asyncio (см. ниже).
Чтобы включить asyncio, я создаю потребительскую задачу и Задача продюсера. Производитель отправляет вызываемые объекты исполнителю цикла в синхронном цикле, помещая полученные фьючерсы в несвязанную очередь.
Потребитель поддерживает набор ожидающих фьючерсов, ожидая первого результата из любого из этих наборов. или очередь. Он ожидает результата asyncio.wait() как для очереди, так и для второй сопрограммы asyncio.wait() для набора ожидающих фьючерсов. Если результат приходит из очереди, он добавляет его в набор и возобновляет ожидание. Если результатом является завершенное будущее, он обрабатывает результаты и обрабатывает исключения, прежде чем возобновить ожидание. Задача будет завершена, когда очередь отключится.
Не слишком ли я усложняю задачу? Есть ли более простой подход?
Я не нашел аккуратного способа уменьшить использование памяти. Лучшее, что я могу придумать, это проверить _work_queue.qsize() пула потоков и, если он превысит пороговое значение, просто дождаться asyncio.sleep(). Неужели должен быть лучший способ?
Подробнее здесь: [url]https://stackoverflow.com/questions/79275864/approaches-and-problems-with-massively-concurrent-processing-using-concurrent-fu[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия