Я использую приложение Fastapi с несколькими работниками. Приложение имеет одну конечную точку, которая занимает небольшое количество текста и использует для его встраивания Sentencetransformer. Когда я запускаю это приложение на одного работника, все работает отлично. Он может обрабатывать около 25 запросов одновременно без какого -либо единственного запроса, занимающего больше времени, чем одна секунда. прохладный. Я хочу увеличить этот номер, поэтому я запускаю сервер с большим количеством работников. Тем не менее, выполнение этого иногда бросает запросы, потому что во время тестирования я получаю тайм-ауты (мой тайм-аут-10 с) (все запросы на время не чтения были выполнены за секунду). Я думал, что это было потому, что Fastapi занимал больше времени, чем обычно, по какой -то причине выполнять задачи, однако, во время отладки, кажется, что конечная точка даже не получила удара (или, по крайней мере, печатным заявлением на Начало конечной точки никогда не напечатано). Даже после того, как секунда прошла, и остальные 25+ запросов завершились за секунду, остальные 2 или около того запросов, которые по -прежнему никогда не проходили. < /p>
для контекста у меня 4 CORE I7 ЦП. p> ниже воспроизводимый main.py . Вы можете запустить его с помощью FastApi Run -Workers main.py .
Обратите внимание, что это происходит, даже если редактировать код, чтобы моя конечная точка синхронизировалась вместо Async (избавившись от adder_thread и напрямую вызов queue.put () in def enced ()) < /p>
import asyncio
import os
import time
from concurrent.futures import Future
from contextlib import asynccontextmanager
from queue import Queue
from threading import Thread
from fastapi import FastAPI, Query
from numpy import ndarray
from pydantic import BaseModel, ConfigDict
from sentence_transformers import SentenceTransformer
class TextEmbedQueueItem(BaseModel):
"""queue item for dense embedder"""
model_config = ConfigDict(arbitrary_types_allowed=True)
text: str
future: Future
def thread_queue_conumser(
model: SentenceTransformer,
queue: Queue[TextEmbedQueueItem | None],
):
while True:
item = queue.get()
if item is None:
break
if item.future.cancelled():
continue
result = model.encode([item.text])
if not item.future.cancelled():
try:
item.future.set_result(result)
except:
pass
class TextEmbedder:
"""
Dense Model Controler
"""
def __init__(self):
self.local_model = SentenceTransformer(
"nomic-ai/nomic-embed-text-v1",
trust_remote_code=True,
)
self.task_runner_name = "TextEmbedder"
self.queue = Queue()
self.thread = Thread(
target=thread_queue_conumser,
args=(self.local_model, self.queue),
daemon=True,
)
self._loop = asyncio.new_event_loop()
self.thread.start()
self.adder_thread = Thread(
target=self.run_add_to_queue,
daemon=True,
)
self.adder_thread.start()
def run_add_to_queue(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
self._loop.close()
def stop(self):
print("stopping!")
while not self.queue.empty():
try:
self.queue.get(timeout=1)
except:
break
print("adding none!")
self.queue.put(None)
self.thread.join()
print("stop done!")
def _add_embed(self, text: str, future: Future[ndarray]):
self.queue.put(TextEmbedQueueItem(text=text, future=future))
def embed(self, text: str):
"""embed text. if returns None, then model is not ready for x seconds!"""
future: Future[ndarray] = Future()
self._loop.call_soon_threadsafe(self._add_embed, text, future)
return future
dense_embedder = TextEmbedder()
def teardown():
dense_embedder.stop()
@asynccontextmanager
async def lifespan(fast_app: FastAPI):
"""startup and shutdown tasks for the app"""
# start up
# =======
yield
# everything after here runs after shutdown
# end all threads and loops
teardown()
app = FastAPI(lifespan=lifespan)
@app.post("/search")
async def search(
text: str = Query(...),
elastic_index: str = Query(...),
page: int = Query(1, ge=1),
force_embed: bool = Query(False),
num: int = Query(...),
):
print("here", str(num))
st = time.time()
text_embedding_task: asyncio.Future[ndarray] | None = None
text_embedding_task = asyncio.wrap_future(dense_embedder.embed(text))
print("queued up task", str(num))
text_embeding = await text_embedding_task
print("returning", str(num))
return {
"time": round(time.time() - st, 2),
}
< /code>
и ниже приведен файл, который я использую для его проверки (это может потребоваться пара прогонов, чтобы получить немного времени Out): < /p>
import asyncio
from typing import List
from httpx import AsyncClient, Limits, Response
async def main():
limits = Limits(max_keepalive_connections=100, max_connections=100)
async with AsyncClient(
base_url="http://localhost:8000", limits=limits
) as client:
tasks: List = []
number_of_total_requests = 30
number_of_requests_per_second = 30
for i in range(number_of_total_requests):
tasks.append(
asyncio.create_task(
client.post(
"/search?text=testing%20a%20search&elastic_index=test&page=1&num="
+ str(i),
timeout=10,
)
)
)
if i != 0 and i % number_of_requests_per_second == 0:
await asyncio.sleep(1)
responses = await asyncio.gather(*tasks, return_exceptions=True)
count = -1
success_times = 0
all_times = 0
num_greater_than_second = 0
num_less_than_second = 0
num_requets_timeouts = 0
for response in responses:
count += 1
if isinstance(response, Exception):
num_requets_timeouts += 1
continue
assert (
response.status_code == 200
) # Adjust based on expected status
response_json = response.json()
did_in_under_a_second = response_json["time"] < 1
if not did_in_under_a_second:
num_greater_than_second += 1
else:
num_less_than_second += 1
success_times += response_json["time"]
all_times += response_json["time"]
print(count, response_json["time"])
print(f"Number over a second: {num_greater_than_second}")
print(f"Number of request timeouts: {num_requets_timeouts}")
if num_less_than_second != 0:
print(
f"Average time of successful requests (meaining requests that took less than a second): {success_times / (num_less_than_second)}"
)
if num_requets_timeouts != number_of_total_requests:
print(
f"Average time of all requests: {all_times / (number_of_total_requests - num_requets_timeouts)}"
)
if __name__ == "__main__":
asyncio.run(main())
Редактировать: я знаю, что люди будут думать, что это проблема асинхронна против синхронизации, так что это редактирование Main.py , которая использует синхронизацию. Та же проблема возникает.
import asyncio
import os
import time
from concurrent.futures import Future
from contextlib import asynccontextmanager
from queue import Queue
from threading import Thread
from fastapi import FastAPI, Query
from numpy import ndarray
from pydantic import BaseModel, ConfigDict
from sentence_transformers import SentenceTransformer
class TextEmbedQueueItem(BaseModel):
"""queue item for dense embedder"""
model_config = ConfigDict(arbitrary_types_allowed=True)
text: str
future: Future
def thread_queue_conumser(
model: SentenceTransformer,
queue: Queue[TextEmbedQueueItem | None],
):
while True:
item = queue.get()
if item is None:
break
if item.future.cancelled():
continue
result = model.encode([item.text])
if not item.future.cancelled():
try:
item.future.set_result(result)
except:
pass
class TextEmbedder:
"""
Dense Model Controler
"""
def __init__(self):
self.local_model = SentenceTransformer(
"nomic-ai/nomic-embed-text-v1",
trust_remote_code=True,
)
self.task_runner_name = "TextEmbedder"
self.queue = Queue()
self.thread = Thread(
target=thread_queue_conumser,
args=(self.local_model, self.queue),
daemon=True,
)
self._loop = asyncio.new_event_loop()
self.thread.start()
self.adder_thread = Thread(
target=self.run_add_to_queue,
daemon=True,
)
self.adder_thread.start()
def run_add_to_queue(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
self._loop.close()
def stop(self):
print("stopping!")
while not self.queue.empty():
try:
self.queue.get(timeout=1)
except:
break
print("adding none!")
self.queue.put(None)
self.thread.join()
print("stop done!")
def embed(self, text: str):
"""embed text. if returns None, then model is not ready for x seconds!"""
future: Future[ndarray] = Future()
self.queue.put(TextEmbedQueueItem(text=text, future=future))
return future
dense_embedder = TextEmbedder()
def teardown():
dense_embedder.stop()
@asynccontextmanager
async def lifespan(fast_app: FastAPI):
"""startup and shutdown tasks for the app"""
# start up
# =======
yield
# everything after here runs after shutdown
# end all threads and loops
teardown()
app = FastAPI(lifespan=lifespan)
@app.post("/search")
def search(
text: str = Query(...),
elastic_index: str = Query(...),
page: int = Query(1, ge=1),
force_embed: bool = Query(False),
num: int = Query(...),
):
print("here", str(num))
st = time.time()
print("queued up task", str(num))
text_embeding = dense_embedder.embed(text).result()
print("returning", str(num))
return {
"time": round(time.time() - st, 2),
}
Подробнее здесь: https://stackoverflow.com/questions/794 ... rkers-only
FastaPi сбрасывает (или не получает запросы?) Запросы только при использовании нескольких работников ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
PHP CLI получает ввод от пользователя, а затем сбрасывает его в переменную?
Anonymous » » в форуме Php - 0 Ответы
- 21 Просмотры
-
Последнее сообщение Anonymous
-
-
-
PHP CLI получает ввод от пользователя, а затем сбрасывает в возможную переменную?
Anonymous » » в форуме Php - 0 Ответы
- 9 Просмотры
-
Последнее сообщение Anonymous
-