Код: Выделить всё
@dag(dag_id='my_dag_trigger')
def first_taskflow():
@task(multiple_outputs=True)
def create_some_values():
return {'v1': value1, 'v2', value2}
@task
def trigger_dag(**kwargs):
TriggerDagRunOperator(
task_id='my_second_dag',
conf={
'v1': "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}"
}).execute(kwargs)
create_some_values() >> trigger_dag()
first_taskflow()
# ------- DAG TO BE TRIGGERED -------
@dag(dag_id='my_second_dag')
def secondary_taskflow():
@task()
def secondary_task(**context):
print(context['dag_run'].conf.get('v1'))
secondary_task()
secondary_taskflow()
Значения динамически в конфункте?>
Подробнее здесь: https://stackoverflow.com/questions/794 ... dagrunoper