У меня большая потеря пакетов при использовании UDP в Python. Я знаю, что мне следует использовать TCP, если я не хочу потери пакетов, но у меня нет (полного) контроля над отправителем.
Это камера, которая отправляет 15 изображений в секунду с использованием многоадресной рассылки UDP.
Ниже вы видите код, который я написал сейчас.
Он использует многопроцессорность, чтобы позволить функциям производителя и потребителя работать параллельно. Producer function catches the packets, consumer function processes them and writes the images to .bmp files.
I've written a class PacketStream which writes the bytes from the packages to a .bmp file.
When the camera sends a new image, it first sends one packet, with first byte = 0x01. Содержит информацию об изображении.
Затем отправляется 612 пакетов с первым байтом = 0x02. Они содержат байты изображения (508 байт/пакет).
Поскольку в секунду отправляется 15 изображений, в секунду отправляется ~9000 пакетов. Хотя это происходит с большей скоростью в пакетах на изображение, примерно 22 пакета/мс.
Я могу отлично получать все пакеты, используя tcpdump или Wireshark.
Но, используя приведенный ниже код, пакеты пропускаются.
Конечно, мой компьютер с Windows 7 должен справиться с этим? Я также использую его на Raspberry Pi 3, и там пропускается примерно такое же количество пакетов. Поэтому я думаю, что это проблема с кодом.
Я пробовал много разных вещей, таких как многопоточность вместо многопроцессорной обработки, конвейер вместо очереди.
Я также пробовал увеличить буфер сокета с помощью
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000)
безрезультатно.
Это вообще возможно в Python?
Заранее спасибо,
import time
from multiprocessing import Process, Queue
import socket
import struct
from PIL import Image
class PacketStream:
def __init__(self, output_path):
self.output_path = output_path
self.data_buffer = ''
self.img_id = -1 # -1 = waiting for start of new image
def process(self, data):
message_id = data[0]
if message_id == '\x01':
self.wrap_up_last_image()
self.img_id = ord(data[3])
self.data_buffer = ''
if message_id == '\x02':
self.data_buffer += data[6:]
def wrap_up_last_image(self):
if self.img_id > 0:
n_bytes = len(self.data_buffer)
if n_bytes == 307200:
global i
write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp',
self.data_buffer)
i += 1
else:
print 'Image lost: %s bytes missing.' % (307200 - n_bytes)
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def producer(q):
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
q.put(sock.recv(512))
def consumer(q):
packet_stream = PacketStream('D:/bmpdump/')
while True:
data = q.get()
packet_stream.process(data)
i = 0
if __name__ == '__main__':
q = Queue()
t1 = Process(target=producer, args=(q,))
t1.daemon = True # so they stop when the main prog stops
t1.start()
t2 = Process(target=consumer, args=(q,))
t2.daemon = True
t2.start()
time.sleep(10.0)
print 'Program finished.'
РЕДАКТИРОВАТЬ
Спасибо за все предложения.
1) Я уже пробовал потоковую обработку + очередь, а также ''.join(), похоже, не имело большого значения. Я совершенно уверен, что проблема в том, что поток-производитель не получает достаточного приоритета. Я не могу найти, как увеличить это значение с помощью Python? Возможно ли это вообще?
2) Используя приведенный ниже код, мне удалось потерять всего около 10%. Процессор загружен примерно на 25 % (на Raspberry Pi). Ключевым моментом является использование данных, когда в потоке пакетов возникает пауза, т. е. когда прибывает последний пакет данных
import time
import socket
import struct
from PIL import Image
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def consume(data_buffer):
img_id = ord(data_buffer[0][1])
real_data_buffer = [data[6:] for data in data_buffer]
data_string = ''.join(real_data_buffer)
global i
write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string)
i += 1
def producer(sock):
print 'Producer start'
data_buffer = []
while True:
data = sock.recvfrom(512)[0]
if data[0] == '\x01':
data_buffer = []
else:
data_buffer.append(data)
if len(data_buffer) == 612:
consume(data_buffer)
# image counter
i = 0
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000)
producer(sock)
Подробнее здесь: https://stackoverflow.com/questions/418 ... acket-loss
Получение UDP-пакетов с помощью Python, потеря пакетов ⇐ Python
Программы на Python
-
Anonymous
1766426775
Anonymous
У меня большая потеря пакетов при использовании UDP в Python. Я знаю, что мне следует использовать TCP, если я не хочу потери пакетов, но у меня нет (полного) контроля над отправителем.
Это камера, которая отправляет 15 изображений в секунду с использованием многоадресной рассылки UDP.
Ниже вы видите код, который я написал сейчас.
Он использует многопроцессорность, чтобы позволить функциям производителя и потребителя работать параллельно. Producer function catches the packets, consumer function processes them and writes the images to .bmp files.
I've written a class PacketStream which writes the bytes from the packages to a .bmp file.
When the camera sends a new image, it first sends one packet, with first byte = 0x01. Содержит информацию об изображении.
Затем отправляется 612 пакетов с первым байтом = 0x02. Они содержат байты изображения (508 байт/пакет).
Поскольку в секунду отправляется 15 изображений, в секунду отправляется ~9000 пакетов. Хотя это происходит с большей скоростью в пакетах на изображение, примерно 22 пакета/мс.
Я могу отлично получать все пакеты, используя tcpdump или Wireshark.
Но, используя приведенный ниже код, пакеты пропускаются.
Конечно, мой компьютер с Windows 7 должен справиться с этим? Я также использую его на Raspberry Pi 3, и там пропускается примерно такое же количество пакетов. Поэтому я думаю, что это проблема с кодом.
Я пробовал много разных вещей, таких как многопоточность вместо многопроцессорной обработки, конвейер вместо очереди.
Я также пробовал увеличить буфер сокета с помощью
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000)
безрезультатно.
Это вообще возможно в Python?
Заранее спасибо,
import time
from multiprocessing import Process, Queue
import socket
import struct
from PIL import Image
class PacketStream:
def __init__(self, output_path):
self.output_path = output_path
self.data_buffer = ''
self.img_id = -1 # -1 = waiting for start of new image
def process(self, data):
message_id = data[0]
if message_id == '\x01':
self.wrap_up_last_image()
self.img_id = ord(data[3])
self.data_buffer = ''
if message_id == '\x02':
self.data_buffer += data[6:]
def wrap_up_last_image(self):
if self.img_id > 0:
n_bytes = len(self.data_buffer)
if n_bytes == 307200:
global i
write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp',
self.data_buffer)
i += 1
else:
print 'Image lost: %s bytes missing.' % (307200 - n_bytes)
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def producer(q):
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
q.put(sock.recv(512))
def consumer(q):
packet_stream = PacketStream('D:/bmpdump/')
while True:
data = q.get()
packet_stream.process(data)
i = 0
if __name__ == '__main__':
q = Queue()
t1 = Process(target=producer, args=(q,))
t1.daemon = True # so they stop when the main prog stops
t1.start()
t2 = Process(target=consumer, args=(q,))
t2.daemon = True
t2.start()
time.sleep(10.0)
print 'Program finished.'
РЕДАКТИРОВАТЬ
Спасибо за все предложения.
1) Я уже пробовал потоковую обработку + очередь, а также ''.join(), похоже, не имело большого значения. Я совершенно уверен, что проблема в том, что поток-производитель не получает достаточного приоритета. Я не могу найти, как увеличить это значение с помощью Python? Возможно ли это вообще?
2) Используя приведенный ниже код, мне удалось потерять всего около 10%. Процессор загружен примерно на 25 % (на Raspberry Pi). Ключевым моментом является использование данных, когда в потоке пакетов возникает пауза, т. е. когда прибывает последний пакет данных
import time
import socket
import struct
from PIL import Image
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def consume(data_buffer):
img_id = ord(data_buffer[0][1])
real_data_buffer = [data[6:] for data in data_buffer]
data_string = ''.join(real_data_buffer)
global i
write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string)
i += 1
def producer(sock):
print 'Producer start'
data_buffer = []
while True:
data = sock.recvfrom(512)[0]
if data[0] == '\x01':
data_buffer = []
else:
data_buffer.append(data)
if len(data_buffer) == 612:
consume(data_buffer)
# image counter
i = 0
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000)
producer(sock)
Подробнее здесь: [url]https://stackoverflow.com/questions/41897484/receive-udp-packets-with-python-packet-loss[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия