Я записал пример сценария с помощью пакета Python-A2A и нанести следующую ошибку. Я не уверен, связана ли проблема с конкретной версией пакета. < /P>
A2ACalcClient initialized for URL: http://10.188.106.132:5001/a2a
Sending calculation request: add(a=5, b=3)
Unexpected response type from A2A agent: error
Full response content: ErrorContent(message='Failed to communicate with agent at http://10.188.106.132:5001/a2a/a2a. Tried multiple endpoint variations.', type=)
A2ACalcClient initialized for URL: http://10.188.106.132:5001/
Sending calculation request: add(a=5, b=3)
Unexpected response type from A2A agent: error
Full response content: ErrorContent(message='Failed to communicate with agent at http://10.188.106.132:5001/tasks/send. Tried multiple endpoint variations.', type=)
< /code>
код сервера: < /p>
import math
from python_a2a import (
A2AServer, Message, TextContent, FunctionCallContent,
FunctionResponseContent, FunctionParameter, MessageRole, run_server,
Task
)
class CalculatorAgent(A2AServer):
def handle_message(self, message):
if message.content.type == "text":
return Message(
content=TextContent(
text="I'm a [Python A2A](python-a2a.html) calculator agent. You can call my functions:\n"
"- calculate: Basic arithmetic (operation, a, b)\n"
"- sqrt: Square root (value)"
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
elif message.content.type == "function_call":
function_name = message.content.name
params = {p.name: p.value for p in message.content.parameters}
try:
if function_name == "calculate":
operation = params.get("operation", "add")
a = float(params.get("a", 0))
b = float(params.get("b", 0))
if operation == "add":
result = a + b
elif operation == "subtract":
result = a - b
elif operation == "multiply":
result = a * b
elif operation == "divide":
if b == 0:
raise ValueError("Cannot divide by zero")
result = a / b
else:
raise ValueError(f"Unknown operation: {operation}")
return Message(
content=FunctionResponseContent(
name="calculate",
response={"result": result}
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
elif function_name == "sqrt":
value = float(params.get("value", 0))
if value < 0:
raise ValueError("Cannot calculate square root of negative number")
result = math.sqrt(value)
return Message(
content=FunctionResponseContent(
name="sqrt",
response={"result": result}
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
except Exception as e:
return Message(
content=FunctionResponseContent(
name=function_name,
response={"error": str(e)}
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
if __name__ == "__main__":
agent = CalculatorAgent()
run_server(agent, host="0.0.0.0", port=5001)
< /code>
Клиентский код: < /p>
import python_a2a
from python_a2a import (
A2AClient, Message, FunctionCallContent,
FunctionParameter, MessageRole
)
# Card, Message, Task, SendMessageSuccessResponse
#
class A2ACalcClient:
"""
A class to encapsulate the communication with a Python A2A agent.
"""
def __init__(self, a2a_url: str):
"""
Initializes the A2ACalcClient with the A2A agent's URL.
Args:
a2a_url: The URL of the Python A2A agent (e.g., "http://127.0.0.1:5001").
"""
self.client = A2AClient(a2a_url)
print(f"A2ACalcClient initialized for URL: {a2a_url}")
def send_calculation_request(self, operation: str, a: int, b: int) -> int | None:
"""
Sends a function call message to the A2A agent to perform a calculation.
Args:
operation: The mathematical operation (e.g., "add", "subtract", "multiply", "divide").
a: The first operand.
b: The second operand.
Returns:
The result of the calculation if successful, otherwise None.
"""
print(f"Sending calculation request: {operation}(a={a}, b={b})")
function_call_content = FunctionCallContent(
name="calculate",
parameters=[
FunctionParameter(name="operation", value=operation),
FunctionParameter(name="a", value=a),
FunctionParameter(name="b", value=b)
]
)
function_call_message = Message(
content=function_call_content,
role=MessageRole.USER
)
try:
response = self.client.send_message(function_call_message)
if response.content.type == "function_response":
result = response.content.response.get("result")
if result is not None:
print(f"Received result from A2A agent: {result}")
return result
else:
print("Function response received, but 'result' key is missing.")
return None
else:
print(f"Unexpected response type from A2A agent: {response.content.type}")
print(f"Full response content: {response.content}")
return None
except Exception as e:
print(f"An error occurred while communicating with the A2A agent: {e}")
return None
# --- Example Usage ---
if __name__ == "__main__":
# Instantiate the calcAgentClient
a2a_url = "http://127.0.0.1:5001"
# a2a_url = "http://127.0.0.1:5001/a2a" # Also tried with this URL
calcAgentClient = A2ACalcClient(a2a_url)
# Perform an addition
sum_result = calcAgentClient.send_calculation_request(operation="add", a=5, b=3)
if sum_result is not None:
print(f"Addition Result: {sum_result}") # Expected: 8
print("-" * 30)
Подробнее здесь: https://stackoverflow.com/questions/797 ... ple-endpoi
'Не удалось общаться с агентом по адресу http://127.0.0.1:5001 Пробое несколько вариаций конечной точки.', Type = <conte ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение