Запись формата Delta в Data Lake в AWS S3Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Запись формата Delta в Data Lake в AWS S3

Сообщение Anonymous »

Моя цель — загрузить данные в дельта-формате в озеро данных AWS S3.
Я должен отметить, что мне удалось загрузить файлы (паркет) в это озеро данных, а также я могу написать дельта-формат в моя локальная машина, но когда я пытаюсь записать дельта-формат в S3, я получаю ошибку.
Итак, мой код следующий

Код: Выделить всё

import findspark
from pyspark.sql import SparkSession
import pandas as pd

findspark.find()
findspark.init()

import boto3

# Initialize a session using the AWS SDK for Python (boto3)
session = boto3.Session(profile_name='default')

# Get the AWS credentials
credentials = session.get_credentials()

spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
.config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
.config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6,io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.legacy.parquet.int96RebaseModeInWrite","CORRECTED") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.getOrCreate()

df = ...  # some dataframe

df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").option("mergeSchema", "true").save('s3a://bucket/dts/deltatable/')
У меня такая ошибка:

Код: Выделить всё

"name": "Py4JJavaError",
"message": "An error occurred while calling o116.save.\n: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 172) (host.docker.internal executor driver): java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'\r\n\tat org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404).....
Кроме того, я заметил, что с этой конфигурацией я больше не могу записывать файлы паркета в S3 (вопреки тому, что я говорил раньше), но подтверждаю, что могу писать дельта-формат в свой локальная машина.
Но если я изменю свою конфигурацию в искре на эту, у меня получится написать паркет в S3:

Код: Выделить всё

spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
.config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
.config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6") \
.getOrCreate()
Кстати, ошибка, связанная с этой попыткой паркета с дельта-конфигурацией, заключается в следующем:

Код: Выделить всё

Py4JJavaError: An error occurred while calling o130.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 177) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
Еще одна вещь, которая может помочь, это то, что если я проверю S3, я нашел папку паркета, которую пытались загрузить, но внутри нет никакого файла паркета из-за упомянутой мной ошибки. ранее
Изображение

Я понимаю, что моя конфигурация неправильная, но не знаю почему.
Кроме того, я использую Spark 3.4.1, Scala 2.12, Delta-Spark 2.4.0

Подробнее здесь: https://stackoverflow.com/questions/771 ... -in-aws-s3
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Подключение и аутентификация к Delta Lake в Azure Data Lake Storage Gen 2 с использованием API Python delta-rs
    Anonymous » » в форуме Python
    0 Ответы
    36 Просмотры
    Последнее сообщение Anonymous
  • Azure Data Lake Gen 2 и Python копируют файлы в папках Data Lake
    Anonymous » » в форуме Python
    0 Ответы
    77 Просмотры
    Последнее сообщение Anonymous
  • Delta Lake для настройки ноутбуков AWS Glue
    Anonymous » » в форуме Python
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Как создать дельта-таблицу с помощью автономного озера Delta Lake и записать данные
    Anonymous » » в форуме JAVA
    0 Ответы
    20 Просмотры
    Последнее сообщение Anonymous
  • Как упаковать сценарий Pyspark + Delta Lake в Exe с Pyinstaller
    Anonymous » » в форуме Python
    0 Ответы
    0 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»