Максимальная пропускная способность в канале данных Python webrtc ограничена ~ 110 мб/с для большой передачи файловPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Максимальная пропускная способность в канале данных Python webrtc ограничена ~ 110 мб/с для большой передачи файлов

Сообщение Anonymous »

Я строю инструмент передачи одноранговых файлов в Python, используя каналы данных AIORTC и WEBRTC. Он предназначен для эффективного обработки больших файлов путем чтения блоков 64 МБ от диска и отправки их в размере 256 КБ над каналом данных. Моя настройка: < /p>
Размер чтения диска: 64 МБ < /p>
Размер сетевой куски: 256 КБ < /p>
Мас. куски. < /p>
Проблема:
Даже когда оба сверстника работают на одной машине (Localhost), максимальная пропускная способность I достигает ~ 110 мб /с. BufferedAmount в канале данных всегда, по -видимому, находится на пределе MAX_BUFFERED_AMOUNT. Петля отправки постоянно ждет буферного пространства, которое, кажется, заливает пропускную способность. < /P>
Основная часть приложения: < /p>
class FileSender:
def __init__(self, file_path):
self._file_path = file_path
self._file_channel = None
self._control_channel = None
self._done = asyncio.Future()
self._buffer_event = asyncio.Event()
self._buffer_event.set()
self._chunk_queue = asyncio.Queue(maxsize=128) # enough to pipeline many chunks

def set_channel(self, channel_type, channel):
if channel_type == "file":
self._file_channel = channel
self._file_channel.on("open", self._on_both_channels_open)
self._file_channel.bufferedAmountLowThreshold = MAX_BUFFERED_AMOUNT
self._file_channel.on("bufferedamountlow", self._on_buffered_amount_low)
elif channel_type == "control":
self._control_channel = channel
self._control_channel.on("open", self._on_both_channels_open)
self._control_channel.on("message", lambda message: asyncio.create_task(self._on_control_message(message)))

def _on_both_channels_open(self):
if self._file_channel.readyState == "open" and self._control_channel.readyState == "open":
asyncio.create_task(self._start_file_transfer())

def _on_buffered_amount_low(self):
self._buffer_event.set()

async def wait_until_done(self):
await self._done

async def _file_reader(self):
"""Read large blocks from disk and push slices into the queue."""
async with aiofiles.open(self._file_path, "rb") as f:
while True:
block = await f.read(DISK_READ_SIZE)
if not block:
break
# Slice into 256 KB chunks but keep them in memory until queued
for i in range(0, len(block), NETWORK_CHUNK_SIZE):
await self._chunk_queue.put(block[i:i + NETWORK_CHUNK_SIZE])
await self._chunk_queue.put(None) # EOF

async def _start_file_transfer(self):
metadata = self._construct_metadata(self._file_path)
self._control_channel.send(ControlMessage.create_json("metadata", json.dumps(metadata)))
progress = Progress(metadata["file_size"])

reader_task = asyncio.create_task(self._file_reader())

while True:
chunk = await self._chunk_queue.get()
if chunk is None:
break

# Wait for channel buffer only if needed
while self._file_channel.bufferedAmount > MAX_BUFFERED_AMOUNT:
self._buffer_event.clear()
await self._buffer_event.wait()

self._file_channel.send(chunk)
progress.update(chunk)

await reader_task
self._control_channel.send(ControlMessage.create_json("eof", metadata["file_name"]))

class FileReceiver:
def __init__(self, path):
self._file_channel = None
self._control_channel = None
self._metadata = None
self._location = None
self._progress = None
self._path = path
self._done = asyncio.Future()
self._chunk_queue = asyncio.Queue(maxsize=256) # pipeline more chunks
self._writer_task = None

async def wait_until_done(self):
await self._done

def set_channel(self, channel_type, channel):
if channel_type == "file":
self._file_channel = channel
self._file_channel.on("message", lambda msg: self._chunk_queue.put_nowait(msg))
elif channel_type == "control":
self._control_channel = channel
self._control_channel.on("message", lambda msg: asyncio.create_task(self._on_control_message(msg)))

async def _process_file(self):
self._location = os.path.join(self._path, self._metadata["file_name"])
self._progress = Progress(self._metadata["file_size"])

async with aiofiles.open(self._location, "wb") as f:
while True:
chunk = await self._chunk_queue.get()
if chunk is None:
break
await f.write(chunk)
self._progress.update(chunk)

# checksum validation
if compute_hash(self._location) != self._metadata["hash"]:
print("[ERROR] File corrupted")
else:
print("File received successfully")

self._control_channel.send(ControlMessage.create_json("transfer_complete", self._metadata["file_name"]))
self._done.set_result(None)
self._file_channel.close()
self._control_channel.close()

async def _on_control_message(self, message):
control_message = ControlMessage.from_json(message)
if control_message.msg_type == "metadata":
self._metadata = control_message.data
print(f"Receiving file: {self._metadata['file_name']} ({self._metadata['file_size']} bytes)")
self._writer_task = asyncio.create_task(self._process_file())
elif control_message.msg_type == "eof":
await self._chunk_queue.put(None)
< /code>
Вопросы: < /p>
является ли ограничение пропускной способности, присущее одному каналу данных Python webrtc? Нужен? Если да, есть ли у вас какие -либо рекомендации? (Он должен быть на основе UDP, чтобы иметь возможность для удара отверстия.)>

Подробнее здесь: https://stackoverflow.com/questions/797 ... large-file
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»