Python 3.12 – очередь не освобождает памятьPython

Программы на Python
Ответить
Anonymous
 Python 3.12 – очередь не освобождает память

Сообщение Anonymous »

У меня есть, как мне казалось, относительно простой процесс: передача некоторых данных через http в очередь в сопрограмме, чтение из очереди в другой сопрограмме и запись этих данных на диск.
Это MRE того, что я на самом деле делаю, но у него та же проблема с памятью. Я использую многопроцессорную очередь, но тестовая очередь asyncio здесь такая же. По сути, похоже, что очередь не освобождает память после максимальной отметки.
Ключевым моментом здесь является то, что процесс управления долговечен, и я не хочу, чтобы он потреблял много памяти.
Python 3.12.3
Linux Kernel 6.8.0-51
Вот код:
from __future__ import annotations

import asyncio
from pathlib import Path
from socket import AF_INET
import psutil

import aiofiles.os

from aiohttp import (
ClientError,
ClientSession,
ClientTimeout,
TCPConnector,
)

async def write_to_disk(dest: Path, queue: asyncio.Queue) -> None:
async with aiofiles.open(dest, "wb") as f:
while data := await queue.get():
# give up control so the http reader gets priority, i.e.
# simulate time consuming work
await asyncio.sleep(0)
await f.write(data)
# this does nothing
queue.task_done()

# last "data" is empty sentinel. This also does nothing
queue.task_done()

print("write to disk finished")

async def log_mem():
while True:
process = psutil.Process()
used = round(process.memory_info().rss / 1024**2, 2)
print(f"mem used: {used} MiB")
await asyncio.sleep(5)

async def start_http_stream(
queue: asyncio.Queue,
connect_timeout: int = 3,
read_timeout: int = 10,
) -> None:
timeout = ClientTimeout(connect=connect_timeout, sock_read=read_timeout)
conn = TCPConnector(family=AF_INET)

url = "http://somedownloadurl.com/filetodownload"
extra_params = {"json": {}}

async with ClientSession(
connector=conn,
) as session:
try:
async with session.post(
url, timeout=timeout, **extra_params
) as resp:
bytes_downloaded = 0
percent_logged = 0
content_length = int(resp.headers.get("Approx-Content-Length"))

print(
"Download size:",
round(content_length / 1024 / 1024 / 1024, 2),
"GiB",
)

async for chunk, _ in resp.content.iter_chunks():
if not chunk:
# we can get empty chunks - which threw me to start with. If we pass these through,
# the other end thinks the stream is finished.
continue

bytes_downloaded += len(chunk)
percent = int((bytes_downloaded / content_length) * 100)
if percent > percent_logged and percent % 5 == 0:
percent_logged = percent
print(
f"Downloaded: {percent}%, q size: {queue.qsize()}"
)

# using this as poor mans backpressure. I had problems with
# read timeouts on the resp when using a bounded queue
if queue.qsize() >= 8192:
await asyncio.sleep(0.05)

await queue.put(chunk)
# Sentinel
await queue.put(b"")
except (ClientError, OSError) as e:
print("EXC", e)

print("Http stream finished")

async def main():
q = asyncio.Queue()

stream_coro = start_http_stream(q)
write_coro = write_to_disk("file_download.tar", q)
log_mem_task = asyncio.create_task(log_mem())

await asyncio.gather(stream_coro, write_coro)
print("tasks finished... sleeping 30 seconds")
await asyncio.sleep(30)
print("Current q size:", q.qsize())
print("deleting queue and sleeping 30 seconds")
del q
await asyncio.sleep(30)
log_mem_task.cancel()
try:
await log_mem_task
except asyncio.CancelledError:
print("log mem cancelled")

asyncio.run(main())

Вот результат выполнения кода (для краткости я вырезал среднюю часть):
mem used: 31.62 MiB
Download size: 36.09 GiB
Downloaded: 5%, q size: 5799
mem used: 1534.69 MiB
mem used: 2049.82 MiB
Downloaded: 10%, q size: 8168
mem used: 2049.82 MiB
Downloaded: 15%, q size: 8175
mem used: 2048.55 MiB
Downloaded: 20%, q size: 8180

mem used: 2052.88 MiB
mem used: 2052.88 MiB
Downloaded: 80%, q size: 8113
mem used: 2052.88 MiB
Downloaded: 85%, q size: 8112
mem used: 2052.88 MiB
Downloaded: 90%, q size: 8148
mem used: 2052.45 MiB
mem used: 2052.45 MiB
Downloaded: 95%, q size: 8042
mem used: 2052.45 MiB
Downloaded: 100%, q size: 8179
Http stream finished
mem used: 2052.45 MiB
write to disk finished
tasks finished... sleeping 30 seconds
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
Current q size: 0
deleting queue and sleeping 30 seconds
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
log mem cancelled

Как видите, использованная память не освобождается даже после удаления очереди. Я проверил с помощью Tracemalloc, и память высвободилась из кучи, так что можно предположить, что это дело в cpython, да?
Я понятия не имею, как это дальше отлаживать или как освободить память. Я собираюсь перезапустить приложение после завершения загрузки, что не очень хорошо.
Что я могу здесь сделать? Это кажется довольно простой вещью, я просто этого не понимаю.
Изменить:
Я также пробовал использовать явный сборщик мусора. Collect(), то же самое.
Taking out the trash and sleeping 30 seconds
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
log mem cancelled


Подробнее здесь: https://stackoverflow.com/questions/793 ... ing-memory
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»