Получение ошибки в неподдерживаемых типах операндов DAG воздушного потока для >>: «список» и «список». Последовательное Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Получение ошибки в неподдерживаемых типах операндов DAG воздушного потока для >>: «список» и «список». Последовательное

Сообщение Anonymous »

Я новичок в Apache airflow и DAG. Всего в группе обеспечения доступности баз данных 6 задач (задача1, задача2, задача3, задача4, задача5, задача6). Но во время запуска DAG мы получаем следующую ошибку:
DAG неподдерживаемые типы операндов для >>: 'list' и 'list' >
Ниже приведен мой код для DAG. Пожалуйста, помогите. Я новичок в области воздушного потока.
from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False
}

dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 14))

#################### CREATE TASK #####################################

task_1 = DatabricksSubmitRunOperator(
task_id='task_1',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_1/task_1.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_1.driver.TestClass1',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)

task_2 = DatabricksSubmitRunOperator(
task_id='task_2',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_2/task_2.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_2.driver.TestClass2',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)

task_3 = DatabricksSubmitRunOperator(
task_id='task_3',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_3/task_3.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_3.driver.TestClass3',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)

task_4 = DatabricksSubmitRunOperator(
task_id='task_4',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_4/task_4.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_4.driver.TestClass4',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)

task_5 = DatabricksSubmitRunOperator(
task_id='task_5',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_5/task_5.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_5.driver.TestClass5',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)

task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)

#################### ORDER OF OPERATORS ###########################

task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag

task_1 >> [task_2 , task_3] >> [ task_4 , task_5 ] >> task_6


Подробнее здесь: https://stackoverflow.com/questions/628 ... st-and-lis
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»