Вот минимальный пример:
Код: Выделить всё
import asyncio
async def source():
for i in range(5):
print(f"yielding {i}")
await asyncio.sleep(0.1)
yield i
async def consumer(name, agen):
async for x in agen:
print(f"{name} got {x}")
await asyncio.sleep(0.05)
return f"{name} done"
async def main():
agen = source()
# Run two consumers concurrently on the same async generator
results = await asyncio.gather(
consumer("A", agen),
consumer("B", agen)
)
print("Results:", results)
asyncio.run(main())
- Иногда элементы получает только один потребитель.
- Иногда второй потребитель ничего не получает.
- Иногда программа зависает навсегда после получения 4.
- Редко оба потребителя выполняются частично, но не до конца.
Мой вопросы:
- Почему ожидание асинхронного генератора из нескольких задач приводит к недетерминированным зависаниям?
- Какая именно часть асинхронной итерационной модели Python делает это небезопасным или неопределенным?
- Существует ли правильный шаблон для «многоадресная рассылка» асинхронного генератора нескольким потребителям одновременно без предварительного сохранения всех результатов в памяти?
- Безопасно ли обернуть генератор чем-то вроде asyncio.Queue или это приведет к проблемам с противодавлением/упорядочением?
- Какое решение с наименьшими накладными расходами сохраняет поведение потоковой передачи и позволяет избежать тупиковые ситуации?
Подробнее здесь: https://stackoverflow.com/questions/798 ... terministi
Мобильная версия