Код: Выделить всё
def set_s3_credentials(access_key, secret_key):
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", access_key)
hadoop_conf.set("fs.s3a.secret.key", secret_key)
Произошла ошибка: произошла ошибка при вызове o56.showString.
: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 1.0 завершилась сбоем 8 раз, последний сбой: потеря
задача 0,7 на этапе 1.0 (TID 8) (исполнитель 172.12.2.153) 0):
java.nio.file.AccessDeniedException:
s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293-9f504b1d-da56-4031 -963b-be9f22348eb4-141340-1.c000.snappy.parquet:
getFileStatus на
s3a://wayfinder-doceree-s3-customer-data/export/mx_submits_2024_06/part-00000-tid-1074794359726202293 -9f504b1d-da56-4031-963b-be9f22348eb4-141340-1.c000.snappy.parquet:
com.amazonaws.services.s3.model.AmazonS3Exception: запрещено (Сервис:
Amazon S3; код состояния : 403; Код ошибки: 403 Запрещено; Идентификатор запроса:
6C07ME6ZAV2B4XSA; Расширенный идентификатор запроса S3:
Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAIkly0mBCmB6rehqSuat0RlR0WHjPQ lFkQQ==;
Прокси: null), Идентификатор расширенного запроса S3:
Dx8EtSGjnYMl0Ld6kwSs9L9CMk0sdrDkzzdCSsXaG2KXk1uhC6iAikly0mBCmB6rehqSuat0RlR0WHjPQlFkQQ ==:403
Запрещено
в org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255).
Иногда я получаю 056.parquet вместо 056.showString. В моем коде я сначала устанавливаю учетные данные для исходного сегмента, используя вышеуказанную функцию, а затем читаю свои данные из исходного сегмента с помощью spark.read.parquet(), и я успешно могу прочитать их в кадре данных и использовать агрегатную функцию, например df.count() или просмотрите его с помощью df.show().
Но после этого я пытаюсь изменить учетные данные для целевого сегмента, используя приведенное выше функция, но после изменения этих учетных данных, когда я пытаюсь написать или использовать df.show() данные, которые он показывает мне исключение.
Вот мой код:
Вот мой код:
Код: Выделить всё
def copy_parquet_file():
try:
# Set and log source AWS credentials
set_s3_credentials(source_aws_access_key_id, source_aws_secret_access_key)
logging.info(f"Set source AWS credentials for bucket: '{source_bucket_name}'")
# update_spark_conf(source_s3_conf)
logging.info(f"Starting to copy file from bucket '{source_bucket_name}' key '{source_key}' to bucket '{destination_bucket_name}' key '{destination_key}'")
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
logging.info(f'Source bucket access_key: {hadoop_conf.get("fs.s3a.access.key")}')
logging.info(f'Source bucket secret_key: {hadoop_conf.get("fs.s3a.secret.key")}')
# Read parquet file from source bucket using PySpark
source_parquet_path = f"s3a://{source_bucket_name}/{source_key}"
df = spark.read.parquet(source_parquet_path, header=True, multiLine=True, quote="\"", escape="\"")
df = df.limit(100)
# df.printSchema()
logging.info("Read source parquet file successfully.")
# Set and log destination AWS credentials
set_s3_credentials(dest_aws_access_key_id, dest_aws_secret_access_key)
# update_spark_conf(destination_s3_conf)
logging.info(f"Set destination AWS credentials for bucket: '{destination_bucket_name}'")
logging.info(f'Destination bucket access_key: {hadoop_conf.get("fs.s3a.access.key")}')
logging.info(f'Destination bucket secret_key: {hadoop_conf.get("fs.s3a.secret.key")}')
# Write dataframe to destination bucket using PySpark
destination_parquet_path = f"s3a://{destination_bucket_name}/{destination_key}"
time.sleep(10)
df.show()
df.write.parquet(destination_parquet_path, mode='overwrite')
logging.info(f'Copied {source_key} to {destination_key} successfully.')
except Exception as e:
logging.error(f"Error occurred: {e}")
Подробнее здесь: https://stackoverflow.com/questions/787 ... he-help-of