FastAPI – слушайте Кафку в отдельном потоке [дубликат]Python

Программы на Python
Ответить
Anonymous
 FastAPI – слушайте Кафку в отдельном потоке [дубликат]

Сообщение Anonymous »

У меня есть приложение FastAPI. Приложение подписывается на тему kafka и обрабатывает сообщения (все работает)
Но когда я добавил конечные точки http, заметил, что не могу вызвать конечные точки, потому что слушаю kafka бесконечно цикл.
Как исправить мой код, чтобы включить конечные точки http? Я думаю, подписка на Кафку должна работать в отдельном потоке.
Мой код:
# main.py

from pathlib import Path

from fastapi import FastAPI, APIRouter

from app.api.v1.api import api_router
from app.core.config import settings

import logging

from aiokafka import AIOKafkaConsumer
from fastapi import FastAPI
import hashlib

log = logging.getLogger("uvicorn")

router = APIRouter(prefix="/kafka_consumer")

async def consume():
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
print("TODO: process msg ", msg.value.decode())

async def check_kafka() -> bool:
"""
Checks if Kafka is available by fetching all metadata from the Kafka client.

Returns:
bool: True if Kafka is available, False otherwise.
"""
try:
await consumer.client.fetch_all_metadata()
except Exception as exc:
logging.error(f'Kafka is not available: {exc}')
else:
return True
return False

@router.get("/healthz/ready")
async def ready_check() -> int:
"""
Check the health of application dependencies.

Returns:
int: Representation of the HTTP status code for a successful response.
- 200 if the server is ready.
- 503 if the server is not ready.
"""
readiness_probes = await asyncio.gather(
*[component for component in [check_kafka(), http_client.is_ready()]],
)
ready = all(probe for probe in readiness_probes)
status = HTTPStatus.OK if ready else HTTPStatus.SERVICE_UNAVAILABLE
return status

def create_application() -> FastAPI:
"""Create FastAPI application and set routes.

Returns:
FastAPI: The created FastAPI instance.
"""
application = FastAPI(openapi_url="/kafka_consumer/openapi.json", docs_url="/kafka_consumer/docs")
application.include_router(router, tags=["consumer"])
return application

def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.

Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""

return AIOKafkaConsumer(
settings.kafka_topics,
bootstrap_servers=settings.kafka_instance,
)

app = create_application()
consumer = create_consumer()

@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""

log.info("Starting up...")
await consumer.start()
log.info("Start consume")
await consume()
log.info("XXX Start app") # Don't see this log

@app.on_event("shutdown")
async def shutdown_event():
"""Shutdown event for FastAPI application."""

log.info("Shutting down...")
await consumer.stop()


Подробнее здесь: https://stackoverflow.com/questions/792 ... ate-thread
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»