Я пытаюсь импортировать очень большие файлы CSV в файлы паркета с помощью поляров. Я передаю данные, использую ленивые фреймы данных и приемники. Никаких проблем, пока...
... не отсортирую фрейм данных по столбцу и не удалю дубликаты. Требование, которое нельзя пропустить, заключается в том, что данные, записываемые в паркет, должны быть уникальными по столбцу datetime и отсортированы по одному и тому же столбцу. Узким местом является сортировка и удаление дубликатов. Насколько я понимаю, данные должны полностью находиться в памяти, чтобы удалять дубликаты и сортировать. Нет никакой гарантии, что исходные данные отсортированы или не имеют дубликатов.
Запись фрейма данных в паркет без сортировки и без проверки дубликатов не является проблемой и приводит к созданию файлов паркета размером около 3-4 ГБ. Но их чтение, сортировка и применение unique() приводит к увеличению потребления памяти до 128 ГБ, что является пределом памяти моего хоста (я запускаю код в Ubuntu в WSL2). Я уже выделил максимальный объем памяти для WSL2 и подтвердил, что у него есть доступ ко всему объему памяти. В какой-то момент сортировка и удаление дубликатов приводит к сбою виртуальной машины WSL. Кажется, я не могу эффективно сортировать и удалять дубликаты.
Можете ли вы предложить лучший подход, чем тот, который я сейчас использую:
def import_csv(self, symbol_id: str, data_source: DataSource, data_type: DataType, source_files: List[str], column_schema: List[ColumnSchema]) -> None:
#ensure 1 or 2 source files are provided
if len(source_files) != 1 and len(source_files) != 2:
raise ValueError(f"Can only process 1 or 2 source files for symbol {symbol_id}")
#obtain new df
new_df = self._csv_to_dataframe(source_files, column_schema)
#filter out duplicates and sort by datetime
new_df = new_df.unique(subset="datetime")
new_df = new_df.sort("datetime")
#merge with existing data if it exists
path_filename = self.base_directory / f"{symbol_id}_{data_source.value}_{data_type.value}.parquet"
if path_filename.exists():
old_df = pl.scan_parquet(path_filename, glob=False)
df = pl.concat([old_df, new_df], how="vertical")
else:
df = new_df
#write to parquet
df.sink_parquet(path_filename, engine="streaming")
#update metadata
# self._update_metadata(symbol_id, data_source, data_type, len(df), df["datetime"].first(), df["datetime"].last())
#logging
# self.logger.info(f"Imported {len(df)} rows for {symbol_id} from {df["datetime"].first()} to {df["datetime"].last()}")
def _csv_to_dataframe(self, source_files: list[str], column_schema: List[ColumnSchema]) -> pl.LazyFrame:
# Generate Polars expressions for column transformations
expressions = self._generate_polars_expressions(column_schema)
dfs = []
for source_file in source_files:
df = pl.scan_csv(source_file, has_header=True, glob=False).select(expressions)
dfs.append(df)
if len(dfs) == 1:
df = dfs[0]
else:
df = pl.concat(dfs, how="vertical")
df = df.group_by("datetime").mean()
return df
def _generate_polars_expressions(self, schema: list[ColumnSchema]) -> list[pl.Expr]:
expressions = []
for col_schema in schema:
# Create a base expression from the source column name
expr = pl.col(col_schema.source_column_name)
# Handle special cases based on the target data type
if col_schema.dtype == pl.Datetime:
# Ensure datetime format is provided
if col_schema.datetime_format is None:
raise ValueError(
f"Datetime format is required for column '{col_schema.source_column_name}'"
)
# For datetime, we first parse the string with the specified format
expr = expr.str.to_datetime(format=col_schema.datetime_format, time_unit=self.time_unit, time_zone=col_schema.from_timezone)
#always convert to default timezone
expr = expr.dt.convert_time_zone(self.data_timezone)
else:
# For other dtypes, a simple cast is sufficient
expr = expr.cast(col_schema.dtype)
# Alias the expression with the target column name
final_expr = expr.alias(col_schema.target_column_name)
# Add the final expression to the list
expressions.append(final_expr)
return expressions
Подробнее здесь: https://stackoverflow.com/questions/797 ... dataframes
Эффективная сортировка/удаление дубликатов кадров данных Polars. ⇐ Python
Программы на Python
-
Anonymous
1761950491
Anonymous
Я пытаюсь импортировать очень большие файлы CSV в файлы паркета с помощью поляров. Я передаю данные, использую ленивые фреймы данных и приемники. Никаких проблем, пока...
... не отсортирую фрейм данных по столбцу и не удалю дубликаты. Требование, которое нельзя пропустить, заключается в том, что данные, записываемые в паркет, должны быть уникальными по столбцу datetime и отсортированы по одному и тому же столбцу. Узким местом является сортировка и удаление дубликатов. Насколько я понимаю, данные должны полностью находиться в памяти, чтобы удалять дубликаты и сортировать. Нет никакой гарантии, что исходные данные отсортированы или не имеют дубликатов.
Запись фрейма данных в паркет без сортировки и без проверки дубликатов не является проблемой и приводит к созданию файлов паркета размером около 3-4 ГБ. Но их чтение, сортировка и применение unique() приводит к увеличению потребления памяти до 128 ГБ, что является пределом памяти моего хоста (я запускаю код в Ubuntu в WSL2). Я уже выделил максимальный объем памяти для WSL2 и подтвердил, что у него есть доступ ко всему объему памяти. В какой-то момент сортировка и удаление дубликатов приводит к сбою виртуальной машины WSL. Кажется, я не могу эффективно сортировать и удалять дубликаты.
Можете ли вы предложить лучший подход, чем тот, который я сейчас использую:
def import_csv(self, symbol_id: str, data_source: DataSource, data_type: DataType, source_files: List[str], column_schema: List[ColumnSchema]) -> None:
#ensure 1 or 2 source files are provided
if len(source_files) != 1 and len(source_files) != 2:
raise ValueError(f"Can only process 1 or 2 source files for symbol {symbol_id}")
#obtain new df
new_df = self._csv_to_dataframe(source_files, column_schema)
#filter out duplicates and sort by datetime
new_df = new_df.unique(subset="datetime")
new_df = new_df.sort("datetime")
#merge with existing data if it exists
path_filename = self.base_directory / f"{symbol_id}_{data_source.value}_{data_type.value}.parquet"
if path_filename.exists():
old_df = pl.scan_parquet(path_filename, glob=False)
df = pl.concat([old_df, new_df], how="vertical")
else:
df = new_df
#write to parquet
df.sink_parquet(path_filename, engine="streaming")
#update metadata
# self._update_metadata(symbol_id, data_source, data_type, len(df), df["datetime"].first(), df["datetime"].last())
#logging
# self.logger.info(f"Imported {len(df)} rows for {symbol_id} from {df["datetime"].first()} to {df["datetime"].last()}")
def _csv_to_dataframe(self, source_files: list[str], column_schema: List[ColumnSchema]) -> pl.LazyFrame:
# Generate Polars expressions for column transformations
expressions = self._generate_polars_expressions(column_schema)
dfs = []
for source_file in source_files:
df = pl.scan_csv(source_file, has_header=True, glob=False).select(expressions)
dfs.append(df)
if len(dfs) == 1:
df = dfs[0]
else:
df = pl.concat(dfs, how="vertical")
df = df.group_by("datetime").mean()
return df
def _generate_polars_expressions(self, schema: list[ColumnSchema]) -> list[pl.Expr]:
expressions = []
for col_schema in schema:
# Create a base expression from the source column name
expr = pl.col(col_schema.source_column_name)
# Handle special cases based on the target data type
if col_schema.dtype == pl.Datetime:
# Ensure datetime format is provided
if col_schema.datetime_format is None:
raise ValueError(
f"Datetime format is required for column '{col_schema.source_column_name}'"
)
# For datetime, we first parse the string with the specified format
expr = expr.str.to_datetime(format=col_schema.datetime_format, time_unit=self.time_unit, time_zone=col_schema.from_timezone)
#always convert to default timezone
expr = expr.dt.convert_time_zone(self.data_timezone)
else:
# For other dtypes, a simple cast is sufficient
expr = expr.cast(col_schema.dtype)
# Alias the expression with the target column name
final_expr = expr.alias(col_schema.target_column_name)
# Add the final expression to the list
expressions.append(final_expr)
return expressions
Подробнее здесь: [url]https://stackoverflow.com/questions/79760899/memory-efficient-sorting-removing-duplicates-of-polars-dataframes[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия