Op_kwargs не заполняется параметрами в шаблоне Jinja для генератора DAG AirflowPython

Программы на Python
Ответить
Anonymous
 Op_kwargs не заполняется параметрами в шаблоне Jinja для генератора DAG Airflow

Сообщение Anonymous »

Я создаю автоматический генератор DAG на основе конфигурации YAML. Моя проблема в том, что при создании группы обеспечения доступности баз данных значение, которое должно появиться в «op_kwargs», не отображается. Остальные значения настроены правильно, а это нет. Я делюсь частью шаблона Jinja, в которой я этим занимаюсь, а также файлом Python, в котором я управляю параметрами и конфигурацией YAML.
ШАБЛОН JINJA:

Код: Выделить всё

with DAG(
dag_id="{{ dag_id }}",
default_args={{ default_args | tojson }},
schedule_interval="{{ schedule_interval }}",
catchup={{ catchup }}
) as dag:

# Definir las tareas
{% for task_id, task_info in tasks.items() %}
{% set params = task_info['params'] if task_info['params'] is defined else {} %}
{% if task_info['type'] == "python_callable" %}
{{ task_id }} = PythonOperator(
task_id="{{ task_id }}",
python_callable={{ task_info['command'] }},
op_kwargs={{ params | tojson}},
dag=dag
)
ФАЙЛ PYTHON:

Код: Выделить всё

def _create_python_callable_task(self, task_id, task_info):
"""
Crear una tarea que ejecuta una función Python.
"""
# Obtener el nombre de la función a ejecutar (de acuerdo al comando en el YAML)
command = task_info.get("command")
command = FUNCTIONS_MAP.get(command)
args = task_info.get("params", {})  # Asegúrate de que "params" esté correctamente asignado
print(f"Creating task {task_id} with params: {args}")  # Línea de depuración
task = PythonOperator(
task_id=task_id,
python_callable=command,
op_kwargs=args,
dag=self.dag
)
self.task_list[task_id] = {'task': task, 'type': 'python_callable', 'info': task_info}
КОНФИГУРАЦИЯ YAML

Код: Выделить всё

my_workflow:
airflow_config:
dag_id: "ruedas"
default_args:
owner: "Marcos"
retries: 3
start_date: "2024-10-18"
schedule_interval: "@daily"
catchup: False
tasks:
extract_data:
type: "python_operator"
command: "data_extract_and_load"
dependencies: []  # Depende de la tarea definida ahi
params:  # Definir un diccionario de parámetros
config_path: "/opt/airflow/dags/scripts/config/config.yml"

extract_data2:
type: "python_operator"
command: "data_extract_and_load"
dependencies: [extract_data]  # Depende de la tarea definida ahi
params:  # Definir un diccionario de parámetros
config_path: "/opt/airflow/dags/scripts/config/config.yml"
ОКОНЧАТЕЛЬНЫЙ DAG:

Код: Выделить всё

with DAG(
dag_id="ruedas",
default_args={"owner": "Marcos", "retries": 3, "start_date": "2024-10-18 00:00:00"},
schedule_interval="@daily",
catchup=False
) as dag:

extract_data = PythonOperator(
task_id="extract_data",
python_callable=data_extract_and_load,
op_kwargs={},
dag=dag
)

extract_data2 = PythonOperator(
task_id="extract_data2",
python_callable=data_extract_and_load,
op_kwargs={},
dag=dag
)
много благодарностей.
Я пробовал много чего, например, менял шаблон, файл Python и т. д., но ничего не помогло.
много благодарностей!
Я пробовал много всего, например, менял шаблон, файл Python и т. д., но ничего не помогло.
п>

Подробнее здесь: https://stackoverflow.com/questions/792 ... -dag-gener
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»