Обеспечение постоянной работы потребителя aio_pika вместе с FastAPI.Python

Программы на Python
Ответить
Гость
 Обеспечение постоянной работы потребителя aio_pika вместе с FastAPI.

Сообщение Гость »


Я написал потребительскую задачу aio_pika, которая должна работать вечно в приложении FastAPI. Эта задача является частью объекта менеджера, реализующего шаблон публикации/подписки:

из aio_pika import Connect_robust из aio_pika.abc импортировать AbstractIncomingMessage класс MyManager (объект): защита __init__(сам): self.waiter = asyncio.Future() def опубликовать(сам, значение): официант, self.waiter = self.waiter, asyncio.Future() waiter.set_result((значение, self.waiter)) асинхронная подписка по определению (самостоятельная): официант = сам.официант пока правда: значение, официант = ожидание официанта доходность __aiter__ = подписаться async def on_message(self, message: AbstractIncomingMessage) -> Нет: пытаться: асинхронно с message.process(): # выполнить любую десериализацию полученного элемента элемент = json.loads(message.body) # поделитесь записью с подписчиками self.publish(пункт) кроме исключения как e: logger.error(e, exc_info=True) асинхронный запуск (self): соединение = ожидание подключения_робуст( settings.amqp_url, цикл = asyncio.get_running_loop() ) канал = ожидание соединения.канал() my_queue = ожидайте Channel.get_queue('моя-очередь') ожидайте my_queue.consume(self.on_message) ожидайте asyncio.Future() дождитесь соединения.закрыть() Эти потребительские задачи создаются во время запуска приложения FastAPI:

my_manager = asyncio.Future() @app.on_event("запуск") асинхронная защита on_startup(): my_manager.set_result(MyManager()) задача = asyncio.create_task((ожидайте my_manager).run()) Обратите внимание, что экземпляр менеджера создается только во время on_startup, чтобы гарантировать наличие существующего цикла asyncio.

К сожалению, задача перестает работать через несколько недель/месяцев. Мне не удалось записать, какое событие вызвало это. Я не уверен, что задача выходит из строя или соединение с сервером AMQP прервалось без повторного подключения. Я даже не знаю, как/где выявить/зафиксировать проблему.

В чем может быть причина этой проблемы и как ее устранить?

В качестве дополнительного контекста менеджер используется в маршруте событий, отправленных сервером:

@router.get('/items') асинхронное определение items_stream (запрос: запрос): асинхронная защита event_publisher(): пытаться: aiter = (ожидайте my_manager).__aiter__() пока правда: задача = asyncio.create_task(aiter.__anext__()) событие = ожидание asyncio.shield(задача) выход dict (данные = событие) кроме asyncio.CancelledError как e: print(f'Отключено от клиента (через обновление/закрытие) {request.client}') поднять е вернуть EventSourceResponse(event_publisher()) Асинхронный итератор защищен во избежание описанной здесь проблемы.
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»