Код: Выделить всё
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
@op # no dep since 1st task
def load_csv_into_duckdb(context, csv_filename)
@op(ins={"start":In(Nothing)}
def transform_dates(context, csv_filename)
@op(ins={"start":In(Nothing)}
def from_code_2_categories(context, csv_filename)
@op(ins={"start":In(Nothing)}
def export_2_parquet(context, csv_filename)
@op(ins={"start":In(Nothing)}
def profile_dataset(context, csv_filename)
@graph
def process(context, csv_filename:str):
task1 = load_csv_into_duckdb(context=context, csv_filename=csv_filename)
task2 = transform_dates(start=task1, context=context, csv_filename=csv_filename)
task3 = from_code_2_categories(start=task2, context=context, csv_filename=csv_filename)
task4 = export_2_parquet(start=task3, context=context, csv_filename=csv_filename)
profile_dataset(start=task4, context=context, csv_filename=csv_filename)
@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)
Я также пробовал, чтобы @graph возвращал список отдельных задач. возвращаемые значения, но DagsterUI жалуется, что функция, оформленная графом, должна возвращать словарь с ключами сопоставления. Я мог бы построить это, но чувствую, что вместо этого мне следует взять это из контекста выполнения Dagster, к которому я не знаю, как получить доступ из функции графика.
Есть ли у кого-нибудь подсказки? ?
Подробнее здесь: https://stackoverflow.com/questions/791 ... ster-graph
Мобильная версия