При записи файла в хранилище BLOB-объектов Azure из синапса выдается TASK_WRITE_FAILED.Python

Программы на Python
Ответить
Anonymous
 При записи файла в хранилище BLOB-объектов Azure из синапса выдается TASK_WRITE_FAILED.

Сообщение Anonymous »

Я создаю несколько файлов паркета, и мне нужно сохранить их в хранилище BLOB-объектов Azure. Для этого вопроса я воссоздал ошибку при попытке записать CSV-файл из фрейма данных.
Я работаю над этим сценарием в записной книжке Azure Synapse. Чтение и обновление файлов работает нормально. Чтение аналогично записи.
В какой-то момент я использовал pandas, и все работало нормально, однако у меня возникли проблемы с возможным изменением схемы файла паркета, поэтому я вернулся к PySpark. .
Очевидно, проблема может быть в разрешениях, поэтому я включаю соответствующую часть токена sas.
Написание кода
сильный> питон
from pyspark.sql import SparkSession

# Azure Storage configurations
storage_account_name = ""
container_name = ""
sas_token = "ss=bfqt&srt=sco&sp=rwdlacupiytfx"

# Input and output paths
input_folder = "input"
output_folder = "output"

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Spark configuration for Azure Blob Storage
spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net", sas_token)

# Example DataFrame
test_df = spark.createDataFrame([(1, "test"), (2, "data")], ["id", "value"])
print("SCRIPT: data frame created")

# Write DataFrame to Azure Blob Storage
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{output_folder}/test.csv"
print(f"SCRIPT: output path created: {output_path}")

print("SCRIPT: Writing data frame to the output path...")
# test_df.write.parquet(output_path).mode("overwrite")
test_df.write.mode("overwrite").csv(output_path)

print(f"SCRIPT: Data written successfully to {output_path}")

Код аварийно завершает работу на test_df.write.mode("overwrite").csv(output_path)
Traceback< /strong>
> --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call
> last) Cell In[23], line 31
> 29 print("SCRIPT: Writing data frame to the output path...")
> 30 # test_df.write.parquet(output_path).mode("overwrite")
> ---> 31 test_df.write.mode("overwrite").csv(output_path)
> 33 print(f"SCRIPT: Data written successfully to {output_path}")
>
> File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1799,
> in DataFrameWriter.csv(self, path, mode, compression, sep, quote,
> escape, header, nullValue, escapeQuotes, quoteAll, dateFormat,
> timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace,
> charToEscapeQuoteEscaping, encoding, emptyValue, lineSep) 1780
> self.mode(mode) 1781 self._set_opts( 1782
> compression=compression, 1783 sep=sep, (...) 1797
> lineSep=lineSep, 1798 )
> -> 1799 self._jwrite.csv(path)
>
> File
> ~/cluster-env/env/lib/python3.10/site-packages/py4j/java_gateway.py:1322,
> in JavaMember.__call__(self, *args) 1316 command =
> proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318
> args_command +\ 1319 proto.END_COMMAND_PART 1321 answer =
> self.gateway_client.send_command(command)
> -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg
> in temp_args: 1326 if hasattr(temp_arg, "_detach"):
>
> File
> /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:169,
> in capture_sql_exception..deco(*a, **kw)
> 167 def deco(*a: Any, **kw: Any) -> Any:
> 168 try:
> --> 169 return f(*a, **kw)
> 170 except Py4JJavaError as e:
> 171 converted = convert_exception(e.java_exception)
>
> File
> ~/cluster-env/env/lib/python3.10/site-packages/py4j/protocol.py:326,
> in get_return_value(answer, gateway_client, target_id, name)
> 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> 325 if answer[1] == REFERENCE_TYPE:
> --> 326 raise Py4JJavaError(
> 327 "An error occurred while calling {0}{1}{2}.\n".
> 328 format(target_id, ".", name), value)
> 329 else:
> 330 raise Py4JError(
> 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
> 332 format(target_id, ".", name, value))
>
> Py4JJavaError: An error occurred while calling o6862.csv.

Исключение

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 33) (vm-8c653250 executor 1): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to wasbs://datawarehouse@devdwhdatalakepoc.blob.core.windows.net/tuum-ricker-dev-sandbox/output/test.csv/.spark-staging-198d0508-1991-420e-be6e-3efb60313af2.
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:472)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:120)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: Error closing the output.
at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:1000)
at org.apache.spark.sql.catalyst.csv.UnivocityGenerator.close(UnivocityGenerator.scala:124)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CsvOutputWriter.scala:48)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseCurrentWriter(FileFormatDataWriter.scala:73)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:87)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:143)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:456)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:462)
... 15 more
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: This request is not authorized to perform this operation.
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2898)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2764)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:1245)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:1111)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:341)
at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:161)
at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255)
at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:996)
... 23 more
Caused by: com.microsoft.azure.storage.StorageException: This request is not authorized to perform this operation.
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:735)
at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:691)
at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:434)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2837)
... 32 more


Подробнее здесь: https://stackoverflow.com/questions/793 ... ite-failed
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»