Я должен отметить, что мне удалось загрузить файлы (паркет) в это озеро данных, а также я могу написать дельта-формат в моя локальная машина, но когда я пытаюсь записать дельта-формат в S3, я получаю ошибку.
Итак, мой код следующий
Код: Выделить всё
import findspark
from pyspark.sql import SparkSession
import pandas as pd
findspark.find()
findspark.init()
import boto3
# Initialize a session using the AWS SDK for Python (boto3)
session = boto3.Session(profile_name='default')
# Get the AWS credentials
credentials = session.get_credentials()
spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
.config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
.config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6,io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.legacy.parquet.int96RebaseModeInWrite","CORRECTED") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.getOrCreate()
df = ... # some dataframe
df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").option("mergeSchema", "true").save('s3a://bucket/dts/deltatable/')
Код: Выделить всё
"name": "Py4JJavaError",
"message": "An error occurred while calling o116.save.\n: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 172) (host.docker.internal executor driver): java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'\r\n\tat org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404).....
Но если я изменю свою конфигурацию в искре на эту, у меня получится написать паркет в S3:
Код: Выделить всё
spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
.config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
.config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6") \
.getOrCreate()
Код: Выделить всё
Py4JJavaError: An error occurred while calling o130.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 177) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)

Я понимаю, что моя конфигурация неправильная, но не знаю почему.
Кроме того, я использую Spark 3.4.1, Scala 2.12, Delta-Spark 2.4.0
Подробнее здесь: https://stackoverflow.com/questions/771 ... -in-aws-s3