Я пытаюсь погрузиться в большой паркетный файл с полярными. Это должно быть легко достичь в (1) памяти: < /p>
import os ; os.environ['POLARS_MAX_THREADS'] = '4'
import polars as pl
import time, random
import numpy as np
random.seed(42)
N_TICKS = 100_000
N_TICKERS = 10_000
T0 = 1755634720560000000
def generate_fake_example_data():
tmp = []
for t in range(N_TICKERS):
print(t,end=',')
tmp.append(pl.DataFrame({
"ticker": [f"ticker{t}"] * N_TICKS,
"epoch_nanos": T0 + np.cumsum(np.random.randint(1e7, 1e10, size=N_TICKS)),
"price": np.round(np.random.uniform(100, 400, size=N_TICKS), 2),
}))
data = pl.concat(tmp)
print(f"{len(data)=}")
data.write_parquet("example_input.parquet")
generate_fake_example_data()
!ls -lah example_input.parquet
print(pl.__version__)
DOWNSAMPLE_NANOS = int(1e11)
# RAM usage spikes by 60GiB
d = pl.scan_parquet("example_input.parquet")
d = d.with_columns((pl.col('epoch_nanos') // DOWNSAMPLE_NANOS).alias('ts_bucket'))
d = d.filter(
(pl.col('ticker') != pl.col('ticker').shift(-1).fill_null('EOF'))
|(pl.col('ts_bucket') != pl.col('ts_bucket').shift(-1))
).drop('ts_bucket')
print(d.explain(engine='streaming'))
d.sink_parquet("example_output.parquet",engine='streaming')
< /code>
output: < /p>
-rw-rw-r--. 1 ec2-user ec2-user 9.4G Aug 19 20:42 example_input.parquet
1.32.3
simple π 3/3 ["ticker", "epoch_nanos", ... 1 other column]
FILTER [([(col("ticker")) != (col("ticker").shift([dyn int: -1]).fill_null(["EOF"]))]) | ([(col("ts_bucket")) != (col("ts_bucket").shift([dyn int: -1]))])]
FROM
WITH_COLUMNS:
[[(col("epoch_nanos")) floor_div (1000000000)].alias("ts_bucket")]
Parquet SCAN [example_input.parquet]
PROJECT 3/3 COLUMNS
Подробнее здесь: https://stackoverflow.com/questions/797 ... on-shift-1
Потоковая потоковая передача Polars: Parquet Parquet на основе Shift (-1) ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Проблема с производительностью при записи Polars.DataFrame в файл .parquet
Anonymous » » в форуме Python - 0 Ответы
- 18 Просмотры
-
Последнее сообщение Anonymous
-