- Набор данных не сортируется после слияния:
Когда я использую write_delta() в режиме «слияния», результирующий набор данных не сортируется.
Мой текущий обходной путь — вручную отсортировать и перезаписать таблицу Delta, но это явно неэффективно для больших наборов данных.
Есть ли способ обеспечить сортировку непосредственно во время операции слияния в Polars? - Много неиспользуемых файлов:
Когда я перезаписываю набор данных Delta Lake, старые и неиспользуемые файлы не удаляются автоматически. Для очистки требуется запустить «VACUUM» вручную.
Есть ли способ автоматизировать этот процесс очистки во время операций write_delta()?
import polars as pl
# Define the Delta Lake path
delta_path = "/PathToDataset/dataset.delta"
df_old = pl.from_repr(
"""
┌───────┬─────┬───────┐
│ group ┆ id ┆ val_1 │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ f64 │
╞═══════╪═════╪═══════╡
│ A ┆ 0 ┆ 1.0 │
│ A ┆ 1 ┆ 2.0 │
│ A ┆ 2 ┆ 3.0 │
│ B ┆ 0 ┆ 11.0 │
│ B ┆ 1 ┆ 12.0 │
│ B ┆ 2 ┆ 13.0 │
│ C ┆ 0 ┆ 21.0 │
│ C ┆ 1 ┆ 22.0 │
│ C ┆ 2 ┆ 23.0 │
└───────┴─────┴───────┘
""")
df_new = pl.from_repr(
"""
┌───────┬─────┬───────┐
│ group ┆ id ┆ val_1 │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ f64 │
╞═══════╪═════╪═══════╡
│ A ┆ 1 ┆ 99.0 │
│ X ┆ 0 ┆ 99.0 │
└───────┴─────┴───────┘
""")
print("Saving the initial dataset to Delta Lake...")
df_old.write_delta(delta_path, mode="overwrite")
print("Updating the Delta Lake dataset with new data...")
df_new.write_delta(
delta_path,
mode="merge", # Merge mode for upserts
delta_merge_options={
"predicate": "s.group = t.group AND s.id = t.id", # Match on 'group' and 'id'
"source_alias": "s",
"target_alias": "t",
}).when_matched_update_all().when_not_matched_insert_all().execute()
print("Update complete.")
df = pl.scan_delta(delta_path).collect()
print(df)
# ┌───────┬─────┬───────┐
# │ group ┆ id ┆ val_1 │
# │ --- ┆ --- ┆ --- │
# │ str ┆ i64 ┆ f64 │
# ╞═══════╪═════╪═══════╡
# │ X ┆ 0 ┆ 99.0 │
# │ A ┆ 1 ┆ 99.0 │
# │ A ┆ 0 ┆ 1.0 │
# │ A ┆ 2 ┆ 3.0 │
# │ B ┆ 0 ┆ 11.0 │
# │ B ┆ 1 ┆ 12.0 │
# │ B ┆ 2 ┆ 13.0 │
# │ C ┆ 0 ┆ 21.0 │
# │ C ┆ 1 ┆ 22.0 │
# │ C ┆ 2 ┆ 23.0 │
# └───────┴─────┴───────┘
Изменить: вот мой текущий обходной путь.
- запись таблицы с использованием Polars.
- сортировка таблицы с использованием deltalake.
- очистка с использованием deltalake.
def update_delta(
delta_path: str,
new_data: pl.DataFrame,
index: list[str],
mode: str = "merge",
sort: bool = True,
vacuum: bool = True,
vacuum_retention_hours: int = 0,
verbose: bool = False) -> None:
"""
Upsert or overwrite a dataset into a Delta table using polars and deltalake.
The function is made for datasets of size/memory ratio >20% on a single machine.
Parameters:
delta_path (str): Path to the Delta Lake dataset.
new_data (pl.DataFrame): New dataset to upsert into the Delta table.
index (list[str]): List of column names used as the primary key for matching rows during the merge.
mode (str, optional): Operation mode, either "merge" or "overwrite". Defaults to "merge".
sort (bool, optional): Whether to sort the Delta table by the index after the operation. Defaults to True.
vacuum (bool, optional): Whether to perform a VACUUM operation to clean up unused files. Defaults to True.
vacuum_retention_hours (int, optional): Retention period for the VACUUM operation. Defaults to 0 hour.
verbose (bool, optional): Whether to print logs. Defaults to False.
Returns:
None.
"""
try:
# Check if the Delta table already exists
table_exists = DeltaTable.is_deltatable(delta_path)
if not table_exists:
if verbose:
print(f"Creating a new Delta table at {delta_path}.")
new_data.write_delta(delta_path, mode="overwrite")
else:
if mode == "merge":
if verbose:
print(f"Performing upsert into Delta table at {delta_path}...")
# Generate the merge predicate dynamically from the index
merge_predicate = " AND ".join([f"s.{col} = t.{col}" for col in index])
# Perform the merge operation
new_data.write_delta(
delta_path,
mode="merge",
delta_merge_options={
"predicate": merge_predicate,
"source_alias": "s",
"target_alias": "t",
},
).when_matched_update_all().when_not_matched_insert_all().execute()
elif mode == "overwrite":
if verbose:
print(f"Overwriting Delta table at {delta_path}...")
new_data.write_delta(delta_path, mode="overwrite")
else:
raise ValueError("Invalid mode. Supported modes are 'merge' and 'overwrite'.")
# Reorder the data using a Z-order curve to improve data skipping
if sort:
if verbose:
print("Performing reordering data using a Z-order curve....")
delta_table = DeltaTable(delta_path)
delta_table.optimize.z_order(index)
# Perform the VACUUM operation to clean up unused files
if vacuum:
if verbose:
print("Performing VACUUM to clean up older unused files...")
delta_table = DeltaTable(delta_path)
delta_table.vacuum(
retention_hours=vacuum_retention_hours,
dry_run=False,
enforce_retention_duration=False,
)
if verbose:
print(f"{mode.capitalize()} operation completed successfully.")
return None
except Exception as e:
print(f"An error occurred: {e}")
raise
Подробнее здесь: https://stackoverflow.com/questions/793 ... on-cleanup
Мобильная версия