if num_videos == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
Внутриprocess_videos_in_batches я добавляю строки отладки:
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")
И внутри save_all_data_to_csv:
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print("Debug: writing CSV to", output_csv_path)
Для некоторых выходных CSV-файлов я вижу отладочные отпечатки (и файл, как и ожидалось, содержит много строк). Однако для других файлов CSV создается, но оказывается пустым. Еще более странно то, что отладочные отпечатки для этих «пустых» файлов никогда не появляются — поэтому похоже, что код не полностью запускает этот раздел, но CSV-файл все равно появляется. Более того, маловероятно, что процесс внутриprocess_videos_in_batches приведет к созданию CSV с 0 строками, но на самом деле некоторые выходные файлы CSV имеют 0 строк (полный код основной функции и части выходных данных представлены по адресу конец). Я подтвердил, что файлы, в которых есть строки
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")
правильно напечатаны, содержат содержимое, тогда как файлы, в которых эти строки не напечатаны, пусты.
- Я подтвердил каждый входной CSV-файл действительно содержит 200 строк (поэтому проверка if должна пройти)
- Я использую уникальные имена выходных файлов, поэтому проблема с перезаписью файла не возникает.
Я проверил htop, в системе достаточно памяти или процессор.
- Python: 3.10.15< /li>
Conda: 24.9.2 - VSCode: 1.96
ОС: Ubuntu 22.04.5 LTS
Будем очень признательны за любые советы или идеи. Заранее спасибо!
--- полный код основной функции ---
process_videos_in_batches
def process_videos_in_batches(video_id_list, batch_size, API_KEY):
all_data = []
all_video_ids = video_id_list[:]
while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]
videos_info = get_video_data(batch_video_ids, API_KEY)
### Debug ###
print(f'Debug: {videos_info}, {batch_video_ids}')
#############
# videos_infoがNoneの場合は次のバッチへ
if videos_info is None:
continue
for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
# 親コメントの情報を展開
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)
### Debug ###
print(f'Debug: {len(all_data)}')
print(f'Debug: {all_data[:1]}')
#############
return all_data
save_all_data_to_csv
def save_all_data_to_csv(all_data, output_csv_path):
# DataFrame に変換
final_df = pd.DataFrame(all_data)
# CSV に保存
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
### Debug ###
print("Debug: writing CSV to", output_csv_path)
#############
process_single_file
def process_single_file(file_path, output_dir, batch_size, API_KEY):
"""
1つのCSVファイルを処理し、必要に応じて結果をCSVで出力する。
"""
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
# 終わったらファイルパスを返す(ログ代わり)
return file_path
Основной процесс:
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]
# 並列実行する
num_workers = 4
futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
# process_single_file関数を並列に実行
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)
# 処理が終わったら順に結果を取得
# tqdmを使って「終わった数」を確認するときは、as_completedに対して進捗バーを回します
for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)
Выходы:
Debug: {'lAtasG8EVEg': {'video_id': 'lAtasG8EVEg'...
...
Debug: 22058
Debug: [{'video_id': '8BtA6fO93_w'...
Debug: writing CSV to /home/foo/mnt/vt/related_videos/data_info/C_dsxOR9JJw.csv
Подробнее здесь: https://stackoverflow.com/questions/793 ... the-code-p