Сбор/объединение/ожидание параллельных операций в глубину в DagsterPython

Программы на Python
Ответить
Anonymous
 Сбор/объединение/ожидание параллельных операций в глубину в Dagster

Сообщение Anonymous »

После высоко оцененной помощи @zyd в этом ответе на параллельное, глубокое выполнение в Dagster, я теперь ищу способ запустить @op для собранных результатов запуска графа, или, по крайней мере, тот, который ждет, пока все они завершатся, поскольку у них нет жестких зависимостей как таковых. Мой рабочий код выглядит следующим образом:

Код: Выделить всё

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)

@op # no dep since 1st task
def load_csv_into_duckdb(context, csv_filename)

@op(ins={"start":In(Nothing)}
def transform_dates(context, csv_filename)

@op(ins={"start":In(Nothing)}
def from_code_2_categories(context, csv_filename)

@op(ins={"start":In(Nothing)}
def export_2_parquet(context, csv_filename)

@op(ins={"start":In(Nothing)}
def profile_dataset(context, csv_filename)

@graph
def process(context, csv_filename:str):
task1 = load_csv_into_duckdb(context=context, csv_filename=csv_filename)
task2 = transform_dates(start=task1, context=context, csv_filename=csv_filename)
task3 = from_code_2_categories(start=task2, context=context, csv_filename=csv_filename)
task4 = export_2_parquet(start=task3, context=context, csv_filename=csv_filename)
profile_dataset(start=task4, context=context, csv_filename=csv_filename)

@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)
Я попробовал подход .map(process).collect(), но Дагстер жалуется, что Nonetype не имеет атрибута Collect. Однако я видел в Интернете несколько примеров такого же подхода, и, очевидно, он должен работать.
Я также пробовал, чтобы @graph возвращал список отдельных задач. возвращаемые значения, но DagsterUI жалуется, что функция, оформленная графом, должна возвращать словарь с ключами сопоставления. Я мог бы построить это, но чувствую, что вместо этого мне следует взять это из контекста выполнения Dagster, к которому я не знаю, как получить доступ из функции графика.
Есть ли у кого-нибудь подсказки? ?

Подробнее здесь: https://stackoverflow.com/questions/791 ... in-dagster
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»