Я пытаюсь реализовать простой макет API, который реализует веб-перехватчики с использованием flask-smorest.
По сути, я пытаюсь имитировать API, который обрабатывает видео и отправляет ответ на сообщение успешно/ошибочно после завершения обработки видео по введенному URL-адресу веб-перехватчика.
Для этого я использую asyncio. Однако я не могу вызвать асинхронную функцию внутри маршрута публикации.
мой файл Routes.py выглядит так
from flask import abort
from flask.views import MethodView
from flask_smorest import Blueprint
from marshmallow import ValidationError
from server.models import TranslationJob
from server.schema import CreateStatusSchema, StatusResponseSchema
import uuid
from server.services.webhook import WebhookService
from server.models import StatusEnum
# In-memory storage
jobs_store = {}
status_blueprint = Blueprint(
"status",
"status",
url_prefix="/translation",
description="Status of translation jobs",
)
loop = asyncio.get_event_loop()
@status_blueprint.route("/status", methods=["POST", "GET"])
class TranslationCollection(MethodView):
@status_blueprint.arguments(CreateStatusSchema, location="json")
def post(self, job_data):
try:
job = TranslationJob(
duration=job_data["duration"],
webhook_url=job_data["webhook_url"]
)
jobs_store[job.id] = job
# Start webhook monitoring if URL provided
asyncio.create_task(start_monitor_status(job))
return {"message": "Job created successfully", "job_id": str(job.id)}
except ValidationError as e:
return {"message": "Validation error", "errors": e.messages}, 400
except Exception as e:
return {"message": str(e)}, 500
def get(self):
jobs = [job.to_dict() for job in jobs_store.values()]
return {"jobs": jobs}
@status_blueprint.route("/status/", methods=["GET"])
class TranslationItem(MethodView):
@status_blueprint.response(200, StatusResponseSchema)
def get(self, job_id):
try:
job_id = uuid.UUID(job_id)
except ValueError:
abort(400, description="Invalid job ID format")
if job_id not in jobs_store:
abort(404, description=f"Job with ID {job_id} not found")
return jobs_store[job_id].to_dict()
async def start_monitor_status(job: TranslationJob):
await _monitor_job_status(job) # Directly await the monitoring function
async def _monitor_job_status(job: TranslationJob):
previous_status = job.get_status()
while True:
current_status = job.get_status()
if current_status != previous_status:
await WebhookService.send_webhook(job)
if current_status in [StatusEnum.COMPLETED, StatusEnum.ERROR]:
break
previous_status = current_status
await asyncio.sleep(1)
Здесь, когда пользователь публикует видео, он генерирует задание TranslationJob, продолжительность которого просто указывает, как долго обрабатывается видео, и URL-адрес веб-перехватчика для публикации после завершения видео. .
Я создаю задание, добавляю его в хранилище памяти и хочу вызвать функцию, которая постоянно вызывает статус задания, чтобы узнать, завершено ли оно, однако я этого хочу быть асинхронным. Я не хочу ждать, пока видео будет готово, чтобы сообщить пользователю, что задание было успешно создано.
Если я оставлю функцию как есть, я получуRuntimeWarning: coroutine 'start_monitor_status' was never awaited
return {"message": str(e)}, 500
однако, если я сделаю функцию post асинхронной, я получу
The return type must be a string, dict, list, tuple with headers or status, Response instance, or WSGI callable, but it was a coroutine.
как мне изменить это, чтобы оно работало?
мой файл init.py выглядит так
from flask import Flask
from flask_smorest import Api
from server.config import APIConfig
from server.routes import status_blueprint
def create_app() -> Flask:
app = Flask(__name__)
# app.config['ASYNC_MODE'] = True
app.config.from_object(APIConfig)
app.debug = True
api = Api(app)
api.register_blueprint(status_blueprint)
return app
и сценарий входа в программу выглядит следующим образом (также пытаемся использовать async, чтобы он работал)
import asyncio
from server import create_app
app = create_app()
async def main():
await app
if __name__ == "__main__":
asyncio.run(main())
Подробнее здесь: https://stackoverflow.com/questions/791 ... sing-async
Невозможно реализовать веб-перехватчик с помощью flask-smorest с использованием async ⇐ Python
Программы на Python
1730856116
Anonymous
Я пытаюсь реализовать простой макет API, который реализует веб-перехватчики с использованием flask-smorest.
По сути, я пытаюсь имитировать API, который обрабатывает видео и отправляет ответ на сообщение успешно/ошибочно после завершения обработки видео по введенному URL-адресу веб-перехватчика.
Для этого я использую asyncio. Однако я не могу вызвать асинхронную функцию внутри маршрута публикации.
мой файл Routes.py выглядит так
from flask import abort
from flask.views import MethodView
from flask_smorest import Blueprint
from marshmallow import ValidationError
from server.models import TranslationJob
from server.schema import CreateStatusSchema, StatusResponseSchema
import uuid
from server.services.webhook import WebhookService
from server.models import StatusEnum
# In-memory storage
jobs_store = {}
status_blueprint = Blueprint(
"status",
"status",
url_prefix="/translation",
description="Status of translation jobs",
)
loop = asyncio.get_event_loop()
@status_blueprint.route("/status", methods=["POST", "GET"])
class TranslationCollection(MethodView):
@status_blueprint.arguments(CreateStatusSchema, location="json")
def post(self, job_data):
try:
job = TranslationJob(
duration=job_data["duration"],
webhook_url=job_data["webhook_url"]
)
jobs_store[job.id] = job
# Start webhook monitoring if URL provided
asyncio.create_task(start_monitor_status(job))
return {"message": "Job created successfully", "job_id": str(job.id)}
except ValidationError as e:
return {"message": "Validation error", "errors": e.messages}, 400
except Exception as e:
return {"message": str(e)}, 500
def get(self):
jobs = [job.to_dict() for job in jobs_store.values()]
return {"jobs": jobs}
@status_blueprint.route("/status/", methods=["GET"])
class TranslationItem(MethodView):
@status_blueprint.response(200, StatusResponseSchema)
def get(self, job_id):
try:
job_id = uuid.UUID(job_id)
except ValueError:
abort(400, description="Invalid job ID format")
if job_id not in jobs_store:
abort(404, description=f"Job with ID {job_id} not found")
return jobs_store[job_id].to_dict()
async def start_monitor_status(job: TranslationJob):
await _monitor_job_status(job) # Directly await the monitoring function
async def _monitor_job_status(job: TranslationJob):
previous_status = job.get_status()
while True:
current_status = job.get_status()
if current_status != previous_status:
await WebhookService.send_webhook(job)
if current_status in [StatusEnum.COMPLETED, StatusEnum.ERROR]:
break
previous_status = current_status
await asyncio.sleep(1)
Здесь, когда пользователь публикует видео, он генерирует задание TranslationJob, продолжительность которого просто указывает, как долго обрабатывается видео, и URL-адрес веб-перехватчика для публикации после завершения видео. .
Я создаю задание, добавляю его в хранилище памяти и хочу вызвать функцию, которая постоянно вызывает статус задания, чтобы узнать, завершено ли оно, однако я этого хочу быть асинхронным. Я не хочу ждать, пока видео будет готово, чтобы сообщить пользователю, что задание было успешно создано.
Если я оставлю функцию как есть, я получуRuntimeWarning: coroutine 'start_monitor_status' was never awaited
return {"message": str(e)}, 500
однако, если я сделаю функцию post асинхронной, я получу
The return type must be a string, dict, list, tuple with headers or status, Response instance, or WSGI callable, but it was a coroutine.
как мне изменить это, чтобы оно работало?
мой файл [b]init[/b].py выглядит так
from flask import Flask
from flask_smorest import Api
from server.config import APIConfig
from server.routes import status_blueprint
def create_app() -> Flask:
app = Flask(__name__)
# app.config['ASYNC_MODE'] = True
app.config.from_object(APIConfig)
app.debug = True
api = Api(app)
api.register_blueprint(status_blueprint)
return app
и сценарий входа в программу выглядит следующим образом (также пытаемся использовать async, чтобы он работал)
import asyncio
from server import create_app
app = create_app()
async def main():
await app
if __name__ == "__main__":
asyncio.run(main())
Подробнее здесь: [url]https://stackoverflow.com/questions/79161028/unable-to-implement-webhook-with-flask-smorest-using-async[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия