У меня есть приложение FastAPI. Приложение подписывается на тему kafka и обрабатывает сообщения (все работает)
Но когда я добавил конечные точки http, заметил, что не могу вызвать конечные точки, потому что слушаю kafka бесконечно цикл.
Как исправить мой код, чтобы включить конечные точки http? Я думаю, подписка на Кафку должна работать в отдельном потоке.
Мой код:
# main.py
from pathlib import Path
from fastapi import FastAPI, APIRouter
from app.api.v1.api import api_router
from app.core.config import settings
import logging
from aiokafka import AIOKafkaConsumer
from fastapi import FastAPI
import hashlib
log = logging.getLogger("uvicorn")
router = APIRouter(prefix="/kafka_consumer")
async def consume():
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
print("TODO: process msg ", msg.value.decode())
async def check_kafka() -> bool:
"""
Checks if Kafka is available by fetching all metadata from the Kafka client.
Returns:
bool: True if Kafka is available, False otherwise.
"""
try:
await consumer.client.fetch_all_metadata()
except Exception as exc:
logging.error(f'Kafka is not available: {exc}')
else:
return True
return False
@router.get("/healthz/ready")
async def ready_check() -> int:
"""
Check the health of application dependencies.
Returns:
int: Representation of the HTTP status code for a successful response.
- 200 if the server is ready.
- 503 if the server is not ready.
"""
readiness_probes = await asyncio.gather(
*[component for component in [check_kafka(), http_client.is_ready()]],
)
ready = all(probe for probe in readiness_probes)
status = HTTPStatus.OK if ready else HTTPStatus.SERVICE_UNAVAILABLE
return status
def create_application() -> FastAPI:
"""Create FastAPI application and set routes.
Returns:
FastAPI: The created FastAPI instance.
"""
application = FastAPI(openapi_url="/kafka_consumer/openapi.json", docs_url="/kafka_consumer/docs")
application.include_router(router, tags=["consumer"])
return application
def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.
Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_topics,
bootstrap_servers=settings.kafka_instance,
)
app = create_application()
consumer = create_consumer()
@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""
log.info("Starting up...")
await consumer.start()
log.info("Start consume")
await consume()
log.info("XXX Start app") # Don't see this log
@app.on_event("shutdown")
async def shutdown_event():
"""Shutdown event for FastAPI application."""
log.info("Shutting down...")
await consumer.stop()
Подробнее здесь: https://stackoverflow.com/questions/792 ... ate-thread
FastAPI – слушайте Кафку в отдельном потоке [дубликат] ⇐ Python
Программы на Python
-
Anonymous
1732939905
Anonymous
У меня есть приложение FastAPI. Приложение подписывается на тему kafka и обрабатывает сообщения (все работает)
Но когда я добавил конечные точки http, заметил, что не могу вызвать конечные точки, потому что слушаю kafka бесконечно цикл.
Как исправить мой код, чтобы включить конечные точки http? Я думаю, подписка на Кафку должна работать в отдельном потоке.
Мой код:
# main.py
from pathlib import Path
from fastapi import FastAPI, APIRouter
from app.api.v1.api import api_router
from app.core.config import settings
import logging
from aiokafka import AIOKafkaConsumer
from fastapi import FastAPI
import hashlib
log = logging.getLogger("uvicorn")
router = APIRouter(prefix="/kafka_consumer")
async def consume():
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
print("TODO: process msg ", msg.value.decode())
async def check_kafka() -> bool:
"""
Checks if Kafka is available by fetching all metadata from the Kafka client.
Returns:
bool: True if Kafka is available, False otherwise.
"""
try:
await consumer.client.fetch_all_metadata()
except Exception as exc:
logging.error(f'Kafka is not available: {exc}')
else:
return True
return False
@router.get("/healthz/ready")
async def ready_check() -> int:
"""
Check the health of application dependencies.
Returns:
int: Representation of the HTTP status code for a successful response.
- 200 if the server is ready.
- 503 if the server is not ready.
"""
readiness_probes = await asyncio.gather(
*[component for component in [check_kafka(), http_client.is_ready()]],
)
ready = all(probe for probe in readiness_probes)
status = HTTPStatus.OK if ready else HTTPStatus.SERVICE_UNAVAILABLE
return status
def create_application() -> FastAPI:
"""Create FastAPI application and set routes.
Returns:
FastAPI: The created FastAPI instance.
"""
application = FastAPI(openapi_url="/kafka_consumer/openapi.json", docs_url="/kafka_consumer/docs")
application.include_router(router, tags=["consumer"])
return application
def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.
Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_topics,
bootstrap_servers=settings.kafka_instance,
)
app = create_application()
consumer = create_consumer()
@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""
log.info("Starting up...")
await consumer.start()
log.info("Start consume")
await consume()
log.info("XXX Start app") # Don't see this log
@app.on_event("shutdown")
async def shutdown_event():
"""Shutdown event for FastAPI application."""
log.info("Shutting down...")
await consumer.stop()
Подробнее здесь: [url]https://stackoverflow.com/questions/79236616/fastapi-listen-to-kafka-in-separate-thread[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия