Я считываю около 1 миллиона строк, хранящихся в S3 в виде файлов паркета, в кадр данных (данные размером 900 МБ в корзине). Фильтрация фрейма данных на основе значений и последующее преобразование в фрейм данных Pandas. Здесь задействованы две пользовательские функции (classify и TransformDate). Я получаю эту ошибку при запуске этого фрагмента кода. Что не так с этим кодом? Мне не хватает какой-то настройки искры или это неправильное использование UDF?
Фрагмент кода ниже:
#Import headers skipped for simplicity
findspark.init()
#Simple Function to classify values
def classify(value):
if float(value) < 0.0 or float(value) >= 10.0:
return -1
return int(float(value) * 2) + 1
#Simple Function to transform a string to date object
def transformDate(dateStr):
date_format = '%d-%b-%Y:%H:%M:%S %Z'
datetime_obj = datetime.datetime.strptime("{} {}".format(dateStr, 'UTC'), date_format)
return datetime_obj
def read_from_s3():
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4')
conf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('spark.hadoop.fs.s3a.connection.maximum', 100)
conf.set('fs.s3a.threads.max', 50)
conf.set('spark.default.parallelism', 2048)
conf.set('spark.sql.shuffle.partitions',4096)
conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
conf.set('spark.sql.streaming.schemaInference','true')
conf.set('spark.rpc.message.maxSize', '1024')
conf.set('spark.executor.memory', '32g')
conf.set('spark.shuffle.file.buffer', '64k')
conf.set('spark.eventLog.buffer.kb', '200k')
conf.set('spark.executor.cores', '8')
conf.set('spark.cores.max', '8')
conf.set('spark.driver.memory', '32g')
conf.set('spark.driver.maxResultSize', '21G')
conf.set('spark.worker.cleanup.enabled', True)
conf.set('spark.executor.heartbeatInterval', '43200s')
conf.set('spark.network.timeout', '3000000s')
conf.set('spark.hadoop.fs.s3a.access.key', '')
conf.set('spark.hadoop.fs.s3a.secret.key', '')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel('DEBUG')
classify_udf = udf(lambda x:classify(x),IntegerType())
transform_date_udf = udf(lambda x:transformDate(x),TimestampType())
paths = 's3a:///'
df = spark.read.parquet(paths,
inferSchema=True)
#Encode a col -colA which is in string format and create an integer representation for it
df = df.withColumn('colA_encoded',classify_udf('colA'))
#Encode a date string -str_date which is in string format and create timestamp with timezone
df = df.withColumn('date_transformed',transform_date_udf('str_date'))
return df
#Filter rows which have colA > 3.0 and retain all rows where colZ is present in this filtered data set (colA > 3.0)
#Basically keep all rows whose colZ has atleast one row with colA > 3.0
def do_filter(df: DataFrame):
df = df.fillna({'colA':'0'})
df= df.withColumn('colA_float',df['colA'].cast("float").alias('colA_float'))
df2 = (df.filter(col('colA_float') > 3.0).select('colZ').distinct())
df=df.join(df2, 'colZ')
return df
#Function to group the dataframe by colZ and create index over colA
def do_proper_group(df: DataFrame):
window = Window.partitionBy('colZ').orderBy('colA')
df =df.select('*', dense_rank().over(window).alias('myIdx'))
return df
def plot_data_frame(df: DataFrame =None) :
fig, ax = plt.subplots()
print(df.count())
pandas_df = df.toPandas()
print(pandas_df.columns)
pandas_df.set_index('myIdx', inplace=True)
pandas_df.groupby('colZ')['colA_float'].plot(legend=True,x='myIdx', xlabel="My Group Number", ylabel="Value")
plt.show()
if __name__ == '__main__':
start = time.time()
print(start)
plot_data_frame(do_proper_group(do_filter(read_from_s3())))
end = time.time()
print(end - start)
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: read
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: lazySeek
Откуда возникает эта ошибка EOF? Всего в корзине S3 760 файлов общим размером 900 МБ. Все файлы паркета.
Я не уверен, связана ли проблема с конфигурацией Spark или с тем, как я фильтрую dataframe и присоединитесь к нему.
Примечание. Он работает в течение 5 часов, прежде чем наконец выдаст эту ошибку EOF. Он отлично работает с небольшими наборами данных. Но 900 МБ тоже мало.
Любая помощь приветствуется.
Я считываю около 1 миллиона строк, хранящихся в S3 в виде файлов паркета, в кадр данных (данные размером 900 МБ в корзине). Фильтрация фрейма данных на основе значений и последующее преобразование в фрейм данных Pandas. Здесь задействованы две пользовательские функции (classify и TransformDate). Я получаю эту ошибку при запуске этого фрагмента кода. Что не так с этим кодом? Мне не хватает какой-то настройки искры или это неправильное использование UDF? Фрагмент кода ниже: [code]#Import headers skipped for simplicity
findspark.init()
#Simple Function to classify values def classify(value): if float(value) < 0.0 or float(value) >= 10.0: return -1 return int(float(value) * 2) + 1
#Simple Function to transform a string to date object def transformDate(dateStr): date_format = '%d-%b-%Y:%H:%M:%S %Z' datetime_obj = datetime.datetime.strptime("{} {}".format(dateStr, 'UTC'), date_format) return datetime_obj
conf.set('spark.hadoop.fs.s3a.access.key', '') conf.set('spark.hadoop.fs.s3a.secret.key', '') spark = SparkSession.builder.config(conf=conf).getOrCreate() spark.sparkContext.setLogLevel('DEBUG') classify_udf = udf(lambda x:classify(x),IntegerType()) transform_date_udf = udf(lambda x:transformDate(x),TimestampType()) paths = 's3a:///' df = spark.read.parquet(paths, inferSchema=True) #Encode a col -colA which is in string format and create an integer representation for it df = df.withColumn('colA_encoded',classify_udf('colA')) #Encode a date string -str_date which is in string format and create timestamp with timezone df = df.withColumn('date_transformed',transform_date_udf('str_date')) return df
#Filter rows which have colA > 3.0 and retain all rows where colZ is present in this filtered data set (colA > 3.0) #Basically keep all rows whose colZ has atleast one row with colA > 3.0 def do_filter(df: DataFrame): df = df.fillna({'colA':'0'}) df= df.withColumn('colA_float',df['colA'].cast("float").alias('colA_float')) df2 = (df.filter(col('colA_float') > 3.0).select('colZ').distinct()) df=df.join(df2, 'colZ') return df
#Function to group the dataframe by colZ and create index over colA def do_proper_group(df: DataFrame): window = Window.partitionBy('colZ').orderBy('colA') df =df.select('*', dense_rank().over(window).alias('myIdx')) return df
if __name__ == '__main__': start = time.time() print(start) plot_data_frame(do_proper_group(do_filter(read_from_s3()))) end = time.time() print(end - start) [/code] Журнал ошибок ниже: [code]24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int raise EOFError EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572) at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94) at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) 24/07/24 06:47:31 DEBUG Invoker: Starting: read 24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int raise EOFError EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572) at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94) at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) 24/07/24 06:47:31 DEBUG Invoker: Starting: lazySeek
[/code] Откуда возникает эта ошибка EOF? Всего в корзине S3 760 файлов общим размером 900 МБ. Все файлы паркета. Я не уверен, связана ли проблема с конфигурацией Spark или с тем, как я фильтрую dataframe и присоединитесь к нему. Примечание. Он работает в течение 5 часов, прежде чем наконец выдаст эту ошибку EOF. Он отлично работает с небольшими наборами данных. Но 900 МБ тоже мало. Любая помощь приветствуется.
Я только что нашел комментарий в этом ответе, в котором говорится, что использование iostream::eof в условии цикла «почти наверняка неправильно». Обычно я использую что-то вроде while(cin>>n), которое, я думаю, неявно проверяет наличие EOF.
Я использую Parquet.Net (4.23.5) для записи файла паркета. Я обнаружил, что когда я хочу записать нулевое значение в столбце данных, сгенерированный файл паркета становится нечитаемым.
Итак, что я делаю неправильно
Это это простой код для его...
Я создал файл паркета, используя метод .write_parquet Python Polars. Python может без проблем прочитать его обратно, и MATLAB также может без проблем прочитать информацию о файле с помощью parquetinfo.
Однако, когда я запускаю parquetread в MATLAB...
Я использую Parquet.Net (4.23.5) для записи файла паркета. Я обнаружил, что когда я хочу записать нулевое значение в столбце данных, сгенерированный файл паркета становится нечитаемым.
Итак, что я делаю неправильно
Это это простой код для его...
Я экспортировал файл паркета с помощью parquet.net, который включает столбец продолжительности, содержащий значения, превышающие 24 часа. Я открыл этот инструмент с помощью инструмента для пола, который включен в состав parquet.net, и столбец имеет...