`
импортировать awswrangler как wr
импортировать boto3
импортировать pandas как pd
s3 = boto3.client('s3')
defprocess_object(bucket_name, object_prefix, flow_name, columns_to_keep, chunk_size=100_000):
rows = []
Код: Выделить всё
# Lire le fichier Parquet en morceaux avec awswrangler
dfs = wr.s3.read_parquet(
path=f"s3://{bucket_name}/{object_prefix}",
columns=columns_to_keep,
chunked=True # Lecture par morceaux pour économiser la mémoire
)
for df in dfs: # Itérer sur chaque DataFrame généré
# Limiter le nombre de lignes par morceaux si nécessaire
if chunk_size:
df = df.head(chunk_size)
# Supprimer les doublons en fonction des colonnes spécifiques
if flow_name == "R50":
df = df.drop_duplicates(subset=["nom_archive", "nom_fichier"])
else:
df = df.drop_duplicates(subset=["nom_archive", "nom_fichier", "publication_date"])
# Trier par la colonne "nom_archive" en ordre décroissant et garder la première ligne
df = df.sort_values("nom_archive", ascending=False).head(1)
# Optimiser les types de données
df["nom_archive"] = df["nom_archive"].astype("category")
df["nom_fichier"] = df["nom_fichier"].astype("category")
# Ajouter les données traitées à la liste des lignes
for index, row in df.iterrows():
rows.append({
"grid_operator": "grid_operator", # Adapter avec ta variable réelle
"file_name": row["nom_fichier"],
"file_size": None,
"archive_name": row["nom_archive"],
"parent_path": "parent_path", # Adapter avec ta variable réelle
"event_time": "event_time", # Adapter avec ta variable réelle
"upload_day": "upload_day", # Adapter avec ta variable réelle
"upload_month": "upload_month", # Adapter avec ta variable réelle
"extraction_date": row.get("publication_date"),
"flow_name": flow_name
})
return rows`
L'erreur ArrowInvalid: Схема с индексом 0 была другой.
Я пытался привести все поле к str. Но это не работает.
Помогите, пожалуйста?
Подробнее здесь: https://stackoverflow.com/questions/791 ... nto-lambda
Мобильная версия