Py4JJavaError: An error occurred while calling o153.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:848) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 842, LAPTOP-4N2SL32T, executor driver): at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) ... 9 more Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86) ... 20 more
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195) ... 33 more Caused by: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) ... 9 more Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86) ... 20 more
# Définition des chemins.
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/Results'
print('PATH: '+\
PATH+'\nPATH_Data: '+\
PATH_Data+'\nPATH_Result: '+PATH_Result)
sc = spark.sparkContext
# Chargement des images en local en tant que fichiers binaires.
images = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.jpg") \
.option("recursiveFileLookup", "true") \
.load(PATH_Data)
# Extraction du label de l'image à partir du chemin.
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
# Chargement et préparation de MobileNetV2 pour le transfert d'apprentissage.
model = MobileNetV2(weights='imagenet',
include_top=True,
input_shape=(224, 224, 3))
new_model = Model(inputs=model.input,
outputs=model.layers[-2].output)
# Diffusion des poids.
brodcast_weights = sc.broadcast(new_model.get_weights())
# Renvoie un modèle MobileNetV2 avec la dernière couche supprimée et les poids diffusés.
def model_fn():
"""
Returns a MobileNetV2 model with top layer removed
and broadcasted pretrained weights.
"""
model = MobileNetV2(weights='imagenet',
include_top=True,
input_shape=(224, 224, 3))
for layer in model.layers:
layer.trainable = False
new_model = Model(inputs=model.input,
outputs=model.layers[-2].output)
new_model.set_weights(brodcast_weights.value)
return new_model
# Prétraite les images pour le modèle de prédiction.
def preprocess(content):
"""
Preprocesses raw image bytes for prediction.
"""
img = Image.open(io.BytesIO(content)).resize([224, 224])
arr = img_to_array(img)
return preprocess_input(arr)
# Extrait un vecteur de caractéristiques pour chaque image de la série.
def featurize_series(model, content_series):
"""
Featurize a pd.Series of raw images using the input model.
:return: a pd.Series of image features
"""
input = np.stack(content_series.map(preprocess))
preds = model.predict(input)
# For some layers, output features will be multi-dimensional tensors.
# We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
output = [p.flatten() for p in preds]
return pd.Series(output)
# Pandas UDF pour featuriser des images en lot avec Spark.
@pandas_udf('array', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
'''
This method is a Scalar Iterator pandas UDF wrapping our featurization function.
The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
:param content_series_iter: This argument is an iterator over batches of data, where each batch
is a pandas Series of image data.
'''
# With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
# for multiple data batches. This amortizes the overhead of loading big models.
model = model_fn()
for content_series in content_series_iter:
yield featurize_series(model, content_series)
# Transformation d'un DataFrame d'images en un DataFrame qui contient les caractéristiques (features) extraites des images.
features_df = images.repartition(5).select(col("path"),
col("label"),
featurize_udf("content").alias("features")
)
# Ecriture en Parquet.
features_df.write.mode("overwrite").parquet(PATH_Result)
Большое спасибо за помощь !
Я пытаюсь настроить параметры сеанса Spark (память драйвера, рабочая память, ...), я проверяю, что могу писать в PATH_Results и т. д... но не понимаю, в чем проблема.
Когда я запускаю команду "features_df.write.mode("overwrite").parquet(PATH_Result)" , я вижу, что в PATH_Results создается временный репозиторий, но он удаляется при появлении ошибки.
Я ищу уже 1 неделю...
Большое спасибо
При использовании этой команды возникает ошибка: [code]features_df.write.mode("overwrite").parquet(PATH_Result) [/code] Ошибка: [code]Py4JJavaError: An error occurred while calling o153.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:848) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 842, LAPTOP-4N2SL32T, executor driver): at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) ... 9 more Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86) ... 20 more
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195) ... 33 more Caused by: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) ... 9 more Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86) ... 20 more [/code] Мой контекст: [list] [*]Я использую блокнот Jupyter [*]PySpark версия: 3.0.1 [*]Java: JDK11 [*]Python: 3.8 [/list] Я правильно инициализирую сеанс Spark: [code]import findspark findspark.init()
import os os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-11" os.environ["SPARK_HOME"] = r"C:\Users\xxxxx\anaconda3\envs\spark_env\Lib\site-packages\pyspark"
# Initialisation de Spark pour le traitement local. spark = (SparkSession .builder .appName('app') .master('local') # Utilise tous les cœurs disponibles .config("spark.sql.parquet.writeLegacyFormat", 'true') .config("spark.python.worker.timeout", "3000") # Test .config("spark.python.worker.reuse", "true") # Test .config("spark.sql.execution.arrow.enabled", "true") # Test .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") .config("spark.executor.memory", "8g") .config("spark.executor.cores", 2) .config("spark.driver.memory", "8g") .config("spark.driver.cores", 2) .config("spark.worker.memory", "1g") .config("spark.network.timeout", "800s") .config("spark.executor.heartbeatInterval", "60s") .getOrCreate() ) [/code] И мой код: [code]# Définition des chemins. PATH = os.getcwd() PATH_Data = PATH+'/data/Test1' PATH_Result = PATH+'/data/Results' print('PATH: '+\ PATH+'\nPATH_Data: '+\ PATH_Data+'\nPATH_Result: '+PATH_Result)
sc = spark.sparkContext # Chargement des images en local en tant que fichiers binaires. images = spark.read.format("binaryFile") \ .option("pathGlobFilter", "*.jpg") \ .option("recursiveFileLookup", "true") \ .load(PATH_Data)
# Extraction du label de l'image à partir du chemin. images = images.withColumn('label', element_at(split(images['path'], '/'),-2)) # Chargement et préparation de MobileNetV2 pour le transfert d'apprentissage. model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3)) new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
# Diffusion des poids. brodcast_weights = sc.broadcast(new_model.get_weights())
# Renvoie un modèle MobileNetV2 avec la dernière couche supprimée et les poids diffusés. def model_fn(): """ Returns a MobileNetV2 model with top layer removed and broadcasted pretrained weights. """ model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3)) for layer in model.layers: layer.trainable = False new_model = Model(inputs=model.input, outputs=model.layers[-2].output) new_model.set_weights(brodcast_weights.value) return new_model
# Prétraite les images pour le modèle de prédiction. def preprocess(content): """ Preprocesses raw image bytes for prediction. """ img = Image.open(io.BytesIO(content)).resize([224, 224]) arr = img_to_array(img) return preprocess_input(arr)
# Extrait un vecteur de caractéristiques pour chaque image de la série. def featurize_series(model, content_series): """ Featurize a pd.Series of raw images using the input model. :return: a pd.Series of image features """ input = np.stack(content_series.map(preprocess)) preds = model.predict(input) # For some layers, output features will be multi-dimensional tensors. # We flatten the feature tensors to vectors for easier storage in Spark DataFrames. output = [p.flatten() for p in preds] return pd.Series(output)
# Pandas UDF pour featuriser des images en lot avec Spark. @pandas_udf('array', PandasUDFType.SCALAR_ITER) def featurize_udf(content_series_iter): ''' This method is a Scalar Iterator pandas UDF wrapping our featurization function. The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
:param content_series_iter: This argument is an iterator over batches of data, where each batch is a pandas Series of image data. ''' # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it # for multiple data batches. This amortizes the overhead of loading big models. model = model_fn() for content_series in content_series_iter: yield featurize_series(model, content_series)
# Transformation d'un DataFrame d'images en un DataFrame qui contient les caractéristiques (features) extraites des images. features_df = images.repartition(5).select(col("path"), col("label"), featurize_udf("content").alias("features") ) # Ecriture en Parquet. features_df.write.mode("overwrite").parquet(PATH_Result) [/code] Большое спасибо за помощь :) ! Я пытаюсь настроить параметры сеанса Spark (память драйвера, рабочая память, ...), я проверяю, что могу писать в PATH_Results и т. д... но не понимаю, в чем проблема. Когда я запускаю команду "features_df.write.mode("overwrite").parquet(PATH_Result)" , я вижу, что в PATH_Results создается временный репозиторий, но он удаляется при появлении ошибки. Я ищу уже 1 неделю... Большое спасибо :)
В моем проекте я использую Spark-Cassandra-Connector для чтения таблицы из Cassandra и дальнейшей обработки ее в JavaRDD, но я столкнулся с проблемой при обработке строки Cassandra в javaRDD.
org.apache.spark.SparkException: Job aborted due to...
В своем проекте я использую Spark-Cassandra-Connector для прочтения таблицы From Cassandra и обработать ее в Javardd, но я сталкиваюсь с проблемой при обработке Cassandra Row в Javardd.
org.apache.spark.SparkException: Job aborted due to stage...
В своем проекте я использую Spark-Cassandra-Connector для прочтения таблицы From Cassandra и обработать ее в Javardd, но я сталкиваюсь с проблемой при обработке Cassandra Row в Javardd.
org.apache.spark.SparkException: Job aborted due to stage...
Я использую библиотеку CSipSimple для функции вызовов VoIP в своем приложении для Android, и недавно мое приложение было отклонено в магазине Play, поскольку версия OpenSSL с этими библиотеками была старой. Мне нужно обновить файлы .so PJSIP в...