Pyarrow - определить записанные фрагменты или фильтры, используемые при записи набора данных паркета?Python

Программы на Python
Ответить
Anonymous
 Pyarrow - определить записанные фрагменты или фильтры, используемые при записи набора данных паркета?

Сообщение Anonymous »

Мой вариант использования заключается в том, что я хочу передать пути к файлам или фильтры задаче в Airflow как xcom, чтобы моя следующая задача могла прочитать только что обработанные данные.
Задача A записывает таблицу в секционированный набор данных, и создается несколько фрагментов файла Parquet --> Задача B считывает эти фрагменты позже как набор данных. Однако мне нужно читать только соответствующие данные, а не весь набор данных, который может состоять из многих миллионов строк.
Я протестировал два подхода:
  • Список измененных файлов сразу после завершения записи в набор данных. Это предоставит мне список путей, которые я могу вызвать ds.dataset(paths) во время выполнения следующей задачи. Я могу использовать partitioning.parse() для этих путей или проверить фрагменты, чтобы получить список используемых фильтров (

    Код: Выделить всё

    frag.partition_expression
    )
Недостаток этого подхода в том, что я могу записывать файлы параллельно с одним и тем же набором данных.
  • Я могу сгенерировать фильтры, используемые при записи набора данных, превратив таблицу в фрейм данных pandas, выполнив группировку, а затем создав фильтры. Я не уверен, что есть более простой подход к этому. Затем я могу использовать pq._filters_to_expression() для результатов, чтобы создать полезный фильтр.
Это не идеально, так как мне нужно исправить определенные типы данных, которые не сохраняются должным образом как Airflow xcom (без травления, поэтому все должно быть в формате json). Кроме того, если я хочу выполнить секционирование по столбцу словаря, мне, возможно, придется настроить эту функцию.

Код: Выделить всё

    def create_filter_list(df, partition_columns):
"""Creates a list of pyarrow filters to be sent through an xcom and evaluated as an expression. Xcom disables pickling, so we need to save timestamp and date values as strings and convert downstream"""
filter_list = []
value_list = []
partition_keys = [df[col] for col in partition_columns]
for keys, _ in df[partition_columns].groupby(partition_keys):
if len(partition_columns) == 1:
if is_jsonable(keys):
value_list.append(keys)
elif keys is not None:
value_list.append(str(keys))
else:
if not isinstance(keys, tuple):
keys = (keys,)
read_filter = []
for name, val in zip(partition_columns, keys):
if type(val) == np.int_:
read_filter.append((name, "==", int(val)))
elif val is not None:
read_filter.append((name, "==", str(val)))
filter_list.append(read_filter)
if len(partition_columns) == 1:
if len(value_list) > 0:
filter_list = [(name, "in", value_list) for name in partition_columns]
return filter_list
Есть какие-нибудь предложения по поводу того, какой подход мне следует использовать, или есть ли лучший способ достичь моей цели?

Подробнее здесь: https://stackoverflow.com/questions/662 ... -a-parquet
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»