Как правильно использовать и анализировать список диктов XCom в AirflowPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как правильно использовать и анализировать список диктов XCom в Airflow

Сообщение Anonymous »

Я пишу программу Airflow, которая генерирует список слов на основе параметров запуска, которые я передаю в формате json с помощью пользовательского интерфейса, а затем анализирую и использую его элементы внутри KubernetesPodOperator. В приведенном ниже коде я упростил свой вариант использования до простого echo.
Это мой код

Код: Выделить всё

from datetime import datetime

import yaml
from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

config = yaml.safe_load(open('/opt/airflow/${dag_config_filename}', 'r'))
namespace = config['kubernetes']['namespace']

files = config['sftp']['files']  # list of 4 names

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': config['alerting_email'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'executor_config': {
'KubernetesExecutor': {
'service_account_name': '${airflow_worker}',
'namespace': f'{namespace}',
}
},
}

execution_date = '{{ ds_nodash  }}'
wizaly_files_date_ingestion = '{{ dag_run.start_date.strftime("%Y%m%d") }}'

@dag(
dag_id=config['dag_id'],
default_args=default_args,
description='Pipeline for Perfmarket ETL',
tags=['perfmarket'],
schedule_interval=config['schedule_interval'],
start_date=datetime(2024, 10, 22),
catchup=False,
params={
'entry_param': Param(
default='0',
type='string',
enum=['0', '1', '2'],
description='Run type: 0=daily, 1=rattrapage, 2=explo'
)
}
)
def test_dag() -> None:
start_task = EmptyOperator(task_id='start')

@task
def define_run_confs(**context):
params = context['params']
entry_param = params['entry_param']
if entry_param == '0':  # daily (default)
files_typed = [file + '_option1' for file in config['sftp']['files']]
elif entry_param == '1':
files_typed = [file + '_option2' for file in config['sftp']['files']]
else:
files_typed = [file + '_option3' for file in config['sftp']['files']]
zip_files = [
{
'first_key': f'{file_typed}.zip',
'second_key': 'ch1' if '_type1' in file else 'ch2',
'task_id': f'import_{file}',
}
for file, file_typed in zip(files, files_typed)
]
return zip_files

configs = define_run_confs() # execution of the aboce function to generate my dict list.

import_zip_tasks = []

# I parse my dict list and for each dict I print keys
for file_info in configs:
import_task = KubernetesPodOperator(
task_id=f'import_zip_{file_info["task_id"]}',
name='import_zip',
cmds=['bash', '-cx'],
arguments=[
f"""echo {file_info['first_key']};
echo {file_info['second_key']} \
"""
]
)
import_zip_tasks.append(import_task)

end_task = EmptyOperator(task_id='end')

start_task >> configs >> import_zip_tasks >> end_task

test_dag()
Проблема в том, что я не могу проанализировать свой список из-за: Ошибка типа: объект «XComArg» не повторяется
Похоже, что код не может проанализировать переменную конфигурации. Как я могу преобразовать ее в обычную переменную списка?
Я попробовал несколько методов, но не могу проанализировать список.

Подробнее здесь: https://stackoverflow.com/questions/791 ... in-airflow
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Airflow Xcom Pull - строка Jinja заменить кавычки
    Anonymous » » в форуме Python
    0 Ответы
    51 Просмотры
    Последнее сообщение Anonymous
  • Airflow DAG - контекст доступа с использованием Simplehttpoperator, чтобы включить xcom Pull
    Anonymous » » в форуме Python
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Airflow DAG - контекст доступа с использованием Simplehttpoperator, чтобы включить xcom Pull
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Как отфильтровать список диктов со списком строк в Python? [дубликат]
    Anonymous » » в форуме Python
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Когда статус задачи Airflow Neptune — «LOAD_IN_QUEUE», Airflow пытается повторить попытку.
    Anonymous » » в форуме Python
    0 Ответы
    42 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»