Я пишу программу Airflow, которая генерирует список слов на основе параметров запуска, которые я передаю в формате json с помощью пользовательского интерфейса, а затем анализирую и использую его элементы внутри KubernetesPodOperator. В приведенном ниже коде я упростил свой вариант использования до простого echo. Это мой код
from datetime import datetime
import yaml
from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
config = yaml.safe_load(open('/opt/airflow/${dag_config_filename}', 'r'))
namespace = config['kubernetes']['namespace']
files = config['sftp']['files'] # list of 4 names
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': config['alerting_email'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'executor_config': {
'KubernetesExecutor': {
'service_account_name': '${airflow_worker}',
'namespace': f'{namespace}',
}
},
}
execution_date = '{{ ds_nodash }}'
wizaly_files_date_ingestion = '{{ dag_run.start_date.strftime("%Y%m%d") }}'
@dag(
dag_id=config['dag_id'],
default_args=default_args,
description='Pipeline for Perfmarket ETL',
tags=['perfmarket'],
schedule_interval=config['schedule_interval'],
start_date=datetime(2024, 10, 22),
catchup=False,
params={
'entry_param': Param(
default='0',
type='string',
enum=['0', '1', '2'],
description='Run type: 0=daily, 1=rattrapage, 2=explo'
)
}
)
def test_dag() -> None:
start_task = EmptyOperator(task_id='start')
@task
def define_run_confs(**context):
params = context['params']
entry_param = params['entry_param']
if entry_param == '0': # daily (default)
files_typed = [file + '_option1' for file in config['sftp']['files']]
elif entry_param == '1':
files_typed = [file + '_option2' for file in config['sftp']['files']]
else:
files_typed = [file + '_option3' for file in config['sftp']['files']]
zip_files = [
{
'first_key': f'{file_typed}.zip',
'second_key': 'ch1' if '_type1' in file else 'ch2',
'task_id': f'import_{file}',
}
for file, file_typed in zip(files, files_typed)
]
return zip_files
configs = define_run_confs() # execution of the aboce function to generate my dict list.
import_zip_tasks = []
# I parse my dict list and for each dict I print keys
for file_info in configs:
import_task = KubernetesPodOperator(
task_id=f'import_zip_{file_info["task_id"]}',
name='import_zip',
cmds=['bash', '-cx'],
arguments=[
f"""echo {file_info['first_key']};
echo {file_info['second_key']} \
"""
]
)
import_zip_tasks.append(import_task)
end_task = EmptyOperator(task_id='end')
start_task >> configs >> import_zip_tasks >> end_task
test_dag()
Проблема в том, что я не могу проанализировать свой список из-за: Ошибка типа: объект «XComArg» не повторяется
Похоже, что код не может проанализировать переменную конфигурации. Как я могу преобразовать ее в обычную переменную списка?
Я попробовал несколько методов, но не могу проанализировать список.
Я пишу программу Airflow, которая генерирует список слов на основе параметров запуска, которые я передаю в формате json с помощью пользовательского интерфейса, а затем анализирую и использую его элементы внутри KubernetesPodOperator. В приведенном ниже коде я упростил свой вариант использования до простого echo. [b]Это мой код[/b] [code]from datetime import datetime
import yaml from airflow.decorators import dag, task from airflow.models.param import Param from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
@task def define_run_confs(**context): params = context['params'] entry_param = params['entry_param'] if entry_param == '0': # daily (default) files_typed = [file + '_option1' for file in config['sftp']['files']] elif entry_param == '1': files_typed = [file + '_option2' for file in config['sftp']['files']] else: files_typed = [file + '_option3' for file in config['sftp']['files']] zip_files = [ { 'first_key': f'{file_typed}.zip', 'second_key': 'ch1' if '_type1' in file else 'ch2', 'task_id': f'import_{file}', } for file, file_typed in zip(files, files_typed) ] return zip_files
configs = define_run_confs() # execution of the aboce function to generate my dict list.
import_zip_tasks = []
# I parse my dict list and for each dict I print keys for file_info in configs: import_task = KubernetesPodOperator( task_id=f'import_zip_{file_info["task_id"]}', name='import_zip', cmds=['bash', '-cx'], arguments=[ f"""echo {file_info['first_key']}; echo {file_info['second_key']} \ """ ] ) import_zip_tasks.append(import_task)
test_dag() [/code] Проблема в том, что я не могу проанализировать свой список из-за: Ошибка типа: объект «XComArg» не повторяется Похоже, что код не может проанализировать переменную конфигурации. Как я могу преобразовать ее в обычную переменную списка? Я попробовал несколько методов, но не могу проанализировать список.