Ошибка Spark EOF (чтение паркета из S3) — преобразование Spark в PandasPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ошибка Spark EOF (чтение паркета из S3) — преобразование Spark в Pandas

Сообщение Anonymous »

Я считываю около 1 миллиона строк, хранящихся в S3 в виде файлов паркета, в кадр данных (данные размером 900 МБ в корзине). Фильтрация фрейма данных на основе значений и последующее преобразование в фрейм данных Pandas. Здесь задействованы две пользовательские функции (classify и TransformDate). Я получаю эту ошибку при запуске этого фрагмента кода. Что не так с этим кодом? Мне не хватает какой-то настройки искры или это неправильное использование UDF?
Фрагмент кода ниже:

Код: Выделить всё

#Import headers skipped for simplicity

findspark.init()

#Simple Function to classify values
def classify(value):
if float(value) < 0.0 or float(value) >= 10.0:
return -1
return int(float(value) * 2) + 1

#Simple Function to transform a string to date object
def transformDate(dateStr):
date_format = '%d-%b-%Y:%H:%M:%S %Z'
datetime_obj = datetime.datetime.strptime("{} {}".format(dateStr, 'UTC'), date_format)
return datetime_obj

def read_from_s3():
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4')
conf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('spark.hadoop.fs.s3a.connection.maximum', 100)
conf.set('fs.s3a.threads.max', 50)
conf.set('spark.default.parallelism', 2048)
conf.set('spark.sql.shuffle.partitions',4096)
conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
conf.set('spark.sql.streaming.schemaInference','true')

conf.set('spark.rpc.message.maxSize', '1024')
conf.set('spark.executor.memory', '32g')
conf.set('spark.shuffle.file.buffer', '64k')
conf.set('spark.eventLog.buffer.kb', '200k')
conf.set('spark.executor.cores', '8')
conf.set('spark.cores.max', '8')
conf.set('spark.driver.memory', '32g')
conf.set('spark.driver.maxResultSize', '21G')
conf.set('spark.worker.cleanup.enabled', True)
conf.set('spark.executor.heartbeatInterval', '43200s')
conf.set('spark.network.timeout', '3000000s')

conf.set('spark.hadoop.fs.s3a.access.key', '')
conf.set('spark.hadoop.fs.s3a.secret.key', '')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel('DEBUG')
classify_udf = udf(lambda x:classify(x),IntegerType())
transform_date_udf = udf(lambda x:transformDate(x),TimestampType())
paths = 's3a:///'
df = spark.read.parquet(paths,
inferSchema=True)
#Encode a col -colA which is in string format and create an integer representation for it
df = df.withColumn('colA_encoded',classify_udf('colA'))
#Encode a date string -str_date which is in string format and create timestamp with timezone
df = df.withColumn('date_transformed',transform_date_udf('str_date'))
return df

#Filter rows which have colA > 3.0 and retain all rows where colZ is present in this filtered data set (colA > 3.0)
#Basically keep all rows whose colZ has atleast one row with colA  > 3.0
def do_filter(df: DataFrame):
df = df.fillna({'colA':'0'})
df= df.withColumn('colA_float',df['colA'].cast("float").alias('colA_float'))
df2 = (df.filter(col('colA_float') >  3.0).select('colZ').distinct())
df=df.join(df2, 'colZ')
return df

#Function to group the dataframe by colZ and create index over colA
def do_proper_group(df: DataFrame):
window = Window.partitionBy('colZ').orderBy('colA')
df =df.select('*', dense_rank().over(window).alias('myIdx'))
return df

def plot_data_frame(df: DataFrame =None) :
fig, ax = plt.subplots()
print(df.count())
pandas_df = df.toPandas()
print(pandas_df.columns)
pandas_df.set_index('myIdx', inplace=True)
pandas_df.groupby('colZ')['colA_float'].plot(legend=True,x='myIdx', xlabel="My Group Number", ylabel="Value")
plt.show()

if __name__ == '__main__':
start = time.time()
print(start)
plot_data_frame(do_proper_group(do_filter(read_from_s3())))
end = time.time()
print(end - start)
Журнал ошибок ниже:

Код: Выделить всё

24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
raise EOFError
EOFError

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: read
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException:  Traceback (most recent call last):
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
raise EOFError
EOFError

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: lazySeek

Откуда возникает эта ошибка EOF? Всего в корзине S3 760 файлов общим размером 900 МБ. Все файлы паркета.
Я не уверен, связана ли проблема с конфигурацией Spark или с тем, как я фильтрую dataframe и присоединитесь к нему.
Примечание. Он работает в течение 5 часов, прежде чем наконец выдаст эту ошибку EOF. Он отлично работает с небольшими наборами данных. Но 900 МБ тоже мало.
Любая помощь приветствуется.

Подробнее здесь: https://stackoverflow.com/questions/787 ... conversion
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»