Обеспечение постоянной работы потребителя aio_pika вместе с FastAPI. ⇐ Python
-
Гость
Обеспечение постоянной работы потребителя aio_pika вместе с FastAPI.
Я написал потребительскую задачу aio_pika, которая должна работать вечно в приложении FastAPI. Эта задача является частью объекта менеджера, реализующего шаблон публикации/подписки:
из aio_pika import Connect_robust из aio_pika.abc импортировать AbstractIncomingMessage класс MyManager (объект): защита __init__(сам): self.waiter = asyncio.Future() def опубликовать(сам, значение): официант, self.waiter = self.waiter, asyncio.Future() waiter.set_result((значение, self.waiter)) асинхронная подписка по определению (самостоятельная): официант = сам.официант пока правда: значение, официант = ожидание официанта доходность __aiter__ = подписаться async def on_message(self, message: AbstractIncomingMessage) -> Нет: пытаться: асинхронно с message.process(): # выполнить любую десериализацию полученного элемента элемент = json.loads(message.body) # поделитесь записью с подписчиками self.publish(пункт) кроме исключения как e: logger.error(e, exc_info=True) асинхронный запуск (self): соединение = ожидание подключения_робуст( settings.amqp_url, цикл = asyncio.get_running_loop() ) канал = ожидание соединения.канал() my_queue = ожидайте Channel.get_queue('моя-очередь') ожидайте my_queue.consume(self.on_message) ожидайте asyncio.Future() дождитесь соединения.закрыть() Эти потребительские задачи создаются во время запуска приложения FastAPI:
my_manager = asyncio.Future() @app.on_event("запуск") асинхронная защита on_startup(): my_manager.set_result(MyManager()) задача = asyncio.create_task((ожидайте my_manager).run()) Обратите внимание, что экземпляр менеджера создается только во время on_startup, чтобы гарантировать наличие существующего цикла asyncio.
К сожалению, задача перестает работать через несколько недель/месяцев. Мне не удалось записать, какое событие вызвало это. Я не уверен, что задача выходит из строя или соединение с сервером AMQP прервалось без повторного подключения. Я даже не знаю, как/где выявить/зафиксировать проблему.
В чем может быть причина этой проблемы и как ее устранить?
В качестве дополнительного контекста менеджер используется в маршруте событий, отправленных сервером:
@router.get('/items') асинхронное определение items_stream (запрос: запрос): асинхронная защита event_publisher(): пытаться: aiter = (ожидайте my_manager).__aiter__() пока правда: задача = asyncio.create_task(aiter.__anext__()) событие = ожидание asyncio.shield(задача) выход dict (данные = событие) кроме asyncio.CancelledError как e: print(f'Отключено от клиента (через обновление/закрытие) {request.client}') поднять е вернуть EventSourceResponse(event_publisher()) Асинхронный итератор защищен во избежание описанной здесь проблемы.
Я написал потребительскую задачу aio_pika, которая должна работать вечно в приложении FastAPI. Эта задача является частью объекта менеджера, реализующего шаблон публикации/подписки:
из aio_pika import Connect_robust из aio_pika.abc импортировать AbstractIncomingMessage класс MyManager (объект): защита __init__(сам): self.waiter = asyncio.Future() def опубликовать(сам, значение): официант, self.waiter = self.waiter, asyncio.Future() waiter.set_result((значение, self.waiter)) асинхронная подписка по определению (самостоятельная): официант = сам.официант пока правда: значение, официант = ожидание официанта доходность __aiter__ = подписаться async def on_message(self, message: AbstractIncomingMessage) -> Нет: пытаться: асинхронно с message.process(): # выполнить любую десериализацию полученного элемента элемент = json.loads(message.body) # поделитесь записью с подписчиками self.publish(пункт) кроме исключения как e: logger.error(e, exc_info=True) асинхронный запуск (self): соединение = ожидание подключения_робуст( settings.amqp_url, цикл = asyncio.get_running_loop() ) канал = ожидание соединения.канал() my_queue = ожидайте Channel.get_queue('моя-очередь') ожидайте my_queue.consume(self.on_message) ожидайте asyncio.Future() дождитесь соединения.закрыть() Эти потребительские задачи создаются во время запуска приложения FastAPI:
my_manager = asyncio.Future() @app.on_event("запуск") асинхронная защита on_startup(): my_manager.set_result(MyManager()) задача = asyncio.create_task((ожидайте my_manager).run()) Обратите внимание, что экземпляр менеджера создается только во время on_startup, чтобы гарантировать наличие существующего цикла asyncio.
К сожалению, задача перестает работать через несколько недель/месяцев. Мне не удалось записать, какое событие вызвало это. Я не уверен, что задача выходит из строя или соединение с сервером AMQP прервалось без повторного подключения. Я даже не знаю, как/где выявить/зафиксировать проблему.
В чем может быть причина этой проблемы и как ее устранить?
В качестве дополнительного контекста менеджер используется в маршруте событий, отправленных сервером:
@router.get('/items') асинхронное определение items_stream (запрос: запрос): асинхронная защита event_publisher(): пытаться: aiter = (ожидайте my_manager).__aiter__() пока правда: задача = asyncio.create_task(aiter.__anext__()) событие = ожидание asyncio.shield(задача) выход dict (данные = событие) кроме asyncio.CancelledError как e: print(f'Отключено от клиента (через обновление/закрытие) {request.client}') поднять е вернуть EventSourceResponse(event_publisher()) Асинхронный итератор защищен во избежание описанной здесь проблемы.
Мобильная версия