if num_videos == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
Внутриprocess_videos_in_batches я добавляю строки отладки:
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")
И внутри save_all_data_to_csv:
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print("Debug: writing CSV to", output_csv_path)
Для некоторых выходных CSV-файлов я вижу отладочные отпечатки (и файл, как и ожидалось, содержит много строк). Однако для других файлов CSV создается, но оказывается пустым. Еще более странно то, что отладочные отпечатки для этих «пустых» файлов никогда не появляются — поэтому похоже, что код не полностью запускает этот раздел, но CSV-файл все равно появляется. Более того, маловероятно, что процесс внутриprocess_videos_in_batches приведет к созданию CSV с 0 строками, но на самом деле некоторые выходные файлы CSV имеют 0 строк (полный код основной функции и части выходных данных представлены по адресу конец). Я подтвердил, что файлы, в которых есть строки
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")
правильно напечатаны, содержат содержимое, тогда как файлы, в которых эти строки не напечатаны, пусты.
- Я подтвердил каждый входной CSV-файл действительно содержит 200 строк (поэтому проверка if должна пройти)
- Я использую уникальные имена выходных файлов, поэтому проблема с перезаписью файлов не возникает.
Я проверил htop, в системе достаточно памяти или процессор.
- Python: 3.10.15< /li>
Conda: 24.9.2 - VSCode: 1.96
ОС: Ubuntu 22.04.5 LTS
Будем очень признательны за любые советы или идеи. Заранее спасибо!
--- полный код основной функции ---
process_videos_in_batches
def process_videos_in_batches(video_id_list, batch_size, API_KEY):
all_data = []
all_video_ids = video_id_list[:]
while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]
videos_info = get_video_data(batch_video_ids, API_KEY)
### Debug ###
print(f'Debug: {videos_info}, {batch_video_ids}')
#############
# videos_infoがNoneの場合は次のバッチへ
if videos_info is None:
continue
for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
# 親コメントの情報を展開
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)
### Debug ###
print(f'Debug: {len(all_data)}')
print(f'Debug: {all_data[:1]}')
#############
return all_data
save_all_data_to_csv
def save_all_data_to_csv(all_data, output_csv_path):
# DataFrame に変換
final_df = pd.DataFrame(all_data)
# CSV に保存
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
### Debug ###
print("Debug: writing CSV to", output_csv_path)
#############
process_single_file
def process_single_file(file_path, output_dir, batch_size, API_KEY):
"""
1つのCSVファイルを処理し、必要に応じて結果をCSVで出力する。
"""
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
# 終わったらファイルパスを返す(ログ代わり)
return file_path
Основной процесс:
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]
# 並列実行する
num_workers = 4
futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
# process_single_file関数を並列に実行
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)
# 処理が終わったら順に結果を取得
# tqdmを使って「終わった数」を確認するときは、as_completedに対して進捗バーを回します
for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)
Выходы:
Debug: {'lAtasG8EVEg': {'video_id': 'lAtasG8EVEg'...
...
Debug: 22058
Debug: [{'video_id': '8BtA6fO93_w'...
Debug: writing CSV to /home/foo/mnt/vt/related_videos/data_info/C_dsxOR9JJw.csv
Минимально воспроизводимый пример
import os
import time
from concurrent.futures import ProcessPoolExecutor
def example_task(x):
time.sleep(1) # Simulate some work
return x ** 2
def process_files(files, num_workers):
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(example_task, files))
return results
if __name__ == "__main__":
# Input data to simulate files
files = [1, 2, 3, 4, 5]
# Test with multiprocessing
num_workers = 4 # Adjust the number of workers
try:
output = process_files(files, num_workers)
print("Output:", output)
except Exception as e:
print("Error:", e)
Полный код
import os
import requests
import pandas as pd
import time
from tqdm.notebook import tqdm
from datetime import datetime, timedelta, timezone
from concurrent.futures import ProcessPoolExecutor, as_completed
# Retrieve API key and paths from environment variables
API_KEY = os.getenv('YOUTUBE_API_KEY')
batch_size = 50
mnt_path = os.getenv('MNT_PATH')
root_dir = os.path.join(mnt_path, 'related_videos')
# Create directories for thumbnails and output data
thumbnail_dir = os.path.join(root_dir, 'thumbnails')
os.makedirs(thumbnail_dir, exist_ok=True)
output_dir = os.path.join(root_dir, 'data_info')
os.makedirs(output_dir, exist_ok=True)
# Check if the output directory exists
if not os.path.exists(output_dir):
print(f"Failed to create directory: {output_dir}")
# Print the current time in JST (Japan Standard Time)
def show_now_time():
jst = timezone(timedelta(hours=9))
jst_time = datetime.now(jst)
print(jst_time)
# Convert UTC time string to JST time string
def convert_to_jst(utc_time_str):
utc_time = datetime.strptime(utc_time_str, "%Y-%m-%dT%H:%M:%SZ")
jst_time = utc_time + timedelta(hours=9)
return jst_time.strftime("%Y-%m-%d %H:%M:%S")
# Download a video thumbnail and save it locally
def download_thumbnail(thumbnail_url, thumbnail_dir, video_id):
try:
response = requests.get(thumbnail_url)
response.raise_for_status() # Raise an exception for HTTP errors
except requests.exceptions.RequestException as e:
print(f"Error occurred while downloading thumbnail: {e}")
return None
if response.ok:
# Extract the file extension (e.g., jpg, png)
file_extension = thumbnail_url.split('.')[-1]
if len(file_extension) > 5: # Handle overly long extensions (e.g., URL parameters)
file_extension = 'jpg' # Default to jpg
# Construct the file path for saving
file_path = os.path.join(thumbnail_dir, f"{video_id}.{file_extension}")
try:
# Save the image data to the file
with open(file_path, 'wb') as file:
file.write(response.content)
return file_path
except IOError as io_error:
print(f"Error occurred while saving the file: {io_error}")
return None
else:
print(f"Unexpected status code received: {response.status_code}")
return None
# Retrieve all comments from a specific video
def get_all_video_comments(video_id, API_KEY):
comment_url = 'https://www.googleapis.com/youtube/v3/commentThreads'
comments = []
page_token = None
max_retries = 3
# Default value when no comments are available
empty_comment = {
'comment': '0',
'comment_like_count': 0,
'comment_author_channel_id': '0',
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': 'yes'
}
# Default value for disabled comment sections
disabled_comment = {
'comment': pd.NA,
'comment_like_count': pd.NA,
'comment_author_channel_id': pd.NA,
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': 'no'
}
# Default value for inaccessible comments
delete_comment = {
'comment': pd.NA,
'comment_like_count': pd.NA,
'comment_author_channel_id': pd.NA,
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': pd.NA
}
# Handle API errors and decide on appropriate actions
def handle_api_error(response):
try:
error_response = response.json()
error_info = error_response.get('error', {})
error_reason = ''
if 'errors' in error_info and error_info['errors']:
error_reason = error_info['errors'][0].get('reason', '')
except ValueError:
# Retry if JSON decoding fails
return 'retry'
status = response.status_code
# Handle specific error codes and reasons
if status == 403:
if error_reason == 'commentsDisabled':
return 'disabled'
elif error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
print(f"API quota exceeded. Waiting for 24 hours...")
show_now_time()
time.sleep(86400)
return 'retry'
elif error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
print(f"Rate limit exceeded. Waiting for 2 minutes...")
show_now_time()
time.sleep(120)
return 'retry'
else:
return 'abort'
elif status == 404:
return 'not_found'
elif status == 429:
print("Too many requests. Waiting for 70 seconds...")
show_now_time()
time.sleep(70)
return 'retry'
else:
print(f"Unexpected API error: {status}, reason: {error_reason}")
time.sleep(70)
return 'retry'
while True:
comment_params = {
'part': 'snippet,replies',
'videoId': video_id,
'key': API_KEY,
'maxResults': 100,
'pageToken': page_token
}
for attempt in range(max_retries):
comment_response = requests.get(comment_url, params=comment_params)
if comment_response.status_code == 200:
# Parse the JSON response
comment_data = comment_response.json()
break
else:
# Handle the error based on its type
error_action = handle_api_error(comment_response)
if error_action == 'retry':
continue
elif error_action == 'disabled':
return [disabled_comment]
elif error_action == 'not_found':
return [delete_comment]
elif error_action == 'abort':
return [delete_comment]
else:
# If retries fail, return a default response
print(f"Maximum retries reached.")
return [delete_comment]
# Extract comments and their replies
if 'items' in comment_data and comment_data['items']:
for item in comment_data['items']:
parent_comment = item['snippet']['topLevelComment']['snippet']
comment_entry = {
'comment': parent_comment['textDisplay'],
'comment_like_count': parent_comment['likeCount'],
'comment_author_channel_id': parent_comment.get('authorChannelId', {}).get('value'),
'comment_published_at': parent_comment['publishedAt'],
'updated_at': parent_comment.get('updatedAt', None),
'is_comment_available': 'yes',
'has_reply': 'no',
'reply_comment': pd.NA,
'reply_like_count': pd.NA,
'reply_author_channel_id': pd.NA,
'reply_published_at': pd.NA,
'reply_updated_at': pd.NA
}
# Process replies if available
if 'replies' in item:
for reply in item['replies']['comments']:
reply_snippet = reply['snippet']
comment_entry.update({
'reply_comment': reply_snippet['textDisplay'],
'reply_like_count': reply_snippet['likeCount'],
'reply_author_channel_id': reply_snippet.get('authorChannelId', {}).get('value'),
'reply_published_at': reply_snippet['publishedAt'],
'reply_updated_at': reply_snippet.get('updatedAt', None),
'has_reply': 'yes'
})
comments.append(comment_entry.copy())
else:
comments.append(comment_entry)
else:
# Return a default response if no comments are retrieved
return [empty_comment]
# Fetch the next page of comments if available
page_token = comment_data.get('nextPageToken')
if not page_token:
break
time.sleep(3)
# Return a default value if no comments were retrieved
if not comments:
return [empty_comment]
return comments
# Retrieve metadata for a batch of video IDs
def get_video_data(video_ids, API_KEY):
video_url = 'https://www.googleapis.com/youtube/v3/videos'
max_retries = 3
videos_info = {}
# Process video IDs in batches of 50
for start in range(0, len(video_ids), 50):
batch_video_ids = video_ids[start:start+50]
video_params = {
'part': 'snippet,statistics',
'id': ','.join(batch_video_ids),
'key': API_KEY
}
for attempt in range(max_retries):
try:
video_response = requests.get(video_url, params=video_params)
if video_response.status_code == 200:
video_data = video_response.json()
fetched_ids = set()
if 'items' in video_data:
for item in video_data['items']:
video_id = item.get('id', None)
fetched_ids.add(video_id)
# Safely retrieve snippet and statistics
snippet = item.get('snippet', {})
statistics = item.get('statistics', {})
title = snippet.get('title', '')
description = snippet.get('description', '')
like_count = int(statistics.get('likeCount', 0))
view_count = int(statistics.get('viewCount', 0))
comment_count = int(statistics.get('commentCount', 0))
published_at = convert_to_jst(snippet.get('publishedAt', ''))
channel_id = snippet.get('channelId', '')
channel_title = snippet.get('channelTitle', '')
thumbnail_url = snippet.get('thumbnails', {}).get('default', {}).get('url', pd.NA)
# Download the thumbnail if available
if pd.notna(thumbnail_url):
thumbnail_path = download_thumbnail(thumbnail_url, thumbnail_dir, video_id)
else:
thumbnail_path = None
# Store video information
videos_info[video_id] = {
'video_id': video_id,
'title': title,
'description': description,
'like_count': like_count,
'view_count': view_count,
'comment_count': comment_count,
'published_at': published_at,
'channel_id': channel_id,
'channel_title': channel_title,
'thumbnail_url': thumbnail_url,
'thumbnail_path': thumbnail_path,
}
# Handle missing video IDs
missing_ids = set(batch_video_ids) - fetched_ids
for mid in missing_ids:
videos_info[mid] = None
break
else:
# Parse error response and handle specific cases
error_response = video_response.json()
error_info = error_response.get('error', {})
error_reason = ''
if 'errors' in error_info and error_info['errors']:
error_reason = error_info['errors'][0].get('reason', '')
if video_response.status_code == 403 and error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
print("API quota exceeded. Waiting for 24 hours...")
show_now_time()
time.sleep(86400)
continue
elif video_response.status_code == 403 and error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
print("Rate limit exceeded. Waiting for 2 minutes...")
time.sleep(120)
continue
elif video_response.status_code == 404:
print(f"Video not found or invalid resource: batch={batch_video_ids}")
break
else:
print(f"Unexpected API error: {video_response.status_code}, reason: {error_reason}")
time.sleep(120)
continue
except ValueError:
print("Failed to decode API response JSON. Retrying...")
time.sleep(70)
continue
return videos_info if videos_info else None
# Process videos in batches and collect all data
def process_videos_in_batches(video_id_list, batch_size, API_KEY):
all_data = []
all_video_ids = video_id_list[:]
while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]
videos_info = get_video_data(batch_video_ids, API_KEY)
if videos_info is None:
continue
for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)
return all_data
# Save the collected data to a CSV file
def save_all_data_to_csv(all_data, output_csv_path):
final_df = pd.DataFrame(all_data)
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
# Process a single CSV file and save the results
def process_single_file(file_path, output_dir, batch_size, API_KEY):
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
return file_path
# List input files and prepare for parallel processing
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]
# Execute tasks in parallel
num_workers = 4
futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)
for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)
Подробнее здесь: https://stackoverflow.com/questions/793 ... the-code-p