Traceback (most recent call last):
File "", line 198, in
some_task_op() >> Transfer >> short_circuit() >> Process >> Summarize
^^^^^^^^^^^^^^
File "/python/env/lib/python3.12/site-packages/airflow/decorators/base.py", line 372, in __call__
op = self.operator_class(
^^^^^^^^^^^^^^^^^^^^
TypeError: airflow.decorators.sensor.DecoratedSensorOperator() got multiple values for keyword argument 'op_kwargs'
Эта ошибка выдается для любого оператора Python в моей группе обеспечения доступности баз данных, и это весьма озадачивает, поскольку я предоставляю декоратору op_kwargs только один раз.
- Работа с воздушным потоком 2.9.0
- Python 3.12
default_args = {
'depends_on_past': True,
'email': ['admin@admin.test'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'start_date': pendulum.datetime(2024, 10, 2),
}
with DAG(
dag_id='mydag',
schedule='0 14 * * *',
catchup=True,
default_args=default_args,
params={
'snapshot': Param(None, type=['null', 'string'], format='date'),
'input_base': Param(Variable.get('some-home', '/path/to/home'), type='string'),
'output_base': Param(Variable.get('some-home', '/path/to/home'), type='string'),
'overwrite': Param(False, type='boolean'),
'deploy_env': Param('production', type=['string'], enum=['staging', 'production'])
}
) as dag:
@task.sensor(
dag=dag,
task_id='some_task_op',
mode='reschedule', # default is 'poke'
poke_interval=10*60,
timeout=4*60*60,
op_kwargs={
'creds': '{{ var.value.get("api-credentials") }}',
},
)
def some_task_op(params, data_interval_end, creds=None):
pass
Подробнее здесь: https://stackoverflow.com/questions/790 ... rgument-op