Объединение потоков и asyncio для обработки аудиопотока через соединение WebSocket.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Объединение потоков и asyncio для обработки аудиопотока через соединение WebSocket.

Сообщение Anonymous »

Обзор
У меня есть сервер, который имеет открытое соединение WebSocket с одним клиентским приложением. Это клиентское приложение (приложение для Android) может отправлять аудиоданные с микрофона в реальном времени. Что сервер должен сделать в ответ на получение этих данных, так это ответить частичной транскрипцией, чтобы пользователь мог видеть, что он говорит, расшифрованным в реальном времени. Для этого я использую Google Speech-to-text API.
Мне также хорошо известно, что в Android есть встроенный распознаватель речи, который обеспечивает именно это.
Сервер запускается с использованием asyncio.run, и входящие данные передаются обработчикам, которые используют асинхронные методы. Это методы, на которые возложена ответственность за обработку приема аудиокадра:

Код: Выделить всё

elif action == util.ActionMessages.AUDIO_FRAME:
audio_id, audio = content["id"], content["audio"]
await self._audio_handler.receive_audio(audio, audio_id)

# Audio handler method
class AudioHandler:
def __init__(self, client_handler: ClientHandler):
self._client_handler = client_handler

self._audio_finished = dict()

self._is_streaming = False
self._audio_queue = queue.Queue()
self._languages = "en-US"

self._speech_client = speech.SpeechClient()
config = speech.RecognitionConfig(...)
self._streaming_config = speech.StreamingRecognitionConfig(...)

self._executor = ThreadPoolExecutor(max_workers=1)
self._request_built = False

async def receive_audio(self, content: str | None, audio_id: str):

is_audio_complete = self._audio_finished.setdefault(audio_id, False)
if content and not is_audio_complete:
self._is_streaming = True
content = base64.b64decode(content)
self._audio_queue.put(content)

if not self._request_built:
future = self._executor.submit(self._build_requests)
future.add_done_callback(lambda f: self._on_audio_processing_complete(f, audio_id))
self._request_built = True

elif is_audio_complete:
# TODO: Implement audio processing complete like clean up dictionary
pass

else:
self._request_built = False
self._is_streaming = False
self._audio_queue.put(None)

def _on_audio_processing_complete(self, future, audio_id):
self._audio_finished[audio_id] = True
self._request_built = False

def _read_audio(self):
while self._is_streaming:
chunk = self._audio_queue.get()
if chunk is None:
return
data = [chunk]

while True:
try:
chunk = self._audio_queue.get_nowait()
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break

yield b"".join(data)

def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue

result = response.results[0]
if not result.alternatives:
continue

transcript = result.alternatives[0].transcript

overwrite_chars = " " * (num_chars_printed - len(transcript))

# Send transcript clients
print(transcript + overwrite_chars)

if not result.is_final:
num_chars_printed = len(transcript)

else:
self._is_streaming = False
return

def _build_requests(self):
audio_generator = self._read_audio()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)

responses = self._speech_client.streaming_recognize(self._streaming_config, requests)
self._listen_print_loop(responses)
Когда аудио поступает с идентификатором высказывания (идентификатором аудио), оно заполняет очередь. При первом поступлении запускается новый поток, который создает экземпляр генератора, который считывает аудиосэмплы из очереди и преобразует их в соответствующий тип. Клиент Google Speech использует этот генератор для выполнения транскрипции. Этот речевой клиент возвращает генератор ответов, который используется методом _listen_print_loop для (на данный момент) печати ответов/транскрипций.
Логика использования API речи Google: во многом основано на их документах.

Проблема

Как вы понимаете, печать на стороне сервера транскрипции — это не то, что мне нужно. Я хотел бы отправить эти частичные транскрипции в мое клиентское приложение. Однако метод, который я использую для отправки сообщений через сокет, является асинхронным, и поэтому в этой реализации его нельзя отправить из метода _listen_print_loop, поскольку он сам по себе не является асинхронным. Вот что я имею в виду:

Код: Выделить всё

def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue

result = response.results[0]
if not result.alternatives:
continue

transcript = result.alternatives[0].transcript

overwrite_chars = " " * (num_chars_printed - len(transcript))

# Cannot do this!
await send_to_client(transcript + overwrite_chars)

if not result.is_final:
num_chars_printed = len(transcript)

else:
self._is_streaming = False
return
Мне хотелось бы знать, какое решение лучше всего подойдет для этой цели. Это переход от использования потоков к использованию только asyncio? Если да, то не будет ли это означать, что мне придется реализовать функцию асинхронного генератора? Не вызовет ли это проблем с речевым клиентом?
Я относительно новичок в asyncio, буду очень признателен за любые указатели!
Изменить: использование asyncio .run_coroutine_threadsafe()
Я безрезультатно пробовал следующее:

Код: Выделить всё

def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue

result = response.results[0]
if not result.alternatives:
continue

transcript = result.alternatives[0].transcript

overwrite_chars = " " * (num_chars_printed - len(transcript))

# Send on current even loop passed to client handler
print(transcript + overwrite_chars)
asyncio.run_coroutine_threadsafe(
send_to_clients(transcript + overwrite_chars),
self._client_handler.loop,
)

if not result.is_final:
num_chars_printed = len(transcript)

else:
self._is_streaming = False
return

В этом изменении я использую asyncio.run_coroutine_threadsafe() asyncio для запуска сопрограммы send_to_clients. Переменная цикла устанавливается при запуске следующим образом:

Код: Выделить всё

async def launch_server():
# For threads
client_handler.loop = asyncio.get_running_loop()

ip_address = "0.0.0.0"
port = int(os.getenv("PORT"))
server = await websockets.serve(
websocket_server,
ip_address,
port,
process_request=health_check
)

await asyncio.shield(server.wait_closed())

if __name__ == "__main__":
asyncio.run(launch_server())
Это решение не работает, поскольку при достижении линии для отправки расшифровки WebSocket закрывается со следующим исключением:
Соединение WebSocket закрыто: закрывающий кадр не получен и не отправлен.
Изменить: использование voice.SpeechAsyncClient
Я обнаружил, что речевой модуль предлагает речь.SpeechAsyncClient (ссылка), которую я использовал следующим образом:

Код: Выделить всё

async def receive_audio(self, content: str | None, audio_id: str):
is_audio_complete = self._audio_finished.setdefault(audio_id, False)
if content and not is_audio_complete:
self._is_streaming = True
content = base64.b64decode(content)
await self._audio_queue.put(content)

if not self._request_built:
self._request_built = True
await self._build_requests()

elif is_audio_complete:
pass

else:
self._request_built = False
self._is_streaming = False
await self._audio_queue.put(None)

async def _read_audio(self):
print("Reading audio")

config_request = speech.StreamingRecognizeRequest()
config_request.streaming_config = self._streaming_config
yield config_request

while self._is_streaming:
chunk = await self._audio_queue.get()
if chunk is None:
return
data = [chunk]

while True:
try:
chunk = await self._audio_queue.get_nowait()
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break

request = speech.StreamingRecognizeRequest()
request.audio_content = b"".join(data)
yield request

async def _build_requests(self):
audio_generator = self._read_audio()
responses = await self._speech_client.streaming_recognize(
requests=audio_generator,
)
print("Listening for audio")
await self._listen_print_loop(responses)
Это не вызывает никаких ошибок, однако по какой-то причине программа зависает при ожидании методаstreaming_recnigne(...). Точнее, генератор _read_audio() никогда не вызывается, что означает, что звук никогда не обрабатывается.
Изменить 2.1: забыл упомянуть, что asyncio.Queue используется здесь
Редактировать 2.2: я реализовал эту функцию в рабочем файле, используя этот метод (и непосредственно мой микрофон), и она работает. Проблема здесь по-прежнему в том, что генератор никогда не вызывается (print("Чтение аудио") никогда не достигается. Это наводит меня на мысль, что именно так я обрабатываю asyncio.

Подробнее здесь: https://stackoverflow.com/questions/789 ... ocket-conn
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Asyncio Async Funcitons вешает с Asyncio.gather. (Код работает без Asyncio.gather)
    Anonymous » » в форуме Python
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Почему подключение к WebSocket в Python возвращает отклоненное соединение WebSocket?
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Удалить звук системного динамика из аудиопотока
    Anonymous » » в форуме JAVA
    0 Ответы
    32 Просмотры
    Последнее сообщение Anonymous
  • Blazor воспроизводит добавление аудиопотока в ServerApp [закрыто]
    Anonymous » » в форуме C#
    0 Ответы
    20 Просмотры
    Последнее сообщение Anonymous
  • Воспроизведение аудиопотока в клиентском браузере
    Anonymous » » в форуме C#
    0 Ответы
    13 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»