Перенести данные csv/parquet из одной корзины s3 в другую корзину s3 с помощью pysparkPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Перенести данные csv/parquet из одной корзины s3 в другую корзину s3 с помощью pyspark

Сообщение Anonymous »

Я использую кластер Spark, который состоит из компьютеров ec2, и теперь с помощью pyspark я хочу перенести данные из исходного сегмента s3 в целевой сегмент в формате паркета. Оба сегмента имеют разные роли IAM и политики сегмента. Я устанавливаю ключ доступа и секретный ключ spark aws на уровне Hadoop, используя этот код def set_s3_credentials(access_key, secret_key):

Код: Выделить всё

    hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", access_key)
hadoop_conf.set("fs.s3a.secret.key", secret_key)  but i am getting this ERROR - Error occurred: An error occurred while calling o56.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 8 times, most recent failure: Lost task 0.7 in stage 1.0 (TID 8) (172.12.2.153 executor 0): java.nio.file.AccessDeniedException: s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293-9f504b1d-da56-4031-963b-be9f22348eb4-141340-1.c000.snappy.parquet: getFileStatus on s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293-9f504b1d-da56-4031-963b-be9f22348eb4-141340-1.c000.snappy.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 6C07ME6ZAV2B4XSA; S3 Extended Request ID: Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAIkly0mBCmB6rehqSuat0RlR0WHjPQlFkQQ==; Proxy: null), S3 Extended Request ID: Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAIkly0mBCmB6rehqSuat0RlR0WHjPQlFkQQ==:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255).some times i am getting 056.parquet in place of 056.showString. In my code first seting credentials for source bucket by using above funciton and then reading my data from source bucket using spark.read.parquet() and successfully able to read it in dataframe and able to use aggregate function like df.count() or able to see it using df.show(). but after that i am trying to change the credentials for destination bucket using the above function but after changing those credentails when i am trying to write or use df.show() the data it is showing me the exception.
.....

Код: Выделить всё

def copy_parquet_file():
try:
# Set and log source AWS credentials
set_s3_credentials(source_aws_access_key_id, source_aws_secret_access_key)
logging.info(f"Set source AWS credentials for bucket: '{source_bucket_name}'")
# update_spark_conf(source_s3_conf)

logging.info(f"Starting to copy file from bucket '{source_bucket_name}' key '{source_key}' to bucket '{destination_bucket_name}' key '{destination_key}'")
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
logging.info(f'Source bucket access_key: {hadoop_conf.get("fs.s3a.access.key")}')
logging.info(f'Source bucket secret_key: {hadoop_conf.get("fs.s3a.secret.key")}')

# Read parquet file from source bucket using PySpark
source_parquet_path = f"s3a://{source_bucket_name}/{source_key}"
df = spark.read.parquet(source_parquet_path, header=True, multiLine=True, quote="\"", escape="\"")
df = df.limit(100)
# df.printSchema()
logging.info("Read source parquet file successfully.")

# Set and log destination AWS credentials
set_s3_credentials(dest_aws_access_key_id, dest_aws_secret_access_key)
# update_spark_conf(destination_s3_conf)
logging.info(f"Set destination AWS credentials for bucket: '{destination_bucket_name}'")

logging.info(f'Destination bucket access_key: {hadoop_conf.get("fs.s3a.access.key")}')
logging.info(f'Destination bucket secret_key: {hadoop_conf.get("fs.s3a.secret.key")}')

# Write dataframe to destination bucket using PySpark
destination_parquet_path = f"s3a://{destination_bucket_name}/{destination_key}"
time.sleep(10)
df.show()
df.write.parquet(destination_parquet_path, mode='overwrite')
logging.info(f'Copied {source_key} to {destination_key} successfully.')

except Exception as e:
logging.error(f"Error occurred: {e}") here is my code. Please help me to resolve this issue
Я хочу успешно перенести свои данные без таких исключений.

Подробнее здесь: https://stackoverflow.com/questions/787 ... he-help-of
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»