Apache Nifi: невозможно объединить несколько CSV-файлов в один файл PARQUET с помощью процессора ExecuteStreamCommandPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Apache Nifi: невозможно объединить несколько CSV-файлов в один файл PARQUET с помощью процессора ExecuteStreamCommand

Сообщение Anonymous »

Я пытаюсь объединить несколько CSV-файлов, поступающих из восходящего потока в виде файлов потока аналогичного типа (одна и та же схема), в один формат файла PARQUET. Ниже приведен поток моей группы процессоров. Где в верхнем ExecuteStreamCommand я переименовываю имена столбцов, чтобы гарантировать отсутствие специальных символов в именах столбцов, в то время как в нисходящем процессоре ExecuteStreamCommand фактически пытается объединиться в один формат паркета, но он не объединяется в один и выходит такое же количество CSV-файлов (которые находятся в отдельных файлах паркета).
Изображение

Ниже приведен код, который я использую для объединения нескольких CSV-файлов в один файл паркета.

Код: Выделить всё

import sys
import pandas as pd
import io
from pyarrow import parquet as pq
import pyarrow as pa

# Initialize an empty DataFrame to hold all CSV data
merged_df = pd.DataFrame()

# Read CSV data from standard input (incoming flow file content)
input_data = sys.stdin.read().strip()

# Check if the input data is empty
if not input_data:
print("Error: No data received from stdin")
sys.exit(1)

# Use StringIO to read the CSV from stdin
csv_content = io.StringIO(input_data)

# Read and append CSV content to merged_df
try:
# Read CSV into DataFrame
df = pd.read_csv(csv_content)

# If merged_df is empty, initialize it with the same columns as df
if merged_df.empty:
merged_df = df
else:
# Align columns before concatenating (this handles schema inconsistencies)
merged_df = pd.concat([merged_df, df], ignore_index=True, sort=False)

except pd.errors.EmptyDataError:
print("Error: No columns to parse from CSV data.")
sys.exit(1)

# After reading all CSV files, convert the merged DataFrame to a Parquet table
table = pa.Table.from_pandas(merged_df)

# Write the Parquet table to stdout (which NiFi will handle)
pq.write_table(table, sys.stdout.buffer, compression='snappy')  # Adjust compression if needed
Кто-нибудь может предсказать, где я делаю это неправильно? Почему он не может объединиться в один паркет, а в несколько файлов паркета. Он также не меняет расширение выходных файлов, таких как .parquet.

Подробнее здесь: https://stackoverflow.com/questions/792 ... using-exec
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»