Оптимизация агрегаций PysparkPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Оптимизация агрегаций Pyspark

Сообщение Anonymous »

У меня огромный флажок данных с 3B строк. Я запускаю код pyspark ниже с конфигурацией Spark. < /P>

Код: Выделить всё

spark = SparkSession\
.builder\
.appName("App")\
.config("spark.executor.memory","10g")\
.config("spark.executor.cores","4")\
.config("spark.executor.instances","6")\
.config("spark.sql.adaptive.enabled","true")\
.config("spark.dynamicAllocation.enabled","false")\
.enableHiveSupport()\
.getOrCreate()

df = spark.read.parquet("/data")
df = df.filter(col("colA").isNotNull() & col("colB").isNotNull())
df = df.withColumn("colK_udf",udf_function("colK"))

df_1 = df.withColumn("newCol", when((col("colA.field") == 1) & (col("colB.field1") == 2), col("colA.field1")).otherwise("colB.field1")))\
...
df_1 = df1.select(...)

df_agg = df_1.groupby("colA","colB","colC","colD").agg(count(*).alias("numRecords"),
sort_array(collected_set("colE")).alias("colE"),
sum("colF").alias("colF"),
sum("colG").alias("colG"),
sum("colH").alias("colH"),
sum("colL").alias("colL"),
min("colI").alias("colI"),
max("colJ").alias("colJ"),
countDistinct("colE").alias("colE"),
sort_array(collected_set("colP")).alias("colP"),
sort_array(collected_set("colQ")).alias("colQ"),
max("colR").alias("colR"),
max("colS").alias("colS")
)
df_agg.count()
< /code>
Я попробовал код на меньшем данных о данных с строками всего 100 м, и он работал. Однако, когда я запустил его на DataFrame с 3B строками, я получаю ошибку ниже при выполнении последнего df_agg.count () 
.

Код: Выделить всё

ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor 1 on 2.2.2.2:
...
The API gave the following message: Pod ephemeral local storage usage exceeds the total limit of containers 50Gi.
< /code>
Я уже увеличил локальное использование POD с 30 ГБ до 50 ГБ, но я не могу увеличить его на неопределенный срок. Я буду продолжать получать это сообщение для других исполнителей, и количество неудачных задач просто продолжает расти. Когда я просто позволяю программе запускать часы, вход 
поднимался до 350 ГБ, а Shuffle write до 480 ГБ, прежде чем я его убил. Выберите , но это не решило проблему.
Что еще я могу попробовать?

Подробнее здесь: https://stackoverflow.com/questions/796 ... timization
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Оптимизация агрегаций Pyspark
    Anonymous » » в форуме Python
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Оптимизация агрегаций Pyspark
    Anonymous » » в форуме Python
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous
  • Оптимизация агрегаций окон: выталкивание выражений на элемент из окна агрегации
    Anonymous » » в форуме Python
    0 Ответы
    65 Просмотры
    Последнее сообщение Anonymous
  • Pyspark с провалами агрегаций
    Anonymous » » в форуме Python
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • Разное количество агрегаций для одного и того же значения
    Anonymous » » в форуме Elasticsearch aggregation
    0 Ответы
    944 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»