Python Reactivex и OpenAI – поток зависает?Python

Программы на Python
Ответить
Anonymous
 Python Reactivex и OpenAI – поток зависает?

Сообщение Anonymous »

Пожалуйста, посмотрите мой блок кода ниже. Чтобы запустить его, я делаю следующее. Когда я запускаю следующий код, все печатается, как и ожидалось, и процесс завершается, как и ожидалось.
stream = await client.create_completion(...)
stream.subscribe(print) # works perfectly

Все команды печати выполняются должным образом. Все выглядит великолепно, пока я действую реактивно.
Однако, когда я делаю следующее, оно зависает на неопределенный срок:
stream = await client.create_completion(...)
stream.pipe(ops.to_iterable()).run()` # hangs :(

Почему это? Что я делаю не так?
Заранее спасибо за помощь.
from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import ChatCompletionChunk
from openai.types.chat.chat_completion_chunk import Choice
import reactivex as rx

from app.ai.bridge.chat.chat_completion_types import ChatRequest, ToolConfig

from app.ai.bridge.chat.drivers.openai_chat_driver_reactive_mappers import (
map_message_to_openai,
map_toolconfig_to_openai,
)

import asyncio

class OpenaiClientReactive:
def __init__(self, openai_client: AsyncOpenAI) -> None:
self.api = openai_client

async def create_completion(
self, chat_request: ChatRequest, tool_config: ToolConfig | None = None
) -> rx.Observable[Choice]:
stream: rx.subject.ReplaySubject[Choice] = rx.subject.ReplaySubject()

async def do_stream() -> None:
async_stream: AsyncStream[ChatCompletionChunk] = await self._stream_openai(
chat_request, tool_config
)

max_variants_expected = chat_request.options.num_variants
num_indexes_completed = 0

async for chunk in async_stream:
for choice in chunk.choices:
if choice.finish_reason:
num_indexes_completed += 1
stream.on_next(choice)

if max_variants_expected == num_indexes_completed:
# If all indexes are complete, we can complete the stream
print("All indexes complete")
break

await async_stream.close()
stream.on_completed()

asyncio.create_task(do_stream())

return stream

async def _stream_openai(
self,
chat_request: ChatRequest,
tool_config: ToolConfig | None = None,
) -> AsyncStream[ChatCompletionChunk]:
mapped_messages = mapped_messages = [
map_message_to_openai(message) for message in chat_request.context.messages
]
if tool_config:
return await self.api.chat.completions.create(
# TODO: Move this to database driven configuration, since it's an LLM.
model="gpt-3.5-turbo",
messages=mapped_messages,
stream=True,
n=chat_request.options.num_variants,
tools=map_toolconfig_to_openai(tool_config),
)
else:
return await self.api.chat.completions.create(
# TODO: Move this to database driven configuration, since it's an LLM.
model="gpt-3.5-turbo",
messages=mapped_messages,
stream=True,
n=chat_request.options.num_variants,
)



Подробнее здесь: https://stackoverflow.com/questions/795 ... ream-hangs
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»