OP_KWARGS не заполняется параметрами в шаблоне Jinja для генератора DAG воздушного потокаPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 OP_KWARGS не заполняется параметрами в шаблоне Jinja для генератора DAG воздушного потока

Сообщение Anonymous »

Я строю автоматический генератор DAG из конфигурации YAML. Моя проблема в том, что при генерации DAG значение, которое должно появляться в «Op_kwargs», не отображается. Другие значения настроены правильно, но это не так. Я делюсь частью шаблона Jinja, где я обращаюсь с этим, а также файл Python, где я управляю параметрами и конфигурацией YAML. < /P>
Шаблон Jinja: < /p>

Код: Выделить всё

with DAG(
dag_id="{{ dag_id }}",
default_args={{ default_args | tojson }},
schedule_interval="{{ schedule_interval }}",
catchup={{ catchup }}
) as dag:

# Definir las tareas
{% for task_id, task_info in tasks.items() %}
{% set params = task_info['params'] if task_info['params'] is defined else {} %}
{% if task_info['type'] == "python_callable" %}
{{ task_id }} = PythonOperator(
task_id="{{ task_id }}",
python_callable={{ task_info['command'] }},
op_kwargs={{ params | tojson}},
dag=dag
)
< /code>
python file: < /p>
def _create_python_callable_task(self, task_id, task_info):
"""
Crear una tarea que ejecuta una función Python.
"""
# Obtener el nombre de la función a ejecutar (de acuerdo al comando en el YAML)
command = task_info.get("command")
command = FUNCTIONS_MAP.get(command)
args = task_info.get("params", {})  # Asegúrate de que "params" esté correctamente asignado
print(f"Creating task {task_id} with params: {args}")  # Línea de depuración
task = PythonOperator(
task_id=task_id,
python_callable=command,
op_kwargs=args,
dag=self.dag
)
self.task_list[task_id] = {'task': task, 'type': 'python_callable', 'info': task_info}
< /code>
Конфигурация yaml < /p>
my_workflow:
airflow_config:
dag_id: "ruedas"
default_args:
owner: "Marcos"
retries: 3
start_date: "2024-10-18"
schedule_interval: "@daily"
catchup: False
tasks:
extract_data:
type: "python_operator"
command: "data_extract_and_load"
dependencies: []  # Depende de la tarea definida ahi
params:  # Definir un diccionario de parámetros
config_path: "/opt/airflow/dags/scripts/config/config.yml"

extract_data2:
type: "python_operator"
command: "data_extract_and_load"
dependencies: [extract_data]  # Depende de la tarea definida ahi
params:  # Definir un diccionario de parámetros
config_path: "/opt/airflow/dags/scripts/config/config.yml"
< /code>
final dag: < /p>
with DAG(
dag_id="ruedas",
default_args={"owner": "Marcos", "retries": 3, "start_date": "2024-10-18 00:00:00"},
schedule_interval="@daily",
catchup=False
) as dag:

extract_data = PythonOperator(
task_id="extract_data",
python_callable=data_extract_and_load,
op_kwargs={},
dag=dag
)

extract_data2 = PythonOperator(
task_id="extract_data2",
python_callable=data_extract_and_load,
op_kwargs={},
dag=dag
)
Я пробовал много вещей, таких как изменение шаблона, файл Python и т. Д., Но ничего не работает.

Подробнее здесь: https://stackoverflow.com/questions/792 ... -dag-gener
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Op_kwargs не заполняется параметрами в шаблоне Jinja для генератора DAG Airflow
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Импорт воздушного потока автоматически создает каталог воздушного потока
    Anonymous » » в форуме Python
    0 Ответы
    52 Просмотры
    Последнее сообщение Anonymous
  • Импорт воздушного потока автоматически создает каталог воздушного потока
    Anonymous » » в форуме Python
    0 Ответы
    56 Просмотры
    Последнее сообщение Anonymous
  • DAG/задача воздушного потока зависла в рабочем состоянии
    Гость » » в форуме Python
    0 Ответы
    32 Просмотры
    Последнее сообщение Гость
  • DAG/задача воздушного потока зависла в рабочем состоянии
    Anonymous » » в форуме Python
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»