Как отменить все задачи в TaskGroupPython

Программы на Python
Ответить
Anonymous
 Как отменить все задачи в TaskGroup

Сообщение Anonymous »

Код: Выделить всё

import asyncio
import random

task_group: asyncio.TaskGroup | None = None

async def coro1():
while True:
await asyncio.sleep(1)
print("coro1")

async def coro2():
while True:
await asyncio.sleep(1)
if random.random() < 0.1:
print("dead")
assert task_group is not None
task_group.cancel() # This function does not exist.
else:
print("Survived another second")

async def main():
global task_group
async with asyncio.TaskGroup() as tg:
task_group = tg
tg.create_task(coro1())
tg.create_task(coro2())
task_group = None

asyncio.run(main())
В этом примере coro1 будет печатать «coro1» каждую секунду, coro2 имеет 10% шанс отменить всю TaskGroup, т. е. отменить как coro1, так и coro2 и выйти из блока async with каждую секунду.
Проблема в том, что я не знаю, как отменить группу задач. Функции TaskGroup.cancel() нет.

Подробнее здесь: https://stackoverflow.com/questions/762 ... -taskgroup
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»