Утечка памяти в асинхронных HTTP-запросах с помощью aiohttp в DjangoPython

Программы на Python
Ответить
Anonymous
 Утечка памяти в асинхронных HTTP-запросах с помощью aiohttp в Django

Сообщение Anonymous »

Я столкнулся с проблемой утечки памяти в своем приложении Django, которое использует aiohttp для выполнения асинхронных HTTP-запросов. Я настроил функцию для получения данных из REST API, но, несмотря на мои попытки эффективно управлять памятью, я наблюдаю значительное увеличение использования памяти с каждым обработанным пакетом запросов.
Вот упрощенная версия моего кода:

Код: Выделить всё

    import logging
from django.http import JsonResponse
import aiohttp
import asyncio
import gc
from memory_profiler import profile

logger = logging.getLogger(__name__)

async def fetch(session, url):
async with session.get(url, ssl=False) as response:
response.raise_for_status()
return await response.json()

@profile
async def main():
URL = "https://jsonplaceholder.typicode.com/posts"
REQUEST_COUNT = 1000
CONCURRENT_LIMIT = 50
BATCH_SIZE = 100

async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)) as session:
semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)

async def fetch_with_limit():
async with semaphore:
return await fetch(session, URL)

for i in range(0, REQUEST_COUNT, BATCH_SIZE):
tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
results = await asyncio.gather(*tasks)
logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
tasks.clear()

await session.close()
del session
gc.collect()
return

async def test_httpx_view(request):
asyncio.create_task(main())
return JsonResponse({'status': 'Success'}, status=200)
Результаты профилирования
Я запустил профилирование памяти для функции main(), и результаты показывают значительный прирост памяти при каждой обработке каждого пакета:

Код: Выделить всё

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
288    171.8 MiB    171.8 MiB           1   @profile
289                                         async def main():
290    171.8 MiB      0.0 MiB           1       URL = "https://jsonplaceholder.typicode.com/posts"
291    171.8 MiB      0.0 MiB           1       REQUEST_COUNT = 1000
292    171.8 MiB      0.0 MiB           1       CONCURRENT_LIMIT = 50
293    171.8 MiB      0.0 MiB           1       BATCH_SIZE = 100
294
295                                             # Initialize local session to ensure closure after each batch
296    171.8 MiB      0.0 MiB           2       async with aiohttp.ClientSession(
297    171.8 MiB      0.0 MiB           1           connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298    171.8 MiB      0.0 MiB           1       ) as session:
299    171.8 MiB      0.0 MiB           1           semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301    191.0 MiB    -18.9 MiB        1001           async def fetch_with_limit():
302    191.0 MiB    -14.6 MiB        1500               async with semaphore:
303    191.0 MiB   -101.7 MiB        5191                   return await fetch(session, URL)
304
305    191.0 MiB      0.0 MiB          12           for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306    191.0 MiB      0.0 MiB        1030               tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307    191.0 MiB     -0.3 MiB          20               results = await asyncio.gather(*tasks)
308
309                                                     # Here you could process the results from each batch if needed
310    191.0 MiB      0.0 MiB          10               logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312                                                     # Clean up after each batch
313    191.0 MiB      0.0 MiB          10               tasks.clear()
314
315    191.0 MiB      0.0 MiB           1       await session.close()
316    191.0 MiB      0.0 MiB           1       del session
317    191.0 MiB      0.0 MiB           1       gc.collect()
318    191.0 MiB      0.0 MiB           1       return

INFO:      127.0.0.1:41392 - "GET /test_aiohttp/ HTTP/1.1" 200 OK

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
288    190.8 MiB    190.8 MiB           1   @profile
289                                         async def main():
290    190.8 MiB      0.0 MiB           1       URL = "https://jsonplaceholder.typicode.com/posts"
291    190.8 MiB      0.0 MiB           1       REQUEST_COUNT = 1000
292    190.8 MiB      0.0 MiB           1       CONCURRENT_LIMIT = 50
293    190.8 MiB      0.0 MiB           1       BATCH_SIZE = 100
294
295                                             # Initialize local session to ensure closure after each batch
296    190.8 MiB      0.0 MiB           2       async with aiohttp.ClientSession(
297    190.8 MiB      0.0 MiB           1           connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298    190.8 MiB      0.0 MiB           1       ) as session:
299    190.8 MiB      0.0 MiB           1           semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301    191.4 MiB      0.0 MiB        1001           async def fetch_with_limit():
302    191.4 MiB      0.1 MiB        1500               async with semaphore:
303    191.4 MiB      0.5 MiB        5134                   return await fetch(session, URL)
304
305    191.4 MiB      0.0 MiB          12           for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306    191.4 MiB      0.0 MiB        1030               tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307    191.4 MiB      0.0 MiB          20               results = await asyncio.gather(*tasks)
308
309                                                     # Here you could process the results from each batch if needed
310    191.4 MiB      0.0 MiB          10               logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312                                                     # Clean up after each batch
313    191.4 MiB      0.0 MiB          10               tasks.clear()
314
315    191.4 MiB      0.0 MiB           1       await session.close()
316    191.4 MiB      0.0 MiB           1       del session
317    191.4 MiB      0.0 MiB           1       gc.collect()
318    191.4 MiB      0.0 MiB           1       return

INFO:     127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1"  200 OK

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
288    191.4 MiB    191.4 MiB           2   @profile
289                                         async def main():
290    191.4 MiB      0.0 MiB           2       URL = "https://jsonplaceholder.typicode.com/posts"
291    191.4 MiB      0.0 MiB           2       REQUEST_COUNT = 1000
292    191.4 MiB      0.0 MiB           2       CONCURRENT_LIMIT = 50
293    191.4 MiB      0.0 MiB           2       BATCH_SIZE = 100
294
295                                             # Initialize local session to ensure closure after each batch
296    191.4 MiB      0.0 MiB           4       async with aiohttp.ClientSession(
297    191.4 MiB      0.0 MiB           2           connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298    191.4 MiB      0.0 MiB           2       ) as session:
299    191.4 MiB      0.0 MiB           2           semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301    206.5 MiB    -72.7 MiB        1902           async def fetch_with_limit():
302    206.5 MiB   -107.7 MiB        2850               async with semaphore:
303    206.5 MiB   -364.5 MiB        9791                   return await fetch(session, URL)
304
305    206.5 MiB     -0.7 MiB          21           for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306    206.5 MiB    -74.8 MiB        1957               tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307    206.5 MiB     -1.5 MiB          37               results = await asyncio.gather(*tasks)
308
309                                                     # Here you could process the results from each batch if needed
310    206.5 MiB     -0.7 MiB          18               logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312                                                     # Clean up after each batch
313    206.5 MiB     -0.7 MiB          18               tasks.clear()
314
315    206.5 MiB      0.0 MiB           1       await session.close()
316    206.5 MiB      0.0 MiB           1       del session
317    206.5 MiB      0.0 MiB           1       gc.collect()
318    206.5 MiB      0.0 MiB           1       return
Наблюдения
Использование памяти начинается примерно со 171,8 МБ, но увеличивается с каждым обработанным пакетом.
Я явно включил сбор мусора, но использование памяти продолжает расти.
Инструкция async with гарантирует закрытие ClientSession после каждого пакета, но, похоже, она не освобождает память эффективно.
Дополнительная информация
Я пробовал использовать Python версий 3.12.3, 3.10.15 и 3.9.2. Для Python 3.9.2 используются следующие версии библиотеки:
aiohttp: версия 3.10.10
multidict: версия 6.1.0
yarl: версия 1.17.1
async_timeout: версия 4.0.3
Вопрос
Что может быть причиной утечки памяти в этой настройке?
Является ли это Могу ли я предпринять какие-либо дополнительные шаги, чтобы обеспечить правильное управление памятью с помощью aiohttp в асинхронном контексте?

Подробнее здесь: https://stackoverflow.com/questions/791 ... -in-django
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»