Вот упрощенная версия моего кода:
Код: Выделить всё
import logging
from django.http import JsonResponse
import aiohttp
import asyncio
import gc
from memory_profiler import profile
logger = logging.getLogger(__name__)
async def fetch(session, url):
async with session.get(url, ssl=False) as response:
response.raise_for_status()
return await response.json()
@profile
async def main():
URL = "https://jsonplaceholder.typicode.com/posts"
REQUEST_COUNT = 1000
CONCURRENT_LIMIT = 50
BATCH_SIZE = 100
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)) as session:
semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
async def fetch_with_limit():
async with semaphore:
return await fetch(session, URL)
for i in range(0, REQUEST_COUNT, BATCH_SIZE):
tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
results = await asyncio.gather(*tasks)
logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
tasks.clear()
await session.close()
del session
gc.collect()
return
async def test_httpx_view(request):
asyncio.create_task(main())
return JsonResponse({'status': 'Success'}, status=200)
Я запустил профилирование памяти для функции main(), и результаты показывают значительный прирост памяти при каждой обработке каждого пакета:
Код: Выделить всё
Line # Mem usage Increment Occurrences Line Contents
=============================================================
288 171.8 MiB 171.8 MiB 1 @profile
289 async def main():
290 171.8 MiB 0.0 MiB 1 URL = "https://jsonplaceholder.typicode.com/posts"
291 171.8 MiB 0.0 MiB 1 REQUEST_COUNT = 1000
292 171.8 MiB 0.0 MiB 1 CONCURRENT_LIMIT = 50
293 171.8 MiB 0.0 MiB 1 BATCH_SIZE = 100
294
295 # Initialize local session to ensure closure after each batch
296 171.8 MiB 0.0 MiB 2 async with aiohttp.ClientSession(
297 171.8 MiB 0.0 MiB 1 connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298 171.8 MiB 0.0 MiB 1 ) as session:
299 171.8 MiB 0.0 MiB 1 semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301 191.0 MiB -18.9 MiB 1001 async def fetch_with_limit():
302 191.0 MiB -14.6 MiB 1500 async with semaphore:
303 191.0 MiB -101.7 MiB 5191 return await fetch(session, URL)
304
305 191.0 MiB 0.0 MiB 12 for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306 191.0 MiB 0.0 MiB 1030 tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307 191.0 MiB -0.3 MiB 20 results = await asyncio.gather(*tasks)
308
309 # Here you could process the results from each batch if needed
310 191.0 MiB 0.0 MiB 10 logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312 # Clean up after each batch
313 191.0 MiB 0.0 MiB 10 tasks.clear()
314
315 191.0 MiB 0.0 MiB 1 await session.close()
316 191.0 MiB 0.0 MiB 1 del session
317 191.0 MiB 0.0 MiB 1 gc.collect()
318 191.0 MiB 0.0 MiB 1 return
INFO: 127.0.0.1:41392 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
Line # Mem usage Increment Occurrences Line Contents
=============================================================
288 190.8 MiB 190.8 MiB 1 @profile
289 async def main():
290 190.8 MiB 0.0 MiB 1 URL = "https://jsonplaceholder.typicode.com/posts"
291 190.8 MiB 0.0 MiB 1 REQUEST_COUNT = 1000
292 190.8 MiB 0.0 MiB 1 CONCURRENT_LIMIT = 50
293 190.8 MiB 0.0 MiB 1 BATCH_SIZE = 100
294
295 # Initialize local session to ensure closure after each batch
296 190.8 MiB 0.0 MiB 2 async with aiohttp.ClientSession(
297 190.8 MiB 0.0 MiB 1 connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298 190.8 MiB 0.0 MiB 1 ) as session:
299 190.8 MiB 0.0 MiB 1 semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301 191.4 MiB 0.0 MiB 1001 async def fetch_with_limit():
302 191.4 MiB 0.1 MiB 1500 async with semaphore:
303 191.4 MiB 0.5 MiB 5134 return await fetch(session, URL)
304
305 191.4 MiB 0.0 MiB 12 for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306 191.4 MiB 0.0 MiB 1030 tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307 191.4 MiB 0.0 MiB 20 results = await asyncio.gather(*tasks)
308
309 # Here you could process the results from each batch if needed
310 191.4 MiB 0.0 MiB 10 logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312 # Clean up after each batch
313 191.4 MiB 0.0 MiB 10 tasks.clear()
314
315 191.4 MiB 0.0 MiB 1 await session.close()
316 191.4 MiB 0.0 MiB 1 del session
317 191.4 MiB 0.0 MiB 1 gc.collect()
318 191.4 MiB 0.0 MiB 1 return
INFO: 127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
Line # Mem usage Increment Occurrences Line Contents
=============================================================
288 191.4 MiB 191.4 MiB 2 @profile
289 async def main():
290 191.4 MiB 0.0 MiB 2 URL = "https://jsonplaceholder.typicode.com/posts"
291 191.4 MiB 0.0 MiB 2 REQUEST_COUNT = 1000
292 191.4 MiB 0.0 MiB 2 CONCURRENT_LIMIT = 50
293 191.4 MiB 0.0 MiB 2 BATCH_SIZE = 100
294
295 # Initialize local session to ensure closure after each batch
296 191.4 MiB 0.0 MiB 4 async with aiohttp.ClientSession(
297 191.4 MiB 0.0 MiB 2 connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298 191.4 MiB 0.0 MiB 2 ) as session:
299 191.4 MiB 0.0 MiB 2 semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301 206.5 MiB -72.7 MiB 1902 async def fetch_with_limit():
302 206.5 MiB -107.7 MiB 2850 async with semaphore:
303 206.5 MiB -364.5 MiB 9791 return await fetch(session, URL)
304
305 206.5 MiB -0.7 MiB 21 for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306 206.5 MiB -74.8 MiB 1957 tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307 206.5 MiB -1.5 MiB 37 results = await asyncio.gather(*tasks)
308
309 # Here you could process the results from each batch if needed
310 206.5 MiB -0.7 MiB 18 logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312 # Clean up after each batch
313 206.5 MiB -0.7 MiB 18 tasks.clear()
314
315 206.5 MiB 0.0 MiB 1 await session.close()
316 206.5 MiB 0.0 MiB 1 del session
317 206.5 MiB 0.0 MiB 1 gc.collect()
318 206.5 MiB 0.0 MiB 1 return
Использование памяти начинается примерно со 171,8 МБ, но увеличивается с каждым обработанным пакетом.
Я явно включил сбор мусора, но использование памяти продолжает расти.
Инструкция async with гарантирует закрытие ClientSession после каждого пакета, но, похоже, она не освобождает память эффективно.
Дополнительная информация
Я пробовал использовать Python версий 3.12.3, 3.10.15 и 3.9.2. Для Python 3.9.2 используются следующие версии библиотеки:
aiohttp: версия 3.10.10
multidict: версия 6.1.0
yarl: версия 1.17.1
async_timeout: версия 4.0.3
Вопрос
Что может быть причиной утечки памяти в этой настройке?
Могу ли я предпринять какие-либо дополнительные действия обеспечить правильное управление памятью с помощью aiohttp в асинхронном контексте?
Подробнее здесь: https://stackoverflow.com/questions/791 ... -in-django
Мобильная версия