Код: Выделить всё
df_1 = df.withColumn("newCol", when((col("colA.field") == 1) & (col("colB.field1") == 2), col("colA.field1")).otherwise("colB.field1")))\
...
df_agg = df_1.groupby("colA","colB","colC","colD").agg(count(*).alias("numRecords"),
sort_array(collected_set("colE")).alias("colE"),
sum("colE").alias("colE"),
sum("colF").alias("colF"),
sum("colG").alias("colG"),
sum("colH").alias("colH"),
min("colI").alias("colI"),
max("colJ").alias("colJ"),
countDistinct("colK").alias("colK"),
first("colL").alias("colL"),
first("colM").alias("colM"),
first("colN").alias("colN"),
first("colO").alias("colO"),
sort_array(collected_set("colP")).alias("colP"),
sort_array(collected_set("colQ")).alias("colQ"),
max("colR").alias("colR"),
max("colS").alias("colS")
)
< /code>
colL
Я пробовал следующее (отдельно), чтобы просто сделать df_agg.show (10, druncate = false) , но всегда получили ошибку.
Код: Выделить всё
Job aborted due to stage failure: ShuffleMapStage 4 (showString at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException
at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException
at org.apache.spark.ShuffleBlockFetcherIterator.next
at org.apache.spark.ShuffleBlockFetcherIterator.next
at org.apache.spark.util.CompletionIterator.next
at scala.collection.Iterator$$anon$11.nextCur
at scala.collection.Iterator$$anon$11.nextNext
at scala.collection.Iterator$$anon$10.nextNext
...
at org.apache.spark.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1
at org.apache.spark.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted
...
Caused by: org.apache.spark.ExecutorDeadException: The relative remote executor(Id: 253), which maintains the block data to fetch is dead.
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
...
< /code>
[list]
[*] Запустите код pyspark на исходном раме данных, как это происходит с вложенными столбцами (паркетные файлы при 11,4 ГБ) < /li>
Уменьшите количество записей с 106 м до 99,8 м, используя df.sample (0,943) < /code>. Ранее я успешно запустил один и тот же код на другом подобном DataFrame с 99,9 -метровыми строками (но без вложенных столбцов, а паркетные файлы были на 5,8 ГБ).
[*] Удивление схемы и только выбранные соответствующие столбцы df_flat = df.select (col ("cola.field1"). /> Напишите вышеуказанный DataFrame df_flat
[/list]
I also ran df.groupBy("colA", "colB", "colC", "colD").count().orderBy(desc("count")).show(), and the largest group has 68K records, followed by 37K, 27K, 21K, 13 groups with >10K records, and many more with ~ 9K или меньше. Я думаю, что мои данные искажены? Это всего лишь небольшой тест, и мне в конечном итоге нужно будет запустить это на гораздо большем диапазоне данных, в порядке миллиардов строк.
Подробнее здесь: https://stackoverflow.com/questions/796 ... ns-failing