У меня есть таблица с 20 миллионами строк и более чем 400 столбцами. Помимо первого столбца, мне нужно изменить все остальные столбцы на равномерно распределенные децили, независимо от других столбцов. Мои данные находятся в AWS Databricks. Я запускаю блокнот Python как задание, используя кластер заданий со следующей конфигурацией:
import pandas as pd
from pyspark.sql import SparkSession
#import data
df = spark.sql("select * from my_space.table1")
# Convert to a pandas DataFrame
pandas_df = df.toPandas()
# List of columns to calculate deciles
columns = pandas_df.columns[1:]
# Loop through each column and calculate deciles (1 to 10)
for col in columns:
# Use qcut to assign a decile rank (1 to 10)
pandas_df[col + '_decile'] = pd.qcut(pandas_df[col], 10, labels=False, duplicates='drop') + 1 # Deciles 1-10
# Convert the DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
# Select the 'person_id' column along with columns that end with '_decile'
decile_columns = ['person_id'] + [col for col in spark_df.columns if col.endswith('_decile')]
# Create a new DataFrame with only 'person_id' and decile columns
decile_df = spark_df.select(decile_columns)
#some minor stuff here where I saved a new df as df_renamed and renamed some columns
# Write it to a table
df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table")
Это ошибка при преобразовании в Dataframe pandas. Насколько я понимаю, увеличение этого размера может даже не помочь, потому что pandas не очень эффективно обрабатывает большие данные.
SparkException:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 164 tasks (23.4 GiB) is bigger than local result size limit 23.3 GiB, to address it, set spark.driver.maxResultSize bigger than your dataset result size.
Попытка 2: использование функции ntile Pyspark, но задание все еще выполняется через 28 часов.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# import data
df = spark.sql("select * from my_space.table1")
# List of columns to calculate deciles
columns = df.columns[1:]
# define number of deciles
num_deciles = 10
# List to hold the transformations for each column
decile_columns = []
for col in columns:
window_spec = Window.orderBy(col)
# Apply the ntile function to create deciles based on the column's values
decile_columns.append(F.ntile(num_deciles).over(window_spec).alias(f'{col}_decile'))
# Apply the transformations to the DataFrame
df_with_deciles = df.select('*', *decile_columns)
# Select the 'person_id' column along with columns that end with '_decile'
decile_columns2 = ['person_id'] + [col for col in df_with_deciles.columns if col.endswith('_decile')]
# Create a new DataFrame with only 'person_id' and decile columns along with their values
decile_df = df_with_deciles.select(decile_columns2)
#some minor stuff here where I saved a new df as df_renamed and renamed some columns
# Write it to a table
df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table")
Вопрос. Стоит ли попробовать что-нибудь еще, помимо увеличения размера кластера или разделения столбцов для вычисления децилей и последующего объединения вместе?
У меня есть таблица с 20 миллионами строк и более чем 400 столбцами. Помимо первого столбца, мне нужно изменить все остальные столбцы на равномерно распределенные децили, независимо от других столбцов. Мои данные находятся в AWS Databricks. Я запускаю блокнот Python как задание, используя кластер заданий со следующей конфигурацией: [code]Multi-Node with Enable Autoscaling checked Driver: i3en.3xlarge · Workers: i3en.3xlarge · 2-10 workers · DBR: 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)] [/code] Итак, пример данных и желаемый результат (т. е. желаемый результат — последние два столбца) могут выглядеть следующим образом:
person_id col2 col400 col2_decile col400_decile
a 0,01 0,10 1 1
b 0,15 0,25 2 2
c 0,20 0,30 3 3
d0,25 0,30 3 3
e 0,30 0,40 4 4
f 0,35 0,45 4 5
g 0,400,50 5 6
ч 0,45 0,55 6 6
i 0,50 0,60 6 8
j 0,550,62 7 8
к 0,55 0,33 8 4
l 0,56 0,64 8 9
n 0.050,59 1 7
о 0,19 0,22 2 1
p 0,42 0,49 5 5
q 0,510.23 7 2
r 0,75 0,63 10 9
с 0,82 1,02 10 10
t 0,590,76 9 10
у 0,57 0,57 9 7
[b]Попытка 1[/b]: Моя первая попытка использовала Pandas qcut функция, но неудивительно, что при преобразовании в pandas Dataframe. [code]import pandas as pd from pyspark.sql import SparkSession
#import data df = spark.sql("select * from my_space.table1")
# Convert to a pandas DataFrame pandas_df = df.toPandas()
# List of columns to calculate deciles columns = pandas_df.columns[1:]
# Loop through each column and calculate deciles (1 to 10) for col in columns: # Use qcut to assign a decile rank (1 to 10) pandas_df[col + '_decile'] = pd.qcut(pandas_df[col], 10, labels=False, duplicates='drop') + 1 # Deciles 1-10
# Convert the DataFrame to a Spark DataFrame spark_df = spark.createDataFrame(pandas_df)
# Select the 'person_id' column along with columns that end with '_decile' decile_columns = ['person_id'] + [col for col in spark_df.columns if col.endswith('_decile')]
# Create a new DataFrame with only 'person_id' and decile columns decile_df = spark_df.select(decile_columns)
#some minor stuff here where I saved a new df as df_renamed and renamed some columns
# Write it to a table df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table") [/code] Это ошибка при преобразовании в Dataframe pandas. Насколько я понимаю, увеличение этого размера может даже не помочь, потому что pandas не очень эффективно обрабатывает большие данные. [code]SparkException: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 164 tasks (23.4 GiB) is bigger than local result size limit 23.3 GiB, to address it, set spark.driver.maxResultSize bigger than your dataset result size. [/code] [b]Попытка 2[/b]: использование функции ntile Pyspark, но задание все еще выполняется через 28 часов. [code]from pyspark.sql import functions as F from pyspark.sql.window import Window
# import data df = spark.sql("select * from my_space.table1")
# List of columns to calculate deciles columns = df.columns[1:]
# define number of deciles num_deciles = 10
# List to hold the transformations for each column decile_columns = []
for col in columns: window_spec = Window.orderBy(col)
# Apply the ntile function to create deciles based on the column's values decile_columns.append(F.ntile(num_deciles).over(window_spec).alias(f'{col}_decile'))
# Apply the transformations to the DataFrame df_with_deciles = df.select('*', *decile_columns)
# Select the 'person_id' column along with columns that end with '_decile' decile_columns2 = ['person_id'] + [col for col in df_with_deciles.columns if col.endswith('_decile')]
# Create a new DataFrame with only 'person_id' and decile columns along with their values decile_df = df_with_deciles.select(decile_columns2)
#some minor stuff here where I saved a new df as df_renamed and renamed some columns
# Write it to a table df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table") [/code] [b]Вопрос[/b]. Стоит ли попробовать что-нибудь еще, помимо увеличения размера кластера или разделения столбцов для вычисления децилей и последующего объединения вместе?
Я хочу написать скрипт для массового импорта csv-файлов в PostgreSQL. Я хочу импортировать два разных набора данных. Одна из них — это папка, содержащая финансовые показатели сотен компаний, помещенная в разные файлы CSV, разделенные тикером. Все...
Я изучаю Spark, поэтому в качестве задачи нам нужно было создать колесо локально, а затем установить его в Databricks (я использую Azure Databricks) и протестировать его, запустив из блокнота Databrick. Эта программа предполагает чтение файла CSV...
Я запустил приведенный ниже код для экспорта модели ML в mlflow на основе Azure Databricks , но, похоже, получаю эту ошибку: Хост или токен MLflow настроены неправильно .
Я не могу понять, в чем проблема. URL-адрес рабочей области и токен PAT...
Я выполнил приведенный ниже код для экспорта модели машинного обучения в mlflow на основе Azure Databricks , но, похоже, получаю эту ошибку
MLflow host or token is not configured correctly
Я не могу понять, в чем проблема. URL-адрес рабочей области...