Как продлить срок жизни объекта под asynccontextmanager в фоновой задаче в FastAPI?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как продлить срок жизни объекта под asynccontextmanager в фоновой задаче в FastAPI?

Сообщение Anonymous »

Я использую объект, который требует запуска и процесса удаления (например, загрузки из/сохранения в кэш) в конечных точках FastAPI. Я использовал asynccontextmanager для управления контекстом объекта, но я также хочу обработать объект в более поздней фоновой задаче.
Теперь в моих средах (

Код: Выделить всё

fastapi==0.115.5
) контекст этого объекта заканчивается до ответа на запрос, но обычно это происходит раньше, чем завершение фоновой задачи, поэтому часть фоновой задачи выполняется вне контекста. Например, если в части демонтажа контекстного менеджера есть процесс «сохранения в кэш», последующие изменения в фоновой задаче не будут сохранены, поскольку они запускаются после процесса демонтажа.
Существует минимальный (но все же ~150 строк) рабочий пример по этой сути. Я тоже сюда вставлю.

Код: Выделить всё

from fastapi import FastAPI, Depends, BackgroundTasks, Request
from typing import Annotated, AsyncIterator
from pydantic import BaseModel, Field
from uuid import uuid4
from contextlib import asynccontextmanager
import random
import asyncio

app = FastAPI()

class Chat(BaseModel):
"""
This is a over-simplified Chat History Manager, that can be used in e.g. LangChain-like system
There is an additional `total` field because history are serialized and cached on their own, and we don't want to load all histories when unserialize them from cache/database.
"""

id: str = Field(default_factory=lambda: uuid4().hex)
meta: str = "some meta information"
history: list[str] = []
total: int = 0
uncached: int = 0

def add_message(self, msg: str):
self.history.append(msg)
self.total += 1
self.uncached += 1

async def save(self, cache: dict):
# cache history that are not cached
for imsg in range(-self.uncached, 0):
cache[f"msg:{self.id}:{self.total + imsg}"] = self.history[-self.uncached]
self.uncached = 0
# cache everything except history
cache[f"sess:{self.id}"] = self.model_dump(exclude={"history"})

print(f"saved: {self}")

@classmethod
async def load(cls, sess_id: str, cache: dict, max_read: int = 30):
sess_key = f"sess:{sess_id}"
obj = cls.model_validate(cache.get(sess_key))
for imsg in range(max(0, obj.total - max_read), obj.total):
obj.history.append(cache.get(f"msg:{obj.id}:{imsg}"))

print(f"loaded: {obj}")
return obj

async def chat(self, msg: str, cache: dict):
"""So this"""
self.add_message(msg)

async def get_chat():
resp = []
for i in range(random.randint(3, 5)):
# simulate long network IO
await asyncio.sleep(0.5)
chunk = f"resp{i}:{random.randbytes(2).hex()};"

resp.append(chunk)
yield chunk

self.add_message("".join(resp))

# NOTE to make the message cache work properly, we have to manually save this:
# await self.save(cache)

return get_chat()

# use a simple dict to mimic an actual cache, e.g. Redis
cache = {}

async def get_cache():
return cache

# didn't figure out how to make Chat a dependable
# I have read https://fastapi.tiangolo.com/advanced/advanced-dependencies/#parameterized-dependencies but still no clue
# the problem is: `sess_id` is passed from user, not something we can fix just like this tutorial shows.
# As an alternative, I used this async context manager.
# Theoretically this would automatically save the Chat object after exiting the `async with` block
@asynccontextmanager
async def get_chat_from_cache(sess_id: str, cache: dict):
"""
get object from cache (possibly create one), yield it, then save it back to cache
"""
sess_key = f"sess:{sess_id}"
if sess_key not in cache:
obj = Chat()
obj.id = sess_id
await obj.save(cache)
else:
obj = await Chat.load(sess_id, cache)

yield obj

await obj.save(cache)

async def task(sess_id: str, task_id: int, resp_gen: AsyncIterator[str], cache: dict):
""" """
async for chunk in resp_gen:
# do something with chunk, e.g.  stream it to the client via a websocket
await asyncio.sleep(0.5)
cache[f"chunk:{sess_id}:{task_id}"] = chunk
task_id += 1

@app.get("/{sess_id}/{task_id}/{prompt}")
async def get_chat(
req: Request,
sess_id: str,
task_id: int,
prompt: str,
background_task: BackgroundTasks,
cache: Annotated[dict, Depends(get_cache)],
):
print(f"req incoming: {req.url}")
async with get_chat_from_cache(sess_id=sess_id, cache=cache) as chat:
resp_gen = await chat.chat(f"prompt:{prompt}", cache=cache)

background_task.add_task(
task, sess_id=sess_id, task_id=task_id, resp_gen=resp_gen, cache=cache
)

return "success"

@app.get("/{sess_id}")
async def get_sess(
req: Request, sess_id: str, cache: Annotated[dict, Depends(get_cache)]
):
print(f"req incoming: {req.url}")
return (await Chat.load(sess_id=sess_id, cache=cache)).model_dump()
Я нашел близкое (но не идентичное) обсуждение, в котором говорится о сроке службы надежных устройств. Кажется, что срок службы надежных можно перенести/расширить на фоновые задачи, хотя они считают, что это грубое поведение. У меня была мысль сделать get_chat_from_cache надежным, основанным на доходности, но я не понял, как это сделать правильно. Но в любом случае этот подход, похоже, не рекомендуется разработчиками FastAPI, поскольку фактическое время удаления зависимых компонентов недокументировано и может измениться в будущих версиях.
Я знаю, что, вероятно, я мог бы повторить удаление вручную в фоновой задаче, но это похоже на хак. Я спрашиваю, есть ли более элегантные способы сделать это. Возможно, существуют более эффективные шаблоны проектирования, которые помогут полностью избежать этой проблемы, дайте мне знать.

Подробнее здесь: https://stackoverflow.com/questions/792 ... -a-backgro
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как продлить срок жизни объекта под asynccontextmanager в фоновой задаче в FastAPI?
    Anonymous » » в форуме Python
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Как продлить срок жизни объекта под asynccontextmanager в фоновой задаче в FastAPI?
    Anonymous » » в форуме Python
    0 Ответы
    12 Просмотры
    Последнее сообщение Anonymous
  • Как продлить срок службы объекта под asynccontextmanager в фоновой задаче в fastapi?
    Anonymous » » в форуме Python
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • C# Как продлить время жизни транзакции mongo db
    Anonymous » » в форуме C#
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Продлить срок выполнения в Spring Batch
    Anonymous » » в форуме JAVA
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»