Способ параллельного выполнения операций Dagster, который также виден в пользовательском интерфейсе.Python

Программы на Python
Ответить
Anonymous
 Способ параллельного выполнения операций Dagster, который также виден в пользовательском интерфейсе.

Сообщение Anonymous »

(также опубликовано на r/dagster)
Dagster N00b здесь.
У меня очень конкретный вариант использования. Мой ETL выполняет следующие шаги:
  • Запрашивает базу данных, чтобы получить список файлов CSV
  • Переходит в файловую систему и для каждого CSV-файла:
  • загрузите его в DuckDB
  • преобразуйте некоторые столбцы на сегодняшний день
  • преобразовать некоторые числовые коды в текстовые категории
  • экспортировать чистую таблицу в файл .parquet
  • запустить профильный отчет для чистых данных
Для удобства таблицы DuckDB названы так же, как файлы CSV.
2a–2e можно выполнять параллельно ДЛЯ КАЖДОГО CSV-ФАЙЛА. В контексте одного CSV-файла их необходимо запускать ПОСЛЕДОВАТЕЛЬНО.
Мой текущий код:

Код: Выделить всё

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)

def load_csv_into_duckdb(context, csv_filename)

def transform_dates(context, csv_filename)

def from_code_2_categories(context, csv_filename)

def export_2_parquet(context, csv_filename)

def profile_dataset(context, csv_filename)

@op
def process(context, csv_filename:str):
load_csv_into_duckdb(context, csv_filename)
transform_dates(context, csv_filename)
from_code_2_categories(context, csv_filename)
export_2_parquet(context, csv_filename)
profile_dataset(context, csv_filename)

@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)
Конвейер работает, но функции, которые фактически выполняют загрузку в DuckDB, преобразования и экспорт в паркет, «спрятаны» в функцииprocess() op.
Конвейер работает, но функции, которые фактически выполняют загрузку в DuckDB, преобразования и экспорт в паркет, «спрятаны» в функцииprocess() op.
p>
Есть ли способ правильно модульно структурировать это, следуя лучшим практикам Dagster? Я хотел бы определить мой процесс() как график и мою работу как просто выполнение графика, имея при этом возможность видеть отдельные задачи в DagsterUI, чтобы я мог повторно запустить только те, которые потерпели неудачу.
Я попробовал генерировать_подзадачи(csv_filename_list).map(load_csv_into_duckdb).map(transform_dates).map(from_code_2_categories). map(...) маршрут, но задачи не ждут завершения предыдущего перед запуском.
Хотите протянуть руку помощи?

Подробнее здесь: https://stackoverflow.com/questions/791 ... ible-in-ui
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»