Вызов AIOKafkaConsumer через FastAPI вызывает ошибку «объект должен быть создан внутри асинхронной функции или обеспечитPython

Программы на Python
Ответить
Anonymous
 Вызов AIOKafkaConsumer через FastAPI вызывает ошибку «объект должен быть создан внутри асинхронной функции или обеспечит

Сообщение Anonymous »

У меня есть приложение FastAPI, которое подписывается на тему Kafka в асинхронном режиме. Мне нужно создать модульный тест для моего приложения.
Мой код:

Код: Выделить всё

def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.

Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)

app = FastAPI()
consumer = create_consumer()

@app.on_event("startup")
async def startup_event():
"""Startup event for FastAPI application."""

log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())

async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
Я использую TestClient FastAPI:

Код: Выделить всё

import pytest

from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app

client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.
Однако при установке AIOKafkaConsumer я получаю следующую ошибку:

Объект должен быть создан внутри асинхронной функции или напрямую обеспечить цикл.

Как правильно протестировать приложение? Похоже, мне придется издеваться над функциональностью kafka.

Подробнее здесь: https://stackoverflow.com/questions/793 ... -within-an
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»