Я новичок в программировании, и мне нужны рекомендации. Надеюсь найти здесь помощь. У меня есть групповой чат Autogen, который я пытаюсь подключиться к интерфейсу реагирования через FastAPI и Websockets. Я могу заставить его работать и возвращать ответы во внешний интерфейс, используя только User_proxy и одного помощника. Вот так:
import autogen
from autogen import Agent, ConversableAgent
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
try:
from termcolor import colored
except ImportError:
def colored(x, *args, **kwargs):
return x
class UserProxyWebAgent(autogen.UserProxyAgent):
def __init__(self, *args, **kwargs):
super(UserProxyWebAgent, self).__init__(*args, **kwargs)
self._reply_func_list = []
self.register_reply([Agent, None], ConversableAgent.generate_oai_reply)
self.register_reply([Agent, None], ConversableAgent.generate_code_execution_reply)
self.register_reply([Agent, None], ConversableAgent.generate_function_call_reply)
self.register_reply([Agent, None], UserProxyWebAgent.a_check_termination_and_human_reply)
async def a_check_termination_and_human_reply(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
"""Check if the conversation should be terminated, and if human reply is provided."""
if config is None:
config = self
if messages is None:
messages = self._oai_messages[sender]
message = messages[-1]
reply = ""
no_human_input_msg = ""
if self.human_input_mode == "ALWAYS":
reply = await self.a_get_human_input(
f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not self._is_termination_msg(message) else "exit"
else:
if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]:
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
terminate = self._is_termination_msg(message)
reply = await self.a_get_human_input(
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
if terminate
else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not terminate else "exit"
elif self._is_termination_msg(message):
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
reply = await self.a_get_human_input(
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply or "exit"
# print the no_human_input_msg
if no_human_input_msg:
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
# stop the conversation
if reply == "exit":
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, None
# send the human reply
if reply or self._max_consecutive_auto_reply_dict[sender] == 0:
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, reply
# increment the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] += 1
if self.human_input_mode != "NEVER":
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
return False, None
def set_queues(self, client_sent_queue, client_receive_queue):
self.client_sent_queue = client_sent_queue
self.client_receive_queue = client_receive_queue
async def a_get_human_input(self, prompt: str) -> str:
last_message = self.last_message()
if last_message["content"]:
await self.client_receive_queue.put(last_message["content"])
reply = await self.client_sent_queue.get()
if reply and reply == "DO_FINISH":
return "exit"
return reply
else:
return
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse
import uuid
from autogen_group_chat import AutogenChat
import asyncio
import uvicorn
from dotenv import load_dotenv, find_dotenv
import openai
import os
_ = load_dotenv(find_dotenv()) # read local .env file
openai.api_key = os.environ['OPENAI_API_KEY']
# openai.log='debug'
app = FastAPI()
app.autogen_chat = {}
class ConnectionManager:
def __init__(self):
self.active_connections: list[AutogenChat] = []
async def connect(self, autogen_chat: AutogenChat):
await autogen_chat.websocket.accept()
self.active_connections.append(autogen_chat)
async def disconnect(self, autogen_chat: AutogenChat):
autogen_chat.client_receive_queue.put_nowait("DO_FINISH")
print(f"autogen_chat {autogen_chat.chat_id} disconnected")
self.active_connections.remove(autogen_chat)
manager = ConnectionManager()
async def send_to_client(autogen_chat: AutogenChat):
while True:
reply = await autogen_chat.client_receive_queue.get()
if reply and reply == "DO_FINISH":
autogen_chat.client_receive_queue.task_done()
break
await autogen_chat.websocket.send_text(reply)
autogen_chat.client_receive_queue.task_done()
await asyncio.sleep(0.05)
async def receive_from_client(autogen_chat: AutogenChat):
while True:
data = await autogen_chat.websocket.receive_text()
if data and data == "DO_FINISH":
await autogen_chat.client_receive_queue.put("DO_FINISH")
await autogen_chat.client_sent_queue.put("DO_FINISH")
break
await autogen_chat.client_sent_queue.put(data)
await asyncio.sleep(0.05)
@app.websocket("/ws/{chat_id}")
async def websocket_endpoint(websocket: WebSocket, chat_id: str):
try:
autogen_chat = AutogenChat(chat_id=chat_id, websocket=websocket)
await manager.connect(autogen_chat)
data = await autogen_chat.websocket.receive_text()
future_calls = asyncio.gather(send_to_client(autogen_chat), receive_from_client(autogen_chat))
await autogen_chat.start(data)
print("DO_FINISHED")
except Exception as e:
print("ERROR", str(e))
finally:
try:
await manager.disconnect(autogen_chat)
except:
pass
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Но когда я пытаюсь добавить групповой чат, интерфейс отключается. Я хочу, чтобы он возвращал каждый ответ всех помощников по одному, предоставляя всем им возможность задавать вопросы пользователю. Я использую те же файлы, что и выше, и просто заменяю autogen_chat.py на autogen_groupchat в файле main.py.
autogen_groupchat.py:
Это лучший подход для достижения этой цели? Если да, может ли кто-нибудь дать мне некоторые рекомендации о том, как заставить это работать правильно. Спасибо!
Я новичок в программировании, и мне нужны рекомендации. Надеюсь найти здесь помощь. У меня есть групповой чат Autogen, который я пытаюсь подключиться к интерфейсу реагирования через FastAPI и Websockets. Я могу заставить его работать и возвращать ответы во внешний интерфейс, используя только User_proxy и одного помощника. Вот так: [code]autogen_chat.py:
import autogen from user_proxy_webagent import UserProxyWebAgent import asyncio
async def a_check_termination_and_human_reply( self, messages: Optional[List[Dict]] = None, sender: Optional[Agent] = None, config: Optional[Any] = None, ) -> Tuple[bool, Union[str, Dict, None]]: """Check if the conversation should be terminated, and if human reply is provided.""" if config is None: config = self if messages is None: messages = self._oai_messages[sender] message = messages[-1] reply = "" no_human_input_msg = "" if self.human_input_mode == "ALWAYS": reply = await self.a_get_human_input( f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: " ) no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" # if the human input is empty, and the message is a termination message, then we will terminate the conversation reply = reply if reply or not self._is_termination_msg(message) else "exit" else: if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]: if self.human_input_mode == "NEVER": reply = "exit" else: # self.human_input_mode == "TERMINATE": terminate = self._is_termination_msg(message) reply = await self.a_get_human_input( f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " if terminate else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: " ) no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" # if the human input is empty, and the message is a termination message, then we will terminate the conversation reply = reply if reply or not terminate else "exit" elif self._is_termination_msg(message): if self.human_input_mode == "NEVER": reply = "exit" else: # self.human_input_mode == "TERMINATE": reply = await self.a_get_human_input( f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " ) no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" # if the human input is empty, and the message is a termination message, then we will terminate the conversation reply = reply or "exit"
# print the no_human_input_msg if no_human_input_msg: print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
# stop the conversation if reply == "exit": # reset the consecutive_auto_reply_counter self._consecutive_auto_reply_counter[sender] = 0 return True, None
# send the human reply if reply or self._max_consecutive_auto_reply_dict[sender] == 0: # reset the consecutive_auto_reply_counter self._consecutive_auto_reply_counter[sender] = 0 return True, reply
# increment the consecutive_auto_reply_counter self._consecutive_auto_reply_counter[sender] += 1 if self.human_input_mode != "NEVER": print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
async def send_to_client(autogen_chat: AutogenChat): while True: reply = await autogen_chat.client_receive_queue.get() if reply and reply == "DO_FINISH": autogen_chat.client_receive_queue.task_done() break await autogen_chat.websocket.send_text(reply) autogen_chat.client_receive_queue.task_done() await asyncio.sleep(0.05)
async def receive_from_client(autogen_chat: AutogenChat): while True: data = await autogen_chat.websocket.receive_text() if data and data == "DO_FINISH": await autogen_chat.client_receive_queue.put("DO_FINISH") await autogen_chat.client_sent_queue.put("DO_FINISH") break await autogen_chat.client_sent_queue.put(data) await asyncio.sleep(0.05)
if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) [/code] Но когда я пытаюсь добавить групповой чат, интерфейс отключается. Я хочу, чтобы он возвращал каждый ответ всех помощников по одному, предоставляя всем им возможность задавать вопросы пользователю. Я использую те же файлы, что и выше, и просто заменяю autogen_chat.py на autogen_groupchat в файле main.py. autogen_groupchat.py: [code]import autogen from user_proxy_webagent import UserProxyWebAgent import asyncio
[/code] Это лучший подход для достижения этой цели? Если да, может ли кто-нибудь дать мне некоторые рекомендации о том, как заставить это работать правильно. Спасибо!