У меня есть сервер, который имеет открытое соединение WebSocket с одним клиентским приложением. Это клиентское приложение (приложение для Android) может отправлять аудиоданные с микрофона в реальном времени. Что сервер должен сделать в ответ на получение этих данных, так это ответить частичной транскрипцией, чтобы пользователь мог видеть, что он говорит, расшифрованным в реальном времени. Для этого я использую Google Speech-to-text API.
Мне также хорошо известно, что в Android есть встроенный распознаватель речи, который обеспечивает именно это.
Сервер запускается с использованием asyncio.run, и входящие данные передаются обработчикам, которые используют асинхронные методы. Это методы, на которые возложена ответственность за обработку приема аудиокадра:
Код: Выделить всё
elif action == util.ActionMessages.AUDIO_FRAME:
audio_id, audio = content["id"], content["audio"]
await self._audio_handler.receive_audio(audio, audio_id)
# Audio handler method
class AudioHandler:
def __init__(self, client_handler: ClientHandler):
self._client_handler = client_handler
self._audio_finished = dict()
self._is_streaming = False
self._audio_queue = queue.Queue()
self._languages = "en-US"
self._speech_client = speech.SpeechClient()
config = speech.RecognitionConfig(...)
self._streaming_config = speech.StreamingRecognitionConfig(...)
self._executor = ThreadPoolExecutor(max_workers=1)
self._request_built = False
async def receive_audio(self, content: str | None, audio_id: str):
is_audio_complete = self._audio_finished.setdefault(audio_id, False)
if content and not is_audio_complete:
self._is_streaming = True
content = base64.b64decode(content)
self._audio_queue.put(content)
if not self._request_built:
future = self._executor.submit(self._build_requests)
future.add_done_callback(lambda f: self._on_audio_processing_complete(f, audio_id))
self._request_built = True
elif is_audio_complete:
# TODO: Implement audio processing complete like clean up dictionary
pass
else:
self._request_built = False
self._is_streaming = False
self._audio_queue.put(None)
def _on_audio_processing_complete(self, future, audio_id):
self._audio_finished[audio_id] = True
self._request_built = False
def _read_audio(self):
while self._is_streaming:
chunk = self._audio_queue.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = self._audio_queue.get_nowait()
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b"".join(data)
def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = " " * (num_chars_printed - len(transcript))
# Send transcript clients
print(transcript + overwrite_chars)
if not result.is_final:
num_chars_printed = len(transcript)
else:
self._is_streaming = False
return
def _build_requests(self):
audio_generator = self._read_audio()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)
responses = self._speech_client.streaming_recognize(self._streaming_config, requests)
self._listen_print_loop(responses)
Логика использования API речи Google: во многом основано на их документах.
Проблема
Как вы понимаете, печать на стороне сервера транскрипции — это не то, что мне нужно. Я хотел бы отправить эти частичные транскрипции в мое клиентское приложение. Однако метод, который я использую для отправки сообщений через сокет, является асинхронным, и поэтому в этой реализации его нельзя отправить из метода _listen_print_loop, поскольку он сам по себе не является асинхронным. Вот что я имею в виду:Код: Выделить всё
def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = " " * (num_chars_printed - len(transcript))
# Cannot do this!
await send_to_client(transcript + overwrite_chars)
if not result.is_final:
num_chars_printed = len(transcript)
else:
self._is_streaming = False
return
Я относительно новичок в asyncio, буду очень признателен за любые указатели!
Изменить: использование asyncio .run_coroutine_threadsafe()
Я безрезультатно пробовал следующее:
Код: Выделить всё
def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = " " * (num_chars_printed - len(transcript))
# Send on current even loop passed to client handler
print(transcript + overwrite_chars)
asyncio.run_coroutine_threadsafe(
send_to_clients(transcript + overwrite_chars),
self._client_handler.loop,
)
if not result.is_final:
num_chars_printed = len(transcript)
else:
self._is_streaming = False
return
Код: Выделить всё
async def launch_server():
# For threads
client_handler.loop = asyncio.get_running_loop()
ip_address = "0.0.0.0"
port = int(os.getenv("PORT"))
server = await websockets.serve(
websocket_server,
ip_address,
port,
process_request=health_check
)
await asyncio.shield(server.wait_closed())
if __name__ == "__main__":
asyncio.run(launch_server())
Соединение WebSocket закрыто: закрывающий кадр не получен и не отправлен.
Изменить: использование voice.SpeechAsyncClient
Я обнаружил, что речевой модуль предлагает речь.SpeechAsyncClient (ссылка), которую я использовал следующим образом:
Код: Выделить всё
async def receive_audio(self, content: str | None, audio_id: str):
is_audio_complete = self._audio_finished.setdefault(audio_id, False)
if content and not is_audio_complete:
self._is_streaming = True
content = base64.b64decode(content)
await self._audio_queue.put(content)
if not self._request_built:
self._request_built = True
await self._build_requests()
elif is_audio_complete:
pass
else:
self._request_built = False
self._is_streaming = False
await self._audio_queue.put(None)
async def _read_audio(self):
print("Reading audio")
config_request = speech.StreamingRecognizeRequest()
config_request.streaming_config = self._streaming_config
yield config_request
while self._is_streaming:
chunk = await self._audio_queue.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = await self._audio_queue.get_nowait()
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
request = speech.StreamingRecognizeRequest()
request.audio_content = b"".join(data)
yield request
async def _build_requests(self):
audio_generator = self._read_audio()
responses = await self._speech_client.streaming_recognize(
requests=audio_generator,
)
print("Listening for audio")
await self._listen_print_loop(responses)
Изменить 2.1: забыл упомянуть, что asyncio.Queue используется здесь
Редактировать 2.2: я реализовал эту функцию в рабочем файле, используя этот метод (и непосредственно мой микрофон), и она работает. Проблема здесь по-прежнему в том, что генератор никогда не вызывается (print("Чтение аудио") никогда не достигается. Это наводит меня на мысль, что именно так я обрабатываю asyncio.
Подробнее здесь: https://stackoverflow.com/questions/789 ... ocket-conn