Как обрабатывать исключения aiokafka getmany() в случае остановки KafkaPython

Программы на Python
Ответить
Anonymous
 Как обрабатывать исключения aiokafka getmany() в случае остановки Kafka

Сообщение Anonymous »

Я запускаю потребителя aiokafka в приложении fastAPI
В основном файле я запускаю потребителя

Код: Выделить всё

from aiokafka import AIOKafkaConsumer

def create_application() -> FastAPI:
...
return FastAPI()

app = create_application()
consumer = create_consumer()

@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""
global_items["logger"].info("Starting up...")
await consumer.start()

asyncio.gather(
consume(),
# some other tasks
return_exceptions=True
)

async def consume(db: Session = next(get_db())):
"""Consume and process messages from Kafka."""

while True:
try:
print("Try consume")
data = await consumer.getmany(timeout_ms=10000)
for tp, msgs in data.items():
if msgs:
for msg in msgs:
await process_message(msg, db)
await consumer.commit({tp: msgs[-1].offset+1})
except Exception as e:
# printing ERROR LOG

finally:
await asyncio.sleep(settings.common.consumer_pause_sec)
Я хочу напечатать ОШИБКУ, если Kafka была остановлена
Мои действия
  • Включить Kafka и приложение fastAPI — просмотреть действительные журналы
  • Отключить Kafka (через Docker Stop
Текущий результат после действий — я вижу журналы «Попробуйте использовать»
Ожидаемый результат — я хочу просмотреть журнал ошибок (в обработчике исключений)

Подробнее здесь: https://stackoverflow.com/questions/798 ... pped-kafka
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»