Использование кусков для загрузки огромного файла в лямбдуPython

Программы на Python
Ответить
Anonymous
 Использование кусков для загрузки огромного файла в лямбду

Сообщение Anonymous »

Я пытаюсь загрузить огромный файл паркета из s3, используя лямбда-функцию в aws. В целях оптимизации, поскольку в лямбде есть проблемы с памятью, я попытался разделить полученные данные на фрагменты. Это код функции:
`
импортировать awswrangler как wr
импортировать boto3
импортировать pandas как pd
s3 = boto3.client('s3')
defprocess_object(bucket_name, object_prefix, flow_name, columns_to_keep, chunk_size=100_000):
rows = []

Код: Выделить всё

# Lire le fichier Parquet en morceaux avec awswrangler
dfs = wr.s3.read_parquet(
path=f"s3://{bucket_name}/{object_prefix}",
columns=columns_to_keep,
chunked=True  # Lecture par morceaux pour économiser la mémoire
)

for df in dfs:  # Itérer sur chaque DataFrame généré
# Limiter le nombre de lignes par morceaux si nécessaire
if chunk_size:
df = df.head(chunk_size)

# Supprimer les doublons en fonction des colonnes spécifiques
if flow_name == "R50":
df = df.drop_duplicates(subset=["nom_archive", "nom_fichier"])
else:
df = df.drop_duplicates(subset=["nom_archive", "nom_fichier", "publication_date"])

# Trier par la colonne "nom_archive" en ordre décroissant et garder la première ligne
df = df.sort_values("nom_archive", ascending=False).head(1)

# Optimiser les types de données
df["nom_archive"] = df["nom_archive"].astype("category")
df["nom_fichier"] = df["nom_fichier"].astype("category")

# Ajouter les données traitées à la liste des lignes
for index, row in df.iterrows():
rows.append({
"grid_operator": "grid_operator",  # Adapter avec ta variable réelle
"file_name": row["nom_fichier"],
"file_size": None,
"archive_name": row["nom_archive"],
"parent_path": "parent_path",  # Adapter avec ta variable réelle
"event_time": "event_time",  # Adapter avec ta variable réelle
"upload_day": "upload_day",  # Adapter avec ta variable réelle
"upload_month": "upload_month",  # Adapter avec ta variable réelle
"extraction_date": row.get("publication_date"),
"flow_name": flow_name
})

return rows`
Но он возвращает эту ошибку:
L'erreur ArrowInvalid: Схема с индексом 0 была другой.
Я пытался привести все поле к str. Но это не работает.
Помогите, пожалуйста?

Подробнее здесь: https://stackoverflow.com/questions/791 ... nto-lambda
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»