Почему моя функция получения_обработанного_видео не получает полный объем данныхPython

Программы на Python
Ответить
Anonymous
 Почему моя функция получения_обработанного_видео не получает полный объем данных

Сообщение Anonymous »

Я пытаюсь создать кластерную систему обработки с клиентом, брокером и узлами. При выполнении функции получения_обработанного_видео она перестает получать данные через случайное время. Есть ли что-то, что я упускаю?
КОД БРОКЕРА

Код: Выделить всё

import socket
import threading
import os

# Configuraciones del broker
BROKER_HOST = 'localhost'
BROKER_PORT = 5000
NODE_PORT = 6000

# Guardar la lista de nodos
NODES = []
NODES_LOCK = threading.Lock()

def print_nodes():
"""Función para imprimir la lista actual de nodos conectados."""
with NODES_LOCK:
print("Nodos conectados:")
for i, node_socket in enumerate(NODES, start=1):
print(f"  Nodo {i}: {node_socket.getpeername()}")
if not NODES:
print("  No hay nodos conectados.")

def handle_node_registration(node_socket):
with NODES_LOCK:
NODES.append(node_socket)
print("Nodo registrado.")
print_nodes()

try:
while True:
node_socket.recv(1024)  # Mantener la conexión abierta
except:
with NODES_LOCK:
NODES.remove(node_socket)
print("Nodo desconectado.")
print_nodes()

def node_listener():
broker_node_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
broker_node_socket.bind((BROKER_HOST, NODE_PORT))
broker_node_socket.listen(5)
print(f"Broker de nodos corriendo en {BROKER_HOST}:{NODE_PORT}")

while True:
node_socket, _ = broker_node_socket.accept()
threading.Thread(target=handle_node_registration, args=(node_socket,)).start()

def distribute_video_to_nodes(video_data):
with NODES_LOCK:
if len(NODES) == 0:
print("No hay nodos disponibles.")
return

chunk_size = len(video_data) // len(NODES)
for i, node_socket in enumerate(NODES):
start_byte = i * chunk_size
end_byte = (i + 1) * chunk_size if i != len(NODES) - 1 else len(video_data)
video_chunk = video_data[start_byte:end_byte]
print(f"Enviando bytes del video a Nodo {i + 1}...")

# Enviar tamaño de los bytes y luego los datos del video
data_length = str(len(video_chunk)).encode()
node_socket.sendall(data_length.ljust(16))  # Enviar el tamaño ajustado a 16 bytes
node_socket.sendall(video_chunk)
print(f"Bytes enviados al Nodo {i + 1}.")

def receive_processed_video():
processed_video_parts = []
expected_length = 5297096  # Tamaño esperado del video (ajústalo según tu video)

with NODES_LOCK:
for i, node_socket in enumerate(NODES):
print(f"Recibiendo datos procesados del Nodo {i + 1}...")

# Recibir datos procesados en bloques
data = b""
retries = 5  # Número máximo de reintentos
while len(data) < expected_length and retries > 0:
try:
packet = node_socket.recv(4096)  # 128 KB buffer size
if not packet:
break  # Salir cuando el nodo cierra la conexión
data += packet
print(f"Recibido: {len(data)} bytes hasta ahora del Nodo {i + 1}")
except socket.timeout:
print("Tiempo de espera agotado.  Intentando continuar...")
retries -= 1
if retries == 0:
print(f"Error: No se pudieron recibir todos los datos del Nodo {i + 1}")
break

if len(data) >= expected_length:
processed_video_parts.append(data)
print(f"Datos procesados recibidos completamente del Nodo {i + 1}.")
else:
print(f"Error: Datos incompletos recibidos del Nodo {i + 1}.")
print(f"Bytes esperados: {expected_length}, bytes recibidos: {len(data)}")

complete_video_data = b''.join(processed_video_parts)
print(f"Total de bytes recibidos después de la concatenación: {len(complete_video_data)}")

if len(complete_video_data) >= expected_length:
output_file = process_video_data(complete_video_data)
return complete_video_data
else:
print("Error: Los datos recibidos no coinciden con el tamaño esperado.")
return None

def process_video_data(data):
with open("video_guardado.mp4", "wb") as video_file:
video_file.write(data)
print("Video guardado como video_guardado.mp4")
return data

def broker_server():
broker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
broker_socket.bind((BROKER_HOST, BROKER_PORT))
broker_socket.listen(5)
print(f"Broker de clientes corriendo en {BROKER_HOST}:{BROKER_PORT}")

while True:
client_socket, _ = broker_socket.accept()
print("Cliente conectado.")

# Recibir el video del cliente
print("Recibiendo video del cliente...")
video_length = int(client_socket.recv(1024).decode())
video_data = b""
while len(video_data) <  video_length:
print(f"Recibido: {len(video_data)} / {video_length}")
video_data += client_socket.recv(4096)

print("Video completo recibido.")

# Enviar el video a los nodos
distribute_video_to_nodes(video_data)

# Recibir el video procesado de los nodos
processed_video = receive_processed_video()

if processed_video:
with open("final_video.mp4", "wb") as video_file:
video_file.write(processed_video)
print("Video procesado guardado como 'final_video.mp4'.")

# Enviar el video procesado al cliente
client_socket.sendall(processed_video)
print("Video procesado enviado al cliente.")
else:
print("Error: No se recibieron datos procesados.")

client_socket.close()

if __name__ == "__main__":
threading.Thread(target=node_listener, daemon=True).start()
broker_server()

КОД УЗЛА

Код: Выделить всё

import socket
import cv2

# Configuraciones del nodo
BROKER_HOST = '127.0.0.1'
BROKER_PORT = 6000  # Puerto donde escucha el broker

def process_video_data(video_data):
# Guardar el video recibido en un archivo llamado video_recibido.mp4
with open("video_recibido.mp4", "wb") as video_file:
video_file.write(video_data)
print("Video guardado como video_recibido.mp4")

# Abrir el archivo guardado localmente usando OpenCV
cap = cv2.VideoCapture("video_recibido.mp4")

if not cap.isOpened():
raise Exception("Error al abrir el archivo de video.")

# Inicialización del sustractor de fondo
fgbg = cv2.createBackgroundSubtractorKNN()

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

output_file = "video_grises.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height), isColor=False)

while True:
ret, frame = cap.read()
if not ret:
break  # Fin del video

# Aplicamos el sustractor de fondo y convertimos a escala de grises
img_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
out.write(img_gray)

cap.release()
out.release()

print("Video procesado y guardado como:", output_file)
return output_file

def node_server():
node_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
node_socket.connect((BROKER_HOST, BROKER_PORT))
print(f"Nodo conectado al broker en {BROKER_HOST}:{BROKER_PORT}")

try:
while True:
# Recibir el tamaño de los datos
data_length = node_socket.recv(16).strip()
if not data_length:
continue
data_length = int(data_length.decode())
# Recibir el video en bytes
video_data = b""
while len(video_data) <  data_length:
packet = node_socket.recv(65536)  # Incrementa el tamaño del buffer de recepción a 64 KB
if not packet:
break
video_data += packet
print("Video recibido")

# Procesar el video recibido
output_file = process_video_data(video_data)

# Enviar el video procesado al broker usando el mismo socket
send_video_to_broker(node_socket, output_file)

except Exception as e:
print(f"Error: {e}")

finally:
node_socket.close()  # Cerrar el socket solo al finalizar toda la operación
print("Conexión con el broker cerrada.")

def send_video_to_broker(node_socket, output_file):
# Leer el video procesado
chunk_size = 1024 * 4 # 64 KB
with open(output_file, "rb") as video_file:
while True:
video_chunk = video_file.read(chunk_size)
if not video_chunk:
break
node_socket.sendall(video_chunk)
print(f"Enviado chunk de {len(video_chunk)} bytes al broker")
print("Video procesado enviado al broker")

if __name__ == "__main__":
node_server()

Я пытался увеличить размер буфера, чтобы изменить способ отправки видео узлом. Ничто из этого не сработало, оно останавливается после получения случайного количества байт

Подробнее здесь: https://stackoverflow.com/questions/792 ... nt-of-data
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»