Процесс Python celerybeat[worker] пополняет оперативную памятьPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Процесс Python celerybeat[worker] пополняет оперативную память

Сообщение Anonymous »

У меня есть проект fastapi с ритмом сельдерея, который запускается каждые 30 минут. Он анализирует новости с RSS-APIS веб-сайта и записывает их в базу данных.
Вот мои конфигурации сельдерея:

Код: Выделить всё

configs.py

Код: Выделить всё

    ...
# Celery configuration
@property
def CELERY_BROKER_URL(self) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/1"

@property
def CELERY_RESULT_BACKEND(self) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/2"

@property
def WS_MESSAGE_QUEUE(self) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/3"

CELERY_BEAT_SCHEDULE: dict = {
"news-timer-scheduler": {
"task": "news_parser",
"schedule": 1800,  # 1800/60=30 minutes
},
"weather-timer-scheduler": {
"task": "weather_parser",
"schedule": 300.0,  # 300/60=5 minutes
}
}
CELERY_TASK_DEFAULT_QUEUE: str = "default"

# Force all queues to be explicitly listed in `CELERY_TASK_QUEUES` to help prevent typos
CELERY_TASK_CREATE_MISSING_QUEUES: bool = False
CELERY_TIMEZONE: str = 'UTC'
CELERY_TASK_RESULT_EXPIRES: int = 120  # 2 minutes

Код: Выделить всё

celery_app.py

Код: Выделить всё

from celery import current_app as current_celery_app
from app.core.config import configs

def create_celery():
celery = current_celery_app
celery.config_from_object(configs, namespace="CELERY")
celery.autodiscover_tasks(['app.celery.tasks'])
return celery
celery_app = create_celery()

Код: Выделить всё

tasks.py

Код: Выделить всё

import asyncio
import gc
import psutil
from app.celery.celery_app import celery_app
from app.main import container

BATCH_SIZE = 10

@celery_app.task(name="news_parser")
def news_parser():
print("====================RUNNING news_parser====================")
process = psutil.Process()
print(f"news_parser memory usage start:::::::: {process.memory_info().rss / 1024 ** 2} MB")
sites = asyncio.run(container.site_service().load_actives())  # Get list of active site IDs
batch = sites[i:i + BATCH_SIZE]
asyncio.run(container.news_timer_service().run_task(batch))
del batch
gc.collect()  # Collect garbage after processing each batch
del sites
gc.collect()
print(f"news_parser memory usage end:::::::::: {process.memory_info().rss / 1024 ** 2} MB")
return {"news": "parsed"}

@celery_app.task(name="weather_parser")
def weather_parser():
print("====================RUNNING weather_parser====================")
process = psutil.Process()
asyncio.run(container.weather_service().run_task())
gc.collect()
return {"weather": "parse"}
Я использовал эту команду, чтобы запустить работника сельдерея и победить:

Код: Выделить всё

celery -A app.celery.celery_app worker --loglevel=info --pool=solo &
celery -A app.celery.celery_app beat --loglevel=info --scheduler=celery.beat:PersistentScheduler
Необходимо проанализировать RSS-файлы более 70 сайтов, а выполнение задачи news_parser занимает много времени.
ПРОБЛЕМА: использование оперативной памяти увеличивается со временем время, и в конечном итоге процесс пополняет оперативную память. В конце сервер зависает, и его приходится перезагружать. Как очистить используемую память после завершения процесса или установить некоторые ограничения, если достигнута эта ограниченная память, автоматически перезапустите работника сельдерея.... ?
Я пытаюсь запускать команды, как показано ниже:
Я пытаюсь запускать команды, как показано ниже:
p>

Код: Выделить всё

celery -A app.celery.celery_app worker --loglevel=info --max-memory-per-child=350000 --concurrency=3

Код: Выделить всё

celery -A app.celery.celery_app worker --loglevel=info --max-memory-per-child=350000 --max-tasks-per-child=3
но при каждом запуске такта используется следующий рабочий.

inContainer.news_timer_service().run_task (пакетный) используйте gc.collect() и удалите все переменные после использования.

Подробнее здесь: https://stackoverflow.com/questions/790 ... nishes-ram
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»