Я строю автоматический генератор DAG из конфигурации YAML. Моя проблема в том, что при генерации DAG значение, которое должно появляться в «Op_kwargs», не отображается. Другие значения настроены правильно, но это не так. Я делюсь частью шаблона Jinja, где я обращаюсь с этим, а также файл Python, где я управляю параметрами и конфигурацией YAML. < /P>
Шаблон Jinja: < /p>
Я строю автоматический генератор DAG из конфигурации YAML. Моя проблема в том, что при генерации DAG значение, которое должно появляться в «Op_kwargs», не отображается. Другие значения настроены правильно, но это не так. Я делюсь частью шаблона Jinja, где я обращаюсь с этим, а также файл Python, где я управляю параметрами и конфигурацией YAML. < /P> Шаблон Jinja: < /p> [code]with DAG( dag_id="{{ dag_id }}", default_args={{ default_args | tojson }}, schedule_interval="{{ schedule_interval }}", catchup={{ catchup }} ) as dag:
# Definir las tareas {% for task_id, task_info in tasks.items() %} {% set params = task_info['params'] if task_info['params'] is defined else {} %} {% if task_info['type'] == "python_callable" %} {{ task_id }} = PythonOperator( task_id="{{ task_id }}", python_callable={{ task_info['command'] }}, op_kwargs={{ params | tojson}}, dag=dag ) < /code> python file: < /p> def _create_python_callable_task(self, task_id, task_info): """ Crear una tarea que ejecuta una función Python. """ # Obtener el nombre de la función a ejecutar (de acuerdo al comando en el YAML) command = task_info.get("command") command = FUNCTIONS_MAP.get(command) args = task_info.get("params", {}) # Asegúrate de que "params" esté correctamente asignado print(f"Creating task {task_id} with params: {args}") # Línea de depuración task = PythonOperator( task_id=task_id, python_callable=command, op_kwargs=args, dag=self.dag ) self.task_list[task_id] = {'task': task, 'type': 'python_callable', 'info': task_info} < /code> Конфигурация yaml < /p> my_workflow: airflow_config: dag_id: "ruedas" default_args: owner: "Marcos" retries: 3 start_date: "2024-10-18" schedule_interval: "@daily" catchup: False tasks: extract_data: type: "python_operator" command: "data_extract_and_load" dependencies: [] # Depende de la tarea definida ahi params: # Definir un diccionario de parámetros config_path: "/opt/airflow/dags/scripts/config/config.yml"
extract_data2: type: "python_operator" command: "data_extract_and_load" dependencies: [extract_data] # Depende de la tarea definida ahi params: # Definir un diccionario de parámetros config_path: "/opt/airflow/dags/scripts/config/config.yml" < /code> final dag: < /p> with DAG( dag_id="ruedas", default_args={"owner": "Marcos", "retries": 3, "start_date": "2024-10-18 00:00:00"}, schedule_interval="@daily", catchup=False ) as dag:
extract_data2 = PythonOperator( task_id="extract_data2", python_callable=data_extract_and_load, op_kwargs={}, dag=dag ) [/code] Я пробовал много вещей, таких как изменение шаблона, файл Python и т. Д., Но ничего не работает.
Я создаю автоматический генератор DAG на основе конфигурации YAML. Моя проблема в том, что при создании группы обеспечения доступности баз данных значение, которое должно появиться в «op_kwargs», не отображается. Остальные значения настроены...
Я заметил, что всякий раз, когда я импортирую airflow в Python, он автоматически создает каталог airflow в моем домашнем каталоге. Буквально только это
$ python
Python 3.11.9 | packaged by conda-forge | (main, Apr 19 2024, 18:36:13) on linux
Type...
Я заметил, что всякий раз, когда я импортирую airflow в Python, он автоматически создает каталог airflow в моем домашнем каталоге. Буквально только это
$ python
Python 3.11.9 | packaged by conda-forge | (main, Apr 19 2024, 18:36:13) on linux
Type...