У меня есть работа, которая генерирует CSV-файл на основе некоторых данных из озера данных моей компании. Это задание запускается один раз в день с некоторой предопределенной конфигурацией. Это задание реализуется с использованием Spark и Python и выполняется в конвейере Airflow.
CSV позже загружается конкретному клиенту.
Случай< /h2>
Теперь мы хотим привлечь дополнительных клиентов (на данный момент только одного, но в идеале в будущем их будет несколько).
Я думал о том, чтобы настроить конфигурацию и свойства каждого клиента в файле yml и загрузить их в задание на основе входного параметра, который будет определять имя клиента.
Проблема
Чтобы сгенерировать несколько столбцов в окончательном CSV-файле, мне понадобится код Python для разрешения некоторых условий, чтобы получить правильное значение для каждого клиента (значение будет определено в файле конфигурации для каждого клиента).
Однако кажется, что udfs Spark может обрабатывать только данные столбцов, и я не могу передавать туда сложные структуры.
ПримерПредставим, что у меня есть следующее:
- Файл конфигурации. Например, для customer-A.yml:
Код: Выделить всё
x_enabled: True
start_date_x: '01-01-2024'
y_enabled: False
start_date_y: '01-01-2022'
values:
x: 'value-x'
y: 'value-y'
- Модель . В идеале я хотел бы сделать что-то вроде следующего (простой пример псевдокода модели):
Код: Выделить всё
class Customer:
@udf(returnType=StringType())
def calculate_value(self) -> str:
# some conditions and logic using the properties and values defined in the inherited classes.
class CustomerA(Customer):
x_enabled: ...
start_date_x: ...
y_enabled: ...
start_date_y: ...
values: ...
Вопрос
Один из вариантов — передать все параметры, преобразованные с помощью функцииlit(), но я считаю это довольно некрасивым и не масштабируемым (на случай, если мои условия станут более сложными). Есть ли лучший подход?
Подробнее здесь: https://stackoverflow.com/questions/790 ... spark-udfs