Я выполняю сложную асинхронную задачу в FastAPI, и мне нужно вернуть ответ пользователю, продолжая выполнение задачи в фоновом режиме с помощью asyncio. Когда я жду выполнения задачи, она работает отлично, но когда я добавляю ее в цикл задач, она случайным образом прекращает выполнение.
Есть какие-нибудь идеи о лучших методах решения этой проблемы?Сквозная функция с абстракциями ниже
async def from_template_to_conv(
transaction_id: str,
db,
):
print("Getting templates")
transaction = await fetch_transaction_with_workstreams(db, transaction_id)
if len(transaction.workstreams) > 0:
print("Transaction already has workstreams. Aborting.")
return
# If transaction is not found, raise a 404 error
print("Creating workstreams")
response = await process_transaction_based_on_template(db, transaction=transaction)
print("Workstream created successfully.")
questions = await fetch_transaction_questions(db, transaction_id)
question_dics = [
{
"id": q.id,
"content": q.content,
}
for q in questions
]
executor = ThreadPoolExecutor(max_workers=1)
# Define the run_heavy_task function to process all questions
async def run_heavy_task(transaction_id, questions, db, executor):
for question in questions:
transaction = await get_transaction(db, transaction_id)
print(f"Fetched transaction {transaction_id}.")
users = await get_users_by_organization(db, transaction.organization_id)
print(f"Fetched users for transaction {transaction_id}.")
user_ids = [u.id for u in users]
print(user_ids)
print(f"Automating questionnaire for question {question['id']}.")
conversation = await create_automation_conversation(
db, user_ids, "deals", transaction.organization_id, transaction
)
print(f"Created conversation for transaction {transaction_id}.")
response = await process_message_conversation(
question["id"],
conversation,
conversation.id,
question["content"],
user_ids,
db,
)
print(f"Processed message for question {question['id']}.")
final_message = None
async for message in response.body_iterator:
final_message = message
if final_message:
print(f"Final message received for question {question['id']}: {schema.Message.parse_raw(final_message)}")
else:
raise HTTPException(status_code=500, detail="Internal server error")
# Create a single task to process all questions
task = asyncio.create_task(run_heavy_task(transaction_id, question_dics, db, executor))
# await task # Ensure the task is awaited and executed
return response
Подробнее здесь: https://stackoverflow.com/questions/790 ... ound-tasks
FastAPI с asyncio | Запуск длительных фоновых задач ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение