Почему мое использование памяти приложения FastAPI значительно растет при потоковой передаче нескольких файлов с помощьюPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Почему мое использование памяти приложения FastAPI значительно растет при потоковой передаче нескольких файлов с помощью

Сообщение Anonymous »

Я транслирую несколько файлов в виде архива Zip в конечной точке FastAPI с использованием async_stream_zip от Stream-zip (https://pypi.org/project/stream-zip/). Хотя это должно быть эффективным для памяти, я наблюдаю за значительным ростом памяти при потоковой передаче многих файлов. Данные обычно читаются через Pandas и преобразуются с использованием таких пакетов, как Orjson . Я использую процессоры для изоляции обработки в асинхронных конечных точках - чтобы избежать блокировки потока и лучшего изолята и очистки памяти. Я реализовал использование потокового Zip (версия 0.0.83), чтобы получить один файл за раз в папке Zipped. Это должно быть эффективным памятью. Но я вижу огромный рост памяти при потоковой передаче многих больших файлов.

Код: Выделить всё

gunicorn
Версия 23.0.0, Uvicorn-Worker версия 0.2.0):

Код: Выделить всё

CMD ["gunicorn", "-w", "3", "-k", "uvicorn.workers.UvicornWorker", "webservice.main:app", "--bind", "0.0.0.0:8000", "--max-requests", "500", "--max-requests-jitter", "100", "--access-logfile", "-"]

теперь - вот фиктивный пример настройки для лучшего контекста, используя FastApi версию 0.115.4:

Код: Выделить всё

from fastapi import FastAPI, Query
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime
from stream_zip import async_stream_zip, ZIP_32, stream_zip
from stat import S_IFREG
import orjson
from fastapi import APIRouter, FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI(
title="test",
version="V1.0.0"
)

def generate_file(n=200000):
data_line = "2024-01-01 00:00:00 001  10000.00   2000.00  50000.00  51000.00\n"
return data_line * n

def parse(content):
data = []
for line in content.splitlines():
parts = line.split()
if len(parts) >= 7:
data.append({
"date": parts[0],
"time": parts[1],
"doy": int(parts[2]),
"a": float(parts[3]),
"b": float(parts[4]),
"c": float(parts[5]),
"d": float(parts[6]),
})
return orjson.dumps(data)

async def json_bytes_streamer(json_bytes: bytes, chunk_size: int = 8192):
for i in range(0, len(json_bytes), chunk_size):
yield json_bytes[i:i+chunk_size]

async def yield_data(num_files=50, pool='process'):

loop = asyncio.get_running_loop()
now = datetime.now()
content = generate_file()

executor_class = ProcessPoolExecutor if pool == 'process' else ThreadPoolExecutor
with executor_class(max_workers=1) as executor:
json_bytes = await loop.run_in_executor(executor, parse, content)
for i in range(num_files):
filename = f"data_{i+1:02d}.json"
yield (
filename,
now,
S_IFREG | 0o600,
ZIP_32,
json_bytes_streamer(json_bytes)
)

router = APIRouter(tags=["test"])

@router.get("/generate-zip-process-pool")
async def generate_zip_response_pp(num_files: int = Query(100, ge=1)):
return StreamingResponse(
async_stream_zip(yield_data(num_files=num_files)),
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=test_data.zip"},
)

app.include_router(router)

С одной из моих конечных точек, которые дают почасовые данные в ежегодных файлах, память, исчерпанная для запроса, может содержать 50-100 мин, но локальное профилирование показывает больше похоже на использование памятью 5-10 мибов (с помощью psutils ). Для небольших запросов (скажем, только 1 месяц данных), проблема на самом деле не присутствует, так что ссылки на память хранятся где -то? Это проблема Docker? Файлы, прочитанные, взяты из монтирования GPFS, если это дает какие -либо подсказки. Я использую Grafana для профила развертывания. Я пытался установить строгие глобальные настройки сбора мусора и использовать Del и вызовать gc.collect () всякий раз, когда я могу. Рост памяти гораздо более минимален с использованием ThreadPool , но затем память никогда не выпускается после (~ 10mb PERED) - опять же, может быть, подсказка.async def check_data(agen):

try:
first_item = await agen.__anext__()
except StopAsyncIteration:
return None, None

async def new_gen():
yield first_item
async for item in agen:
yield item

return first_item, new_gen()
< /code>
Если у кого -то есть какие -либо идеи, пожалуйста, дайте мне знать - я пробовал все, о чем я могу придумать. Я мог бы упустить что -то супер очевидное, и я пытаюсь узнать больше!

Подробнее здесь: https://stackoverflow.com/questions/796 ... ng-multipl
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»