FastAPI с asyncio | Запуск длительных фоновых задачPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 FastAPI с asyncio | Запуск длительных фоновых задач

Сообщение Anonymous »

Я выполняю сложную асинхронную задачу в FastAPI, и мне нужно вернуть ответ пользователю, продолжая выполнение задачи в фоновом режиме с помощью asyncio. Когда я жду выполнения задачи, она работает отлично, но когда я добавляю ее в цикл задач, она случайным образом прекращает выполнение.
Есть какие-нибудь идеи о лучших методах решения этой проблемы?Сквозная функция с абстракциями ниже
async def from_template_to_conv(
transaction_id: str,
db,
):
print("Getting templates")
transaction = await fetch_transaction_with_workstreams(db, transaction_id)
if len(transaction.workstreams) > 0:
print("Transaction already has workstreams. Aborting.")
return

# If transaction is not found, raise a 404 error
print("Creating workstreams")
response = await process_transaction_based_on_template(db, transaction=transaction)
print("Workstream created successfully.")

questions = await fetch_transaction_questions(db, transaction_id)
question_dics = [
{
"id": q.id,
"content": q.content,
}
for q in questions
]
executor = ThreadPoolExecutor(max_workers=1)

# Define the run_heavy_task function to process all questions
async def run_heavy_task(transaction_id, questions, db, executor):

for question in questions:
transaction = await get_transaction(db, transaction_id)
print(f"Fetched transaction {transaction_id}.")

users = await get_users_by_organization(db, transaction.organization_id)
print(f"Fetched users for transaction {transaction_id}.")
user_ids = [u.id for u in users]
print(user_ids)
print(f"Automating questionnaire for question {question['id']}.")
conversation = await create_automation_conversation(
db, user_ids, "deals", transaction.organization_id, transaction
)
print(f"Created conversation for transaction {transaction_id}.")

response = await process_message_conversation(
question["id"],
conversation,
conversation.id,
question["content"],
user_ids,
db,
)
print(f"Processed message for question {question['id']}.")

final_message = None
async for message in response.body_iterator:
final_message = message

if final_message:
print(f"Final message received for question {question['id']}: {schema.Message.parse_raw(final_message)}")
else:
raise HTTPException(status_code=500, detail="Internal server error")

# Create a single task to process all questions
task = asyncio.create_task(run_heavy_task(transaction_id, question_dics, db, executor))

# await task # Ensure the task is awaited and executed

return response


Подробнее здесь: https://stackoverflow.com/questions/790 ... ound-tasks
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»