Моя настройка< /p>
Код: Выделить всё
input = pathlib.Path("input.csv") # 300k lines
output = pathlib.Path("output.csv")
def mapper(row_id):
# expensive computation and can fail sometimes
pass
any_value_column_is_null = ... # polars expression
schema_as_dict = ... # polars schema
id_col_name = "id"
def process_unprocessed_rows_in_batch(df: pl.DataFrame) -> pl.DataFrame:
additional_data = (
df.filter(any_value_column_is_null)
.with_columns(
pl.col(id_col_name)
.map_elements(
mapper,
pl.Struct(schema_as_dict),
)
.alias(generated_data_col_name)
)
.with_columns(pl.col(generated_data_col_name).struct.unnest())
.drop(generated_data_col_name)
)
return df.update(additional_data, on=id_col_name, how="left")
df = pl.scan_csv(input, schema=schema_as_dict).map_batches(
process_unprocessed_rows_in_batch, streamable=True
)
df.sink_csv(output, maintain_order=False)
Код: Выделить всё
STREAMING:
OPAQUE_PYTHON
Csv SCAN [snippet-dataset.csv]
PROJECT */4 COLUMNS
Я предполагал, что потоковая передача выполняется пакетно, поэтому уже обработанные пакеты сохраняются в выходных данных, и в случае сбоя я потеряю только текущий пакет
Но, похоже, это не тот случай - если обработка не удалась, промежуточный вывод будет пустым
I попробовал настроить раковина_csv(batch_size) и pl.Config.set_streaming_chunk_size — не имеет эффекта
Подробнее здесь: https://stackoverflow.com/questions/793 ... processing
Мобильная версия