Я использую объект, который требует запуска и процесса удаления (например, загрузки из/сохранения в кэш) в конечных точках FastAPI. Я использовал asynccontextmanager для управления контекстом объекта, но я также хочу обработать объект в более поздней фоновой задаче.
Теперь в моих средах (
) контекст этого объекта заканчивается до ответа на запрос, но обычно это происходит раньше, чем завершение фоновой задачи, поэтому часть фоновой задачи выполняется вне контекста. Например, если в части демонтажа контекстного менеджера есть процесс «сохранения в кэш», последующие изменения в фоновой задаче не будут сохранены, поскольку они запускаются после процесса демонтажа.
Существует минимальный (но все же ~150 строк) рабочий пример по этой сути. Я тоже сюда вставлю.
from fastapi import FastAPI, Depends, BackgroundTasks, Request
from typing import Annotated, AsyncIterator
from pydantic import BaseModel, Field
from uuid import uuid4
from contextlib import asynccontextmanager
import random
import asyncio
app = FastAPI()
class Chat(BaseModel):
"""
This is a over-simplified Chat History Manager, that can be used in e.g. LangChain-like system
There is an additional `total` field because history are serialized and cached on their own, and we don't want to load all histories when unserialize them from cache/database.
"""
id: str = Field(default_factory=lambda: uuid4().hex)
meta: str = "some meta information"
history: list[str] = []
total: int = 0
uncached: int = 0
def add_message(self, msg: str):
self.history.append(msg)
self.total += 1
self.uncached += 1
async def save(self, cache: dict):
# cache history that are not cached
for imsg in range(-self.uncached, 0):
cache[f"msg:{self.id}:{self.total + imsg}"] = self.history[-self.uncached]
self.uncached = 0
# cache everything except history
cache[f"sess:{self.id}"] = self.model_dump(exclude={"history"})
print(f"saved: {self}")
@classmethod
async def load(cls, sess_id: str, cache: dict, max_read: int = 30):
sess_key = f"sess:{sess_id}"
obj = cls.model_validate(cache.get(sess_key))
for imsg in range(max(0, obj.total - max_read), obj.total):
obj.history.append(cache.get(f"msg:{obj.id}:{imsg}"))
print(f"loaded: {obj}")
return obj
async def chat(self, msg: str, cache: dict):
"""So this"""
self.add_message(msg)
async def get_chat():
resp = []
for i in range(random.randint(3, 5)):
# simulate long network IO
await asyncio.sleep(0.5)
chunk = f"resp{i}:{random.randbytes(2).hex()};"
resp.append(chunk)
yield chunk
self.add_message("".join(resp))
# NOTE to make the message cache work properly, we have to manually save this:
# await self.save(cache)
return get_chat()
# use a simple dict to mimic an actual cache, e.g. Redis
cache = {}
async def get_cache():
return cache
# didn't figure out how to make Chat a dependable
# I have read https://fastapi.tiangolo.com/advanced/advanced-dependencies/#parameterized-dependencies but still no clue
# the problem is: `sess_id` is passed from user, not something we can fix just like this tutorial shows.
# As an alternative, I used this async context manager.
# Theoretically this would automatically save the Chat object after exiting the `async with` block
@asynccontextmanager
async def get_chat_from_cache(sess_id: str, cache: dict):
"""
get object from cache (possibly create one), yield it, then save it back to cache
"""
sess_key = f"sess:{sess_id}"
if sess_key not in cache:
obj = Chat()
obj.id = sess_id
await obj.save(cache)
else:
obj = await Chat.load(sess_id, cache)
yield obj
await obj.save(cache)
async def task(sess_id: str, task_id: int, resp_gen: AsyncIterator[str], cache: dict):
""" """
async for chunk in resp_gen:
# do something with chunk, e.g. stream it to the client via a websocket
await asyncio.sleep(0.5)
cache[f"chunk:{sess_id}:{task_id}"] = chunk
task_id += 1
@app.get("/{sess_id}/{task_id}/{prompt}")
async def get_chat(
req: Request,
sess_id: str,
task_id: int,
prompt: str,
background_task: BackgroundTasks,
cache: Annotated[dict, Depends(get_cache)],
):
print(f"req incoming: {req.url}")
async with get_chat_from_cache(sess_id=sess_id, cache=cache) as chat:
resp_gen = await chat.chat(f"prompt:{prompt}", cache=cache)
background_task.add_task(
task, sess_id=sess_id, task_id=task_id, resp_gen=resp_gen, cache=cache
)
return "success"
@app.get("/{sess_id}")
async def get_sess(
req: Request, sess_id: str, cache: Annotated[dict, Depends(get_cache)]
):
print(f"req incoming: {req.url}")
return (await Chat.load(sess_id=sess_id, cache=cache)).model_dump()
Я нашел близкое (но не идентичное) обсуждение, в котором говорится о сроке службы надежных устройств. Кажется, что срок службы надежных можно перенести/расширить на фоновые задачи, хотя они считают, что это грубое поведение. У меня была мысль сделать get_chat_from_cache надежным, основанным на доходности, но я не понял, как это сделать правильно. Но в любом случае этот подход, похоже, не рекомендуется разработчиками FastAPI, поскольку фактическое время удаления зависимых компонентов недокументировано и может измениться в будущих версиях.
Я знаю, что, вероятно, я мог бы повторить удаление вручную в фоновой задаче, но это похоже на хак. Я спрашиваю, есть ли более элегантные способы сделать это. Возможно, существуют более эффективные шаблоны проектирования, которые помогут полностью избежать этой проблемы, дайте мне знать.
Я использую объект, который требует запуска и процесса удаления (например, загрузки из/сохранения в кэш) в конечных точках FastAPI. Я использовал asynccontextmanager для управления контекстом объекта, но я также хочу обработать объект в более поздней фоновой задаче. Теперь в моих средах ([code]fastapi==0.115.5[/code]) контекст этого объекта заканчивается до ответа на запрос, но обычно это происходит раньше, чем завершение фоновой задачи, поэтому часть фоновой задачи выполняется вне контекста. Например, если в части демонтажа контекстного менеджера есть процесс «сохранения в кэш», последующие изменения в фоновой задаче не будут сохранены, поскольку они запускаются после процесса демонтажа. Существует минимальный (но все же ~150 строк) рабочий пример по этой сути. Я тоже сюда вставлю. [code]from fastapi import FastAPI, Depends, BackgroundTasks, Request from typing import Annotated, AsyncIterator from pydantic import BaseModel, Field from uuid import uuid4 from contextlib import asynccontextmanager import random import asyncio
app = FastAPI()
class Chat(BaseModel): """ This is a over-simplified Chat History Manager, that can be used in e.g. LangChain-like system There is an additional `total` field because history are serialized and cached on their own, and we don't want to load all histories when unserialize them from cache/database. """
id: str = Field(default_factory=lambda: uuid4().hex) meta: str = "some meta information" history: list[str] = [] total: int = 0 uncached: int = 0
async def save(self, cache: dict): # cache history that are not cached for imsg in range(-self.uncached, 0): cache[f"msg:{self.id}:{self.total + imsg}"] = self.history[-self.uncached] self.uncached = 0 # cache everything except history cache[f"sess:{self.id}"] = self.model_dump(exclude={"history"})
print(f"saved: {self}")
@classmethod async def load(cls, sess_id: str, cache: dict, max_read: int = 30): sess_key = f"sess:{sess_id}" obj = cls.model_validate(cache.get(sess_key)) for imsg in range(max(0, obj.total - max_read), obj.total): obj.history.append(cache.get(f"msg:{obj.id}:{imsg}"))
async def get_chat(): resp = [] for i in range(random.randint(3, 5)): # simulate long network IO await asyncio.sleep(0.5) chunk = f"resp{i}:{random.randbytes(2).hex()};"
resp.append(chunk) yield chunk
self.add_message("".join(resp))
# NOTE to make the message cache work properly, we have to manually save this: # await self.save(cache)
return get_chat()
# use a simple dict to mimic an actual cache, e.g. Redis cache = {}
async def get_cache(): return cache
# didn't figure out how to make Chat a dependable # I have read https://fastapi.tiangolo.com/advanced/advanced-dependencies/#parameterized-dependencies but still no clue # the problem is: `sess_id` is passed from user, not something we can fix just like this tutorial shows. # As an alternative, I used this async context manager. # Theoretically this would automatically save the Chat object after exiting the `async with` block @asynccontextmanager async def get_chat_from_cache(sess_id: str, cache: dict): """ get object from cache (possibly create one), yield it, then save it back to cache """ sess_key = f"sess:{sess_id}" if sess_key not in cache: obj = Chat() obj.id = sess_id await obj.save(cache) else: obj = await Chat.load(sess_id, cache)
yield obj
await obj.save(cache)
async def task(sess_id: str, task_id: int, resp_gen: AsyncIterator[str], cache: dict): """ """ async for chunk in resp_gen: # do something with chunk, e.g. stream it to the client via a websocket await asyncio.sleep(0.5) cache[f"chunk:{sess_id}:{task_id}"] = chunk task_id += 1
@app.get("/{sess_id}") async def get_sess( req: Request, sess_id: str, cache: Annotated[dict, Depends(get_cache)] ): print(f"req incoming: {req.url}") return (await Chat.load(sess_id=sess_id, cache=cache)).model_dump() [/code] Я нашел близкое (но не идентичное) обсуждение, в котором говорится о сроке службы надежных устройств. Кажется, что срок службы надежных можно перенести/расширить на фоновые задачи, хотя они считают, что это грубое поведение. У меня была мысль сделать get_chat_from_cache надежным, основанным на доходности, но я не понял, как это сделать правильно. Но в любом случае этот подход, похоже, не рекомендуется разработчиками FastAPI, поскольку фактическое время удаления зависимых компонентов недокументировано и может измениться в будущих версиях. Я знаю, что, вероятно, я мог бы повторить удаление вручную в фоновой задаче, но это похоже на хак. Я спрашиваю, есть ли более элегантные способы сделать это. Возможно, существуют более эффективные шаблоны проектирования, которые помогут полностью избежать этой проблемы, дайте мне знать.
Я использую объект, который требует запуска и процесса удаления (например, загрузки из/сохранения в кэш) в конечных точках FastAPI. Я использовал asynccontextmanager для управления контекстом объекта, но я также хочу обработать объект в более...
Я использую объект, который требует запуска и процесса удаления (например, загрузки из/сохранения в кэш) в конечных точках FastAPI. Я использовал asynccontextmanager для управления контекстом объекта, но я также хочу обработать объект в более...
Я использую объект, который требует запуска и процесса удаления (например, загрузки из/сохранения в кэш) в конечных точках fastapi. Я использовал asynccontextmanager для управления контекстом объекта, но я также хочу обработать объект в более...
Как изменить транзакциюLifetimeLimitSeconds в моем наборе облачных реплик.
Сначала я создавал новый сеанс:
var client = new MongoClient(new MongoUrl(_connectionString));
var session= client.StartSession();
У меня большая проблема: Spring Batch + TaskExecutor не выполняет поток более 10 минут, и, конечно, мне нужно, чтобы в конечном итоге пакетное задание при необходимости заняло 2 часа.
Да, просто сделай:
java -jar receip-processor-0.0.1-SNAPSHOT.jar...