Мой планировщик в приложении для умного дома Flask с микросервисной архитектурой не работает должным образом.Python

Программы на Python
Ответить Пред. темаСлед. тема
Гость
 Мой планировщик в приложении для умного дома Flask с микросервисной архитектурой не работает должным образом.

Сообщение Гость »

Я разместил свое основное приложение пользовательского интерфейса Flask и приложение Flask Scheduler на одном экземпляре T2.micro, и я столкнулся с проблемой, заключающейся в том, что планировщик не публикует события должным образом. Он сохраняет данные в базе данных, но не публикует их.
Я создал микросервис с двумя потоками. Основной поток — это приложение Flask, а вторичный поток — планировщик. Я использовал библиотеку расписаний Python для реализации базового планировщика, который планирует события, полученные через API. Планировщик также одновременно обновляет базу данных. Проблема в том, что планировщик планирует задачу, а также сохраняет ее в базе данных, но не публикует ее, несмотря на упоминание правильного API.
вот код Scheduler_func.py
вот код Scheduler_func.py
р>
import requests
import schedule
from . import app
from . api_urls import urls
from .models import db, Schedules
from datetime import datetime
import time

def get_schedule_id():
"""
This function is used to get the next available schedule ID.
"""
last_schedule = db.session.query(Schedules).order_by(Schedules.id.desc()).first()
if last_schedule is None:
return 1001
else:
return last_schedule.id+1

schedule_id = 1001

def scheduler_thread():
"""
This function is a background thread that runs the schedule library to periodically execute scheduled tasks.
It continuously loops, checking if any scheduled tasks are due to run, and if so, executes them.
"""
with app.app_context():
while True:
schedule.run_pending()
time.sleep(1)

def publish_job(message, topic):
"""
This function is used to publish a message to a specific MQTT topic.

Args:
message (str): The message to be published.
topic (str): The MQTT topic to which the message should be published.

Returns:
requests.models.Response: The response from the MQTT server.
"""
response = requests.get(urls['publish_schedule']+f'?topic={topic}&pub_message={message}')

def do_job(message,topic):
with app.app_context():
publish_job(message,topic)

def do_once(message, topic, schedule_id):
"""
This function is used to publish a message to a specific MQTT topic once, and then delete the schedule.

Args:
message (str): The message to be published.
topic (str): The MQTT topic to which the message should be published.
schedule_id (int): The ID of the schedule.

Returns:
schedule.CancelJob: A cancel job that can be used to cancel the scheduled task.
"""
with app.app_context():
publish_job(message, topic)
delete_schedule(schedule_id)
return schedule.CancelJob

def create_schedule(data):
"""
This function creates a new schedule in the database and schedules it to run.

Args:
data (dict): The data required to create the schedule.

Returns:
dict: An acknowledgment indicating whether the schedule was created successfully.

"""
global schedule_id

acknowledgment = { 'success': True,
'code': 200,
'message': 'received'}
scheduled_message = data['data']['message']
scheduled_topic = data['topic']
scheduled_time = data['data']['time']
scheduled_repeat = data['data']['repeat']
scheduled_days = data['data']['days']

if scheduled_repeat == 'once':

schedule.every().day.at(scheduled_time).do(do_once, scheduled_message,scheduled_topic,schedule_id).tag(schedule_id)

elif scheduled_repeat == 'everyday' and scheduled_days:
for day in scheduled_days:
if day == 'sun':
schedule.every().sunday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'mon':
schedule.every().monday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'tue':
schedule.every().tuesday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'wed':
schedule.every().wednesday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'thu':
schedule.every().thursday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'fri':
schedule.every().friday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)
elif day == 'sat':
schedule.every().saturday.at(scheduled_time).do(do_job, scheduled_message,scheduled_topic).tag(schedule_id)

days = ''
for day in scheduled_days:
days += day + ' '

repeat = True if scheduled_repeat == 'everyday' else False

db.session.add(Schedules(id = schedule_id, topic = scheduled_topic, message = scheduled_message, repeat = repeat, Days = days, Time = scheduled_time))
db.session.commit()
schedule_id = get_schedule_id()

return {'ack':acknowledgment}

def delete_schedule(schedule_id):
"""
Delete a schedule from the database and clear the schedule from the schedule library.

Args:
schedule_id (int): The ID of the schedule to be deleted.

Returns:
None

"""
with app.app_context():
db.session.query(Schedules).filter(Schedules.id == schedule_id).delete()
db.session.commit()
schedule.clear(schedule_id)

вот код для маршрутизации
from flask import request, jsonify
import requests
import json
from .schedule_func import create_schedule, delete_schedule
from . import app
from .models import db, Schedules
from .api_urls import urls

@app.route('/get_schedule',methods = ['GET', 'POST'])
def get_schedule():
if request.method == 'POST':
data = request.json
acknowledgment = create_schedule(data)['ack']
return jsonify(acknowledgment)

@app.route('/send_data',methods = ['GET', 'POST'])
def send_data():
"""
This function is used to send data to the database.

Parameters:
topic (str): The topic of the data to be sent.

Returns:
dict: A JSON object containing the status, message, topic, and data.
"""
if request.method == 'GET':
topic = request.args.get('topic')
if topic == '':
data = db.session.query(Schedules).all()
else:
data = db.session.query(Schedules).filter(Schedules.topic == topic).all()

data_list = [element.to_dict() for element in data]
packet = {
'status' : True,
'message' : 'success',
'topic' : topic,
'data' : data_list
}
return jsonify(packet)

@app.route('/delete_data', methods = ['GET'])
def delete_data():
"""
Delete a schedule from the database

Args:
schedule_id (int): The id of the schedule to delete

Returns:
dict: A JSON object with a status key indicating success or failure, and a message key indicating the result of the operation
"""
if request.method == 'GET':
schedule_id = request.args.get('schedule_id')
delete_schedule(int(schedule_id))

return jsonify({'status' : True, 'message' : 'schedule_deleted'})

вот основной файл
from scheduler_app import app,db
import threading
import time
from scheduler_app.schedule_func import scheduler_thread

def driver_function():
db.create_all()
worker_thread = threading.Thread(target = scheduler_thread, daemon = True)
worker_thread.start()
return app

if __name__ == '__main__':
app.app_context().push()
driver_function().run(host = '0.0.0.0', port = 8002)


Подробнее здесь: https://stackoverflow.com/questions/781 ... ure-is-not
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как обмениваться моделями Java между микросервисами в микросервисной архитектуре
    Anonymous » » в форуме JAVA
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous
  • Как получить планировщик Cron, как планировщик в Python?
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • Почему мой планировщик PyTorch не работает должным образом?
    Anonymous » » в форуме Python
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Почему мой планировщик PyTorch не работает должным образом?
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Реализуется ли Context Manager (должным образом) HuggingFace Accelerate's init_empty_ewheways (должным образом)?
    Anonymous » » в форуме Python
    0 Ответы
    2 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»