Я также полагаюсь на синтаксис потока задач и возможность автоматически сопоставлять параметры функций/возвращаемые значения с xcom.
А ещё

Имея в виду этот сценарий, я пытаюсь понять, что лучше способ описания моих дагов и групп задач.
Позвольте мне поделиться (упрощенным) примером моих дагов:
Код: Выделить всё
@task
def source() -> str:
return 'blablabla'
@task
def task_len(data: str) -> int:
return len(data)
@task
def task_mul(times: int) -> str:
return 'x' * times
@task_group
def tg(data: str) -> str:
return task_mul(task_len(data))
@task
def sink(data: str) -> None:
print(data)
@dag()
def dag_tg() -> None:
sink(tg(source()))
dag_tg()
У меня также есть группа задач, в которую я хочу захватить нужен ввод xcom и описание вывода.
В данном случае моя цель — «использовать» группу задач как задачу, которая просто потребляет и производит xcom «напрямую».
Эта настройка «работает», так как она правильно загружена в воздушный поток, он работает как ожидалось и т. д.
Тем не менее, у меня есть куча ошибок от mypy: в определении tg у меня есть ошибка. Значение переменной типа "FReturn" из "task_group" не может быть "str", а в dag, где используется tg, у меня Аргумент 1 имеет несовместимый тип "DAGNode"; ожидается "str".
Похоже, что - по крайней мере, с точки зрения ввода - декоратор группы задач ожидает функцию, возвращающую DAGNode (и это также видно на https: //github.com/apache/airflow/blob/main/airflow/decorators/task_group.py#L182).
Поэтому у меня двойной вопрос:
- Предполагая, что аннотации воздушного потока верны, могу ли я использовать группу задач «неправильно»? Следует ли мне избегать использования функций в стиле потока задач в группе? Случайно ли этот даг работает?
- Предполагая, что я использую правильно, следует ли улучшить аннотации воздушного потока для поддержки этого сценария?
Подробнее здесь: https://stackoverflow.com/questions/793 ... -based-dag