ETL в сверхсложной структуре данных с пандами и pysparkPython

Программы на Python
Ответить
Anonymous
 ETL в сверхсложной структуре данных с пандами и pyspark

Сообщение Anonymous »

Вопрос: достаточно ли эффективны предложенные методы для выравнивания данного фрейма данных или их можно усовершенствовать?
Пример столбцов входного фрейма данных pandas (это заданное значение, не может быть изменено):
  • id1 (строка)
  • id2 (строка)
  • date_create (datetime[ms])
  • json_data (объект)
Пример pd.DataFrame(...).to_dict(orient="records"):

Код: Выделить всё

[
{
"id1": "11111111-1111-1111-1111-111111111111",
"id2": "00000000-0000-0000-0000-000000000001",
"date_create": "2026-02-01 13:21:46.281000 +0000",
"json_data": [
{
"data": [
{
"Jul": [
{
"amount": "1317255",
"count": "10",
"type": "type1"
},
{
"amount": "1317526.98",
"count": "2",
"type": "type2"
}
],
"Aug": [
{
"amount": "275.98",
"count": "1",
"type": "type1"
}
],
"Nov": [
{
"amount": "301000",
"count": "2",
"type": "type1"
},
{
"amount": "301000",
"count": "2",
"type": "type3"
}
]
}
],
"uuid": "22222222-2222-2222-2222-222222222222",
"token": "xyz"
}
]
}
]
Самая большая обработка Pandasque, которую я мог написать:

Код: Выделить всё

month_mapping: dict[str, int] = {
"Jan": 1,
"Feb": 2,
"Mar": 3,
"Apr": 4,
"May": 5,
"Jun": 6,
"Jul": 7,
"Aug": 8,
"Sep": 9,
"Oct": 10,
"Nov": 11,
"Dec": 12,
}

exploded = df.explode("json_data")
# not using .apply(pd.Series), because unsure if all three keys present at all times
exploded["uuid"] = exploded["json_data"].apply(
lambda x: x.get("uuid", ""),
)
exploded["token"] = exploded["json_data"].apply(
lambda x: x.get("token", ""),
)
exploded["json_data"] = exploded["json_data"].apply(
lambda x:  x.get("data", []),
)

indexes = ["id1", "id2", "date_create", "uuid", "token"]
temp = exploded.explode("json_data").reset_index(drop=True)
widened = pd.concat(
[temp[indexes], pd.json_normalize(temp["json_data"])],
axis=1,
).reindex(columns=[*indexes, *month_mapping.keys()])

lengthened = (
widened.melt(
id_vars=indexes,
var_name="month",
value_vars=list(month_mapping.keys()),
value_name="tx",
)
.explode("tx")
.reset_index(drop=True)
)

final = pd.concat(
[lengthened.drop(columns=["tx"]), pd.json_normalize(lengthened["tx"])],
axis=1,
)

final["date_create"] = pd.to_datetime(final["date_create"])

result = final.reindex(
columns=[
*indexes,
"type",
"count",
"amount",
"month",
],
)
Предварительная обработка с использованием циклов For:

Код: Выделить всё

final_rows = []

for _, df_row in df.iterrows():

for json_data in df_row["json_data"]:
uuid = json_data.get("uuid", "")
token = json_data.get("token", "")

for data_item in json_data.get("data", []):

for month_col, month_txs in data_item.items():

for tx in month_txs:
final_row = {
"id1": df_row["id1"],
"id2": df_row["id2"],
"date_create": df_row["date_create"],
"uuid": uuid,
"token": token,
"month": month_col,
"type": tx.get("type", None),
"count": tx.get("count", 0),
"amount": tx.get("amount", 0),
}

final_rows.append(final_row)

result = pd.DataFrame(
final_rows,
columns=[
"id1",
"id2",
"date_create",
"uuid",
"token",
"month",
"type",
"count",
"amount",
],
)
Похоже, что циклы for на самом деле дают примерно те же результаты производительности, что и код pandas, на нескольких сотнях тысяч строк данного кадра данных. Часть, которую я лично не могу понять, — это код для выравнивания чуть менее вложенных структур данных (на 2-4 уровня выше самой нижней), но в целом то же самое, и с Pandas, и с циклами for почти в 20 и 6 раз быстрее соответственно, чем то, что я предоставил.
Это только часть кода, который выполняется; другой — агрегация данных, но это единственная часть, которая не сильно меняется между кадрами данных с разными структурами данных.
Важные соображения относительно среды, в которой выполняется код: это движок pyspark с параллельной пакетной обработкой, 1000 строк на пакет (кажется, довольно небольшим для использования возможностей панд, не могу сказать наверняка). Каждый пакет выполняется на отдельном узле, каждый из них извлекает данные из базы данных с отдельным соединением.
Если код pandas настолько эффективен, насколько это возможно, может ли переход на pyspark.sql или поляры кардинально изменить ситуацию?

Подробнее здесь: https://stackoverflow.com/questions/798 ... nd-pyspark
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»