Я пишу программу Airflow, которая генерирует список слов на основе параметров запуска, которые я передаю в формате json с помощью пользовательского интерфейса, а затем анализирую и использую его элементы внутри KubernetesPodOperator. В приведенном ниже коде я упростил свой вариант использования до простого echo. Это мой код
from datetime import datetime
import yaml
from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
config = yaml.safe_load(open('/opt/airflow/${dag_config_filename}', 'r'))
namespace = config['kubernetes']['namespace']
files = config['sftp']['files'] # list of 4 names
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': config['alerting_email'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'executor_config': {
'KubernetesExecutor': {
'service_account_name': '${airflow_worker}',
'namespace': f'{namespace}',
}
},
}
execution_date = '{{ ds_nodash }}'
wizaly_files_date_ingestion = '{{ dag_run.start_date.strftime("%Y%m%d") }}'
@dag(
dag_id=config['dag_id'],
default_args=default_args,
description='Pipeline for Perfmarket ETL',
tags=['perfmarket'],
schedule_interval=config['schedule_interval'],
start_date=datetime(2024, 10, 22),
catchup=False,
params={
'entry_param': Param(
default='0',
type='string',
enum=['0', '1', '2'],
description='Run type: 0=daily, 1=rattrapage, 2=explo'
)
}
)
def test_dag() -> None:
start_task = EmptyOperator(task_id='start')
@task
def define_run_confs(**context):
params = context['params']
entry_param = params['entry_param']
if entry_param == '0': # daily (default)
files_typed = [file + '_option1' for file in config['sftp']['files']]
elif entry_param == '1':
files_typed = [file + '_option2' for file in config['sftp']['files']]
else:
files_typed = [file + '_option3' for file in config['sftp']['files']]
zip_files = [
{
'first_key': f'{file_typed}.zip',
'second_key': 'ch1' if '_type1' in file else 'ch2',
'task_id': f'import_{file}',
}
for file, file_typed in zip(files, files_typed)
]
return zip_files
configs = define_run_confs() # execution of the aboce function to generate my dict list.
import_zip_tasks = []
# I parse my dict list and for each dict I print keys
for file_info in configs:
import_task = KubernetesPodOperator(
task_id=f'import_zip_{file_info["task_id"]}',
name='import_zip',
cmds=['bash', '-cx'],
arguments=[
f"""echo {file_info['first_key']};
echo {file_info['second_key']} \
"""
]
)
import_zip_tasks.append(import_task)
end_task = EmptyOperator(task_id='end')
start_task >> configs >> import_zip_tasks >> end_task
test_dag()
Проблема в том, что я не могу проанализировать свой список из-за: Ошибка типа: объект «XComArg» не повторяется
Похоже, что код не может проанализировать переменную конфигурации. Как я могу преобразовать ее в обычную переменную списка?
Я попробовал несколько методов, но не могу проанализировать список.
Я пишу программу Airflow, которая генерирует список слов на основе параметров запуска, которые я передаю в формате json с помощью пользовательского интерфейса, а затем анализирую и использую его элементы внутри KubernetesPodOperator. В приведенном ниже коде я упростил свой вариант использования до простого echo. [b]Это мой код[/b] [code]from datetime import datetime
import yaml from airflow.decorators import dag, task from airflow.models.param import Param from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
@task def define_run_confs(**context): params = context['params'] entry_param = params['entry_param'] if entry_param == '0': # daily (default) files_typed = [file + '_option1' for file in config['sftp']['files']] elif entry_param == '1': files_typed = [file + '_option2' for file in config['sftp']['files']] else: files_typed = [file + '_option3' for file in config['sftp']['files']] zip_files = [ { 'first_key': f'{file_typed}.zip', 'second_key': 'ch1' if '_type1' in file else 'ch2', 'task_id': f'import_{file}', } for file, file_typed in zip(files, files_typed) ] return zip_files
configs = define_run_confs() # execution of the aboce function to generate my dict list.
import_zip_tasks = []
# I parse my dict list and for each dict I print keys for file_info in configs: import_task = KubernetesPodOperator( task_id=f'import_zip_{file_info["task_id"]}', name='import_zip', cmds=['bash', '-cx'], arguments=[ f"""echo {file_info['first_key']}; echo {file_info['second_key']} \ """ ] ) import_zip_tasks.append(import_task)
test_dag() [/code] Проблема в том, что я не могу проанализировать свой список из-за: Ошибка типа: объект «XComArg» не повторяется Похоже, что код не может проанализировать переменную конфигурации. Как я могу преобразовать ее в обычную переменную списка? Я попробовал несколько методов, но не могу проанализировать список.
Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator.
Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), выполненные в API третьих лиц, хранить CSV в хранилище и в конечном итоге...
Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator .
Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), сделанные в API третьей стороны, хранить CSV в хранилище и в конечном итоге...
У меня есть список слов, которые я хотел бы отфильтровать с помощью другого списка, но у меня возникли проблемы с функцией filter()
list_of_dicts =
list_2 =
# filter(lambda name: any(list2) in list_of_dicts , list_of_dicts)
Я работаю на мультитенантной платформе конвейера данных. Итак, на данный момент у нас около 5 арендаторов. В качестве инструмента оркестрации мы используем AWS MWAA
(Apache Airflow). Каждый арендатор имеет отдельный DAG и запускается...