Как использовать функцию Windows ntile() или аналогичную для сотен столбцов в AWS DatabricksPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как использовать функцию Windows ntile() или аналогичную для сотен столбцов в AWS Databricks

Сообщение Anonymous »

У меня есть таблица с 20 миллионами строк и более чем 400 столбцами. Помимо первого столбца, мне нужно изменить все остальные столбцы на равномерно распределенные децили, независимо от других столбцов. Мои данные находятся в AWS Databricks. Я запускаю блокнот Python как задание, используя кластер заданий со следующей конфигурацией:

Код: Выделить всё

Multi-Node with Enable Autoscaling checked
Driver: i3en.3xlarge · Workers: i3en.3xlarge · 2-10 workers · DBR: 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)]
Итак, пример данных и желаемый результат (т. е. желаемый результат — последние два столбца) могут выглядеть следующим образом:



person_id
col2
col400
col2_decile
col400_decile




a
0,01
0,10
1
1


b
0,15
0,25
2
2


c
0,20
0,30
3
3


d0,25
0,30
3
3


e
0,30
0,40
4
4


f
0,35
0,45
4
5


g
0,400,50
5
6


ч
0,45
0,55
6
6


i
0,50
0,60
6
8


j
0,550,62
7
8


к
0,55
0,33
8
4


l
0,56
0,64
8
9


n
0.050,59
1
7


о
0,19
0,22
2
1


p
0,42
0,49
5
5


q
0,510.23
7
2


r
0,75
0,63
10
9


с
0,82
1,02
10
10


t
0,590,76
9
10


у
0,57
0,57
9
7



Попытка 1: Моя первая попытка использовала Pandas qcut функция, но неудивительно, что при преобразовании в pandas Dataframe.

Код: Выделить всё

import pandas as pd
from pyspark.sql import SparkSession

#import data
df = spark.sql("select * from my_space.table1")

# Convert to a pandas DataFrame
pandas_df = df.toPandas()

# List of columns to calculate deciles
columns = pandas_df.columns[1:]

# Loop through each column and calculate deciles (1 to 10)
for col in columns:
# Use qcut to assign a decile rank (1 to 10)
pandas_df[col + '_decile'] = pd.qcut(pandas_df[col], 10, labels=False, duplicates='drop') + 1  # Deciles 1-10

# Convert the DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)

# Select the 'person_id' column along with columns that end with '_decile'
decile_columns = ['person_id'] + [col for col in spark_df.columns if col.endswith('_decile')]

# Create a new DataFrame with only 'person_id' and decile columns
decile_df = spark_df.select(decile_columns)

#some minor stuff here where I saved a new df as df_renamed and renamed some columns

# Write it to a table
df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table")
Это ошибка при преобразовании в Dataframe pandas. Насколько я понимаю, увеличение этого размера может даже не помочь, потому что pandas не очень эффективно обрабатывает большие данные.

Код: Выделить всё

SparkException:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 164 tasks (23.4 GiB) is bigger than local result size limit 23.3 GiB, to address it, set spark.driver.maxResultSize bigger than your dataset result size.
Попытка 2: использование функции ntile Pyspark, но задание все еще выполняется через 28 часов.

Код: Выделить всё

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# import data
df = spark.sql("select * from my_space.table1")

# List of columns to calculate deciles
columns = df.columns[1:]

# define number of deciles
num_deciles = 10

# List to hold the transformations for each column
decile_columns = []

for col in columns:
window_spec = Window.orderBy(col)

# Apply the ntile function to create deciles based on the column's values
decile_columns.append(F.ntile(num_deciles).over(window_spec).alias(f'{col}_decile'))

# Apply the transformations to the DataFrame
df_with_deciles = df.select('*', *decile_columns)

# Select the 'person_id' column along with columns that end with '_decile'
decile_columns2 = ['person_id'] + [col for col in df_with_deciles.columns if col.endswith('_decile')]

# Create a new DataFrame with only 'person_id' and decile columns along with their values
decile_df = df_with_deciles.select(decile_columns2)

#some minor stuff here where I saved a new df as df_renamed and renamed some columns

# Write it to a table
df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table")
Вопрос. Стоит ли попробовать что-нибудь еще, помимо увеличения размера кластера или разделения столбцов для вычисления децилей и последующего объединения вместе?

Подробнее здесь: https://stackoverflow.com/questions/792 ... in-aws-dat
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»