
Ниже приведен код, который я использую для объединения нескольких CSV-файлов в один файл паркета.
Код: Выделить всё
import sys
import pandas as pd
import io
from pyarrow import parquet as pq
import pyarrow as pa
# Initialize an empty DataFrame to hold all CSV data
merged_df = pd.DataFrame()
# Read CSV data from standard input (incoming flow file content)
input_data = sys.stdin.read().strip()
# Check if the input data is empty
if not input_data:
print("Error: No data received from stdin")
sys.exit(1)
# Use StringIO to read the CSV from stdin
csv_content = io.StringIO(input_data)
# Read and append CSV content to merged_df
try:
# Read CSV into DataFrame
df = pd.read_csv(csv_content)
# If merged_df is empty, initialize it with the same columns as df
if merged_df.empty:
merged_df = df
else:
# Align columns before concatenating (this handles schema inconsistencies)
merged_df = pd.concat([merged_df, df], ignore_index=True, sort=False)
except pd.errors.EmptyDataError:
print("Error: No columns to parse from CSV data.")
sys.exit(1)
# After reading all CSV files, convert the merged DataFrame to a Parquet table
table = pa.Table.from_pandas(merged_df)
# Write the Parquet table to stdout (which NiFi will handle)
pq.write_table(table, sys.stdout.buffer, compression='snappy') # Adjust compression if needed
Подробнее здесь: https://stackoverflow.com/questions/792 ... using-exec