Dagster N00b здесь.
У меня очень конкретный вариант использования. Мой ETL выполняет следующие шаги:
- Запрашивает базу данных, чтобы получить список файлов CSV
- Переходит в файловую систему и для каждого CSV-файла:
- загрузите его в DuckDB
- преобразуйте некоторые столбцы на сегодняшний день
- преобразовать некоторые числовые коды в текстовые категории
- экспортировать чистую таблицу в файл .parquet
- запустить профильный отчет для чистых данных
2a–2e можно выполнять параллельно ДЛЯ КАЖДОГО CSV-ФАЙЛА. В контексте одного CSV-файла их необходимо запускать ПОСЛЕДОВАТЕЛЬНО.
Мой текущий код:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
def load_csv_into_duckdb(context, csv_filename)
def transform_dates(context, csv_filename)
def from_code_2_categories(context, csv_filename)
def export_2_parqu
Подробнее здесь: https://stackoverflow.com/questions/791 ... s-together
Мобильная версия