### LLM
local_llm = 'llama3.2:3b-instruct-fp16'
llm = ChatOllama(model=local_llm, temperature=0)
# load PDF files from a directory
loader = PyPDFDirectoryLoader("/home/carlos/Documents/LLMs/llama_agents/phone_data/")
data = loader.load()
# print the loaded data, which is a list of tuples (file name, text extracted from the PDF)
print(data)
# split the extracted data into text chunks using the text_splitter, which splits the text based on the specified number of characters and overlap
text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=40)
text_chunks = text_splitter.split_documents(data)
# print the number of chunks obtained
print("Chunk length: {0}".format(len(text_chunks)))
# Add to VectorDB
vectorstore = SKLearnVectorStore.from_documents(
documents=text_chunks,
embedding=NomicEmbeddings(model='nomic-embed-text-v1.5', inference_mode='local')
)
# Create retriever
retriever = vectorstore.as_retriever(k=3)
# Create a retriever tool, so that an agent can summon the retriever
retriever_tool = create_retriever_tool(retriever, name="retriever",description="A tool for retrieving documents from a file")
#formated_docs = retriever_tool.invoke("battery")
# Equip the llm with the capacity for summoning the tool
tools = [retriever_tool]
llm_with_tools = llm.bind_tools(tools)
#Create a class to store the state of the graph
class State(TypedDict):
messages: Annotated[list, add_messages]
#Prompt template
rag_prompt = ChatPromptTemplate.from_messages(
[
("system","You are helpful sales assistant at an online cell phone store."),
MessagesPlaceholder(variable_name="messages"),
]
)
#Chain that passes the prompt to the model with tools
rag_chain = (rag_prompt | llm_with_tools)
#Function that runs the chain
def rag_bot(state: State):
return {"mesages": [rag_chain.invoke(state["messages"])]}
#This function checks if there is a message. If there is, it checks if
# the message has a tool_calls. If it has it returns "tools", otherwise
# it returns END.
def route_tools(state: State):
if isinstance(state, list):
ai_message = state[-1]
elif messages := state.get("messages", []):
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools"
return END
## Define a new graph
graph_builder = StateGraph(state_schema=State)
#Nodes----------------------------------
#Chatbot node
graph_builder.add_node("chatbot",rag_bot)
#Tool node
graph_builder.add_node("tools",tool_node)
#Edges----------------------------------
#Initial edge
graph_builder.add_edge(START, "chatbot")
#Edge directing the tools output to chatbot
graph_builder.add_edge("tools","chatbot")
#Conditional edge connecting chatbot to either tools or END,
# using the route_tools function
graph_builder.add_conditional_edges("chatbot",route_tools,{"tools":"tools", END:END})
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# Test
tid1 = uuid.uuid4()
config = {"configurable": {"thread_id": tid1}}
query = "Hi, I'm Bob. Which phone has the best battery?"
input_messages = [query]
for event in graph.stream({"messages": input_messages}, config, stream_mode="values"):
print(event)
Когда я запускаю график, я получаю следующую ошибку:
InvalidUpdateError Traceback (most recent call last)
Cell In[23], line 7
4 query = "Hi, I'm Bob. Which phone has the best battery?"
6 input_messages = [query]
----> 7 for event in graph.stream({"messages": input_messages}, config, stream_mode="values"):
8 print(event)
9 #output["messages"][-1].pretty_print()
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/__init__.py:1315, in Pregel.stream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs)
1304 # Similarly to Bulk Synchronous Parallel / Pregel model
1305 # computation proceeds in steps, while there are channel updates
1306 # channel updates from step N are only visible in step N+1
1307 # channels are guaranteed to be immutable for the duration of the step,
1308 # with channel updates applied only at the transition between steps
1309 while loop.tick(
1310 input_keys=self.input_channels,
1311 interrupt_before=interrupt_before_,
1312 interrupt_after=interrupt_after_,
1313 manager=run_manager,
1314 ):
-> 1315 for _ in runner.tick(
1316 loop.tasks.values(),
1317 timeout=self.step_timeout,
1318 retry_policy=self.retry_policy,
1319 get_waiter=get_waiter,
1320 ):
1321 # emit output
1322 yield from output()
1323 # emit output
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/runner.py:56, in PregelRunner.tick(self, tasks, reraise, timeout, retry_policy, get_waiter)
54 t = tasks[0]
55 try:
---> 56 run_with_retry(t, retry_policy)
57 self.commit(t, None)
58 except Exception as exc:
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/retry.py:29, in run_with_retry(task, retry_policy)
27 task.writes.clear()
28 # run the task
---> 29 task.proc.invoke(task.input, config)
30 # if successful, end
31 break
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/utils/runnable.py:412, in RunnableSeq.invoke(self, input, config, **kwargs)
410 input = context.run(step.invoke, input, config, **kwargs)
411 else:
--> 412 input = context.run(step.invoke, input, config)
413 # finish the root run
414 except BaseException as e:
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/utils/runnable.py:176, in RunnableCallable.invoke(self, input, config, **kwargs)
174 context = copy_context()
175 context.run(_set_config_context, child_config)
--> 176 ret = context.run(self.func, input, **kwargs)
177 except BaseException as e:
178 run_manager.on_chain_error(e)
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/write.py:85, in ChannelWrite._write(self, input, config)
78 def _write(self, input: Any, config: RunnableConfig) -> None:
79 writes = [
80 ChannelWriteEntry(write.channel, input, write.skip_none, write.mapper)
81 if isinstance(write, ChannelWriteEntry) and write.value is PASSTHROUGH
82 else write
83 for write in self.writes
84 ]
---> 85 self.do_write(
86 config,
87 writes,
88 self.require_at_least_one_of if input is not None else None,
89 )
90 return input
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/write.py:138, in ChannelWrite.do_write(config, writes, require_at_least_one_of)
136 if require_at_least_one_of is not None:
137 if not {chan for chan, _ in filtered} & set(require_at_least_one_of):
--> 138 raise InvalidUpdateError(
139 f"Must write to at least one of {require_at_least_one_of}"
140 )
141 write: TYPE_SEND = config[CONF][CONFIG_KEY_SEND]
142 write(sends + filtered)
InvalidUpdateError: Must write to at least one of ['messages']
Я видел, что объекты Runnable получают Union[dict[str, Any], Any]. Я тестировал узлы по отдельности, взяв выходные данные одного и используя его в качестве входных данных следующего, и они не выдали эту ошибку.
Я учусь использовать LangChain и хочу создать RAG-агент с памятью. На данный момент у меня есть такой код: [code]### LLM local_llm = 'llama3.2:3b-instruct-fp16' llm = ChatOllama(model=local_llm, temperature=0)
# load PDF files from a directory loader = PyPDFDirectoryLoader("/home/carlos/Documents/LLMs/llama_agents/phone_data/") data = loader.load() # print the loaded data, which is a list of tuples (file name, text extracted from the PDF) print(data)
# split the extracted data into text chunks using the text_splitter, which splits the text based on the specified number of characters and overlap text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=40) text_chunks = text_splitter.split_documents(data) # print the number of chunks obtained print("Chunk length: {0}".format(len(text_chunks)))
# Create a retriever tool, so that an agent can summon the retriever retriever_tool = create_retriever_tool(retriever, name="retriever",description="A tool for retrieving documents from a file") #formated_docs = retriever_tool.invoke("battery")
# Equip the llm with the capacity for summoning the tool tools = [retriever_tool] llm_with_tools = llm.bind_tools(tools)
#Create a class to store the state of the graph class State(TypedDict): messages: Annotated[list, add_messages]
#Prompt template rag_prompt = ChatPromptTemplate.from_messages( [ ("system","You are helpful sales assistant at an online cell phone store."), MessagesPlaceholder(variable_name="messages"), ] )
#Chain that passes the prompt to the model with tools rag_chain = (rag_prompt | llm_with_tools)
#Function that runs the chain def rag_bot(state: State): return {"mesages": [rag_chain.invoke(state["messages"])]}
#This function checks if there is a message. If there is, it checks if # the message has a tool_calls. If it has it returns "tools", otherwise # it returns END. def route_tools(state: State): if isinstance(state, list): ai_message = state[-1] elif messages := state.get("messages", []): ai_message = messages[-1] else: raise ValueError(f"No messages found in input state to tool_edge: {state}") if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0: return "tools" return END
## Define a new graph graph_builder = StateGraph(state_schema=State)
#Edges---------------------------------- #Initial edge graph_builder.add_edge(START, "chatbot") #Edge directing the tools output to chatbot graph_builder.add_edge("tools","chatbot")
#Conditional edge connecting chatbot to either tools or END, # using the route_tools function graph_builder.add_conditional_edges("chatbot",route_tools,{"tools":"tools", END:END})
# Test tid1 = uuid.uuid4() config = {"configurable": {"thread_id": tid1}} query = "Hi, I'm Bob. Which phone has the best battery?"
input_messages = [query] for event in graph.stream({"messages": input_messages}, config, stream_mode="values"): print(event) [/code] Когда я запускаю график, я получаю следующую ошибку: [code]InvalidUpdateError Traceback (most recent call last) Cell In[23], line 7 4 query = "Hi, I'm Bob. Which phone has the best battery?" 6 input_messages = [query] ----> 7 for event in graph.stream({"messages": input_messages}, config, stream_mode="values"): 8 print(event) 9 #output["messages"][-1].pretty_print()
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/__init__.py:1315, in Pregel.stream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs) 1304 # Similarly to Bulk Synchronous Parallel / Pregel model 1305 # computation proceeds in steps, while there are channel updates 1306 # channel updates from step N are only visible in step N+1 1307 # channels are guaranteed to be immutable for the duration of the step, 1308 # with channel updates applied only at the transition between steps 1309 while loop.tick( 1310 input_keys=self.input_channels, 1311 interrupt_before=interrupt_before_, 1312 interrupt_after=interrupt_after_, 1313 manager=run_manager, 1314 ): -> 1315 for _ in runner.tick( 1316 loop.tasks.values(), 1317 timeout=self.step_timeout, 1318 retry_policy=self.retry_policy, 1319 get_waiter=get_waiter, 1320 ): 1321 # emit output 1322 yield from output() 1323 # emit output
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/runner.py:56, in PregelRunner.tick(self, tasks, reraise, timeout, retry_policy, get_waiter) 54 t = tasks[0] 55 try: ---> 56 run_with_retry(t, retry_policy) 57 self.commit(t, None) 58 except Exception as exc:
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/retry.py:29, in run_with_retry(task, retry_policy) 27 task.writes.clear() 28 # run the task ---> 29 task.proc.invoke(task.input, config) 30 # if successful, end 31 break
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/utils/runnable.py:412, in RunnableSeq.invoke(self, input, config, **kwargs) 410 input = context.run(step.invoke, input, config, **kwargs) 411 else: --> 412 input = context.run(step.invoke, input, config) 413 # finish the root run 414 except BaseException as e:
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/utils/runnable.py:176, in RunnableCallable.invoke(self, input, config, **kwargs) 174 context = copy_context() 175 context.run(_set_config_context, child_config) --> 176 ret = context.run(self.func, input, **kwargs) 177 except BaseException as e: 178 run_manager.on_chain_error(e)
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/write.py:85, in ChannelWrite._write(self, input, config) 78 def _write(self, input: Any, config: RunnableConfig) -> None: 79 writes = [ 80 ChannelWriteEntry(write.channel, input, write.skip_none, write.mapper) 81 if isinstance(write, ChannelWriteEntry) and write.value is PASSTHROUGH 82 else write 83 for write in self.writes 84 ] ---> 85 self.do_write( 86 config, 87 writes, 88 self.require_at_least_one_of if input is not None else None, 89 ) 90 return input
File ~/Documents/LLMs/llmenv/lib/python3.10/site-packages/langgraph/pregel/write.py:138, in ChannelWrite.do_write(config, writes, require_at_least_one_of) 136 if require_at_least_one_of is not None: 137 if not {chan for chan, _ in filtered} & set(require_at_least_one_of): --> 138 raise InvalidUpdateError( 139 f"Must write to at least one of {require_at_least_one_of}" 140 ) 141 write: TYPE_SEND = config[CONF][CONFIG_KEY_SEND] 142 write(sends + filtered)
InvalidUpdateError: Must write to at least one of ['messages'] [/code] Я видел, что объекты Runnable получают Union[dict[str, Any], Any]. Я тестировал узлы по отдельности, взяв выходные данные одного и используя его в качестве входных данных следующего, и они не выдали эту ошибку.