Невозможно реализовать веб-перехватчик с помощью flask-smorest с использованием asyncPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно реализовать веб-перехватчик с помощью flask-smorest с использованием async

Сообщение Anonymous »

Я пытаюсь реализовать простой макет API, который реализует веб-перехватчики с использованием flask-smorest.
По сути, я пытаюсь имитировать API, который обрабатывает видео и отправляет ответ на сообщение успешно/ошибочно после завершения обработки видео по введенному URL-адресу веб-перехватчика.
Для этого я использую asyncio. Однако я не могу вызвать асинхронную функцию внутри маршрута публикации.
мой файл Routes.py выглядит так
from flask import abort
from flask.views import MethodView
from flask_smorest import Blueprint
from marshmallow import ValidationError

from server.models import TranslationJob
from server.schema import CreateStatusSchema, StatusResponseSchema
import uuid
from server.services.webhook import WebhookService
from server.models import StatusEnum

# In-memory storage
jobs_store = {}

status_blueprint = Blueprint(
"status",
"status",
url_prefix="/translation",
description="Status of translation jobs",
)

loop = asyncio.get_event_loop()

@status_blueprint.route("/status", methods=["POST", "GET"])
class TranslationCollection(MethodView):
@status_blueprint.arguments(CreateStatusSchema, location="json")
def post(self, job_data):
try:
job = TranslationJob(
duration=job_data["duration"],
webhook_url=job_data["webhook_url"]
)
jobs_store[job.id] = job
# Start webhook monitoring if URL provided
asyncio.create_task(start_monitor_status(job))
return {"message": "Job created successfully", "job_id": str(job.id)}
except ValidationError as e:
return {"message": "Validation error", "errors": e.messages}, 400
except Exception as e:
return {"message": str(e)}, 500

def get(self):
jobs = [job.to_dict() for job in jobs_store.values()]
return {"jobs": jobs}

@status_blueprint.route("/status/", methods=["GET"])
class TranslationItem(MethodView):
@status_blueprint.response(200, StatusResponseSchema)
def get(self, job_id):
try:
job_id = uuid.UUID(job_id)
except ValueError:
abort(400, description="Invalid job ID format")

if job_id not in jobs_store:
abort(404, description=f"Job with ID {job_id} not found")

return jobs_store[job_id].to_dict()

async def start_monitor_status(job: TranslationJob):
await _monitor_job_status(job) # Directly await the monitoring function

async def _monitor_job_status(job: TranslationJob):
previous_status = job.get_status()

while True:
current_status = job.get_status()

if current_status != previous_status:
await WebhookService.send_webhook(job)

if current_status in [StatusEnum.COMPLETED, StatusEnum.ERROR]:
break

previous_status = current_status
await asyncio.sleep(1)


Здесь, когда пользователь публикует видео, он генерирует задание TranslationJob, продолжительность которого просто указывает, как долго обрабатывается видео, и URL-адрес веб-перехватчика для публикации после завершения видео. .
Я создаю задание, добавляю его в хранилище памяти и хочу вызвать функцию, которая постоянно вызывает статус задания, чтобы узнать, завершено ли оно, однако я этого хочу быть асинхронным. Я не хочу ждать, пока видео будет готово, чтобы сообщить пользователю, что задание было успешно создано.
Если я оставлю функцию как есть, я получуRuntimeWarning: coroutine 'start_monitor_status' was never awaited
return {"message": str(e)}, 500

однако, если я сделаю функцию post асинхронной, я получу
The return type must be a string, dict, list, tuple with headers or status, Response instance, or WSGI callable, but it was a coroutine.

как мне изменить это, чтобы оно работало?
мой файл init.py выглядит так
from flask import Flask
from flask_smorest import Api

from server.config import APIConfig
from server.routes import status_blueprint

def create_app() -> Flask:
app = Flask(__name__)
# app.config['ASYNC_MODE'] = True
app.config.from_object(APIConfig)
app.debug = True

api = Api(app)
api.register_blueprint(status_blueprint)

return app


и сценарий входа в программу выглядит следующим образом (также пытаемся использовать async, чтобы он работал)
import asyncio
from server import create_app

app = create_app()

async def main():
await app

if __name__ == "__main__":
asyncio.run(main())


Подробнее здесь: https://stackoverflow.com/questions/791 ... sing-async
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»