FASTAPI на Azure VM получает `botocore.exceptions.readtimeouterrorPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 FASTAPI на Azure VM получает `botocore.exceptions.readtimeouterror

Сообщение Anonymous »

Я запускаю приложение Fastapi на виртуальной машине Azure. В этом приложении используются методы Langchain-aws (ChatbedRockConverse), чтобы вызовать службу Bedrock AWS. Последующие запросы преуспевают сразу до следующего периода холостого хода. /> < /li>
[*] fastapi против автономных сценариев: < /strong> на той же Azure vm < /strong>, вызывание Bedrock работает отлично, используя: < /p>
  • . Тайм-аут происходит только тогда, когда вызов коренной породы выполняется из продолжительного процесса Fastapi/uvicorn.



traceback:

File "/app/.venv/lib/python3.13/site-packages/botocore/endpoint.py", line 383, in _send
return self.http_session.send(request)
~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
File "/app/.venv/lib/python3.13/site-packages/botocore/httpsession.py", line 501, in send
raise ReadTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ReadTimeoutError: Read timeout on endpoint URL: "https://bedrock-runtime.us-east-1.amazo ... 0/converse"

script :
import json
import logging
import os
from typing import Dict, List, Optional, Any

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END

from botocore.config import Config
from botocore.exceptions import ReadTimeoutError
from fastapi import FastAPI
from pydantic import BaseModel as PydanticBaseModel
import uvicorn
import requests

from langchain_aws import ChatBedrockConverse
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_core.pydantic_v1 import BaseModel, Field

os.environ["LANGCHAIN_TRACING"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "YOUR_LANGCHAIN_API_KEY" # Replace with your actual key
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_PROJECT"] = "generic-agent-tracing" # Optional: Name your project

def setup_logging(enable_boto_debug=True):
"""Setup logging configuration."""
if enable_boto_debug:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('boto3').setLevel(logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.DEBUG)
logging.getLogger('urllib3.connectionpool').setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
return logging.getLogger(__name__)

logger = setup_logging(enable_boto_debug=True)

class RouterDecisionInput(BaseModel):
"""Input schema for router decision tool."""
intent: str = Field(description="Primary intent of the user", enum=["task_a", "task_b", "general_query"])

@tool("RouterDecision", args_schema=RouterDecisionInput)
def router_decision(intent: str) -> Dict[str, Any]:
"""The output of the main intelligent router."""
return {"intent": intent}

class LangChainBedrockClient:
def __init__(self, region='us-east-1', timeout=60):
self.region = region
self.timeout = timeout
self.model_id = 'us.anthropic.claude-3-5-sonnet-20241022-v2:0'
self.config = Config(region_name=region, read_timeout=timeout, connect_timeout=timeout, retries={'max_attempts': 5, 'mode': 'adaptive'})
self.llm = ChatBedrockConverse(model=self.model_id, region_name=region, config=self.config, temperature=0.1, max_tokens=4096, top_p=0.9)
logger.info(f"Initialized LangChain BedrockConverse client with model: {self.model_id}")

def send_message(self, message: str, system_prompt: Optional[str] = None) -> Dict[str, Any]:
try:
messages = [SystemMessage(content=system_prompt), HumanMessage(content=message)] if system_prompt else [HumanMessage(content=message)]
response = self.llm.invoke(messages)
return {'success': True, 'response': response.content}
except Exception as e:
logger.error(f"Error in send_message: {e}", exc_info=True)
return {'success': False, 'error': str(e)}

def send_message_with_tools(self, message: str, tools: Optional
  • = None, system_prompt: Optional[str] = None) -> Dict[str, Any]:
    try:
    if tools is None:
    tools = [router_decision]
    llm_with_tools = self.llm.bind_tools(tools)
    messages = [SystemMessage(content=system_prompt), HumanMessage(content=message)] if system_prompt else [HumanMessage(content=message)]
    response = llm_with_tools.invoke(messages)
    if hasattr(response, 'tool_calls') and response.tool_calls:
    tool_call = response.tool_calls[0]
    return {'success': True, 'tool_use': {'name': tool_call['name'], 'input': tool_call['args']}}
    else:
    return {'success': True, 'response': response.content}
    except Exception as e:
    logger.error(f"Error in send_message_with_tools: {e}", exc_info=True)
    return {'success': False, 'error': str(e)}

    class GraphState(TypedDict):
    """Represents the state of our graph."""
    message: str
    intent: str
    response: str

    bedrock_client = LangChainBedrockClient(region='us-east-1', timeout=20)

    def router_node(state: GraphState) -> Dict[str, Any]:
    logger.info("---EXECUTING ROUTER NODE---")
    message = state["message"]
    router_system_prompt = "You are a routing agent. Based on the user's message, determine their primary intent. Possible intents are: `task_a`, `task_b`, `general_query`."
    result = bedrock_client.send_message_with_tools(message=message, system_prompt=router_system_prompt)

    intent = "general_query"
    if result.get('success') and 'tool_use' in result:
    intent = result['tool_use']['input'].get('intent', 'general_query')
    logger.info(f"Router determined intent: {intent}")
    else:
    logger.warning("Router could not determine intent, defaulting to 'general_query'")
    return {"intent": intent}

    def handle_task_a_node(state: GraphState) -> Dict[str, str]:
    logger.info("---EXECUTING TASK A NODE---")
    message = state["message"]
    system_prompt = "You are an AI assistant. The user wants to perform Task A. Please provide a helpful and relevant response."
    result = bedrock_client.send_message(message=message, system_prompt=system_prompt)
    response = result.get('response', "I'm sorry, there was an error processing Task A.")
    return {"response": response}

    def handle_task_b_node(state: GraphState) -> Dict[str, str]:
    logger.info("---EXECUTING TASK B NODE---")
    message = state["message"]
    system_prompt = "You are an AI assistant. The user wants to perform Task B. Please provide a helpful and relevant response."
    result = bedrock_client.send_message(message=message, system_prompt=system_prompt)
    response = result.get('response', "I'm sorry, there was an error processing Task B.")
    return {"response": response}

    def handle_general_query_node(state: GraphState) -> Dict[str, str]:
    logger.info("---EXECUTING GENERAL QUERY NODE---")
    message = state["message"]
    system_prompt = "You are an AI assistant. The user has a general question. Please provide a clear and concise answer."
    result = bedrock_client.send_message(message=message, system_prompt=system_prompt)
    response = result.get('response', "I'm sorry, I couldn't process that question at the moment.")
    return {"response": response}

    def decide_next_node(state: GraphState) -> str:
    logger.info(f"---DECIDING NEXT NODE BASED ON INTENT: {state['intent']}---")
    if state["intent"] == "task_a":
    return "task_a_node"
    elif state["intent"] == "task_b":
    return "task_b_node"
    else:
    return "general_query_node"

    workflow = StateGraph(GraphState)
    workflow.add_node("router", router_node)
    workflow.add_node("task_a_node", handle_task_a_node)
    workflow.add_node("task_b_node", handle_task_b_node)
    workflow.add_node("general_query_node", handle_general_query_node)

    workflow.set_entry_point("router")
    workflow.add_conditional_edges("router", decide_next_node, {
    "task_a_node": "task_a_node",
    "task_b_node": "task_b_node",
    "general_query_node": "general_query_node",
    })
    workflow.add_edge("task_a_node", END)
    workflow.add_edge("task_b_node", END)
    workflow.add_edge("general_query_node", END)
    graph_app = workflow.compile()

    app = FastAPI(title="Generic LangGraph Bedrock API", description="An API that uses LangGraph to route requests to different Bedrock-powered nodes.")
    class GraphRequest(PydanticBaseModel):
    message: str

    @app.post("/invoke-graph/", response_model=GraphState)
    async def invoke_graph(request: GraphRequest):
    initial_state = {"message": request.message, "intent": "", "response": ""}
    final_state = await graph_app.ainvoke(initial_state)
    return final_state

    @app.get("/warmup")
    def warmup():
    """A simple endpoint to keep the server instance warm and prevent cold starts."""
    logger.info("Warm-up endpoint called. The server is active.")
    return {"status": "ok", "message": "Server is warm."}

    def simulate_requests():
    base_url = "http://127.0.0.1:8004"
    def make_request(description: str, payload: dict):
    print(f"--- {description} ---")
    try:
    response = requests.post(f"{base_url}/invoke-graph/", json=payload)
    response.raise_for_status()
    print("Response from server:")
    print(json.dumps(response.json(), indent=2))
    except requests.exceptions.RequestException as e:
    print(f"Error making request: {e}")
    print("\n" + "="*50 + "\n")

    make_request("Simulating Task A Request", {"message": "Please perform task A for me."})
    make_request("Simulating General Query Request", {"message": "What is the capital of France?"})
    make_request("Simulating Task B Request", {"message": "I need to start task B."})

    if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == 'client':
    simulate_requests()
    else:
    print("To run the FastAPI server, use the command:\nuvicorn main:app --reload --port 8004")
    print("\nTo run the client simulation in a separate terminal, use the command:\npython main.py client")


    Подробнее здесь: https://stackoverflow.com/questions/796 ... ws-bedrock
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»