выше, как я определил значения ввода в моем файле конфигурации yaml.
Код: Выделить всё
with TaskGroup(group_id='full_data_refresh') as full_data_refresh:
for item in itertools.product(dag_config['src_config_id'],dag_config['ctry_config_id']):
config_run_ids = f'{item}'
exec_funs_mti_all_functions_run = PostgresOperator(
task_id= f'exec_funs_mti_all_functions_run{config_run_ids}',
postgres_conn_id='DDL_CONN_WRT_CSB',
sql=f'select funs_mti_all_functions_run{config_run_ids}';'
)
select funs_mti_all_functions_run('1','1');
select funs_mti_all_functions_run('1','2');
select funs_mti_all_functions_run('1','6');
select funs_mti_all_functions_run('1','10');
< /code>
Реализация файла Python для определения DAG, хотя ошибки с сообщением: < /p>
"airflow.exceptions.airflowexcept РАБОТЫ, точки и подчеркивание исключительно "
Может ли кто -нибудь помочь здесь?
Подробнее здесь: https://stackoverflow.com/questions/795 ... ble-inputs