Ошибка при использовании Features_df.write.mode("overwrite").parquet(PATH_Result): org.apache.spark.SparkException: не уPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ошибка при использовании Features_df.write.mode("overwrite").parquet(PATH_Result): org.apache.spark.SparkException: не у

Сообщение Anonymous »

При использовании этой команды возникает ошибка:

Код: Выделить всё

features_df.write.mode("overwrite").parquet(PATH_Result)
Ошибка:

Код: Выделить всё

Py4JJavaError: An error occurred while calling o153.parquet. : org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 842, LAPTOP-4N2SL32T, executor driver):
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
...
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536)
...
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
...
----------------------------------------------------------------------
Driver stacktrace:
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
...
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536)
...
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86) ...  20 more

Мой контекст:
  • Я использую блокнот Jupyter
  • PySpark версия: 3.0.1
  • Java: JDK11
  • Python: 3.8
Я правильно инициализирую сеанс Spark:

Код: Выделить всё

import findspark
findspark.init()

import os
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-11"
os.environ["SPARK_HOME"] = r"C:\Users\xxxxx\anaconda3\envs\spark_env\Lib\site-packages\pyspark"

import sys
os.environ['PYSPARK_PYTHON'] = r"C:\Users\xxxxx\anaconda3\envs\spark_env\python"
os.environ['PYSPARK_DRIVER_PYTHON'] = r"C:\Users\xxxxx\anaconda3\envs\spark_env\python"

# Initialisation de Spark pour le traitement local.
spark = (SparkSession
.builder
.appName('app')
.master('local') # Utilise tous les cœurs disponibles
.config("spark.sql.parquet.writeLegacyFormat", 'true')
.config("spark.python.worker.timeout", "3000") # Test
.config("spark.python.worker.reuse", "true") # Test
.config("spark.sql.execution.arrow.enabled", "true") # Test
.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 2)
.config("spark.driver.memory", "8g")
.config("spark.driver.cores", 2)
.config("spark.worker.memory", "1g")
.config("spark.network.timeout", "800s")
.config("spark.executor.heartbeatInterval", "60s")
.getOrCreate()
)
И мой код:

Код: Выделить всё

# Définition des chemins.
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/Results'
print('PATH:        '+\
PATH+'\nPATH_Data:   '+\
PATH_Data+'\nPATH_Result: '+PATH_Result)

sc = spark.sparkContext
# Chargement des images en local en tant que fichiers binaires.
images = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.jpg") \
.option("recursiveFileLookup", "true") \
.load(PATH_Data)

# Extraction du label de l'image à partir du chemin.
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
# Chargement et préparation de MobileNetV2 pour le transfert d'apprentissage.
model = MobileNetV2(weights='imagenet',
include_top=True,
input_shape=(224, 224, 3))
new_model = Model(inputs=model.input,
outputs=model.layers[-2].output)

# Diffusion des poids.
brodcast_weights = sc.broadcast(new_model.get_weights())

# Renvoie un modèle MobileNetV2 avec la dernière couche supprimée et les poids diffusés.
def model_fn():
"""
Returns a MobileNetV2 model with top layer removed
and broadcasted pretrained weights.
"""
model = MobileNetV2(weights='imagenet',
include_top=True,
input_shape=(224, 224, 3))
for layer in model.layers:
layer.trainable = False
new_model = Model(inputs=model.input,
outputs=model.layers[-2].output)
new_model.set_weights(brodcast_weights.value)
return new_model

# Prétraite les images pour le modèle de prédiction.
def preprocess(content):
"""
Preprocesses raw image bytes for prediction.
"""
img = Image.open(io.BytesIO(content)).resize([224, 224])
arr = img_to_array(img)
return preprocess_input(arr)

# Extrait un vecteur de caractéristiques pour chaque image de la série.
def featurize_series(model, content_series):
"""
Featurize a pd.Series of raw images using the input model.
:return:  a pd.Series of image features
"""
input = np.stack(content_series.map(preprocess))
preds = model.predict(input)
# For some layers, output features will be multi-dimensional tensors.
# We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
output = [p.flatten() for p in preds]
return pd.Series(output)

# Pandas UDF pour featuriser des images en lot avec Spark.
@pandas_udf('array', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
'''
This method is a Scalar Iterator pandas UDF wrapping our featurization function.
The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

:param content_series_iter: This argument is an iterator over batches of data, where each batch
is a pandas Series of image data.
'''
# With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
# for multiple data batches.  This amortizes the overhead of loading big models.
model = model_fn()
for content_series in content_series_iter:
yield featurize_series(model, content_series)

# Transformation d'un DataFrame d'images en un DataFrame qui contient les caractéristiques (features) extraites des images.
features_df = images.repartition(5).select(col("path"),
col("label"),
featurize_udf("content").alias("features")
)
# Ecriture en Parquet.
features_df.write.mode("overwrite").parquet(PATH_Result)
Большое спасибо за помощь :) !
Я пытаюсь настроить параметры сеанса Spark (память драйвера, рабочая память, ...), я проверяю, что могу писать в PATH_Results и т. д... но не понимаю, в чем проблема.
Когда я запускаю команду "features_df.write.mode("overwrite").parquet(PATH_Result)" , я вижу, что это временное хранилище создается в PATH_Results, но удаляется при появлении ошибки.
Ищу уже 1 неделю...
Большое спасибо :)

Подробнее здесь: https://stackoverflow.com/questions/791 ... ult-org-ap
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»