Уменьшите задержку с помощью программного переключателя rtspPython

Программы на Python
Ответить
Anonymous
 Уменьшите задержку с помощью программного переключателя rtsp

Сообщение Anonymous »

Я работал над сценарием, который использует gstreamer для создания конвейера данных для переключения между камерами, с умеренным успехом, потому что программное обеспечение делает то, что должно, но имеет большую задержку ввода, задержку до 3 секунд от камера на экран. Есть какие-нибудь советы, как это исправить?

Код: Выделить всё

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GstRtspServer, GLib
import os
import threading
import time
from urllib.parse import urlparse
import socket

os.environ['GST_DEBUG'] = '2'
os.environ['GST_BUFFER_SIZE'] = '1024'

class RTSPMultiStreamServer:
def __init__(self, config_file="rtsp_sources.txt", port="8555"):
Gst.init(None)

self.config_file = config_file
self.sources = []
self.active_sources = []
self.current_source = 0
self.running = True
self.pipeline = None
self.last_switch_time = 0  # Time of last switch
self.switch_cooldown = 1.0  # Minimum time between switches (seconds)
self.switching = False  # Flag to prevent simultaneous switches

self.verify_sources()

if not self.active_sources:
print("No active RTSP sources available")
return

self.server = GstRtspServer.RTSPServer()
self.server.set_service(port)

self.factory = GstRtspServer.RTSPMediaFactory()
self.factory.set_shared(True)
self.factory.set_latency(0)

self.factory.connect('media-configure', self.on_media_configure)

self.setup_pipeline()
mounts = self.server.get_mount_points()
if mounts is None:
print("Error: Could not get mount points")
return

try:
mounts.remove_factory("/test")
except Exception:
pass  # Ignore if doesn't exist

mounts.add_factory("/test", self.factory)
print(f"Mount point added: /test on port {port}")

def verify_rtsp_url(self, url, timeout=2):
"""Verifies if an RTSP stream is active"""
try:
# Create a simpler test pipeline
test_pipeline_str = (
f'rtspsrc location="{url}" latency=0 protocols=tcp ! '
'fakesink sync=false'
)

test_pipeline = Gst.parse_launch(test_pipeline_str)

# Try to play
ret = test_pipeline.set_state(Gst.State.PLAYING)
#time.sleep(1)

# Check state
state = test_pipeline.get_state(timeout * Gst.SECOND)

# Cleanup
test_pipeline.set_state(Gst.State.NULL)

return state[0] == Gst.StateChangeReturn.SUCCESS

except Exception as e:
print(f"Error verifying {url}: {str(e)}")
return False

def verify_sources(self):
"""Reads and verifies all RTSP sources"""
if not os.path.exists(self.config_file):
with open(self.config_file, "w") as f:
f.write("rtsp://example1.com/stream1\nrtsp://example2.com/stream2")
print(f"File {self.config_file} created.  Please add valid RTSP URLs.")
return

with open(self.config_file, "r") as f:
self.sources = [line.strip() for line in f if line.strip()]

print("\nVerifying RTSP sources...")
for url in self.sources:
print(f"\nVerifying {url}...", end=" ", flush=True)
if self.verify_rtsp_url(url):
self.active_sources.append(url)
print("✓ ACTIVE")
else:
print("✗ INACTIVE")

print(f"\nSources summary:")
print(f"Total sources: {len(self.sources)}")
print(f"Active sources: {len(self.active_sources)}")
print(f"Inactive sources: {len(self.sources) - len(self.active_sources)}")

if self.active_sources:
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
print(f"{i}. {url}")

if len(self.sources) - len(self.active_sources) > 0:
print("\nInactive sources:")
inactive = set(self.sources) - set(self.active_sources)
for i, url in enumerate(inactive, 1):
print(f"{i}. {url}")
print()

def setup_pipeline(self):
"""Sets up the complete pipeline with active sources"""
if not self.active_sources:
print("No active sources to configure pipeline")
return

# Create pipelines for each source
source_pipelines = []
for i, url in enumerate(self.active_sources):
source_pipeline = f"""
rtspsrc location="{url}" !
decodebin ! videoconvert !
videoscale ! video/x-raw,width=854,height=480 !
queue max-size-buffers=1 ! selector.sink_{i}
"""
source_pipelines.append(source_pipeline)

# Selector and encoder pipeline
output_pipeline = """
input-selector name=selector sync-mode=0 !
x264enc tune=zerolatency bitrate=1000 speed-preset=ultrafast
key-int-max=30 vbv-buf-capacity=1000 !
rtph264pay name=pay0 pt=96 config-interval=1
"""

# Join all pipelines
full_pipeline = " ".join(source_pipelines) + output_pipeline

try:
self.factory.set_launch(full_pipeline)
self.factory.set_shared(True)
self.factory.set_latency(0)

# Connect signal to get pipeline
self.factory.connect('media-configure', self.on_media_configure)

print("Pipeline configured successfully")
except Exception as e:
print(f"Error configuring pipeline: {str(e)}")

def on_media_configure(self, factory, media):
"""Callback when media is configured"""
self.pipeline = media.get_element()

# Configure initial selector
selector = self.pipeline.get_by_name("selector")
if selector:
selector.set_property("active-pad",
selector.get_static_pad("sink_0"))
return True

def switch_source(self):
"""Switches to next source with rapid change protection"""
if not self.active_sources or not self.pipeline:
return False

# Check if switch is in progress
if self.switching:
print("\nWait...  switch in progress")
return False

# Check cooldown
current_time = time.time()
time_since_last_switch = current_time - self.last_switch_time

if time_since_last_switch < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last_switch, 1)
print(f"\nWait {remaining}s to switch again")
return False

try:
self.switching = True

# Calculate next source
next_source = (self.current_source + 1) % len(self.active_sources)
url = self.active_sources[next_source]

# Switch to next source
selector = self.pipeline.get_by_name("selector")
if selector:
# Prepare switch
next_pad = selector.get_static_pad(f"sink_{next_source}")
if not next_pad:
print("\nError: Pad not found")
return False

# Perform switch
selector.set_property("active-pad", next_pad)
self.current_source = next_source
self.last_switch_time = current_time

print(f"\nSwitching to source {self.current_source + 1}/{len(self.active_sources)}: "
f"{self.active_sources[self.current_source]}")

return True

except Exception as e:
print(f"\nError switching source: {str(e)}")
return False

finally:
self.switching = False

def source_switcher(self):
while self.running:
#time.sleep(10)
if not self.running:
break
self.switch_source()

def handle_commands(self):
"""Handles user input commands"""
while self.running:
try:
command = input().strip().lower()
if command == 'n' or command == 'next':
self.switch_source()
elif command == 'l' or command == 'list':
self.list_sources()
elif command == 'c' or command == 'cooldown':
self.show_cooldown()
elif command == 'h' or command == 'help':
self.print_help()
elif command == 'q' or command == 'quit':
self.running = False
print("\nStopping server...")
break
except EOFError:
continue
except KeyboardInterrupt:
self.running = False
print("\nStopping server...")
break

def list_sources(self):
"""Shows list of active sources"""
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
if i - 1 == self.current_source:
print(f"→ {i}. {url} (CURRENT)")
else:
print(f"  {i}.  {url}")

def print_help(self):
"""Shows available command help"""
print("\nAvailable commands:")
print("  n, next     - Switch to next stream")
print("  l, list     - List active streams")
print("  c, cooldown - Show remaining time to switch")
print("  h, help     - Show this help")
print("  q, quit     - Exit program")

def start(self):
if not self.active_sources:
print("Error: No active RTSP sources")
return False

try:
if not self.server.attach(None):
print("Error: Could not attach server to context")
return False

print(f"\nRTSP Server started:")
print(f"URL: rtsp://localhost:{self.server.get_service()}/test")
print(f"Active sources in use: {len(self.active_sources)}")
self.list_sources()
self.print_help()

# Start thread for GLib main loop
loop = GLib.MainLoop()
loop_thread = threading.Thread(target=loop.run)
loop_thread.daemon = True
loop_thread.start()

# Handle commands in main thread
self.handle_commands()

# Cleanup
loop.quit()

except Exception as e:
print(f"Error starting server: {str(e)}")
return False

return True

def show_cooldown(self):
"""Shows remaining cooldown time"""
time_since_last = time.time() - self.last_switch_time
if time_since_last < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last, 1)
print(f"\nRemaining time to switch: {remaining}s")
else:
print("\nYou can switch sources now")

def main():
try:
server = RTSPMultiStreamServer(port="8555")
server.start()
except KeyboardInterrupt:
print("\nProgram terminated by user")
except Exception as e:
print(f"Error: {str(e)}")

if __name__ == "__main__":
main()
Идея кода заключается в том, чтобы иметь возможность переключаться между несколькими потоками на одном выходе, но из-за такой большой задержки сложно применить это в реальном приложении.


Подробнее здесь: https://stackoverflow.com/questions/793 ... tsp-switch
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»