Некоторые выходные данные CSV пусты при параллельной обработке Python, даже если путь кода должен быть запущен.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Некоторые выходные данные CSV пусты при параллельной обработке Python, даже если путь кода должен быть запущен.

Сообщение Anonymous »

У меня есть скрипт Python, который параллельно обрабатывает пакет файлов CSV. Каждый входной CSV-файл содержит ровно 200 строк, поэтому я использую флажок if num_videos == 200: для запуска функции типа:
if num_videos == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)

Внутриprocess_videos_in_batches я добавляю строки отладки:
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")

И внутри save_all_data_to_csv:
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print("Debug: writing CSV to", output_csv_path)

Для некоторых выходных CSV-файлов я вижу отладочные отпечатки (и файл, как и ожидалось, содержит много строк). Однако для других файлов CSV создается, но оказывается пустым. Еще более странно то, что отладочные отпечатки для этих «пустых» файлов никогда не появляются — поэтому похоже, что код не полностью запускает этот раздел, но CSV-файл все равно появляется. Более того, маловероятно, что процесс внутриprocess_videos_in_batches приведет к созданию CSV с 0 строками, но на самом деле некоторые выходные файлы CSV имеют 0 строк (полный код основной функции и части выходных данных представлены по адресу конец). Я подтвердил, что файлы, в которых есть строки
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")

правильно напечатаны, содержат содержимое, тогда как файлы, в которых эти строки не напечатаны, пусты.
  • Я подтвердил каждый входной CSV-файл действительно содержит 200 строк (поэтому проверка if должна пройти)
  • Я использую уникальные имена выходных файлов, поэтому проблема с перезаписью файла не возникает.
    Я проверил htop, в системе достаточно памяти или процессор.
Моя среда:
  • Python: 3.10.15< /li>
    Conda: 24.9.2
  • VSCode: 1.96
    ОС: Ubuntu 22.04.5 LTS
Почему могут ли некоторые файлы запускать полный путь кода (с отладочными отпечатками и данными в CSV), в то время как другие, очевидно, пропускают отладочные отпечатки и создают пустой CSV - несмотря на то, что условие if выполняется для всех файлов? Может ли что-то в моем параллельном выполнении быть причиной досрочного завершения функции или молчаливого сбоя для определенных файлов, даже если CSV все еще создается?
Будем очень признательны за любые советы или идеи. Заранее спасибо!
--- полный код основной функции ---
process_videos_in_batches
def process_videos_in_batches(video_id_list, batch_size, API_KEY):

all_data = []

all_video_ids = video_id_list[:]

while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]

videos_info = get_video_data(batch_video_ids, API_KEY)

### Debug ###
print(f'Debug: {videos_info}, {batch_video_ids}')
#############

# videos_infoがNoneの場合は次のバッチへ
if videos_info is None:
continue

for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
# 親コメントの情報を展開
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)
### Debug ###
print(f'Debug: {len(all_data)}')
print(f'Debug: {all_data[:1]}')
#############
return all_data

save_all_data_to_csv
def save_all_data_to_csv(all_data, output_csv_path):
# DataFrame に変換
final_df = pd.DataFrame(all_data)

# CSV に保存
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
### Debug ###
print("Debug: writing CSV to", output_csv_path)
#############

process_single_file
def process_single_file(file_path, output_dir, batch_size, API_KEY):
"""
1つのCSVファイルを処理し、必要に応じて結果をCSVで出力する。
"""
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)

# 終わったらファイルパスを返す(ログ代わり)
return file_path

Основной процесс:
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]

# 並列実行する
num_workers = 4

futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
# process_single_file関数を並列に実行
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)

# 処理が終わったら順に結果を取得
# tqdmを使って「終わった数」を確認するときは、as_completedに対して進捗バーを回します
for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)

Выходы:
Debug: {'lAtasG8EVEg': {'video_id': 'lAtasG8EVEg'...
...
Debug: 22058
Debug: [{'video_id': '8BtA6fO93_w'...
Debug: writing CSV to /home/foo/mnt/vt/related_videos/data_info/C_dsxOR9JJw.csv


Подробнее здесь: https://stackoverflow.com/questions/793 ... the-code-p
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»