В основном файле я запускаю потребителя
Код: Выделить всё
from aiokafka import AIOKafkaConsumer
def create_application() -> FastAPI:
...
return FastAPI()
app = create_application()
consumer = create_consumer()
@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""
global_items["logger"].info("Starting up...")
await consumer.start()
asyncio.gather(
consume(),
# some other tasks
return_exceptions=True
)
async def consume(db: Session = next(get_db())):
"""Consume and process messages from Kafka."""
while True:
try:
print("Try consume")
data = await consumer.getmany(timeout_ms=10000)
for tp, msgs in data.items():
if msgs:
for msg in msgs:
await process_message(msg, db)
await consumer.commit({tp: msgs[-1].offset+1})
except Exception as e:
# printing ERROR LOG
finally:
await asyncio.sleep(settings.common.consumer_pause_sec)
Мои действия
- Включить Kafka и приложение fastAPI — просмотреть действительные журналы
- Отключить Kafka (через Docker Stop
Ожидаемый результат — я хочу просмотреть журнал ошибок (в обработчике исключений)
Подробнее здесь: https://stackoverflow.com/questions/798 ... pped-kafka
Мобильная версия