Проблемы со стоимостью публикации и подписки для потоковой передачи видео H.264 через RabbitMQPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Проблемы со стоимостью публикации и подписки для потоковой передачи видео H.264 через RabbitMQ

Сообщение Anonymous »

Я работаю над проектом по потоковой передаче видеофайла H.264 с использованием RabbitMQ (протокол AMQP) и его отображению в веб-приложении. Настройка включает захват видеокадров, их кодирование, отправку в RabbitMQ, а затем их использование и декодирование на стороне веб-приложения с помощью Flask и Flask-SocketIO.
Однако я сталкиваюсь с проблемой производительности. проблемы со скоростью публикации и подписки в RabbitMQ. Кажется, я не могу достичь более 10 сообщений в секунду. Этого недостаточно для плавной потоковой передачи видео.
Мне нужна помощь в диагностике и устранении узких мест в производительности.
Вот мой код:
  • Скрипт захвата и публикации видео:

Код: Выделить всё

# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'

# Configure logging
logging.basicConfig(level=logging.INFO)

@contextmanager
def rabbitmq_channel(host):
"""Context manager to handle RabbitMQ channel setup and teardown."""
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
try:
yield channel
finally:
connection.close()

def initialize_rabbitmq(channel):
"""Initialize RabbitMQ exchange and queue, and bind them together."""
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)

def send_frame(channel, frame):
"""Encode the video frame using FFmpeg and send it to RabbitMQ."""
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'rawvideo',
'-pix_fmt', 'rgb24',
'-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
'-i', 'pipe:0',
'-f', 'h264',
'-vcodec', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'pipe:1'
]

start_time = time.time()
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate(input=frame.tobytes())
encoding_time = time.time() - start_time

if process.returncode != 0:
logging.error("ffmpeg error: %s", err.decode())
raise RuntimeError("ffmpeg error")

frame_size = len(out)
logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
timestamp = time.time()
formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
logging.info(f"Timestamp: {timestamp}")
logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
timestamp_bytes = struct.pack('d', timestamp)
message_body = timestamp_bytes + out
channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
logging.info(f"Encoding time: {encoding_time:.4f} seconds")

def capture_video(channel):
"""Read video from the file, encode frames, and send them to RabbitMQ."""
if not os.path.exists(VIDEO_FILE_PATH):
logging.error("Error: Video file does not exist.")
return
cap = cv2.VideoCapture(VIDEO_FILE_PATH)
if not cap.isOpened():
logging.error("Error: Could not open video file.")
return
try:
while True:
start_time = time.time()
ret, frame = cap.read()
read_time = time.time() - start_time
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
send_frame(channel, frame_rgb)
cv2.imshow('Video', frame)
if cv2.waitKey(1) &  0xFF == ord('q'):
break
logging.info(f"Read time: {read_time:.4f} seconds")
finally:
cap.release()
cv2.destroyAllWindows()
  • бэкэнд (flask):

Код: Выделить всё

app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

def initialize_rabbitmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
return connection, channel

def decode_frame(frame_data):
# FFmpeg command to decode H.264 frame data
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'h264',
'-i', 'pipe:0',
'-pix_fmt', 'bgr24',
'-vcodec', 'rawvideo',
'-an', '-sn',
'-f', 'rawvideo',
'pipe:1'
]
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start_time = time.time()  # Start timing the decoding process
out, err = process.communicate(input=frame_data)
decoding_time = time.time() - start_time  # Calculate decoding time

if process.returncode != 0:
print("ffmpeg error: ", err.decode())
return None
frame_size = (960, 1280, 3)  # frame dimensions expected by the frontend
frame = np.frombuffer(out, np.uint8).reshape(frame_size)
print(f"Decoding time: {decoding_time:.4f} seconds")
return frame

def format_timestamp(ts):
dt = datetime.fromtimestamp(ts)
return dt.strftime('%H:%M:%S.%f')[:-3]

def rabbitmq_consumer():
connection, channel = initialize_rabbitmq()
for method_frame, properties, body in channel.consume(QUEUE_NAME):
message_receive_time = time.time()  # Time when the message is received

# Extract the timestamp from the message body
timestamp_bytes = body[:8]
frame_data = body[8:]
publish_timestamp = struct.unpack('d', timestamp_bytes)[0]

print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")

frame = decode_frame(frame_data)
decode_time = time.time() - message_receive_time  # Calculate decode time

if frame is not None:
_, buffer = cv2.imencode('.jpg', frame)
frame_data = buffer.tobytes()
socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
emit_time = time.time()  # Time after emitting the frame

# Log the time taken to emit the frame and its size
rtt = emit_time - publish_timestamp  # Calculate RTT from publish to emit
print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
print(f"RTT: {rtt:.4f} seconds")
print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
channel.basic_ack(method_frame.delivery_tag)

@app.route('/')
def index():
return render_template('index.html')

@socketio.on('connect')
def handle_connect():
print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')

if __name__ == '__main__':
consumer_thread = threading.Thread(target=rabbitmq_consumer)
consumer_thread.daemon = True
consumer_thread.start()
socketio.run(app, host='0.0.0.0', port=5000)

Как оптимизировать скорость публикации и подписки, чтобы обрабатывать большее количество сообщений в секунду?
Будем очень благодарны за любую помощь или предложения!
Я пытался использовать многопоточность и многопроцессорную обработку для одновременной обработки нескольких кадров, а также пытался оптимизировать функцию декодирования кадров, чтобы сделать ее быстрее, но безуспешно.>

Подробнее здесь: https://stackoverflow.com/questions/785 ... -over-rabb
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»