Я разместил свое основное приложение пользовательского интерфейса Flask и приложение Flask Scheduler на одном экземпляре T2.micro, и я столкнулся с проблемой, заключающейся в том, что планировщик не публикует события должным образом. Он сохраняет данные в базе данных, но не публикует их.
Я создал микросервис с двумя потоками. Основной поток — это приложение Flask, а вторичный поток — планировщик. Я использовал библиотеку расписаний Python для реализации базового планировщика, который планирует события, полученные через API. Планировщик также одновременно обновляет базу данных. Проблема в том, что планировщик планирует задачу, а также сохраняет ее в базе данных, но не публикует ее, несмотря на упоминание правильного API.
вот код Scheduler_func.py
вот код Scheduler_func.py
р>
import requests
import schedule
from . import app
from . api_urls import urls
from .models import db, Schedules
from datetime import datetime
import time
def get_schedule_id():
"""
This function is used to get the next available schedule ID.
"""
last_schedule = db.session.query(Schedules).order_by(Schedules.id.desc()).first()
if last_schedule is None:
return 1001
else:
return last_schedule.id+1
schedule_id = 1001
def scheduler_thread():
"""
This function is a background thread that runs the schedule library to periodically execute scheduled tasks.
It continuously loops, checking if any scheduled tasks are due to run, and if so, executes them.
"""
with app.app_context():
while True:
schedule.run_pending()
time.sleep(1)
def publish_job(message, topic):
"""
This function is used to publish a message to a specific MQTT topic.
Args:
message (str): The message to be published.
topic (str): The MQTT topic to which the message should be published.
Returns:
requests.models.Response: The response from the MQTT server.
"""
response = requests.get(urls['publish_schedule']+f'?topic={topic}&pub_message={message}')
def do_job(message,topic):
with app.app_context():
publish_job(message,topic)
def do_once(message, topic, schedule_id):
"""
This function is used to publish a message to a specific MQTT topic once, and then delete the schedule.
Args:
message (str): The message to be published.
topic (str): The MQTT topic to which the message should be published.
schedule_id (int): The ID of the schedule.
Returns:
schedule.CancelJob: A cancel job that can be used to cancel the scheduled task.
"""
with app.app_context():
publish_job(message, topic)
delete_schedule(schedule_id)
return schedule.CancelJob
def create_schedule(data):
"""
This function creates a new schedule in the database and schedules it to run.
Args:
data (dict): The data required to create the schedule.
Returns:
dict: An acknowledgment indicating whether the schedule was created successfully.
"""
global schedule_id
acknowledgment = { 'success': True,
'code': 200,
'message': 'received'}
scheduled_message = data['data']['message']
scheduled_topic = data['topic']
scheduled_time = data['data']['time']
scheduled_repeat = data['data']['repeat']
scheduled_days = data['data']['days']
if scheduled_repeat == 'once':
schedule.every().day.at(scheduled_time).do(do_once, scheduled_message,scheduled_topic,schedule_id).tag(schedule_id)
elif scheduled_repeat == 'everyday' and scheduled_days:
for day in scheduled_days:
if day == 'sun':
schedule.every().sunday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'mon':
schedule.every().monday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'tue':
schedule.every().tuesday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'wed':
schedule.every().wednesday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'thu':
schedule.every().thursday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'fri':
schedule.every().friday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'sat':
schedule.every().saturday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
days = ''
for day in scheduled_days:
days += day + ' '
repeat = True if scheduled_repeat == 'everyday' else False
db.session.add(Schedules(id = schedule_id, topic = scheduled_topic, message = scheduled_message, repeat = repeat, Days = days, Time = scheduled_time))
db.session.commit()
schedule_id = get_schedule_id()
return {'ack':acknowledgment}
def delete_schedule(schedule_id):
"""
Delete a schedule from the database and clear the schedule from the schedule library.
Args:
schedule_id (int): The ID of the schedule to be deleted.
Returns:
None
"""
with app.app_context():
db.session.query(Schedules).filter(Schedules.id == schedule_id).delete()
db.session.commit()
schedule.clear(schedule_id)
вот код для маршрутизации
from flask import request, jsonify
import requests
import json
from .schedule_func import create_schedule, delete_schedule
from . import app
from .models import db, Schedules
from .api_urls import urls
@app.route('/get_schedule',methods = ['GET', 'POST'])
def get_schedule():
if request.method == 'POST':
data = request.json
acknowledgment = create_schedule(data)['ack']
return jsonify(acknowledgment)
@app.route('/send_data',methods = ['GET', 'POST'])
def send_data():
"""
This function is used to send data to the database.
Parameters:
topic (str): The topic of the data to be sent.
Returns:
dict: A JSON object containing the status, message, topic, and data.
"""
if request.method == 'GET':
topic = request.args.get('topic')
if topic == '':
data = db.session.query(Schedules).all()
else:
data = db.session.query(Schedules).filter(Schedules.topic == topic).all()
data_list = [element.to_dict() for element in data]
packet = {
'status' : True,
'message' : 'success',
'topic' : topic,
'data' : data_list
}
return jsonify(packet)
@app.route('/delete_data', methods = ['GET'])
def delete_data():
"""
Delete a schedule from the database
Args:
schedule_id (int): The id of the schedule to delete
Returns:
dict: A JSON object with a status key indicating success or failure, and a message key indicating the result of the operation
"""
if request.method == 'GET':
schedule_id = request.args.get('schedule_id')
delete_schedule(int(schedule_id))
return jsonify({'status' : True, 'message' : 'schedule_deleted'})
вот основной файл
from scheduler_app import app,db
import threading
import time
from scheduler_app.schedule_func import scheduler_thread
def driver_function():
db.create_all()
worker_thread = threading.Thread(target = scheduler_thread, daemon = True)
worker_thread.start()
return app
if __name__ == '__main__':
app.app_context().push()
driver_function().run(host = '0.0.0.0', port = 8002)
Подробнее здесь: https://stackoverflow.com/questions/781 ... ure-is-not
Мой планировщик в приложении для умного дома Flask с микросервисной архитектурой не работает должным образом. ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как обмениваться моделями Java между микросервисами в микросервисной архитектуре
Anonymous » » в форуме JAVA - 0 Ответы
- 23 Просмотры
-
Последнее сообщение Anonymous
-