Что может быть причиной того, что моя задача Celery не сможет использовать подключение к базе данных Mysql?Python

Программы на Python
Ответить
Anonymous
 Что может быть причиной того, что моя задача Celery не сможет использовать подключение к базе данных Mysql?

Сообщение Anonymous »

Я новичок в работе с сельдереем и создал несколько простых операций с задачами, используя Celery, Redbeat, Flask, базовую аутентификацию Duo и базу данных MySQL. Единственная проблема заключается в том, что в задаче он не может подключиться к моей базе данных, поскольку считает, что строка подключения имеет значение «Нет», как показано ниже.
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'None' ([Errno 8] nodename nor servname provided, or not known)")
(Background on this error at: https://sqlalche.me/e/20/e3q8)


Я проверил, как настроил Celery, и вроде все в порядке. при запуске приложения все используемые переменные также кажутся в порядке. Вне задач я могу правильно использовать базу данных. Итак, я думаю, что каким-то образом испортил часть конфигурации, которая не позволяет задачам в их контексте получить доступ к соединению с базой данных, но я не могу это точно определить. Если бы кто-нибудь мог предложить мне дополнительный взгляд, чтобы увидеть, в чем проблема и почему она возникает, я был бы благодарен. Я прикрепил приведенный ниже код установки и код соответствующей задачи, которая не удалась.
Вот application.py
from flask import Flask, redirect, url_for
from flask_cors import CORS
from werkzeug.middleware.proxy_fix import ProxyFix
import config.const as CONSTANTS
from authlib.integrations.flask_client import OAuth
from extensions import db, jwt, ma, socketio
from web.authentication import auth
from web.tasks import tasks
from celery_config import make_celery

def create_app():
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY=CONSTANTS.SECRET,
SQLALCHEMY_DATABASE_URI=CONSTANTS.DB_CONNECTION_STRING,
SQLALCHEMY_TRACK_MODIFICATIONS=False,
PREFERRED_URL_SCHEME='https',
CELERY_BROKER_URL=f'{CONSTANTS.broker_dev}',
CELERY_RESULT_BACKEND=f'{CONSTANTS.backend_dev}'
)

CORS(app)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1)

# Initialize and register all Flask extensions
db.init_app(app)
jwt.init_app(app)
ma.init_app(app)
socketio.init_app(app, cors_allowed_origins="*")

# OAuth configuration
oauth = OAuth(app)
app.config['GOTO'] = oauth.register(
'goto',
client_id=CONSTANTS.CLIENT_ID,
authorize_url='CONSTANT.AUTH_URL',
authorize_params=None,
access_token_params=None,
refresh_token_url=None,
redirect_uri='https://e6d8-70-57-86-187.ngrok-free.app'
)

# Register all application blueprints
app.register_blueprint(auth, url_prefix='/auth')
app.register_blueprint(tasks, url_prefix='/task')

# Celery configuration
app.celery = make_celery(app)

return app


Вот celery_config.py
from celery import Celery
import config.const as CONST

def make_celery(app):
backend = CONST.backend_dev
broker = CONST.backend_dev

celery = Celery(app.import_name, backend=backend, broker=broker)
celery.conf.update({
'broker_url': app.config['CELERY_BROKER_URL'],
'result_backend': app.config['CELERY_RESULT_BACKEND'],
'sqlalchemy_database_uri': app.config['SQLALCHEMY_DATABASE_URI'],
'beat_scheduler': 'redbeat.RedBeatScheduler',
'redbeat_redis_url': backend,
'redbeat_key_prefix': 'redbeat:',
'beat_max_loop_interval': 10,
'beat_schedule': {},
'imports': ('tasks.celery_tasks',)
})
celery.set_default()
app.extensions['celery'] = celery
if app is not None:
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
print("Task is running with app context")
return self.run(*args, **kwargs)
celery.Task = ContextTask

return celery

И make_celery.py, где я запускаю приложение
from flask import redirect, url_for, request

from application import create_app

app = create_app()
celery_app = app.extensions['celery']
celery_app.control.purge()

@app.route("/")
def default_route():
print("This print statements means we hit our default route in make celery")
return redirect(url_for('auth.login'))

@app.after_request
def after_request(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
return response

if __name__ == '__main__':
app.run(debug=True, use_reloader=False, port=5009)


И, наконец, вот раздел задачи, который не работает. Когда он пытается запросить токен (я использую sqlalchemy), он выдает ошибку.
@shared_task(name="run_http_task")
def run_http_task(task_id=None, task_url=None, task_name=None, search_string=None, success_text=None, fail_text=None, phone_number=None):
token = Token.query.first()


Подробнее здесь: https://stackoverflow.com/questions/791 ... tabase-con
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»