Я работал над сценарием, который использует gstreamer для создания конвейера данных для переключения между камерами, с умеренным успехом, потому что программное обеспечение делает то, что должно, но имеет большую задержку ввода, задержку до 3 секунд от камера на экран. Есть какие-нибудь советы, как это исправить?
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GstRtspServer, GLib
import os
import threading
import time
from urllib.parse import urlparse
import socket
os.environ['GST_DEBUG'] = '2'
os.environ['GST_BUFFER_SIZE'] = '1024'
class RTSPMultiStreamServer:
def __init__(self, config_file="rtsp_sources.txt", port="8555"):
Gst.init(None)
self.config_file = config_file
self.sources = []
self.active_sources = []
self.current_source = 0
self.running = True
self.pipeline = None
self.last_switch_time = 0 # Time of last switch
self.switch_cooldown = 1.0 # Minimum time between switches (seconds)
self.switching = False # Flag to prevent simultaneous switches
self.verify_sources()
if not self.active_sources:
print("No active RTSP sources available")
return
self.server = GstRtspServer.RTSPServer()
self.server.set_service(port)
self.factory = GstRtspServer.RTSPMediaFactory()
self.factory.set_shared(True)
self.factory.set_latency(0)
self.factory.connect('media-configure', self.on_media_configure)
self.setup_pipeline()
mounts = self.server.get_mount_points()
if mounts is None:
print("Error: Could not get mount points")
return
try:
mounts.remove_factory("/test")
except Exception:
pass # Ignore if doesn't exist
mounts.add_factory("/test", self.factory)
print(f"Mount point added: /test on port {port}")
def verify_rtsp_url(self, url, timeout=2):
"""Verifies if an RTSP stream is active"""
try:
# Create a simpler test pipeline
test_pipeline_str = (
f'rtspsrc location="{url}" latency=0 protocols=tcp ! '
'fakesink sync=false'
)
test_pipeline = Gst.parse_launch(test_pipeline_str)
# Try to play
ret = test_pipeline.set_state(Gst.State.PLAYING)
#time.sleep(1)
# Check state
state = test_pipeline.get_state(timeout * Gst.SECOND)
# Cleanup
test_pipeline.set_state(Gst.State.NULL)
return state[0] == Gst.StateChangeReturn.SUCCESS
except Exception as e:
print(f"Error verifying {url}: {str(e)}")
return False
def verify_sources(self):
"""Reads and verifies all RTSP sources"""
if not os.path.exists(self.config_file):
with open(self.config_file, "w") as f:
f.write("rtsp://example1.com/stream1\nrtsp://example2.com/stream2")
print(f"File {self.config_file} created. Please add valid RTSP URLs.")
return
with open(self.config_file, "r") as f:
self.sources = [line.strip() for line in f if line.strip()]
print("\nVerifying RTSP sources...")
for url in self.sources:
print(f"\nVerifying {url}...", end=" ", flush=True)
if self.verify_rtsp_url(url):
self.active_sources.append(url)
print("✓ ACTIVE")
else:
print("✗ INACTIVE")
print(f"\nSources summary:")
print(f"Total sources: {len(self.sources)}")
print(f"Active sources: {len(self.active_sources)}")
print(f"Inactive sources: {len(self.sources) - len(self.active_sources)}")
if self.active_sources:
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
print(f"{i}. {url}")
if len(self.sources) - len(self.active_sources) > 0:
print("\nInactive sources:")
inactive = set(self.sources) - set(self.active_sources)
for i, url in enumerate(inactive, 1):
print(f"{i}. {url}")
print()
def setup_pipeline(self):
"""Sets up the complete pipeline with active sources"""
if not self.active_sources:
print("No active sources to configure pipeline")
return
# Create pipelines for each source
source_pipelines = []
for i, url in enumerate(self.active_sources):
source_pipeline = f"""
rtspsrc location="{url}" !
decodebin ! videoconvert !
videoscale ! video/x-raw,width=854,height=480 !
queue max-size-buffers=1 ! selector.sink_{i}
"""
source_pipelines.append(source_pipeline)
# Selector and encoder pipeline
output_pipeline = """
input-selector name=selector sync-mode=0 !
x264enc tune=zerolatency bitrate=1000 speed-preset=ultrafast
key-int-max=30 vbv-buf-capacity=1000 !
rtph264pay name=pay0 pt=96 config-interval=1
"""
# Join all pipelines
full_pipeline = " ".join(source_pipelines) + output_pipeline
try:
self.factory.set_launch(full_pipeline)
self.factory.set_shared(True)
self.factory.set_latency(0)
# Connect signal to get pipeline
self.factory.connect('media-configure', self.on_media_configure)
print("Pipeline configured successfully")
except Exception as e:
print(f"Error configuring pipeline: {str(e)}")
def on_media_configure(self, factory, media):
"""Callback when media is configured"""
self.pipeline = media.get_element()
# Configure initial selector
selector = self.pipeline.get_by_name("selector")
if selector:
selector.set_property("active-pad",
selector.get_static_pad("sink_0"))
return True
def switch_source(self):
"""Switches to next source with rapid change protection"""
if not self.active_sources or not self.pipeline:
return False
# Check if switch is in progress
if self.switching:
print("\nWait... switch in progress")
return False
# Check cooldown
current_time = time.time()
time_since_last_switch = current_time - self.last_switch_time
if time_since_last_switch < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last_switch, 1)
print(f"\nWait {remaining}s to switch again")
return False
try:
self.switching = True
# Calculate next source
next_source = (self.current_source + 1) % len(self.active_sources)
url = self.active_sources[next_source]
# Switch to next source
selector = self.pipeline.get_by_name("selector")
if selector:
# Prepare switch
next_pad = selector.get_static_pad(f"sink_{next_source}")
if not next_pad:
print("\nError: Pad not found")
return False
# Perform switch
selector.set_property("active-pad", next_pad)
self.current_source = next_source
self.last_switch_time = current_time
print(f"\nSwitching to source {self.current_source + 1}/{len(self.active_sources)}: "
f"{self.active_sources[self.current_source]}")
return True
except Exception as e:
print(f"\nError switching source: {str(e)}")
return False
finally:
self.switching = False
def source_switcher(self):
while self.running:
#time.sleep(10)
if not self.running:
break
self.switch_source()
def handle_commands(self):
"""Handles user input commands"""
while self.running:
try:
command = input().strip().lower()
if command == 'n' or command == 'next':
self.switch_source()
elif command == 'l' or command == 'list':
self.list_sources()
elif command == 'c' or command == 'cooldown':
self.show_cooldown()
elif command == 'h' or command == 'help':
self.print_help()
elif command == 'q' or command == 'quit':
self.running = False
print("\nStopping server...")
break
except EOFError:
continue
except KeyboardInterrupt:
self.running = False
print("\nStopping server...")
break
def list_sources(self):
"""Shows list of active sources"""
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
if i - 1 == self.current_source:
print(f"→ {i}. {url} (CURRENT)")
else:
print(f" {i}. {url}")
def print_help(self):
"""Shows available command help"""
print("\nAvailable commands:")
print(" n, next - Switch to next stream")
print(" l, list - List active streams")
print(" c, cooldown - Show remaining time to switch")
print(" h, help - Show this help")
print(" q, quit - Exit program")
def start(self):
if not self.active_sources:
print("Error: No active RTSP sources")
return False
try:
if not self.server.attach(None):
print("Error: Could not attach server to context")
return False
print(f"\nRTSP Server started:")
print(f"URL: rtsp://localhost:{self.server.get_service()}/test")
print(f"Active sources in use: {len(self.active_sources)}")
self.list_sources()
self.print_help()
# Start thread for GLib main loop
loop = GLib.MainLoop()
loop_thread = threading.Thread(target=loop.run)
loop_thread.daemon = True
loop_thread.start()
# Handle commands in main thread
self.handle_commands()
# Cleanup
loop.quit()
except Exception as e:
print(f"Error starting server: {str(e)}")
return False
return True
def show_cooldown(self):
"""Shows remaining cooldown time"""
time_since_last = time.time() - self.last_switch_time
if time_since_last < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last, 1)
print(f"\nRemaining time to switch: {remaining}s")
else:
print("\nYou can switch sources now")
def main():
try:
server = RTSPMultiStreamServer(port="8555")
server.start()
except KeyboardInterrupt:
print("\nProgram terminated by user")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()
Идея кода заключается в том, чтобы иметь возможность переключаться между несколькими потоками на одном выходе, но из-за такой большой задержки сложно применить это в реальном приложении.
Я работал над сценарием, который использует gstreamer для создания конвейера данных для переключения между камерами, с умеренным успехом, потому что программное обеспечение делает то, что должно, но имеет большую задержку ввода, задержку до 3 секунд от камера на экран. Есть какие-нибудь советы, как это исправить? [code] import gi gi.require_version('Gst', '1.0') gi.require_version('GstRtspServer', '1.0') from gi.repository import Gst, GstRtspServer, GLib import os import threading import time from urllib.parse import urlparse import socket
# Try to play ret = test_pipeline.set_state(Gst.State.PLAYING) #time.sleep(1)
# Check state state = test_pipeline.get_state(timeout * Gst.SECOND)
# Cleanup test_pipeline.set_state(Gst.State.NULL)
return state[0] == Gst.StateChangeReturn.SUCCESS
except Exception as e: print(f"Error verifying {url}: {str(e)}") return False
def verify_sources(self): """Reads and verifies all RTSP sources""" if not os.path.exists(self.config_file): with open(self.config_file, "w") as f: f.write("rtsp://example1.com/stream1\nrtsp://example2.com/stream2") print(f"File {self.config_file} created. Please add valid RTSP URLs.") return
with open(self.config_file, "r") as f: self.sources = [line.strip() for line in f if line.strip()]
print("\nVerifying RTSP sources...") for url in self.sources: print(f"\nVerifying {url}...", end=" ", flush=True) if self.verify_rtsp_url(url): self.active_sources.append(url) print("✓ ACTIVE") else: print("✗ INACTIVE")
if self.active_sources: print("\nActive sources:") for i, url in enumerate(self.active_sources, 1): print(f"{i}. {url}")
if len(self.sources) - len(self.active_sources) > 0: print("\nInactive sources:") inactive = set(self.sources) - set(self.active_sources) for i, url in enumerate(inactive, 1): print(f"{i}. {url}") print()
def setup_pipeline(self): """Sets up the complete pipeline with active sources""" if not self.active_sources: print("No active sources to configure pipeline") return
# Create pipelines for each source source_pipelines = [] for i, url in enumerate(self.active_sources): source_pipeline = f""" rtspsrc location="{url}" ! decodebin ! videoconvert ! videoscale ! video/x-raw,width=854,height=480 ! queue max-size-buffers=1 ! selector.sink_{i} """ source_pipelines.append(source_pipeline)
# Switch to next source selector = self.pipeline.get_by_name("selector") if selector: # Prepare switch next_pad = selector.get_static_pad(f"sink_{next_source}") if not next_pad: print("\nError: Pad not found") return False
print(f"\nSwitching to source {self.current_source + 1}/{len(self.active_sources)}: " f"{self.active_sources[self.current_source]}")
return True
except Exception as e: print(f"\nError switching source: {str(e)}") return False
finally: self.switching = False
def source_switcher(self): while self.running: #time.sleep(10) if not self.running: break self.switch_source()
def handle_commands(self): """Handles user input commands""" while self.running: try: command = input().strip().lower() if command == 'n' or command == 'next': self.switch_source() elif command == 'l' or command == 'list': self.list_sources() elif command == 'c' or command == 'cooldown': self.show_cooldown() elif command == 'h' or command == 'help': self.print_help() elif command == 'q' or command == 'quit': self.running = False print("\nStopping server...") break except EOFError: continue except KeyboardInterrupt: self.running = False print("\nStopping server...") break
def list_sources(self): """Shows list of active sources""" print("\nActive sources:") for i, url in enumerate(self.active_sources, 1): if i - 1 == self.current_source: print(f"→ {i}. {url} (CURRENT)") else: print(f" {i}. {url}")
def print_help(self): """Shows available command help""" print("\nAvailable commands:") print(" n, next - Switch to next stream") print(" l, list - List active streams") print(" c, cooldown - Show remaining time to switch") print(" h, help - Show this help") print(" q, quit - Exit program")
def start(self): if not self.active_sources: print("Error: No active RTSP sources") return False
try: if not self.server.attach(None): print("Error: Could not attach server to context") return False
print(f"\nRTSP Server started:") print(f"URL: rtsp://localhost:{self.server.get_service()}/test") print(f"Active sources in use: {len(self.active_sources)}") self.list_sources() self.print_help()
# Start thread for GLib main loop loop = GLib.MainLoop() loop_thread = threading.Thread(target=loop.run) loop_thread.daemon = True loop_thread.start()
# Handle commands in main thread self.handle_commands()
# Cleanup loop.quit()
except Exception as e: print(f"Error starting server: {str(e)}") return False
return True
def show_cooldown(self): """Shows remaining cooldown time""" time_since_last = time.time() - self.last_switch_time if time_since_last < self.switch_cooldown: remaining = round(self.switch_cooldown - time_since_last, 1) print(f"\nRemaining time to switch: {remaining}s") else: print("\nYou can switch sources now")
def main(): try: server = RTSPMultiStreamServer(port="8555") server.start() except KeyboardInterrupt: print("\nProgram terminated by user") except Exception as e: print(f"Error: {str(e)}")
if __name__ == "__main__": main() [/code] Идея кода заключается в том, чтобы иметь возможность переключаться между несколькими потоками на одном выходе, но из-за такой большой задержки сложно применить это в реальном приложении.