У меня есть очень простой игрушечный пример, в котором используются Anthropic и asyncio.
Это цикл, который несколько раз запускает функцию asyncio.run().
Иногда возникает ошибка RuntimeError: Event цикл закрыт, во время итераций возникает исключение, и я не понимаю, почему.
Думаю, я неправильно использую asyncio, но не могу понять, где.
Кажется, код работает нормально хотя.
import asyncio
from time import perf_counter
import numpy as np
from anthropic import Anthropic, AsyncAnthropic
async def coroutine1():
prompt = "Who discovered gravity. Answer in 10 words."
client = AsyncAnthropic()
message = await client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return message.content[0].text
async def coroutine2():
prompt = "Who discovered radioactivity. Answer in 10 words."
client = AsyncAnthropic()
message = await client.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return message.content[0].text
async def main():
async with asyncio.TaskGroup() as tg:
t0 = tg.create_task(coroutine1())
t1 = tg.create_task(coroutine2())
print(t0.result())
print(t1.result())
if __name__ == "__main__":
for i in range(10):
tic = perf_counter()
asyncio.run(main())
toc = perf_counter()
print(f"Elapsed time = {toc - tic:.3f}")
Подробнее здесь: https://stackoverflow.com/questions/791 ... -is-closed
Asyncio вызывает ошибку «RuntimeError: цикл событий закрыт» ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Asyncio.run () Raise "RuntimeError: цикл событий закрыт" в ноутбуке Юпитера
Anonymous » » в форуме Python - 0 Ответы
- 6 Просмотры
-
Последнее сообщение Anonymous
-