Мой код:
Код: Выделить всё
def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.
Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)
app = FastAPI()
consumer = create_consumer()
@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""
log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())
async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
Код: Выделить всё
import pytest
from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app
client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.
Объект должен быть создан внутри асинхронную функцию или предоставить цикл напрямую.
Как правильно протестировать приложение? Похоже, мне придется издеваться над функциональностью kafka.
Подробнее здесь: https://stackoverflow.com/questions/793 ... -within-an
Мобильная версия