Apache Nifi (ExecuteStreamCommand): исполняемая команда python3 завершилась с ошибкой:Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Apache Nifi (ExecuteStreamCommand): исполняемая команда python3 завершилась с ошибкой:

Сообщение Anonymous »

Я пытаюсь запустить следующий скрипт в процессореexecutestreamcommand, который будет считывать данные из процессора listfile и fetchfile, а затем я пытаюсь объединить содержимое файлов Excel в формате паркета в следующем скрипте, но передача python3 завершилась ошибкой

Код: Выделить всё

import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import sys
import io

def merge_and_split_parquet(data, output_dir, max_size=1 * 1024**3):
"""
Merges and splits data into Parquet files based on size.

Args:
data (pd.DataFrame): Data to process.
output_dir (str): Directory to save Parquet files.
max_size (int): Maximum size in bytes for each Parquet file.
"""
merged_data = pd.DataFrame()
parquet_index = 1

# Process the data row by row
for _, row in data.iterrows():
merged_data = pd.concat([merged_data, pd.DataFrame([row])])

# Check the size of the DataFrame
table = pa.Table.from_pandas(merged_data)
buffer = pa.BufferOutputStream()
pq.write_table(table, buffer)
if buffer.size() >= max_size:
# Write to a new Parquet file
output_file = os.path.join(output_dir, f"merged_{parquet_index}.parquet")
pq.write_table(table, output_file)
print(f"Generated {output_file}")
merged_data = pd.DataFrame()  # Reset the DataFrame
parquet_index += 1

# Write any remaining data
if not merged_data.empty:
output_file = os.path.join(output_dir, f"merged_{parquet_index}.parquet")
pq.write_table(pa.Table.from_pandas(merged_data), output_file)
print(f"Generated {output_file}")

if __name__ == "__main__":
# Ensure proper arguments are passed
if len(sys.argv) < 2:
print("Usage: python script.py ")
sys.exit(1)

output_dir = sys.argv[1]

# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

# Read data from STDIN
try:
input_stream = sys.stdin.read()
excel_data = pd.read_excel(io.BytesIO(input_stream.encode()))
except Exception as e:
print(f"Error reading input data: {e}")
sys.exit(1)

# Process and write Parquet files
merge_and_split_parquet(excel_data, output_dir)
Когда я запускаю процессор, он выдает следующую ошибку:

Код: Выделить всё

2024-11-25 07:46:07,540 ERROR [Timer-Driven Process Thread-5] o.a.n.p.standard.ExecuteStreamCommand ExecuteStreamCommand[id=4e7bd5aa-0193-1000-6154-675d13b6c0e5] Transferring StandardFlowFileRecord[uuid=c534f453-59d0-4958-819e-01295652d375,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1732502757533-52, container=default, section=52], offset=1680, length=112],offset=0,name=3GNANKANA.xlsx,size=112] to nonzero status. Executable command python3 ended in an error:
ниже показан мой рабочий процесс
[img]https://i.sstatic.net /EDi5bixZ.png[/img]

Моя конфигурация выполнения команды следующая:
Изображение

пожалуйста, помогите мне избавиться от этого. Спасибо

Подробнее здесь: https://stackoverflow.com/questions/792 ... n-an-error
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»