spark = SparkSession\
.builder\
.appName("App")\
.config("spark.executor.memory","10g")\
.config("spark.executor.cores","4")\
.config("spark.executor.instances","6")\
.config("spark.sql.adaptive.enabled","true")\
.config("spark.dynamicAllocation.enabled","false")\
.enableHiveSupport()\
.getOrCreate()
df = spark.read.parquet("/data")
df = df.filter(col("colA").isNotNull() & col("colB").isNotNull())
df = df.withColumn("colK_udf",udf_function("colK"))
df_1 = df.withColumn("newCol", when((col("colA.field") == 1) & (col("colB.field1") == 2), col("colA.field1")).otherwise("colB.field1")))\
...
df_1 = df1.select(...)
df_agg = df_1.groupby("colA","colB","colC","colD").agg(count(*).alias("numRecords"),
sort_array(collected_set("colE")).alias("colE"),
sum("colF").alias("colF"),
sum("colG").alias("colG"),
sum("colH").alias("colH"),
sum("colL").alias("colL"),
min("colI").alias("colI"),
max("colJ").alias("colJ"),
countDistinct("colE").alias("colE"),
sort_array(collected_set("colP")).alias("colP"),
sort_array(collected_set("colQ")).alias("colQ"),
max("colR").alias("colR"),
max("colS").alias("colS")
)
df_agg.count()
< /code>
Я попробовал код на меньшем данных о данных с строками всего 100 м, и он работал. Однако, когда я запустил его на DataFrame с 3B строками, я получаю ошибку ниже при выполнении последнего df_agg.count ()
ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor 1 on 2.2.2.2:
...
The API gave the following message: Pod ephemeral local storage usage exceeds the total limit of containers 50Gi.
< /code>
Я уже увеличил локальное использование POD с 30 ГБ до 50 ГБ, но я не могу увеличить его на неопределенный срок. Я буду получать это сообщение для других исполнителей, и количество неудачных задач просто продолжает расти. больше, чем другие группы.
Я попытался кэш
df_1 после выбора , но это не решило проблему.
Что еще я могу попробовать?
У меня огромный флажок данных с 3B строк. Я запускаю код pyspark ниже с конфигурацией Spark. < /P> [code]spark = SparkSession\ .builder\ .appName("App")\ .config("spark.executor.memory","10g")\ .config("spark.executor.cores","4")\ .config("spark.executor.instances","6")\ .config("spark.sql.adaptive.enabled","true")\ .config("spark.dynamicAllocation.enabled","false")\ .enableHiveSupport()\ .getOrCreate()
df_agg = df_1.groupby("colA","colB","colC","colD").agg(count(*).alias("numRecords"), sort_array(collected_set("colE")).alias("colE"), sum("colF").alias("colF"), sum("colG").alias("colG"), sum("colH").alias("colH"), sum("colL").alias("colL"), min("colI").alias("colI"), max("colJ").alias("colJ"), countDistinct("colE").alias("colE"), sort_array(collected_set("colP")).alias("colP"), sort_array(collected_set("colQ")).alias("colQ"), max("colR").alias("colR"), max("colS").alias("colS") ) df_agg.count() < /code> Я попробовал код на меньшем данных о данных с строками всего 100 м, и он работал. Однако, когда я запустил его на DataFrame с 3B строками, я получаю ошибку ниже при выполнении последнего df_agg.count () [/code]. [code]ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor 1 on 2.2.2.2: ... The API gave the following message: Pod ephemeral local storage usage exceeds the total limit of containers 50Gi. < /code> Я уже увеличил локальное использование POD с 30 ГБ до 50 ГБ, но я не могу увеличить его на неопределенный срок. Я буду получать это сообщение для других исполнителей, и количество неудачных задач просто продолжает расти. больше, чем другие группы. Я попытался кэш [/code] df_1 после выбора , но это не решило проблему. Что еще я могу попробовать?
У меня огромный флажок данных с 3B строк. Я запускаю код pyspark ниже с конфигурацией Spark.
spark = SparkSession\
.builder\
.appName( App )\
.config( spark.executor.memory , 10g )\
.config( spark.executor.cores , 4 )\
.config(...
У меня огромный флажок данных с 3B строк. Я запускаю код pyspark ниже с конфигурацией Spark.
spark = SparkSession\
.builder\
.appName( App )\
.config( spark.executor.memory , 10g )\
.config( spark.executor.cores , 4 )\
.config(...
Я хочу понять последствия производительности элементных преобразований на агрегации с холмистом окна. Рассмотрим следующие две версии агрегации прокатки (из плавающих значений):
i)
X = frame.rolling(index_column= date , group_by= group , period=...
У меня есть 106M DataFrame с вложенными столбцами, то есть у меня есть несколько столбцов, где значения составляют { , , 1, 2, 3} . Я пытаюсь добавить еще несколько столбцов, используя, когда , а затем сделаю агрегацию на DataFrame.
df_1 =...
В Elasticsearch возникла проблема в запросе агрегирования. Проблема в следующем:
Я запрашиваю два разных запроса в одном запросе. Первый вариант — «покажи мне количество документов для subject.label для этих конкретных значений», а второй — «покажи...