Я работаю над проектом, который предполагает сжатие видео с помощью OpenCV. Я пытаюсь уменьшить размер файла и протестировать различные сценарии, которые обеспечат максимально быстрое сжатие или минимальный размер сжатия.
import time
import os
import numpy as np
import cv2
import dask
from dask import delayed, compute
import dask.bag as db
import math
import multiprocessing
from multiprocessing.pool import ThreadPool
import asyncio
import logging
import psutil
import matplotlib.pyplot as plt
# Set up logging
logging.basicConfig(filename='video_compression.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Check if a video file is valid
def validate_video_file(file_path):
if not os.path.exists(file_path):
logging.error(f"File {file_path} not found.")
return False
cap = cv2.VideoCapture(file_path)
if not cap.isOpened():
logging.error(f"Unable to open video file {file_path}")
cap.release()
return False
cap.release()
return True
# Display progress bar in the console
def display_progress_bar(progress, total, bar_length=50):
percent = int(progress / total * 100)
bar = '█' * int(bar_length * (progress / total)) + '-' * (bar_length - int(bar_length * (progress / total)))
print(f"\r\033[91m[{bar}] {percent}%\033[0m", end='')
# Async frame reading utility
async def read_frame(cap):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, cap.read)
# Process frame batches with optional GPU support and other operations
def process_frame_batch(batch, out, **kwargs):
for frame in batch:
# GPU and Grayscale processing
if kwargs.get('grayscale', False):
if cv2.cuda.getCudaEnabledDeviceCount() > 0:
frame = cv2.cuda.cvtColor(frame, cv2.COLOR_BGR2GRAY)
else:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Resize (if specified)
if 'resolution' in kwargs:
frame = cv2.resize(frame, kwargs['resolution'])
# FPS adjustment (if specified)
if 'fps' in kwargs:
out.set(cv2.CAP_PROP_FPS, kwargs['fps'])
out.write(frame)
# Sequential video compression with batch processing and memory-efficient loading
def compress_videos_sequential(input_videos, output_folder, test_name, **kwargs):
times = []
for video in input_videos:
if not validate_video_file(video):
logging.error(f"Skipping {video} due to validation failure.")
continue
start_time = time.time()
cap = cv2.VideoCapture(video)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Auto-adjust batch size based on available memory
available_memory = psutil.virtual_memory().available
batch_size = min(2000, int(available_memory // (total_frames * 480 * 640 * 3))) # Adjusted batch size
fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_path = os.path.join(output_folder, f"compressed_{test_name}_{os.path.basename(video)}.avi")
out = cv2.VideoWriter(output_path, fourcc, kwargs.get('fps', 20.0), kwargs.get('resolution', (640, 480)))
frame_batch = []
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_batch.append(frame)
frame_count += 1
# Process batch
if len(frame_batch) == batch_size:
process_frame_batch(frame_batch, out, **kwargs)
frame_batch = []
# Dynamically update progress based on memory usage
if frame_count % math.ceil(total_frames / 10) == 0:
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=0.1)
disk_usage = psutil.disk_usage(output_folder).percent
logging.info(f"Memory Usage: {memory_usage}%, CPU Usage: {cpu_usage}%, Disk Usage: {disk_usage}%")
display_progress_bar(frame_count, total_frames)
# Process remaining frames in the last batch
if frame_batch:
process_frame_batch(frame_batch, out, **kwargs)
cap.release()
out.release()
times.append(time.time() - start_time)
# Log the time taken for this video
logging.info(f"Compression time for {video}: {times[-1]:.2f}s")
return times
# Parallel video compression using Dask with optimized processing
@delayed
def compress_video_dask(video, output_folder, test_name, batch_size=500, **kwargs):
if not validate_video_file(video):
logging.error(f"Skipping {video} due to validation failure.")
return 0
start_time = time.time()
cap = cv2.VideoCapture(video)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_path = os.path.join(output_folder, f"compressed_{test_name}_{os.path.basename(video)}.avi")
out = cv2.VideoWriter(output_path, fourcc, kwargs.get('fps', 20.0), kwargs.get('resolution', (640, 480)))
frame_batch = []
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_batch.append(frame)
frame_count += 1
# Process batch
if len(frame_batch) == batch_size:
process_frame_batch(frame_batch, out, **kwargs)
frame_batch = []
# Throttle progress updates (update every 10% of frames)
if frame_count % math.ceil(total_frames / 10) == 0:
display_progress_bar(frame_count, total_frames)
# Process remaining frames in the last batch
if frame_batch:
process_frame_batch(frame_batch, out, **kwargs)
cap.release()
out.release()
return time.time() - start_time
# Optimized Dask parallel compression with dynamic worker pool management
def compress_videos_parallel(input_videos, output_folder, scheduler='threads', **kwargs):
cpu_count = multiprocessing.cpu_count()
pool_size = min(len(input_videos), cpu_count) # Limit pool size to available CPUs or video count
# Configure Dask to use the specified scheduler
tasks = []
for video in input_videos:
task = compress_video_dask(video, output_folder, **kwargs)
tasks.append(task)
times = compute(*tasks, scheduler=scheduler) # Use specified scheduler
return times
# Enhanced test runner with benchmarking and logging
def run_tests(test_cases, scheduler='threads'):
sequential_times = []
dask_times = []
for i, test in enumerate(test_cases):
test_name = f"test_{i + 1}_{test['name']}" # Unique test name for each case
print(f"\nRunning Test Case {i + 1}: {test['name']}")
logging.info(f"Test Case {i + 1}: {test['name']} started.")
# Initialize timing variables for the test case
sequential_start_time = time.time()
# Sequential Compression
print("\nRunning sequential compression...")
seq_time = compress_videos_sequential(test['videos'], 'output_sequential', test_name, **test['params'])
total_seq_time = time.time() - sequential_start_time
sequential_times.append(total_seq_time)
print(f"Sequential Compression Time: {total_seq_time:.2f}s")
# Dask Compression
print("\nRunning Dask parallel compression...")
dask_start_time = time.time() # measure only Dask time
dask_time = compress_videos_parallel(test['videos'], 'output_dask', scheduler=scheduler, test_name = test_name, **test['params'])
total_dask_time = time.time() - dask_start_time
dask_times.append(total_dask_time)
print(f"Dask Parallel Compression Time: {total_dask_time:.2f}s")
# Log the total time for the test case
total_time_test = total_seq_time + total_dask_time
logging.info(f"Test Case {i + 1}: {test['name']} completed in {total_time_test:.2f}s.")
print(f"Total Test Time: {total_time_test:.2f}s")
# Plot side-by-side bar chart
plt.figure(figsize=(10, 6))
index = range(len(test_cases))
bar_width = 0.35
plt.bar(index, sequential_times, bar_width, label='Sequential')
plt.bar([i + bar_width for i in index], dask_times, bar_width, label='Dask Parallel')
plt.xlabel('Test Cases')
plt.ylabel('Time (seconds)')
plt.title('Compression Time Comparison: Sequential vs Dask Parallel (Optimized)')
plt.xticks([i + bar_width / 2 for i in index], [tc['name'] for tc in test_cases], rotation=45, ha='right')
plt.legend()
plt.tight_layout()
plt.savefig('compression_time_comparison.png')
plt.show()
# Example test cases
test_cases = [
{"name": "Default Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {}}
]
if __name__ == "__main__":
run_tests(test_cases, scheduler='threads') # can change 'threads' to 'processes' or 'single-threaded' if needed
Что я пробовал:
Форматы файлов: я пробовал сохранять выходные видео в различных форматах ( например, .mp4, .avi), но проблема остается.
Проблемы с кодеком: я убедился, что используемый кодек совместим с форматом, но он по-прежнему не работает. . Например, я использовал cv2.VideoWriter_fourcc(*'XVID') для файлов .avi.
Установка OpenH264: я установил openh264-1.8.0-win64.dll в надежде решить проблему. проблемы с кодеком, но это тоже не решило проблему.
Несмотря на то, что тестовый пример можно запустить, три выходных файла не могут быть запускаться/открываться любым медиаплеером.
Я работаю над проектом, который предполагает сжатие видео с помощью OpenCV. Я пытаюсь уменьшить размер файла и протестировать различные сценарии, которые обеспечат максимально быстрое сжатие или минимальный размер сжатия. [code]import time import os import numpy as np import cv2 import dask from dask import delayed, compute import dask.bag as db import math import multiprocessing from multiprocessing.pool import ThreadPool import asyncio import logging import psutil import matplotlib.pyplot as plt
# Set up logging logging.basicConfig(filename='video_compression.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Check if a video file is valid def validate_video_file(file_path): if not os.path.exists(file_path): logging.error(f"File {file_path} not found.") return False cap = cv2.VideoCapture(file_path) if not cap.isOpened(): logging.error(f"Unable to open video file {file_path}") cap.release() return False cap.release() return True
# Display progress bar in the console def display_progress_bar(progress, total, bar_length=50): percent = int(progress / total * 100) bar = '█' * int(bar_length * (progress / total)) + '-' * (bar_length - int(bar_length * (progress / total))) print(f"\r\033[91m[{bar}] {percent}%\033[0m", end='')
# Process frame batches with optional GPU support and other operations def process_frame_batch(batch, out, **kwargs): for frame in batch: # GPU and Grayscale processing if kwargs.get('grayscale', False): if cv2.cuda.getCudaEnabledDeviceCount() > 0: frame = cv2.cuda.cvtColor(frame, cv2.COLOR_BGR2GRAY) else: frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Resize (if specified) if 'resolution' in kwargs: frame = cv2.resize(frame, kwargs['resolution'])
# FPS adjustment (if specified) if 'fps' in kwargs: out.set(cv2.CAP_PROP_FPS, kwargs['fps'])
out.write(frame)
# Sequential video compression with batch processing and memory-efficient loading def compress_videos_sequential(input_videos, output_folder, test_name, **kwargs): times = []
for video in input_videos: if not validate_video_file(video): logging.error(f"Skipping {video} due to validation failure.") continue
start_time = time.time() cap = cv2.VideoCapture(video) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Auto-adjust batch size based on available memory available_memory = psutil.virtual_memory().available batch_size = min(2000, int(available_memory // (total_frames * 480 * 640 * 3))) # Adjusted batch size
# Log the time taken for this video logging.info(f"Compression time for {video}: {times[-1]:.2f}s")
return times
# Parallel video compression using Dask with optimized processing @delayed def compress_video_dask(video, output_folder, test_name, batch_size=500, **kwargs): if not validate_video_file(video): logging.error(f"Skipping {video} due to validation failure.") return 0
start_time = time.time() cap = cv2.VideoCapture(video) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Optimized Dask parallel compression with dynamic worker pool management def compress_videos_parallel(input_videos, output_folder, scheduler='threads', **kwargs): cpu_count = multiprocessing.cpu_count() pool_size = min(len(input_videos), cpu_count) # Limit pool size to available CPUs or video count
# Configure Dask to use the specified scheduler tasks = [] for video in input_videos: task = compress_video_dask(video, output_folder, **kwargs) tasks.append(task) times = compute(*tasks, scheduler=scheduler) # Use specified scheduler return times
# Enhanced test runner with benchmarking and logging def run_tests(test_cases, scheduler='threads'): sequential_times = [] dask_times = []
for i, test in enumerate(test_cases): test_name = f"test_{i + 1}_{test['name']}" # Unique test name for each case
print(f"\nRunning Test Case {i + 1}: {test['name']}") logging.info(f"Test Case {i + 1}: {test['name']} started.")
# Initialize timing variables for the test case sequential_start_time = time.time()
# Log the total time for the test case total_time_test = total_seq_time + total_dask_time logging.info(f"Test Case {i + 1}: {test['name']} completed in {total_time_test:.2f}s.") print(f"Total Test Time: {total_time_test:.2f}s")
# Plot side-by-side bar chart plt.figure(figsize=(10, 6)) index = range(len(test_cases)) bar_width = 0.35
plt.bar(index, sequential_times, bar_width, label='Sequential') plt.bar([i + bar_width for i in index], dask_times, bar_width, label='Dask Parallel')
plt.xlabel('Test Cases') plt.ylabel('Time (seconds)') plt.title('Compression Time Comparison: Sequential vs Dask Parallel (Optimized)') plt.xticks([i + bar_width / 2 for i in index], [tc['name'] for tc in test_cases], rotation=45, ha='right') plt.legend() plt.tight_layout() plt.savefig('compression_time_comparison.png') plt.show()
# Example test cases test_cases = [ {"name": "Default Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {}} ]
if __name__ == "__main__": run_tests(test_cases, scheduler='threads') # can change 'threads' to 'processes' or 'single-threaded' if needed
[/code] Что я пробовал: [list] [*]Форматы файлов: я пробовал сохранять выходные видео в различных форматах ( например, .mp4, .avi), но проблема остается. [*]Проблемы с кодеком: я убедился, что используемый кодек совместим с форматом, но он по-прежнему не работает. . Например, я использовал cv2.VideoWriter_fourcc(*'XVID') для файлов .avi. [*]Установка OpenH264: я установил openh264-1.8.0-win64.dll в надежде решить проблему. проблемы с кодеком, но это тоже не решило проблему. [/list] Несмотря на то, что тестовый пример можно запустить, три выходных файла не могут быть запускаться/открываться любым медиаплеером.