Эффективная сортировка/удаление дубликатов кадров данных Polars.Python

Программы на Python
Ответить
Anonymous
 Эффективная сортировка/удаление дубликатов кадров данных Polars.

Сообщение Anonymous »

Я пытаюсь импортировать очень большие файлы CSV в файлы паркета с помощью поляров. Я передаю данные, использую ленивые фреймы данных и приемники. Никаких проблем, пока...
... не отсортирую фрейм данных по столбцу и не удалю дубликаты. Требование, которое нельзя пропустить, заключается в том, что данные, записываемые в паркет, должны быть уникальными по столбцу datetime и отсортированы по одному и тому же столбцу. Узким местом является сортировка и удаление дубликатов. Насколько я понимаю, данные должны полностью находиться в памяти, чтобы удалять дубликаты и сортировать. Нет никакой гарантии, что исходные данные отсортированы или не имеют дубликатов.
Запись фрейма данных в паркет без сортировки и без проверки дубликатов не является проблемой и приводит к созданию файлов паркета размером около 3-4 ГБ. Но их чтение, сортировка и применение unique() приводит к увеличению потребления памяти до 128 ГБ, что является пределом памяти моего хоста (я запускаю код в Ubuntu в WSL2). Я уже выделил максимальный объем памяти для WSL2 и подтвердил, что у него есть доступ ко всему объему памяти. В какой-то момент сортировка и удаление дубликатов приводит к сбою виртуальной машины WSL. Кажется, я не могу эффективно сортировать и удалять дубликаты.
Можете ли вы предложить лучший подход, чем тот, который я сейчас использую:
def import_csv(self, symbol_id: str, data_source: DataSource, data_type: DataType, source_files: List[str], column_schema: List[ColumnSchema]) -> None:

#ensure 1 or 2 source files are provided
if len(source_files) != 1 and len(source_files) != 2:
raise ValueError(f"Can only process 1 or 2 source files for symbol {symbol_id}")

#obtain new df
new_df = self._csv_to_dataframe(source_files, column_schema)

#filter out duplicates and sort by datetime
new_df = new_df.unique(subset="datetime")
new_df = new_df.sort("datetime")

#merge with existing data if it exists
path_filename = self.base_directory / f"{symbol_id}_{data_source.value}_{data_type.value}.parquet"
if path_filename.exists():
old_df = pl.scan_parquet(path_filename, glob=False)
df = pl.concat([old_df, new_df], how="vertical")
else:
df = new_df

#write to parquet
df.sink_parquet(path_filename, engine="streaming")

#update metadata
# self._update_metadata(symbol_id, data_source, data_type, len(df), df["datetime"].first(), df["datetime"].last())

#logging
# self.logger.info(f"Imported {len(df)} rows for {symbol_id} from {df["datetime"].first()} to {df["datetime"].last()}")

def _csv_to_dataframe(self, source_files: list[str], column_schema: List[ColumnSchema]) -> pl.LazyFrame:

# Generate Polars expressions for column transformations
expressions = self._generate_polars_expressions(column_schema)

dfs = []
for source_file in source_files:
df = pl.scan_csv(source_file, has_header=True, glob=False).select(expressions)
dfs.append(df)

if len(dfs) == 1:
df = dfs[0]
else:
df = pl.concat(dfs, how="vertical")
df = df.group_by("datetime").mean()

return df

def _generate_polars_expressions(self, schema: list[ColumnSchema]) -> list[pl.Expr]:

expressions = []
for col_schema in schema:
# Create a base expression from the source column name
expr = pl.col(col_schema.source_column_name)

# Handle special cases based on the target data type
if col_schema.dtype == pl.Datetime:

# Ensure datetime format is provided
if col_schema.datetime_format is None:
raise ValueError(
f"Datetime format is required for column '{col_schema.source_column_name}'"
)

# For datetime, we first parse the string with the specified format
expr = expr.str.to_datetime(format=col_schema.datetime_format, time_unit=self.time_unit, time_zone=col_schema.from_timezone)

#always convert to default timezone
expr = expr.dt.convert_time_zone(self.data_timezone)
else:
# For other dtypes, a simple cast is sufficient
expr = expr.cast(col_schema.dtype)

# Alias the expression with the target column name
final_expr = expr.alias(col_schema.target_column_name)

# Add the final expression to the list
expressions.append(final_expr)

return expressions


Подробнее здесь: https://stackoverflow.com/questions/797 ... dataframes
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»