Я использую FastAPI и AsyncIOScheduler.
Мое условие планировщика: запускать задачу каждый день в 6 утра с start_time = 6 утра и временем окончания — 8 утра.
Я добавил код планировщика в lifespan, в котором ваш код выполняется только один раз при запуске приложения.
Пожалуйста, обратитесь к коду, где я добавил datetime.now() чтобы получить время начала и окончания, но оно будет выполняться только один раз, потому что оно добавлено в течение жизненного цикла, так как я могу выполнять задание каждый день.
@asynccontextmanager
async def lifespan(app: FastAPI):
# Run at startup
def print_hello():
print("Hello", datetime.now())
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime, timedelta
scheduler = AsyncIOScheduler()
print("Datetime.......",datetime.now())
start_time = datetime.now() + timedelta(seconds=5) # Start in 10 seconds
end_time = datetime.now() + timedelta(seconds=15) # End in 60 seconds
scheduler.add_job(print_hello, "interval", start =start_time, end = end_time, seconds=1)
scheduler.add_job(print_hello, 'cron', hour=6, minute=0)
scheduler.start()
yield
# Run on shutdown (if required)
scheduler.shutdown()
Подробнее здесь: https://stackoverflow.com/questions/790 ... heduler-in
Как запускать задание планировщика ежедневно с указанием времени начала и окончания, используя AsyncIOScheduler в FastAP ⇐ Python
Программы на Python
1727456438
Anonymous
Я использую FastAPI и AsyncIOScheduler.
Мое условие планировщика: запускать задачу каждый день в 6 утра с start_time = 6 утра и временем окончания — 8 утра.
Я добавил код планировщика в [b]lifespan[/b], в котором ваш код выполняется только один раз при запуске приложения.
Пожалуйста, обратитесь к коду, где я добавил datetime.now() чтобы получить время начала и окончания, но оно будет выполняться только один раз, потому что оно добавлено в течение жизненного цикла, так как я могу выполнять задание каждый день.
@asynccontextmanager
async def lifespan(app: FastAPI):
# Run at startup
def print_hello():
print("Hello", datetime.now())
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime, timedelta
scheduler = AsyncIOScheduler()
print("Datetime.......",datetime.now())
start_time = datetime.now() + timedelta(seconds=5) # Start in 10 seconds
end_time = datetime.now() + timedelta(seconds=15) # End in 60 seconds
scheduler.add_job(print_hello, "interval", start =start_time, end = end_time, seconds=1)
scheduler.add_job(print_hello, 'cron', hour=6, minute=0)
scheduler.start()
yield
# Run on shutdown (if required)
scheduler.shutdown()
Подробнее здесь: [url]https://stackoverflow.com/questions/79031027/how-to-run-scheduler-job-daily-with-start-and-end-time-using-asyncioscheduler-in[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия