Параллельные, глубинные операции в Dagster с объединением операций, графиков и заданий.Python

Программы на Python
Ответить
Anonymous
 Параллельные, глубинные операции в Dagster с объединением операций, графиков и заданий.

Сообщение Anonymous »

(также опубликовано на r/dagster)
Dagster N00b здесь.
У меня очень конкретный вариант использования. Мой ETL выполняет следующие шаги:
  • Запрашивает базу данных, чтобы получить список файлов CSV
  • Переходит в файловую систему и для каждого CSV-файла:
  • загрузите его в DuckDB
  • преобразуйте некоторые столбцы на сегодняшний день
  • преобразовать некоторые числовые коды в текстовые категории
  • экспортировать чистую таблицу в файл .parquet
  • запустить профильный отчет для чистых данных
Для удобства таблицы DuckDB названы так же, как файлы CSV.
2a–2e можно выполнять параллельно ДЛЯ КАЖДОГО CSV-ФАЙЛА. В контексте одного CSV-файла их необходимо запускать ПОСЛЕДОВАТЕЛЬНО.
Мой текущий код:
@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)

def load_csv_into_duckdb(context, csv_filename)

def transform_dates(context, csv_filename)

def from_code_2_categories(context, csv_filename)

def export_2_parqu


Подробнее здесь: https://stackoverflow.com/questions/791 ... s-together
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»