Некоторые выходные данные CSV пусты при параллельной обработке Python, даже если путь кода должен быть запущен.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Некоторые выходные данные CSV пусты при параллельной обработке Python, даже если путь кода должен быть запущен.

Сообщение Anonymous »

У меня есть скрипт Python, который параллельно обрабатывает пакет файлов CSV. Каждый входной CSV-файл содержит ровно 200 строк, поэтому я использую флажок if num_videos == 200: для запуска функции типа:
if num_videos == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)

Внутриprocess_videos_in_batches я добавляю строки отладки:
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")

И внутри save_all_data_to_csv:
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print("Debug: writing CSV to", output_csv_path)

Для некоторых выходных CSV-файлов я вижу отладочные отпечатки (и файл, как и ожидалось, содержит много строк). Однако для других файлов CSV создается, но оказывается пустым. Еще более странно то, что отладочные отпечатки для этих «пустых» файлов никогда не появляются — поэтому похоже, что код не полностью запускает этот раздел, но CSV-файл все равно появляется. Более того, маловероятно, что процесс внутриprocess_videos_in_batches приведет к созданию CSV с 0 строками, но на самом деле некоторые выходные файлы CSV имеют 0 строк (полный код основной функции и части выходных данных представлены по адресу конец). Я подтвердил, что файлы, в которых есть строки
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")

правильно напечатаны, содержат содержимое, тогда как файлы, в которых эти строки не напечатаны, пусты.
  • Я подтвердил каждый входной CSV-файл действительно содержит 200 строк (поэтому проверка if должна пройти)
  • Я использую уникальные имена выходных файлов, поэтому проблема с перезаписью файлов не возникает.
    Я проверил htop, в системе достаточно памяти или процессор.
Моя среда:
  • Python: 3.10.15< /li>
    Conda: 24.9.2
  • VSCode: 1.96
    ОС: Ubuntu 22.04.5 LTS
Почему могут ли некоторые файлы запускать полный путь кода (с отладочными отпечатками и данными в CSV), в то время как другие, очевидно, пропускают отладочные отпечатки и создают пустой CSV - несмотря на то, что условие if выполняется для всех файлов? Может ли что-то в моем параллельном выполнении быть причиной досрочного завершения функции или молчаливого сбоя для определенных файлов, даже если CSV все еще создается?
Будем очень признательны за любые советы или идеи. Заранее спасибо!
--- полный код основной функции ---
process_videos_in_batches
def process_videos_in_batches(video_id_list, batch_size, API_KEY):

all_data = []

all_video_ids = video_id_list[:]

while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]

videos_info = get_video_data(batch_video_ids, API_KEY)

### Debug ###
print(f'Debug: {videos_info}, {batch_video_ids}')
#############

# videos_infoがNoneの場合は次のバッチへ
if videos_info is None:
continue

for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
# 親コメントの情報を展開
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)
### Debug ###
print(f'Debug: {len(all_data)}')
print(f'Debug: {all_data[:1]}')
#############
return all_data

save_all_data_to_csv
def save_all_data_to_csv(all_data, output_csv_path):
# DataFrame に変換
final_df = pd.DataFrame(all_data)

# CSV に保存
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
### Debug ###
print("Debug: writing CSV to", output_csv_path)
#############

process_single_file
def process_single_file(file_path, output_dir, batch_size, API_KEY):
"""
1つのCSVファイルを処理し、必要に応じて結果をCSVで出力する。
"""
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)

# 終わったらファイルパスを返す(ログ代わり)
return file_path

Основной процесс:
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]

# 並列実行する
num_workers = 4

futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
# process_single_file関数を並列に実行
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)

# 処理が終わったら順に結果を取得
# tqdmを使って「終わった数」を確認するときは、as_completedに対して進捗バーを回します
for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)

Выходы:
Debug: {'lAtasG8EVEg': {'video_id': 'lAtasG8EVEg'...
...
Debug: 22058
Debug: [{'video_id': '8BtA6fO93_w'...
Debug: writing CSV to /home/foo/mnt/vt/related_videos/data_info/C_dsxOR9JJw.csv

Минимально воспроизводимый пример
import os
import time
from concurrent.futures import ProcessPoolExecutor

def example_task(x):
time.sleep(1) # Simulate some work
return x ** 2

def process_files(files, num_workers):
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(example_task, files))
return results

if __name__ == "__main__":
# Input data to simulate files
files = [1, 2, 3, 4, 5]

# Test with multiprocessing
num_workers = 4 # Adjust the number of workers
try:
output = process_files(files, num_workers)
print("Output:", output)
except Exception as e:
print("Error:", e)

Полный код
import os
import requests
import pandas as pd
import time
from tqdm.notebook import tqdm
from datetime import datetime, timedelta, timezone
from concurrent.futures import ProcessPoolExecutor, as_completed

# Retrieve API key and paths from environment variables
API_KEY = os.getenv('YOUTUBE_API_KEY')
batch_size = 50
mnt_path = os.getenv('MNT_PATH')
root_dir = os.path.join(mnt_path, 'related_videos')

# Create directories for thumbnails and output data
thumbnail_dir = os.path.join(root_dir, 'thumbnails')
os.makedirs(thumbnail_dir, exist_ok=True)
output_dir = os.path.join(root_dir, 'data_info')
os.makedirs(output_dir, exist_ok=True)

# Check if the output directory exists
if not os.path.exists(output_dir):
print(f"Failed to create directory: {output_dir}")

# Print the current time in JST (Japan Standard Time)
def show_now_time():
jst = timezone(timedelta(hours=9))
jst_time = datetime.now(jst)
print(jst_time)

# Convert UTC time string to JST time string
def convert_to_jst(utc_time_str):
utc_time = datetime.strptime(utc_time_str, "%Y-%m-%dT%H:%M:%SZ")
jst_time = utc_time + timedelta(hours=9)
return jst_time.strftime("%Y-%m-%d %H:%M:%S")

# Download a video thumbnail and save it locally
def download_thumbnail(thumbnail_url, thumbnail_dir, video_id):
try:
response = requests.get(thumbnail_url)
response.raise_for_status() # Raise an exception for HTTP errors
except requests.exceptions.RequestException as e:
print(f"Error occurred while downloading thumbnail: {e}")
return None

if response.ok:
# Extract the file extension (e.g., jpg, png)
file_extension = thumbnail_url.split('.')[-1]
if len(file_extension) > 5: # Handle overly long extensions (e.g., URL parameters)
file_extension = 'jpg' # Default to jpg

# Construct the file path for saving
file_path = os.path.join(thumbnail_dir, f"{video_id}.{file_extension}")
try:
# Save the image data to the file
with open(file_path, 'wb') as file:
file.write(response.content)
return file_path
except IOError as io_error:
print(f"Error occurred while saving the file: {io_error}")
return None
else:
print(f"Unexpected status code received: {response.status_code}")
return None

# Retrieve all comments from a specific video
def get_all_video_comments(video_id, API_KEY):
comment_url = 'https://www.googleapis.com/youtube/v3/commentThreads'
comments = []
page_token = None
max_retries = 3

# Default value when no comments are available
empty_comment = {
'comment': '0',
'comment_like_count': 0,
'comment_author_channel_id': '0',
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': 'yes'
}

# Default value for disabled comment sections
disabled_comment = {
'comment': pd.NA,
'comment_like_count': pd.NA,
'comment_author_channel_id': pd.NA,
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': 'no'
}

# Default value for inaccessible comments
delete_comment = {
'comment': pd.NA,
'comment_like_count': pd.NA,
'comment_author_channel_id': pd.NA,
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': pd.NA
}

# Handle API errors and decide on appropriate actions
def handle_api_error(response):
try:
error_response = response.json()
error_info = error_response.get('error', {})
error_reason = ''
if 'errors' in error_info and error_info['errors']:
error_reason = error_info['errors'][0].get('reason', '')
except ValueError:
# Retry if JSON decoding fails
return 'retry'

status = response.status_code

# Handle specific error codes and reasons
if status == 403:
if error_reason == 'commentsDisabled':
return 'disabled'
elif error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
print(f"API quota exceeded. Waiting for 24 hours...")
show_now_time()
time.sleep(86400)
return 'retry'
elif error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
print(f"Rate limit exceeded. Waiting for 2 minutes...")
show_now_time()
time.sleep(120)
return 'retry'
else:
return 'abort'
elif status == 404:
return 'not_found'
elif status == 429:
print("Too many requests. Waiting for 70 seconds...")
show_now_time()
time.sleep(70)
return 'retry'
else:
print(f"Unexpected API error: {status}, reason: {error_reason}")
time.sleep(70)
return 'retry'

while True:
comment_params = {
'part': 'snippet,replies',
'videoId': video_id,
'key': API_KEY,
'maxResults': 100,
'pageToken': page_token
}

for attempt in range(max_retries):
comment_response = requests.get(comment_url, params=comment_params)

if comment_response.status_code == 200:
# Parse the JSON response
comment_data = comment_response.json()
break
else:
# Handle the error based on its type
error_action = handle_api_error(comment_response)
if error_action == 'retry':
continue
elif error_action == 'disabled':
return [disabled_comment]
elif error_action == 'not_found':
return [delete_comment]
elif error_action == 'abort':
return [delete_comment]

else:
# If retries fail, return a default response
print(f"Maximum retries reached.")
return [delete_comment]

# Extract comments and their replies
if 'items' in comment_data and comment_data['items']:
for item in comment_data['items']:
parent_comment = item['snippet']['topLevelComment']['snippet']
comment_entry = {
'comment': parent_comment['textDisplay'],
'comment_like_count': parent_comment['likeCount'],
'comment_author_channel_id': parent_comment.get('authorChannelId', {}).get('value'),
'comment_published_at': parent_comment['publishedAt'],
'updated_at': parent_comment.get('updatedAt', None),
'is_comment_available': 'yes',
'has_reply': 'no',
'reply_comment': pd.NA,
'reply_like_count': pd.NA,
'reply_author_channel_id': pd.NA,
'reply_published_at': pd.NA,
'reply_updated_at': pd.NA
}

# Process replies if available
if 'replies' in item:
for reply in item['replies']['comments']:
reply_snippet = reply['snippet']
comment_entry.update({
'reply_comment': reply_snippet['textDisplay'],
'reply_like_count': reply_snippet['likeCount'],
'reply_author_channel_id': reply_snippet.get('authorChannelId', {}).get('value'),
'reply_published_at': reply_snippet['publishedAt'],
'reply_updated_at': reply_snippet.get('updatedAt', None),
'has_reply': 'yes'
})
comments.append(comment_entry.copy())
else:
comments.append(comment_entry)

else:
# Return a default response if no comments are retrieved
return [empty_comment]

# Fetch the next page of comments if available
page_token = comment_data.get('nextPageToken')
if not page_token:
break

time.sleep(3)

# Return a default value if no comments were retrieved
if not comments:
return [empty_comment]

return comments

# Retrieve metadata for a batch of video IDs
def get_video_data(video_ids, API_KEY):
video_url = 'https://www.googleapis.com/youtube/v3/videos'
max_retries = 3
videos_info = {}

# Process video IDs in batches of 50
for start in range(0, len(video_ids), 50):
batch_video_ids = video_ids[start:start+50]
video_params = {
'part': 'snippet,statistics',
'id': ','.join(batch_video_ids),
'key': API_KEY
}

for attempt in range(max_retries):
try:
video_response = requests.get(video_url, params=video_params)

if video_response.status_code == 200:
video_data = video_response.json()
fetched_ids = set()
if 'items' in video_data:
for item in video_data['items']:
video_id = item.get('id', None)
fetched_ids.add(video_id)

# Safely retrieve snippet and statistics
snippet = item.get('snippet', {})
statistics = item.get('statistics', {})

title = snippet.get('title', '')
description = snippet.get('description', '')
like_count = int(statistics.get('likeCount', 0))
view_count = int(statistics.get('viewCount', 0))
comment_count = int(statistics.get('commentCount', 0))
published_at = convert_to_jst(snippet.get('publishedAt', ''))
channel_id = snippet.get('channelId', '')
channel_title = snippet.get('channelTitle', '')
thumbnail_url = snippet.get('thumbnails', {}).get('default', {}).get('url', pd.NA)

# Download the thumbnail if available
if pd.notna(thumbnail_url):
thumbnail_path = download_thumbnail(thumbnail_url, thumbnail_dir, video_id)
else:
thumbnail_path = None

# Store video information
videos_info[video_id] = {
'video_id': video_id,
'title': title,
'description': description,
'like_count': like_count,
'view_count': view_count,
'comment_count': comment_count,
'published_at': published_at,
'channel_id': channel_id,
'channel_title': channel_title,
'thumbnail_url': thumbnail_url,
'thumbnail_path': thumbnail_path,
}

# Handle missing video IDs
missing_ids = set(batch_video_ids) - fetched_ids
for mid in missing_ids:
videos_info[mid] = None

break
else:
# Parse error response and handle specific cases
error_response = video_response.json()
error_info = error_response.get('error', {})
error_reason = ''
if 'errors' in error_info and error_info['errors']:
error_reason = error_info['errors'][0].get('reason', '')

if video_response.status_code == 403 and error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
print("API quota exceeded. Waiting for 24 hours...")
show_now_time()
time.sleep(86400)
continue
elif video_response.status_code == 403 and error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
print("Rate limit exceeded. Waiting for 2 minutes...")
time.sleep(120)
continue
elif video_response.status_code == 404:
print(f"Video not found or invalid resource: batch={batch_video_ids}")
break
else:
print(f"Unexpected API error: {video_response.status_code}, reason: {error_reason}")
time.sleep(120)
continue

except ValueError:
print("Failed to decode API response JSON. Retrying...")
time.sleep(70)
continue

return videos_info if videos_info else None

# Process videos in batches and collect all data
def process_videos_in_batches(video_id_list, batch_size, API_KEY):
all_data = []
all_video_ids = video_id_list[:]

while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]

videos_info = get_video_data(batch_video_ids, API_KEY)

if videos_info is None:
continue

for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)

return all_data

# Save the collected data to a CSV file
def save_all_data_to_csv(all_data, output_csv_path):
final_df = pd.DataFrame(all_data)
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')

# Process a single CSV file and save the results
def process_single_file(file_path, output_dir, batch_size, API_KEY):
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
return file_path

# List input files and prepare for parallel processing
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]

# Execute tasks in parallel
num_workers = 4
futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)

for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)


Подробнее здесь: https://stackoverflow.com/questions/793 ... the-code-p
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»