Как оптимизировать наборы данных Delta Lake в Polars (сортировка, уплотнение, очистка)?Python

Программы на Python
Ответить
Anonymous
 Как оптимизировать наборы данных Delta Lake в Polars (сортировка, уплотнение, очистка)?

Сообщение Anonymous »

Я планирую использовать Polars с Delta Lake для управления большими изменяемыми наборами данных на своем ноутбуке. Я столкнулся с двумя проблемами:
  • Набор данных не сортируется после слияния:
    Когда я использую write_delta() в режиме «слияния», результирующий набор данных не сортируется.
    Мой текущий обходной путь — вручную отсортировать и перезаписать таблицу Delta, но это явно неэффективно для больших наборов данных.
    Есть ли способ обеспечить сортировку непосредственно во время операции слияния в Polars?
  • Много неиспользуемых файлов:
    Когда я перезаписываю набор данных Delta Lake, старые и неиспользуемые файлы не удаляются автоматически. Для очистки требуется запустить «VACUUM» вручную.
    Есть ли способ автоматизировать этот процесс очистки во время операций write_delta()?
Будем очень признательны за любые советы или обходные пути!
import polars as pl

# Define the Delta Lake path
delta_path = "/PathToDataset/dataset.delta"

df_old = pl.from_repr(
"""
┌───────┬─────┬───────┐
│ group ┆ id ┆ val_1 │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ f64 │
╞═══════╪═════╪═══════╡
│ A ┆ 0 ┆ 1.0 │
│ A ┆ 1 ┆ 2.0 │
│ A ┆ 2 ┆ 3.0 │
│ B ┆ 0 ┆ 11.0 │
│ B ┆ 1 ┆ 12.0 │
│ B ┆ 2 ┆ 13.0 │
│ C ┆ 0 ┆ 21.0 │
│ C ┆ 1 ┆ 22.0 │
│ C ┆ 2 ┆ 23.0 │
└───────┴─────┴───────┘
""")

df_new = pl.from_repr(
"""
┌───────┬─────┬───────┐
│ group ┆ id ┆ val_1 │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ f64 │
╞═══════╪═════╪═══════╡
│ A ┆ 1 ┆ 99.0 │
│ X ┆ 0 ┆ 99.0 │
└───────┴─────┴───────┘
""")

print("Saving the initial dataset to Delta Lake...")
df_old.write_delta(delta_path, mode="overwrite")

print("Updating the Delta Lake dataset with new data...")
df_new.write_delta(
delta_path,
mode="merge", # Merge mode for upserts
delta_merge_options={
"predicate": "s.group = t.group AND s.id = t.id", # Match on 'group' and 'id'
"source_alias": "s",
"target_alias": "t",
}).when_matched_update_all().when_not_matched_insert_all().execute()
print("Update complete.")

df = pl.scan_delta(delta_path).collect()
print(df)
# ┌───────┬─────┬───────┐
# │ group ┆ id ┆ val_1 │
# │ --- ┆ --- ┆ --- │
# │ str ┆ i64 ┆ f64 │
# ╞═══════╪═════╪═══════╡
# │ X ┆ 0 ┆ 99.0 │
# │ A ┆ 1 ┆ 99.0 │
# │ A ┆ 0 ┆ 1.0 │
# │ A ┆ 2 ┆ 3.0 │
# │ B ┆ 0 ┆ 11.0 │
# │ B ┆ 1 ┆ 12.0 │
# │ B ┆ 2 ┆ 13.0 │
# │ C ┆ 0 ┆ 21.0 │
# │ C ┆ 1 ┆ 22.0 │
# │ C ┆ 2 ┆ 23.0 │
# └───────┴─────┴───────┘

Изменить: вот мой текущий обходной путь.
  • запись таблицы с использованием Polars.
  • сортировка таблицы с использованием deltalake.
  • очистка с использованием deltalake.
Эта функция предназначена для наборов данных с соотношением размера и памяти > 20 % на одном компьютере.
def update_delta(
delta_path: str,
new_data: pl.DataFrame,
index: list[str],
mode: str = "merge",
sort: bool = True,
vacuum: bool = True,
vacuum_retention_hours: int = 0,
verbose: bool = False) -> None:
"""
Upsert or overwrite a dataset into a Delta table using polars and deltalake.

The function is made for datasets of size/memory ratio >20% on a single machine.

Parameters:
delta_path (str): Path to the Delta Lake dataset.
new_data (pl.DataFrame): New dataset to upsert into the Delta table.
index (list[str]): List of column names used as the primary key for matching rows during the merge.
mode (str, optional): Operation mode, either "merge" or "overwrite". Defaults to "merge".
sort (bool, optional): Whether to sort the Delta table by the index after the operation. Defaults to True.
vacuum (bool, optional): Whether to perform a VACUUM operation to clean up unused files. Defaults to True.
vacuum_retention_hours (int, optional): Retention period for the VACUUM operation. Defaults to 0 hour.
verbose (bool, optional): Whether to print logs. Defaults to False.

Returns:
None.
"""
try:
# Check if the Delta table already exists
table_exists = DeltaTable.is_deltatable(delta_path)

if not table_exists:
if verbose:
print(f"Creating a new Delta table at {delta_path}.")
new_data.write_delta(delta_path, mode="overwrite")
else:
if mode == "merge":
if verbose:
print(f"Performing upsert into Delta table at {delta_path}...")
# Generate the merge predicate dynamically from the index
merge_predicate = " AND ".join([f"s.{col} = t.{col}" for col in index])
# Perform the merge operation
new_data.write_delta(
delta_path,
mode="merge",
delta_merge_options={
"predicate": merge_predicate,
"source_alias": "s",
"target_alias": "t",
},
).when_matched_update_all().when_not_matched_insert_all().execute()
elif mode == "overwrite":
if verbose:
print(f"Overwriting Delta table at {delta_path}...")
new_data.write_delta(delta_path, mode="overwrite")
else:
raise ValueError("Invalid mode. Supported modes are 'merge' and 'overwrite'.")

# Reorder the data using a Z-order curve to improve data skipping
if sort:
if verbose:
print("Performing reordering data using a Z-order curve....")
delta_table = DeltaTable(delta_path)
delta_table.optimize.z_order(index)

# Perform the VACUUM operation to clean up unused files
if vacuum:
if verbose:
print("Performing VACUUM to clean up older unused files...")
delta_table = DeltaTable(delta_path)
delta_table.vacuum(
retention_hours=vacuum_retention_hours,
dry_run=False,
enforce_retention_duration=False,
)
if verbose:
print(f"{mode.capitalize()} operation completed successfully.")
return None

except Exception as e:
print(f"An error occurred: {e}")
raise


Подробнее здесь: https://stackoverflow.com/questions/793 ... on-cleanup
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»