Проблема с высоким использованием памяти файла Polars read_ndjson DynamoDBPython

Программы на Python
Ответить
Anonymous
 Проблема с высоким использованием памяти файла Polars read_ndjson DynamoDB

Сообщение Anonymous »

Я пытаюсь свести файл JSON DynamoDB с помощью Polars.
Мой файл JSON весит 1 ГБ, 100 месяцев после сжатия в gz, но в памяти, когда я его читаю, он использует 16 ГБ.
Поскольку данные поступают из DynamoDB, мои столбцы могут иметь несколько типов. Например, 1 год назад это была строка, сегодня это логическое значение.
Итак, я провел 4 эксперимента только с pl.read_ndjson и write_parquet:
  • Прочитал json-файл размером ~150 МБ с 1 вложенным уровнем, все строки имеют одинаковую схему -> ~390 МБ использованной памяти
  • Прочитал тот же файл, сжатый с помощью gz -> ~750 МБ использованной памяти
  • Чтение файла JSON ~150 МБ с несколькими уровнями вложенности и различными схемами -> Использовано 7,7 ГБ памяти
  • Чтение того же файла, сжатого с помощью gz -> ~7,7 ГБ использованной памяти
Почему требуется так много памяти, если схема во вложенном файле непостоянна?
Вот мой скрипт для простых данных:
import json
import random
from uuid import uuid4
from datetime import datetime

DYNAMO_TYPES = ["S", "N", "BOOL"]

def random_dynamodb_value(i, depth=0):
if depth > 0:
return {"S": str(uuid4())}

value_type = DYNAMO_TYPES[i%3]

if value_type == "S":
return {"S": f"val-{uuid4().hex[:8]}"}
elif value_type == "N":
return {"N": str(random.randint(0, 1000))}
elif value_type == "BOOL":
return {"BOOL": random.choice([True, False])}

def generate_extra_easy_fields(n=50):
extra = {}
for i in range(n):
field_name = f"field_{i}"
extra[field_name] = random_dynamodb_value(i)
return extra

def generate_easy_entry():
base = {
"id": {"S": str(uuid4())},
"timestamp": {"S": datetime.now().isoformat()},
}

base.update(generate_extra_easy_fields(50))
return base

def generate_simple_jsonl_file(filename: str, n: int = 20):
with open(filename, "w") as f:
for i in range(n):
entry = json.dumps(generate_easy_entry())
f.write(entry + "\n")

Вот мой скрипт для сложных данных:
import json
import random
from uuid import uuid4
from datetime import datetime

DYNAMO_TYPES = ["S", "N", "BOOL", "M", "L"]

def random_dynamodb_value(i, depth=0):
if depth > 2:
return {"S": str(uuid4())}

value_type = DYNAMO_TYPES[i%5]

if value_type == "S":
return {"S": f"val-{uuid4().hex[:8]}"}
elif value_type == "N":
return {"N": str(random.randint(0, 1000))}
elif value_type == "BOOL":
return {"BOOL": random.choice([True, False])}
elif value_type == "M":
return {
"M": {
f"key_{j}": random_dynamodb_value(random.randint(0, 4), depth + 1)
for j in range(random.randint(1, 3))
}
}
elif value_type == "L":
field = random.randint(0, 4)
return {
"L": [random_dynamodb_value(field, depth + 1) for _ in range(random.randint(2, 4))]
}

def generate_multitype_field():
return random.choice([
{"S": "text"},
{"BOOL": random.choice([True, False])},
{"N": str(random.randint(0, 100))},
])

def generate_extra_fields(n=50):
extra = {}
for i in range(n):
field_name = f"field_{i}"
extra[field_name] = random_dynamodb_value(i)
return extra

def generate_entry():
base = {
"id": {"S": str(uuid4())},
"timestamp": {"S": datetime.now().isoformat()},
"meta": {"M": {"level1": {"M": {"level2": {"M": {"level3": {"L": [random_dynamodb_value(3) for _ in range(2)]}}}}}}},
"listOfStructs": {
"L": [
{
"M": {
"subId": {"N": str(i)},
"flag": {"BOOL": random.choice([True, False])}
}
} for i in range(random.randint(2, 4))
]
},
"structOfStructOfStruct": {"M": {"a": {"M": {"b": {"M": {"c": random_dynamodb_value(2)}}}}}},
"structOfStructOfList": {"M": {"outer": {"M": {"innerList": {"L": [random_dynamodb_value(2) for _ in range(3)]}}}}},
"multiTypeField": generate_multitype_field()
}

base.update(generate_extra_fields(50))
return base

def generate_complexe_jsonl_file(filename: str, n: int = 20):
with open(filename, "w") as f:
for i in range(n):
entry = json.dumps(generate_entry())
f.write(entry + "\n")

Код для проведения экспериментов (выберите сценарий):
import os

import polars as pl
import psutil

from generate_complexe_data import generate_complexe_jsonl_file
from generate_simple_data import generate_simple_jsonl_file

def print_memory():
process = psutil.Process(os.getpid())
rss = process.memory_info().rss
print(f"Mémoire utilisée : {rss / 1024 / 1024:.2f} Mo")

def main() -> None:
"""Process DynamoDB export from manifest to partitioned parquet."""
# generate_complexe_jsonl_file("dynamodb_complexe.json", n=25000)
# generate_simple_jsonl_file("dynamodb_simple.json", n=100000)

# print("read simple")
# print_memory()
# df_simple = pl.read_ndjson("dynamodb_simple.json")
# print_memory()
# print("write simple")
# df_simple.write_parquet("output_simple.parquet")
# print_memory()
#
# print("read simple gz")
# print_memory()
# df_simple_gz = pl.read_ndjson("dynamodb_simple.json.gz")
# print_memory()
# print("write simple gz")
# df_simple_gz.write_parquet("output_simple_gz.parquet")
# print_memory()
#
# print("read complexe")
# print_memory()
# df_complexe = pl.read_ndjson("dynamodb_complexe.json")
# print_memory()
# print("write complexe")
# df_complexe.write_parquet("output_complexe.parquet")
# print_memory()

print("read complexe gz")
print_memory()
df_complexe_gz = pl.read_ndjson("dynamodb_complexe.json.gz")
print_memory()
print("write complexe gz")
df_complexe_gz.write_parquet("output_complexe_gz.parquet")
print_memory()

if __name__== "__main__":
main()


Подробнее здесь: https://stackoverflow.com/questions/796 ... sage-issue
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»