Код: Выделить всё
def email_router(state: TypedDict) -> None:
node_results, state = get_emails(state)
if node_results == "New Email":
state["internal_state"] = "topics_router"
else:
state["errors"] = "No New Mail"
state["internal_state"] = "return_final_status"
def check_internal_state(state: TypedDict) -> str:
logging.debug(f"Internal State: {state["internal_state"]}")
return state["internal_state"]
Код: Выделить всё
workflow.add_conditional_edges(
"email_router",
check_internal_state,
{
"topics_router": "topics_router",
"return_final_status": "return_final_status",
},
)
Код устанавливается в классах. единственная функция, компилирующая код и сохраняющая выходные данные как:
Код: Выделить всё
global mailman
mailman = workflow.compile()
Код: Выделить всё
workflow = StateGraph(GraphState)
workflow.add_node("email_router", email_router)
workflow.add_node("topics_router", topics_router)
workflow.add_node("status_router", status_router)
workflow.add_node("actions_router", actions_router)
workflow.add_node("return_final_status", return_final_status_node)
workflow.set_entry_point("email_router")
workflow.add_conditional_edges(
"email_router",
check_internal_state,
{
"topics_router": "topics_router",
"return_final_status": "return_final_status",
},
)
workflow.add_conditional_edges(
"topics_router",
check_internal_state,
{
"status_router": "status_router",
"return_final_status": "return_final_status",
},
)
workflow.add_conditional_edges(
"status_router",
check_internal_state,
{
"actions_router": "actions_router",
"return_final_status": "return_final_status",
},
)
workflow.add_edge("actions_router", "return_final_status")
workflow.add_edge("return_final_status", END)
Подробнее здесь: https://stackoverflow.com/questions/785 ... h-workflow
Мобильная версия