Это MRE того, что я на самом деле делаю, но у него та же проблема с памятью. Я использую многопроцессорную очередь, но тестовая очередь asyncio здесь такая же. По сути, похоже, что очередь не освобождает память после максимальной отметки.
Ключевым моментом здесь является то, что процесс управления долговечен, и я не хочу, чтобы он потреблял много памяти.
Python 3.12.3
Linux Kernel 6.8.0-51
Вот код:
Код: Выделить всё
from __future__ import annotations
import asyncio
from pathlib import Path
from socket import AF_INET
import psutil
import aiofiles.os
from aiohttp import (
ClientError,
ClientSession,
ClientTimeout,
TCPConnector,
)
async def write_to_disk(dest: Path, queue: asyncio.Queue) -> None:
async with aiofiles.open(dest, "wb") as f:
while data := await queue.get():
# give up control so the http reader gets priority, i.e.
# simulate time consuming work
await asyncio.sleep(0)
await f.write(data)
# this does nothing
queue.task_done()
# last "data" is empty sentinel. This also does nothing
queue.task_done()
print("write to disk finished")
async def log_mem():
while True:
process = psutil.Process()
used = round(process.memory_info().rss / 1024**2, 2)
print(f"mem used: {used} MiB")
await asyncio.sleep(5)
async def start_http_stream(
queue: asyncio.Queue,
connect_timeout: int = 3,
read_timeout: int = 10,
) -> None:
timeout = ClientTimeout(connect=connect_timeout, sock_read=read_timeout)
conn = TCPConnector(family=AF_INET)
url = "http://somedownloadurl.com/filetodownload"
extra_params = {"json": {}}
async with ClientSession(
connector=conn,
) as session:
try:
async with session.post(
url, timeout=timeout, **extra_params
) as resp:
bytes_downloaded = 0
percent_logged = 0
content_length = int(resp.headers.get("Approx-Content-Length"))
print(
"Download size:",
round(content_length / 1024 / 1024 / 1024, 2),
"GiB",
)
async for chunk, _ in resp.content.iter_chunks():
if not chunk:
# we can get empty chunks - which threw me to start with. If we pass these through,
# the other end thinks the stream is finished.
continue
bytes_downloaded += len(chunk)
percent = int((bytes_downloaded / content_length) * 100)
if percent > percent_logged and percent % 5 == 0:
percent_logged = percent
print(
f"Downloaded: {percent}%, q size: {queue.qsize()}"
)
# using this as poor mans backpressure. I had problems with
# read timeouts on the resp when using a bounded queue
if queue.qsize() >= 8192:
await asyncio.sleep(0.05)
await queue.put(chunk)
# Sentinel
await queue.put(b"")
except (ClientError, OSError) as e:
print("EXC", e)
print("Http stream finished")
async def main():
q = asyncio.Queue()
stream_coro = start_http_stream(q)
write_coro = write_to_disk("file_download.tar", q)
log_mem_task = asyncio.create_task(log_mem())
await asyncio.gather(stream_coro, write_coro)
print("tasks finished... sleeping 30 seconds")
await asyncio.sleep(30)
print("Current q size:", q.qsize())
print("deleting queue and sleeping 30 seconds")
del q
await asyncio.sleep(30)
log_mem_task.cancel()
try:
await log_mem_task
except asyncio.CancelledError:
print("log mem cancelled")
asyncio.run(main())
Код: Выделить всё
mem used: 31.62 MiB
Download size: 36.09 GiB
Downloaded: 5%, q size: 5799
mem used: 1534.69 MiB
mem used: 2049.82 MiB
Downloaded: 10%, q size: 8168
mem used: 2049.82 MiB
Downloaded: 15%, q size: 8175
mem used: 2048.55 MiB
Downloaded: 20%, q size: 8180
mem used: 2052.88 MiB
mem used: 2052.88 MiB
Downloaded: 80%, q size: 8113
mem used: 2052.88 MiB
Downloaded: 85%, q size: 8112
mem used: 2052.88 MiB
Downloaded: 90%, q size: 8148
mem used: 2052.45 MiB
mem used: 2052.45 MiB
Downloaded: 95%, q size: 8042
mem used: 2052.45 MiB
Downloaded: 100%, q size: 8179
Http stream finished
mem used: 2052.45 MiB
write to disk finished
tasks finished... sleeping 30 seconds
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
Current q size: 0
deleting queue and sleeping 30 seconds
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
mem used: 2049.34 MiB
log mem cancelled
Я понятия не имею, как это дальше отлаживать или как освободить память. Я собираюсь перезапустить приложение после завершения загрузки, что не очень хорошо.
Что я могу здесь сделать? Это кажется довольно простой вещью, я просто этого не понимаю.
Изменить:
Я также пробовал использовать явный сборщик мусора. Collect(), то же самое.
Код: Выделить всё
Taking out the trash and sleeping 30 seconds
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
mem used: 1990.68 MiB
log mem cancelled
Вот снимок экрана после удаления очереди:
Вот скриншот после удаления очереди:
Дальнейшее редактирование. p>

Здесь это скриншот после выхода из процесса, вы можете увидеть память выпущено:

Подробнее здесь: https://stackoverflow.com/questions/793 ... ing-memory
Мобильная версия