FastAPI макет AIOKafkaConsumer в тестахPython

Программы на Python
Ответить
Anonymous
 FastAPI макет AIOKafkaConsumer в тестах

Сообщение Anonymous »

У меня есть приложение FastAPI, которое подписывается на тему Kafka в асинхронном режиме. Мне нужно создать unittest для моего приложения. Похоже, мне придется издеваться над функциональностью kafka
Мой код:

Код: Выделить всё

def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.

Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)

app = FastAPI()
consumer = create_consumer()

@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""

log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())

async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
Я использую тестовый клиент fastapi

Код: Выделить всё

import pytest

from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app

client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.

У меня ошибка в строке создания AIOKafkaConsumer:

Объект должен быть создан в асинхронном режиме функцию или предоставить цикл напрямую.

Как правильно протестировать приложение? (Похоже, мне придется издеваться над функциональностью kafka)

Подробнее здесь: https://stackoverflow.com/questions/793 ... r-in-tests
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»