Ошибка при использовании Features_df.write.mode("overwrite").parquet(PATH_Result): org.apache.spark.SparkException: не уPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ошибка при использовании Features_df.write.mode("overwrite").parquet(PATH_Result): org.apache.spark.SparkException: не у

Сообщение Anonymous »

При использовании этой команды возникает ошибка:

Код: Выделить всё

features_df.write.mode("overwrite").parquet(PATH_Result)
Ошибка:

Код: Выделить всё

Py4JJavaError: An error occurred while calling o153.parquet. : org.apache.spark.SparkException: Job aborted.  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:848) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 842, LAPTOP-4N2SL32T, executor driver): at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.SparkException:  Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) ... 9 more Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86) ... 20 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195) ... 33 more Caused by: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ...  1 more Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) ... 9 more Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86) ...  20 more
Мой контекст:
  • Я использую блокнот Jupyter
  • PySpark версия: 3.0.1
  • Java: JDK11
  • Python: 3.8
Я правильно инициализирую сеанс Spark:

Код: Выделить всё

import findspark
findspark.init()

import os
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-11"
os.environ["SPARK_HOME"] = r"C:\Users\xxxxx\anaconda3\envs\spark_env\Lib\site-packages\pyspark"

import sys
os.environ['PYSPARK_PYTHON'] = r"C:\Users\xxxxx\anaconda3\envs\spark_env\python"
os.environ['PYSPARK_DRIVER_PYTHON'] = r"C:\Users\xxxxx\anaconda3\envs\spark_env\python"

# Initialisation de Spark pour le traitement local.
spark = (SparkSession
.builder
.appName('app')
.master('local') # Utilise tous les cœurs disponibles
.config("spark.sql.parquet.writeLegacyFormat", 'true')
.config("spark.python.worker.timeout", "3000") # Test
.config("spark.python.worker.reuse", "true") # Test
.config("spark.sql.execution.arrow.enabled", "true") # Test
.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 2)
.config("spark.driver.memory", "8g")
.config("spark.driver.cores", 2)
.config("spark.worker.memory", "1g")
.config("spark.network.timeout", "800s")
.config("spark.executor.heartbeatInterval", "60s")
.getOrCreate()
)
И мой код:

Код: Выделить всё

# Définition des chemins.
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/Results'
print('PATH:        '+\
PATH+'\nPATH_Data:   '+\
PATH_Data+'\nPATH_Result: '+PATH_Result)

sc = spark.sparkContext
# Chargement des images en local en tant que fichiers binaires.
images = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.jpg") \
.option("recursiveFileLookup", "true") \
.load(PATH_Data)

# Extraction du label de l'image à partir du chemin.
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
# Chargement et préparation de MobileNetV2 pour le transfert d'apprentissage.
model = MobileNetV2(weights='imagenet',
include_top=True,
input_shape=(224, 224, 3))
new_model = Model(inputs=model.input,
outputs=model.layers[-2].output)

# Diffusion des poids.
brodcast_weights = sc.broadcast(new_model.get_weights())

# Renvoie un modèle MobileNetV2 avec la dernière couche supprimée et les poids diffusés.
def model_fn():
"""
Returns a MobileNetV2 model with top layer removed
and broadcasted pretrained weights.
"""
model = MobileNetV2(weights='imagenet',
include_top=True,
input_shape=(224, 224, 3))
for layer in model.layers:
layer.trainable = False
new_model = Model(inputs=model.input,
outputs=model.layers[-2].output)
new_model.set_weights(brodcast_weights.value)
return new_model

# Prétraite les images pour le modèle de prédiction.
def preprocess(content):
"""
Preprocesses raw image bytes for prediction.
"""
img = Image.open(io.BytesIO(content)).resize([224, 224])
arr = img_to_array(img)
return preprocess_input(arr)

# Extrait un vecteur de caractéristiques pour chaque image de la série.
def featurize_series(model, content_series):
"""
Featurize a pd.Series of raw images using the input model.
:return:  a pd.Series of image features
"""
input = np.stack(content_series.map(preprocess))
preds = model.predict(input)
# For some layers, output features will be multi-dimensional tensors.
# We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
output = [p.flatten() for p in preds]
return pd.Series(output)

# Pandas UDF pour featuriser des images en lot avec Spark.
@pandas_udf('array', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
'''
This method is a Scalar Iterator pandas UDF wrapping our featurization function.
The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

:param content_series_iter: This argument is an iterator over batches of data, where each batch
is a pandas Series of image data.
'''
# With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
# for multiple data batches.  This amortizes the overhead of loading big models.
model = model_fn()
for content_series in content_series_iter:
yield featurize_series(model, content_series)

# Transformation d'un DataFrame d'images en un DataFrame qui contient les caractéristiques (features) extraites des images.
features_df = images.repartition(5).select(col("path"),
col("label"),
featurize_udf("content").alias("features")
)
# Ecriture en Parquet.
features_df.write.mode("overwrite").parquet(PATH_Result)
Большое спасибо за помощь :) !
Я пытаюсь настроить параметры сеанса Spark (память драйвера, рабочая память, ...), я проверяю, что могу писать в PATH_Results и т. д... но не понимаю, в чем проблема.
Когда я запускаю команду "features_df.write.mode("overwrite").parquet(PATH_Result)" , я вижу, что в PATH_Results создается временный репозиторий, но он удаляется при появлении ошибки.
Я ищу уже 1 неделю...
Большое спасибо :)

Подробнее здесь: https://stackoverflow.com/questions/791 ... ult-org-ap
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»