Я разрабатываю простой голосовой бот в реальном времени, используя API Openai в реальном времени, в частности, интегрируясь с семантическим ядром. Код написан асинхронным образом, и изначально он работает хорошо. Тем не менее, я сталкиваюсь с проблемами с синхронизацией задач и циклом событий, не позволяя мне прерывать воспроизведение бота или аудио, пока он не закончится полностью. Но после этого первоначального взаимодействия я могу задать другой вопрос только после того, как бот закончил ответ. Однако, как только бот начинает реагировать, записанный звук становится нарезанным и неполным, возобновляя полную запись, только после того, как бот заканчивает воспроизведение. Когда служба OpenAI отправляет события, генератор приема обрабатывает их, но генератор ввода перестает работать, не снимая данные, в то время как другой генератор откидывает ответ. Иногда цикл событий кратко возвращается в генератор микрофона, но он только фиксирует неполный звук. Я подозреваю, что многопоточности может потребоваться, но глобальная блокировка интерпретатора Python (GIL) усложняет этот подход. Существуют ли конкретные методы или библиотеки, которые могут помочь эффективно управлять параллельностью в этом сценарии?
import asyncio
import numpy as np
from semantic_kernel.contents.realtime_events import RealtimeAudioEvent, RealtimeEvents
from acev_realtime_voice_bot.service.ports import (
AIStreamingServicePort,
AudioInputPort,
AudioOutputPort,
)
from tests.events import CallInterruptedEvent
class VoiceBotService:
def __init__(
self,
audio_in: AudioInputPort,
audio_out: AudioOutputPort,
ai_service: AIStreamingServicePort,
) -> None:
self.audio_in = audio_in
self.audio_out = audio_out
self.ai_service = ai_service
async def run(self):
async def receive_task():
async for event in self.ai_service.receive():
await asyncio.sleep(0.01)
if (
isinstance(event, RealtimeAudioEvent)
or isinstance(event, np.ndarray)
# Can convert this into a response done of OpenAI and eliminate this
# Gonna have coupling with openAI SDK, but that's ok for the moment.
):
await self.audio_out.send_audio_output(event)
if isinstance(event, CallInterruptedEvent):
break
else:
await self.event_handler(event)
receive_task_future = asyncio.create_task(receive_task())
# the problem is definitely with the async generator in sending the audio. That's the only difference i found between my code and the others.
# By saving the audio frames i get as input, it looks like the input microphone gets blocked in collecting audio! The first time or when the bot is done answeringf
# It saves all the audio and sends it, but when the bot is answering, the audio is weirdly chopped and missing!
# Some locking of the audio interface? Sync problems?
async for audio_frame in self.audio_in.get_input_audio_frames():
print("Sending audio")
await self.ai_service.send(audio_frame)
await receive_task_future
async def event_handler(self, event: RealtimeEvents):
print(event.service_type)
< /code>
localaudiorecorder (используется для записи микрофона): < /p>
class LocalAudioRecorder(AudioInputPort):
def __init__(self, device, sample_rate, channels, dtype, frame_size) -> None:
super().__init__()
self.device = device
self.sample_rate = sample_rate
self.channels = channels
self.dtype = dtype
self.frame_size = frame_size
async def get_input_audio_frames(self) -> AsyncGenerator[np.ndarray, None]:
"""Generator function to yield audio data chunks and save them to a WAV file."""
try:
with InputStream(
samplerate=self.sample_rate,
channels=self.channels,
device=self.device,
dtype=np.int16,
) as stream:
# Open a WAV file to write the audio data
with wave.open('recorded_audio.wav', 'wb') as wav_file:
wav_file.setnchannels(self.channels)
wav_file.setsampwidth(np.dtype(np.int16).itemsize)
wav_file.setframerate(self.sample_rate)
while True:
if self._is_key_pressed():
input() # Clear the input buffer
print("Stopping recording...")
break
if stream.read_available < self.frame_size:
await asyncio.sleep(0)
continue
audio_chunk, _ = stream.read(self.frame_size)
print("Read audio chunk.")
print(f"Content of audio: {audio_chunk}")
# Write the audio chunk to the WAV file
wav_file.writeframes(audio_chunk.tobytes())
await asyncio.sleep(0)
yield audio_chunk
# Check for Enter keypress to stop recording
except Exception as e:
print(f"An error occurred: {e}")
def _is_key_pressed(self):
return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])
< /code>
LocalAudioplayer (используется для воспроизведения аудио): < /p>
class LocalAudioPlayer(AudioOutputPort):
def __init__(self, channels, sample_rate) -> None:
self.channels = channels
self.sample_rate = sample_rate
self.stream = None
async def send_audio_output(
self, audio_frame: ndarray | RealtimeAudioEvent
) -> None:
if isinstance(audio_frame, RealtimeAudioEvent):
audio_frame = np.frombuffer(audio_frame.audio.data, dtype=np.int16)
if self.stream is None:
# Initialize the stream
self.stream = OutputStream(
channels=self.channels, samplerate=self.sample_rate, dtype="float32"
)
self.stream.start()
# Convert int16 audio chunk to float32 and normalize to [-1.0, 1.0]
audio_chunk = audio_frame.astype(np.float32) / np.iinfo(np.int16).max
self.stream.write(audio_chunk)
< /code>
И это адаптер для подключения OpenAI в реальном времени: < /p>
import base64
from collections.abc import AsyncGenerator, Callable, Coroutine
from typing import Any, ClassVar, Final, cast
import numpy as np
from numpy import ndarray
from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import (
AzureRealtimeExecutionSettings,
AzureRealtimeWebsocket,
)
from semantic_kernel.contents.audio_content import AudioContent
from semantic_kernel.contents.realtime_events import RealtimeAudioEvent, RealtimeEvents
from acev_realtime_voice_bot.service.ports import AIStreamingServicePort
class OpenAIRealtimeAdapter(AIStreamingServicePort):
def __init__(
self,
system_prompt: str,
endpoint: str | None = None,
api_version: str | None = None,
deployment_name: str | None = None,
) -> None:
super().__init__()
self.client = AzureRealtimeWebsocket(
endpoint=endpoint, api_version=api_version, deployment_name=deployment_name
)
self.settings = AzureRealtimeExecutionSettings(
instructions=system_prompt,
turn_detection={"type": "server_vad"},
voice="shimmer",
input_audio_format="pcm16",
output_audio_format="pcm16",
input_audio_transcription={"model": "whisper-1"},
function_choice_behavior=FunctionChoiceBehavior.Auto(),
)
async def create_session(self):
await self.client.create_session(
settings=self.settings
) # TODO: Add chathistory if needed.
async def close_session(self):
await self.client.close_session()
async def send(self, event: RealtimeEvents | np.ndarray) -> None:
if isinstance(event, np.ndarray):
event = self._cast_input_audio_to_event(event)
print("Sending event to openAI")
await self.client.send(event=event)
async def receive(
self,
audio_output_callback: Callable[[ndarray], Coroutine[Any, Any, None]]
| None = None,
) -> AsyncGenerator[RealtimeEvents, None]:
async for event in self.client.receive(
audio_output_callback=audio_output_callback
):
yield event
# TODO: Dont know if this is a smell of bad code, probably. I should unify somehow the interfaces and the data types.
def _cast_input_audio_to_event(self, audio_frame) -> RealtimeAudioEvent:
return RealtimeAudioEvent(
audio=AudioContent(
data=base64.b64encode(cast(Any, audio_frame)).decode("utf-8")
)
)
(я хотел бы добавить звуковой образец, но я не могу понять, как это сделать на Stackoverflow, если вы знаете, как, пожалуйста, дайте мне знать, и я также добавлю аудиорезку>
Я разрабатываю простой голосовой бот в реальном времени, используя API Openai в реальном времени, в частности, интегрируясь с семантическим ядром. Код написан асинхронным образом, и изначально он работает хорошо. Тем не менее, я сталкиваюсь с проблемами с синхронизацией задач и циклом событий, не позволяя мне прерывать воспроизведение бота или аудио, пока он не закончится полностью. Но после этого первоначального взаимодействия я могу задать другой вопрос только после того, как бот закончил ответ. Однако, как только бот начинает реагировать, записанный звук становится нарезанным и неполным, возобновляя полную запись, только после того, как бот заканчивает воспроизведение. Когда служба OpenAI отправляет события, генератор приема обрабатывает их, но генератор ввода перестает работать, не снимая данные, в то время как другой генератор откидывает ответ. Иногда цикл событий кратко возвращается в генератор микрофона, но он только фиксирует неполный звук. Я подозреваю, что многопоточности может потребоваться, но глобальная блокировка интерпретатора Python (GIL) усложняет этот подход. Существуют ли конкретные методы или библиотеки, которые могут помочь эффективно управлять параллельностью в этом сценарии?[code]import asyncio
import numpy as np from semantic_kernel.contents.realtime_events import RealtimeAudioEvent, RealtimeEvents
from acev_realtime_voice_bot.service.ports import ( AIStreamingServicePort, AudioInputPort, AudioOutputPort, ) from tests.events import CallInterruptedEvent
async def run(self): async def receive_task(): async for event in self.ai_service.receive(): await asyncio.sleep(0.01) if ( isinstance(event, RealtimeAudioEvent) or isinstance(event, np.ndarray) # Can convert this into a response done of OpenAI and eliminate this # Gonna have coupling with openAI SDK, but that's ok for the moment. ): await self.audio_out.send_audio_output(event)
if isinstance(event, CallInterruptedEvent): break else: await self.event_handler(event)
# the problem is definitely with the async generator in sending the audio. That's the only difference i found between my code and the others. # By saving the audio frames i get as input, it looks like the input microphone gets blocked in collecting audio! The first time or when the bot is done answeringf # It saves all the audio and sends it, but when the bot is answering, the audio is weirdly chopped and missing! # Some locking of the audio interface? Sync problems? async for audio_frame in self.audio_in.get_input_audio_frames(): print("Sending audio") await self.ai_service.send(audio_frame)
async def get_input_audio_frames(self) -> AsyncGenerator[np.ndarray, None]: """Generator function to yield audio data chunks and save them to a WAV file.""" try: with InputStream( samplerate=self.sample_rate, channels=self.channels, device=self.device, dtype=np.int16, ) as stream: # Open a WAV file to write the audio data with wave.open('recorded_audio.wav', 'wb') as wav_file: wav_file.setnchannels(self.channels) wav_file.setsampwidth(np.dtype(np.int16).itemsize) wav_file.setframerate(self.sample_rate)
while True: if self._is_key_pressed(): input() # Clear the input buffer print("Stopping recording...") break if stream.read_available < self.frame_size: await asyncio.sleep(0) continue audio_chunk, _ = stream.read(self.frame_size) print("Read audio chunk.") print(f"Content of audio: {audio_chunk}")
# Write the audio chunk to the WAV file wav_file.writeframes(audio_chunk.tobytes())
await asyncio.sleep(0) yield audio_chunk # Check for Enter keypress to stop recording except Exception as e: print(f"An error occurred: {e}")
if self.stream is None: # Initialize the stream self.stream = OutputStream( channels=self.channels, samplerate=self.sample_rate, dtype="float32" ) self.stream.start()
# Convert int16 audio chunk to float32 and normalize to [-1.0, 1.0] audio_chunk = audio_frame.astype(np.float32) / np.iinfo(np.int16).max self.stream.write(audio_chunk) < /code> И это адаптер для подключения OpenAI в реальном времени: < /p> import base64 from collections.abc import AsyncGenerator, Callable, Coroutine from typing import Any, ClassVar, Final, cast
import numpy as np from numpy import ndarray from semantic_kernel.connectors.ai import FunctionChoiceBehavior from semantic_kernel.connectors.ai.open_ai import ( AzureRealtimeExecutionSettings, AzureRealtimeWebsocket, ) from semantic_kernel.contents.audio_content import AudioContent from semantic_kernel.contents.realtime_events import RealtimeAudioEvent, RealtimeEvents
from acev_realtime_voice_bot.service.ports import AIStreamingServicePort
# TODO: Dont know if this is a smell of bad code, probably. I should unify somehow the interfaces and the data types. def _cast_input_audio_to_event(self, audio_frame) -> RealtimeAudioEvent: return RealtimeAudioEvent( audio=AudioContent( data=base64.b64encode(cast(Any, audio_frame)).decode("utf-8") ) ) [/code] (я хотел бы добавить звуковой образец, но я не могу понять, как это сделать на Stackoverflow, если вы знаете, как, пожалуйста, дайте мне знать, и я также добавлю аудиорезку>
У меня возникла проблема, при которой воспроизведение звука при прохождении через сеть (он же мой VPS) имеет заикание/дрожание, но не происходит при запуске того же самого, но локально.
Я предполагаю, что это как-то связано с воспроизведением...
Любой из вас, умных разработчиков, знает, почему я не могу воспроизводить звук на своем мобильном телефоне Galaxy с помощью приложений Flutter. Отлично работает на эмуляторе
Приложение, которое я создал, работает нормально. Это игра, в которой,...
Я пытаюсь играть в аудио через QT6. Но я получил странную ошибку, с которой я не знаю, с какими средствами или как справиться. Я сделал минимальный пример:
// main.cpp
#include mainwindow.h
У меня есть очень простой аудиоплеер, который воспроизводит mp3-файлы по URL-адресу.
func start() {
if let url = URL(string: cloudinaryUrl) {
self.avPlayerItem = AVPlayerItem(url: url)
self.avPlayer = AVPlayer(playerItem: avPlayerItem)...