Dagster N00b здесь.
У меня очень конкретный вариант использования. Мой ETL выполняет следующие шаги:
- Запрашивает базу данных, чтобы получить список файлов CSV
- Переходит в файловую систему и для каждого CSV-файла:
- загрузите его в DuckDB
- преобразуйте некоторые столбцы на сегодняшний день
- преобразовать некоторые числовые коды в текстовые категории
- экспортировать чистую таблицу в файл .parquet
- запустить профильный отчет для чистых данных
2a–2e можно выполнять параллельно ДЛЯ КАЖДОГО CSV-ФАЙЛА. В контексте одного CSV-файла их необходимо запускать ПОСЛЕДОВАТЕЛЬНО.
Мой текущий код:
Код: Выделить всё
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
def load_csv_into_duckdb(context, csv_filename)
def transform_dates(context, csv_filename)
def from_code_2_categories(context, csv_filename)
def export_2_parquet(context, csv_filename)
def profile_dataset(context, csv_filename)
@op
def process(context, csv_filename:str):
load_csv_into_duckdb(context, csv_filename)
transform_dates(context, csv_filename)
from_code_2_categories(context, csv_filename)
export_2_parquet(context, csv_filename)
profile_dataset(context, csv_filename)
@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)
Конвейер работает, но функции, которые фактически выполняют загрузку в DuckDB, преобразования и экспорт в паркет, «спрятаны» в функцииprocess() op.
p>
Есть ли способ правильно модульно структурировать это, следуя лучшим практикам Dagster? Я хотел бы определить мой процесс() как график и мою работу как просто выполнение графика, имея при этом возможность видеть отдельные задачи в DagsterUI, чтобы я мог повторно запустить только те, которые потерпели неудачу.
Я попробовал генерировать_подзадачи(csv_filename_list).map(load_csv_into_duckdb).map(transform_dates).map(from_code_2_categories). map(...) маршрут, но задачи не ждут завершения предыдущего перед запуском.
Хотите протянуть руку помощи?
Подробнее здесь: https://stackoverflow.com/questions/791 ... ible-in-ui
Мобильная версия