Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiving multiple inputs on a single node not working in a vanilla state #492

Open
4 tasks done
minki-j opened this issue May 19, 2024 · 5 comments
Open
4 tasks done

Comments

@minki-j
Copy link

minki-j commented May 19, 2024

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.

Example Code

from langgraph.graph import END, StateGraph
from langchain_core.runnables import RunnablePassthrough

def path1(state: dict):
    documents = state["documents"]
    documents.append("path1")

    return {"documents": documents}


def path2(state: dict):
    documents = state["documents"]
    documents.append("path2")

    return {"documents": documents}


def middle_router(state: dict):
    return ["path1", "path2"]


def rendezvous(state: dict):
    print("==>>rendezvous")
    documents = state["documents"]
    print("documents: ", documents)
    return {"documents": documents}
    

graph = StateGraph({"documents": []})

graph.add_node("start", RunnablePassthrough())
graph.add_conditional_edges(
    "start",
    middle_router,
    {
        "path1": "path1",
        "path2": "path2",
    },
    then="rendezvous",
)

graph.add_node("path1", path1)
graph.add_edge("path1", "rendezvous")

graph.add_node("path2", path2)
graph.add_edge("path2", "rendezvous")

graph.add_node("rendezvous", rendezvous)
graph.add_edge("rendezvous", END)

graph.set_entry_point("start")
langgraph_app = graph.compile()

langgraph_app.invoke({"documents": []})

Error Message and Stack Trace (if applicable)

InvalidUpdateError Traceback (most recent call last)
Cell In[13], line 26
23 graph.set_entry_point("start")
24 langgraph_app = graph.compile()
---> 26 langgraph_app.invoke({"documents": []})

File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:1177, in Pregel.invoke(self, input, config, stream_mode, output_keys, input_keys, interrupt_before, interrupt_after, debug, **kwargs)
1175 chunks = []
1176 print("LangGraph Input: ", input)
-> 1177 for chunk in self.stream(
1178 input,
1179 config,
1180 stream_mode=stream_mode,
1181 output_keys=output_keys,
1182 input_keys=input_keys,
1183 interrupt_before=interrupt_before,
1184 interrupt_after=interrupt_after,
1185 debug=debug,
1186 **kwargs,
1187 ):
1188 if stream_mode == "values":
1189 latest = chunk

File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:786, in Pregel.stream(self, input, config, stream_mode, output_keys, input_keys, interrupt_before, interrupt_after, debug)
781 print_step_writes(
782 step, pending_writes, self.stream_channels_list
783 )
785 # apply writes to channels
--> 786 _apply_writes(checkpoint, channels, pending_writes)
788 if debug:
789 print_step_checkpoint(step, channels, self.stream_channels_list)

File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:1344, in _apply_writes(checkpoint, channels, pending_writes)
1342 channels[chan].update(vals)
1343 except InvalidUpdateError as e:
-> 1344 raise InvalidUpdateError(
1345 f"Invalid update for channel {chan}: {e}"
1346 ) from e
1347 checkpoint["channel_versions"][chan] = max_version + 1
1348 updated_channels.add(chan)

InvalidUpdateError: Invalid update for channel root: LastValue can only receive one value per step.

Description

I'm using a basic dictionary {"document": []} for my state and updating it in parallel on path1 and path2. However, this has led to an error related to LastValue. Using a TypeDict state with the Annotated function, I didn't encounter this issue, which suggests that the problem may lie with the state.

Additionally, could you provide a definition of LastValue? I've attempted to understand it, but the Pregel code was quite complex. I would appreciate any documentation on the Pregel source code. Thank you.

System Info

langchain==0.1.17
langchain-anthropic==0.1.11
langchain-community==0.0.37
langchain-core==0.1.52
langchain-openai==0.1.6
langchain-text-splitters==0.0.1
langchainhub==0.1.15

@hinthornw
Copy link
Contributor

hinthornw commented May 20, 2024

Hi @minki-j - great observation. If StateGraph is initialized with an instance of a dictionary, rather than a **type**, the resultinggraph has no knowledge of the documents field in your dictionary and only can only "reduce" on the __root__ level.

As noted, the same code works with a typeddict. It also works if you annotated how you want to reduce the primary state:

graph = StateGraph(Annotated[dict, lambda x, y: {**x, **y}])

Another issue is that the code above has then specified while also explicitly adding edges; it would be better to only have the then argument

@hinthornw
Copy link
Contributor

hinthornw commented May 20, 2024

Gonna work with Nuno to make the errors more helpful here, and plan to deprecate then so there's only one way of doing things

@minki-j
Copy link
Author

minki-j commented May 21, 2024

Thanks @hinthornw. I turned the state into a typeddict with the annoated reduce logic, and the error is gone.

Thanks for pointing out the duplicated edges. I mistakenly add the edges when I was cleaning the code for this post.

Can you consider not deprecating then? I love it because it makes the codes cleaner. I think it's a great option to keep having. What about throwing an error when there is a duplicated edges? If you like this idea, I'd love to work on adding that feature!

example>

from varname import nameof

g = StateGraph(StateType)
g.set_entry_point("entry")

g.add_node("entry", RunnablePassthrough())

# With `then`, I can see the flow at a glance
g.add_conditional_edges(
    "entry",
    conditional_func,
    to_path_map(
        [nameof(gather_user_info),
        nameof(gather_vendor_info),]
    ),
    then="rendezvous",
)

g.add_node(nameof(gather_user_info), gather_user_info)
g.add_node(nameof(gather_vendor_info), gather_vendor_info)

# Without using 'then', I have to add the following edges, which is less clear than 'then' in my opinion.
# g.add_edge(nameof(gather_user_info), "rendezvous")
# g.add_edge(nameof(gather_vendor_info), "rendezvous")

g.add_node("rendezvous", RunnablePassthrough())
g.add_edge("rendezvous", END)
# functions used above
def to_path_map(node_names):
    dict = {}
    for name in node_names:
        dict[name] = name
    return dict

def conditional_func(state: dict[str, Documents]):
    if state["documents"].user_info is None:
        return nameof(gather_user_info)
    else:
        return nameof(gather_vendor_info)

@Jeomon
Copy link

Jeomon commented May 26, 2024

could you plz tell about, what does the then argument in add_conditional_edges(....,then=...) I checked the docs but didn't understood it. Let me put it in an example manner, if I have an agent then I will make a conditional edge from the agent to the tool later a regular edge goes back to the agent from the tool. it works in an iterative manner until arrives to a solution. In this were does the then argument does the role.

@Eknathabhiram
Copy link

I am facing similar problem, Please help @hinthornw @Jeomon

This is my state
class State(TypedDict):
node : Node
keywords: Optional[KeywordExtractor]
context_scoring: Optional[ContextScoring]
seed_question: Optional[List[SeedQuestion]]
reasoning_question: Optional[ReasoningQuestion]
conditional_question: Optional[ConditionalQuestion]
multi_context_question: Optional[MultiContextQuestion]
compressed_question: Optional[CompressQuestion]
relevant_concept: Optional[RelevantConcept]
doc_store: Any

One of my routing function is
def seed_question_router(state: State):
if state['seed_question']:
return ['reasoning_question_gen','conditional_question_gen']
else:
return "get_random_node"

I am updating different state parameter in both nodes, then I am receiving below error
Traceback (most recent call last):
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/x/Documents/RAG Pipeline/evalution/workflow_nodes.py", line 148, in
for event in events:
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/pregel/init.py", line 1218, in stream
while loop.tick(
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/pregel/loop.py", line 285, in tick
mv_writes = apply_writes(
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/pregel/algo.py", line 223, in apply_writes
if channels[chan].update(vals) and get_next_version is not None:
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/channels/last_value.py", line 38, in update
raise InvalidUpdateError(
langgraph.errors.InvalidUpdateError: At key 'node': Can receive only one value per step. Use an Annotated key to handle multiple values.

graph_builder = StateGraph(State)

graph_builder.add_node('get_random_node',get_random_node)
graph_builder.add_node('keyword_extractor',keyword_extractor)
graph_builder.add_node('context_scoring_node',context_scoring_node)
graph_builder.add_node('seed_question_gen',seed_question_gen)
graph_builder.add_node('reasoning_question_gen',reasoning_question_gen)
graph_builder.add_node('conditional_question_gen',conditional_question_gen)

graph_builder.add_edge(START, "get_random_node")
graph_builder.add_edge("get_random_node", "keyword_extractor")
graph_builder.add_edge('keyword_extractor','context_scoring_node')

graph_builder.add_edge('seed_question_gen','seed_subgraph')

graph_builder.add_conditional_edges('context_scoring_node',context_filter)
graph_builder.add_conditional_edges('seed_question_gen',seed_question_router)
graph_builder.add_conditional_edges('reasoning_question_gen',reasoning_question_router)
graph_builder.add_conditional_edges('conditional_question_gen',conditional_question_router)

What should I change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants