ШАБЛОН JINJA:
Код: Выделить всё
with DAG(
dag_id="{{ dag_id }}",
default_args={{ default_args | tojson }},
schedule_interval="{{ schedule_interval }}",
catchup={{ catchup }}
) as dag:
# Definir las tareas
{% for task_id, task_info in tasks.items() %}
{% set params = task_info['params'] if task_info['params'] is defined else {} %}
{% if task_info['type'] == "python_callable" %}
{{ task_id }} = PythonOperator(
task_id="{{ task_id }}",
python_callable={{ task_info['command'] }},
op_kwargs={{ params | tojson}},
dag=dag
)
Код: Выделить всё
def _create_python_callable_task(self, task_id, task_info):
"""
Crear una tarea que ejecuta una función Python.
"""
# Obtener el nombre de la función a ejecutar (de acuerdo al comando en el YAML)
command = task_info.get("command")
command = FUNCTIONS_MAP.get(command)
args = task_info.get("params", {}) # Asegúrate de que "params" esté correctamente asignado
print(f"Creating task {task_id} with params: {args}") # Línea de depuración
task = PythonOperator(
task_id=task_id,
python_callable=command,
op_kwargs=args,
dag=self.dag
)
self.task_list[task_id] = {'task': task, 'type': 'python_callable', 'info': task_info}
Код: Выделить всё
my_workflow:
airflow_config:
dag_id: "ruedas"
default_args:
owner: "Marcos"
retries: 3
start_date: "2024-10-18"
schedule_interval: "@daily"
catchup: False
tasks:
extract_data:
type: "python_operator"
command: "data_extract_and_load"
dependencies: [] # Depende de la tarea definida ahi
params: # Definir un diccionario de parámetros
config_path: "/opt/airflow/dags/scripts/config/config.yml"
extract_data2:
type: "python_operator"
command: "data_extract_and_load"
dependencies: [extract_data] # Depende de la tarea definida ahi
params: # Definir un diccionario de parámetros
config_path: "/opt/airflow/dags/scripts/config/config.yml"
Код: Выделить всё
with DAG(
dag_id="ruedas",
default_args={"owner": "Marcos", "retries": 3, "start_date": "2024-10-18 00:00:00"},
schedule_interval="@daily",
catchup=False
) as dag:
extract_data = PythonOperator(
task_id="extract_data",
python_callable=data_extract_and_load,
op_kwargs={},
dag=dag
)
extract_data2 = PythonOperator(
task_id="extract_data2",
python_callable=data_extract_and_load,
op_kwargs={},
dag=dag
)
Я пробовал много чего, например, менял шаблон, файл Python и т. д., но ничего не помогло.
много благодарностей!
Я пробовал много всего, например, менял шаблон, файл Python и т. д., но ничего не помогло.
п>
Подробнее здесь: https://stackoverflow.com/questions/792 ... -dag-gener
Мобильная версия