Почему ожидание асинхронного генератора внутри asyncio.gather приводит к недетерминированным зависаниям и как я могу выпPython

Программы на Python
Ответить
Anonymous
 Почему ожидание асинхронного генератора внутри asyncio.gather приводит к недетерминированным зависаниям и как я могу вып

Сообщение Anonymous »

Я столкнулся со странной проблемой с asyncio: использование asyncio.gather() для функций, использующих один и тот же асинхронный генератор, иногда приводит к зависанию программы на неопределенный срок. Поведение недетерминировано — иногда оно завершается мгновенно, иногда зависает навсегда.
Вот минимальный пример:

Код: Выделить всё

import asyncio

async def source():
for i in range(5):
print(f"yielding {i}")
await asyncio.sleep(0.1)
yield i

async def consumer(name, agen):
async for x in agen:
print(f"{name} got {x}")
await asyncio.sleep(0.05)
return f"{name} done"

async def main():
agen = source()

# Run two consumers concurrently on the same async generator
results = await asyncio.gather(
consumer("A", agen),
consumer("B", agen)
)

print("Results:", results)

asyncio.run(main())
Наблюдаемое поведение:
  • Иногда элементы получает только один потребитель.
  • Иногда второй потребитель ничего не получает.
  • Иногда программа зависает навсегда после получения 4.
  • Редко оба потребителя выполняются частично, но не до конца.
Я ожидал, что оба потребителя будут независимо перебирать генератор — аналогично тому, как две обычные функции могут циклически обрабатывать один и тот же список — но это явно не то, что происходит.

Мой вопросы:
  • Почему ожидание асинхронного генератора из нескольких задач приводит к недетерминированным зависаниям?
  • Какая именно часть асинхронной итерационной модели Python делает это небезопасным или неопределенным?
  • Существует ли правильный шаблон для «многоадресная рассылка» асинхронного генератора нескольким потребителям одновременно без предварительного сохранения всех результатов в памяти?
  • Безопасно ли обернуть генератор чем-то вроде asyncio.Queue или это приведет к проблемам с противодавлением/упорядочением?
  • Какое решение с наименьшими накладными расходами сохраняет поведение потоковой передачи и позволяет избежать тупиковые ситуации?


Подробнее здесь: https://stackoverflow.com/questions/798 ... terministi
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»