Вот мои конфигурации сельдерея:
Код: Выделить всё
configs.py
Код: Выделить всё
...
# Celery configuration
@property
def CELERY_BROKER_URL(self) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/1"
@property
def CELERY_RESULT_BACKEND(self) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/2"
@property
def WS_MESSAGE_QUEUE(self) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/3"
CELERY_BEAT_SCHEDULE: dict = {
"news-timer-scheduler": {
"task": "news_parser",
"schedule": 1800, # 1800/60=30 minutes
},
"weather-timer-scheduler": {
"task": "weather_parser",
"schedule": 300.0, # 300/60=5 minutes
}
}
CELERY_TASK_DEFAULT_QUEUE: str = "default"
# Force all queues to be explicitly listed in `CELERY_TASK_QUEUES` to help prevent typos
CELERY_TASK_CREATE_MISSING_QUEUES: bool = False
CELERY_TIMEZONE: str = 'UTC'
CELERY_TASK_RESULT_EXPIRES: int = 120 # 2 minutes
Код: Выделить всё
celery_app.py
Код: Выделить всё
from celery import current_app as current_celery_app
from app.core.config import configs
def create_celery():
celery = current_celery_app
celery.config_from_object(configs, namespace="CELERY")
celery.autodiscover_tasks(['app.celery.tasks'])
return celery
celery_app = create_celery()
Код: Выделить всё
tasks.py
Код: Выделить всё
import asyncio
import gc
import psutil
from app.celery.celery_app import celery_app
from app.main import container
BATCH_SIZE = 10
@celery_app.task(name="news_parser")
def news_parser():
print("====================RUNNING news_parser====================")
process = psutil.Process()
print(f"news_parser memory usage start:::::::: {process.memory_info().rss / 1024 ** 2} MB")
sites = asyncio.run(container.site_service().load_actives()) # Get list of active site IDs
batch = sites[i:i + BATCH_SIZE]
asyncio.run(container.news_timer_service().run_task(batch))
del batch
gc.collect() # Collect garbage after processing each batch
del sites
gc.collect()
print(f"news_parser memory usage end:::::::::: {process.memory_info().rss / 1024 ** 2} MB")
return {"news": "parsed"}
@celery_app.task(name="weather_parser")
def weather_parser():
print("====================RUNNING weather_parser====================")
process = psutil.Process()
asyncio.run(container.weather_service().run_task())
gc.collect()
return {"weather": "parse"}
Код: Выделить всё
celery -A app.celery.celery_app worker --loglevel=info --pool=solo &
celery -A app.celery.celery_app beat --loglevel=info --scheduler=celery.beat:PersistentScheduler
ПРОБЛЕМА: использование оперативной памяти увеличивается со временем время, и в конечном итоге процесс пополняет оперативную память. В конце сервер зависает, и его приходится перезагружать. Как очистить используемую память после завершения процесса или установить некоторые ограничения, если достигнута эта ограниченная память, автоматически перезапустите работника сельдерея.... ?
Я пытаюсь запускать команды, как показано ниже:
Я пытаюсь запускать команды, как показано ниже:
p>
Код: Выделить всё
celery -A app.celery.celery_app worker --loglevel=info --max-memory-per-child=350000 --concurrency=3
Код: Выделить всё
celery -A app.celery.celery_app worker --loglevel=info --max-memory-per-child=350000 --max-tasks-per-child=3
inContainer.news_timer_service().run_task (пакетный) используйте gc.collect() и удалите все переменные после использования.
Подробнее здесь: https://stackoverflow.com/questions/790 ... nishes-ram