Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subflow can't be resume by main flow #1144

Closed
5 tasks done
cjdxhjj opened this issue Jul 26, 2024 · 8 comments
Closed
5 tasks done

subflow can't be resume by main flow #1144

cjdxhjj opened this issue Jul 26, 2024 · 8 comments

Comments

@cjdxhjj
Copy link

cjdxhjj commented Jul 26, 2024

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangGraph/LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangGraph/LangChain rather than my code.
  • I am sure this is better as an issue rather than a GitHub discussion, since this is a LangGraph bug and not a design question.

Example Code

import random
from typing import TypedDict, Sequence, Annotated, Dict, Callable, Any
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages


class StateA(TypedDict):
    name: str
    messages: Annotated[Sequence[BaseMessage], add_messages]

class StateB(TypedDict):
    name: str
    age: int
    messages: Annotated[Sequence[BaseMessage], add_messages]

def a_llm(state: StateA) -> Dict[str, Any]:
    print('a_llm')
    print(state)
    return {'messages': [AIMessage(content='a_llm')]}

def b_llm(state: StateB) -> Dict[str, Any]:
    print('b_llm')
    print(state)
    return {'messages': [AIMessage(content='b_llm')]}

def a_a(state: StateA) -> Dict[str, Any]:
    print('a_a')
    print(state)
    return {'messages': [AIMessage(content='a_a')]}

def b_a(state: StateB) -> Dict[str, Any]:
    print('b_a')
    print(state)
    return {'messages': [AIMessage(content='b_a')]}

def a_b(state: StateB) -> Dict[str, Any]:
    print('a_b')
    print(state)
    return {'messages': [AIMessage(content='a_b')]}

def human_b(state: StateB) -> Dict[str, Any]:
    print('human_b, continue ? A: continue B stop')
    print(state)
    return {'messages': [AIMessage(content='continue ? A: continue B stop')]}

def judge(state: StateB) -> str:
    i = random.randint(1, 10)
    print(f'judge {i}')
    print(state)
    if i > 6:
        return 'c_b'
    else:
        return END

def h_j(state: StateB) -> str:
    msg = state.get('messages')[-1].content
    if msg == 'A':
        return 'b_b'
    else:
        return END

def b_b(state: StateB) -> Dict[str, Any]:
    print('b_b')
    print(state)
    return {'messages': [AIMessage(content='b_b')]}

def c_b(state: StateB) -> Dict[str, Any]:
    print('c_b')
    print(state)
    return {'messages': [AIMessage(content='c_b')]}
sf = StateGraph(StateB)
sf.add_node('b_llm', b_llm)
sf.add_node('a_b', a_b)
sf.add_node('human_b', human_b)
sf.add_node('b_b', b_b)
sf.add_node('c_b', c_b)
sf.add_edge('b_llm', 'a_b')
sf.add_edge('a_b', 'human_b')
sf.add_conditional_edges('human_b', h_j)
sf.add_conditional_edges('b_b', judge)
sf.add_edge('c_b', END)
sf.set_entry_point('b_llm')

memory = MemorySaver()

graph = StateGraph(StateA)
graph.add_node('a_llm', a_llm)
graph.add_node('a_a', a_a)
graph.add_node('b_s', sf.compile(checkpointer=memory, interrupt_before=['human_b']))
graph.add_node('b_a', b_a)
graph.add_edge('a_llm', 'a_a')
graph.add_edge('a_a', 'b_s')
graph.add_edge('b_s', 'b_a')
graph.set_entry_point('a_llm')


app = graph.compile(checkpointer=memory)
cfg = {"configurable": {"thread_id": "thread-1"}}
res = app.invoke(StateA(name='xxx', messages=[]), cfg)
print(res)
snapshot = app.get_state(cfg)
app.update_state(cfg, {'messages': [HumanMessage(content='A')]})
r = app.invoke(None, config=cfg, stream_mode='values')
print(r)

Error Message and Stack Trace (if applicable)

a_llm
{'name': 'xxx', 'messages': []}
a_a
{'name': 'xxx', 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e')]}
b_llm
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854')]}
a_b
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854'), AIMessage(content='b_llm', id='47006267-1337-4b21-b4ed-c04ec28bcaa0')]}
{'name': 'xxx', 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854')]}
b_llm
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854'), HumanMessage(content='A', id='fb15b763-a436-4b7a-afda-b503e7127d26')]}
a_b
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854'), HumanMessage(content='A', id='fb15b763-a436-4b7a-afda-b503e7127d26'), AIMessage(content='b_llm', id='6d5919fe-9730-4442-b5a9-7cc6e0752ae8')]}
None

Description

my sub flow has some human node to collect manual feedback, when i add the message to main flow, the subflow can't resume.
i can't invoke the subflow invoke to resume it because i dosn't know the graph step to which node, if i hava many subflow, i have to
record it interrupt at which node and put every subflow at global variable

System Info

python 3.12 langraph 0.1.14

@vbarda
Copy link
Collaborator

vbarda commented Jul 26, 2024

@cjdxhjj we’re working on it, stay tuned!

@cjdxhjj
Copy link
Author

cjdxhjj commented Jul 26, 2024

thanks @vbarda

@cjdxhjj
Copy link
Author

cjdxhjj commented Jul 26, 2024

another question, if the subflow occur an error, how can i stop all graph(jump to main workflow end)

@hwchase17
Copy link
Contributor

another question, if the subflow occur an error, how can i stop all graph(jump to main workflow end)

this should happen by default!

@cjdxhjj
Copy link
Author

cjdxhjj commented Jul 26, 2024

the subflow jump to end just jump to the end of subflow, is there any way jump to main flow end node? for example the remote inteface return a error code not python code throw an error @hwchase17

@avinashkz
Copy link

Blocked by the same issue. Is there a workaround to handle interrupt + state updates in sub graphs?

@avinashkz
Copy link

I also tried the compile the sub graph separately so that I can update it's state. I was able to update the state of the sub graph without any issues but doing this updated the the state of parent graph and set next to ().

@vbarda
Copy link
Collaborator

vbarda commented Sep 4, 2024

hi all -- we've added support for adding subgraph interrupts, resuming the subgraphs from the interrupts and updating the subgraph state. please check out this how-to guide for more information https://langchain-ai.github.io/langgraph/how-tos/subgraphs-manage-state/

@vbarda vbarda closed this as completed Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants