如何使用图API¶
本指南演示了LangGraph的图API的基础知识。它将介绍状态,以及如何组合常见的图结构,如序列、分支和循环。它还涵盖了LangGraph的控制功能,包括用于map-reduce工作流的Send API和用于结合状态更新与节点间“跳转”的Command API。
设置¶
安装 langgraph
设置 LangSmith 以便更好地调试
注册LangSmith,以便快速发现问题并提高LangGraph项目的性能。LangSmith允许您使用跟踪数据来调试、测试和监控您使用LangGraph构建的LLM应用程序——请阅读文档以了解更多入门信息。
定义和更新状态¶
在这里,我们展示了如何在LangGraph中定义和更新状态。我们将演示
定义状态¶
LangGraph中的状态可以是TypedDict
、Pydantic
模型或dataclass。下面我们将使用TypedDict
。有关使用Pydantic的详细信息,请参阅本节。
默认情况下,图将具有相同的输入和输出模式,并且状态决定了该模式。有关如何定义不同的输入和输出模式,请参阅本节。
让我们考虑一个使用消息的简单示例。这代表了许多LLM应用程序的一种通用状态表示形式。有关更多详细信息,请参阅我们的概念页面。
API 参考: AnyMessage
from langchain_core.messages import AnyMessage
from typing_extensions import TypedDict
class State(TypedDict):
messages: list[AnyMessage]
extra_field: int
此状态跟踪一个消息对象列表,以及一个额外的整数字段。
更新状态¶
让我们构建一个包含单个节点的示例图。我们的节点只是一个Python函数,它读取我们图的状态并对其进行更新。该函数的第一个参数始终是状态。
API 参考: AIMessage
from langchain_core.messages import AIMessage
def node(state: State):
messages = state["messages"]
new_message = AIMessage("Hello!")
return {"messages": messages + [new_message], "extra_field": 10}
此节点只是将一条消息附加到我们的消息列表,并填充一个额外字段。
重要
节点应直接返回对状态的更新,而不是修改状态。
接下来,我们定义一个包含此节点的简单图。我们使用StateGraph来定义一个在此状态上操作的图。然后我们使用add_node来填充我们的图。
API 参考: StateGraph
from langgraph.graph import StateGraph
builder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()
LangGraph提供了内置的实用程序来可视化您的图。让我们检查一下我们的图。有关可视化的详细信息,请参阅本节。
在这种情况下,我们的图只执行一个节点。让我们继续进行简单的调用。
API 参考: HumanMessage
from langchain_core.messages import HumanMessage
result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi', additional_kwargs={}, response_metadata={}),
AIMessage(content='Hello!', additional_kwargs={}, response_metadata={})],
'extra_field': 10}
请注意,
- 我们通过更新状态的一个键来启动调用。
- 我们在调用结果中接收到整个状态。
为了方便起见,我们经常通过漂亮打印检查消息对象的内容
================================ Human Message =================================
Hi
================================== Ai Message ==================================
Hello!
使用Reducer处理状态更新¶
状态中的每个键都可以有自己的独立Reducer函数,它控制如何应用来自节点的更新。如果没有明确指定Reducer函数,则假定对键的所有更新都应该覆盖它。
对于TypedDict
状态模式,我们可以通过使用Reducer函数注释状态的相应字段来定义Reducer。
在前面的示例中,我们的节点通过向其附加消息来更新状态中的"messages"
键。下面,我们向此键添加一个Reducer,以便自动附加更新。
from typing_extensions import Annotated
def add(left, right):
"""Can also import `add` from the `operator` built-in."""
return left + right
class State(TypedDict):
messages: Annotated[list[AnyMessage], add]
extra_field: int
现在我们的节点可以简化了
def node(state: State):
new_message = AIMessage("Hello!")
return {"messages": [new_message], "extra_field": 10}
API 参考: START
from langgraph.graph import START
graph = StateGraph(State).add_node(node).add_edge(START, "node").compile()
result = graph.invoke({"messages": [HumanMessage("Hi")]})
for message in result["messages"]:
message.pretty_print()
================================ Human Message =================================
Hi
================================== Ai Message ==================================
Hello!
MessagesState¶
实际上,更新消息列表还有其他注意事项
LangGraph包含一个内置的Reducer add_messages
,用于处理这些注意事项。
API 参考: add_messages
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
extra_field: int
def node(state: State):
new_message = AIMessage("Hello!")
return {"messages": [new_message], "extra_field": 10}
graph = StateGraph(State).add_node(node).set_entry_point("node").compile()
input_message = {"role": "user", "content": "Hi"}
result = graph.invoke({"messages": [input_message]})
for message in result["messages"]:
message.pretty_print()
================================ Human Message =================================
Hi
================================== Ai Message ==================================
Hello!
MessagesState
,因此我们可以拥有
定义输入和输出Schema¶
默认情况下,StateGraph
使用单个Schema运行,并且所有节点都应使用该Schema进行通信。但是,也可以为图定义不同的输入和输出Schema。
当指定不同Schema时,内部Schema仍将用于节点之间的通信。输入Schema确保提供的输入与预期结构匹配,而输出Schema则过滤内部数据,以根据定义的输出Schema仅返回相关信息。
下面,我们将看到如何定义不同的输入和输出Schema。
API 参考: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# Define the schema for the input
class InputState(TypedDict):
question: str
# Define the schema for the output
class OutputState(TypedDict):
answer: str
# Define the overall schema, combining both input and output
class OverallState(InputState, OutputState):
pass
# Define the node that processes the input and generates an answer
def answer_node(state: InputState):
# Example answer and an extra key
return {"answer": "bye", "question": state["question"]}
# Build the graph with input and output schemas specified
builder = StateGraph(OverallState, input=InputState, output=OutputState)
builder.add_node(answer_node) # Add the answer node
builder.add_edge(START, "answer_node") # Define the starting edge
builder.add_edge("answer_node", END) # Define the ending edge
graph = builder.compile() # Compile the graph
# Invoke the graph with an input and print the result
print(graph.invoke({"question": "hi"}))
在节点之间传递私有状态¶
在某些情况下,您可能希望节点交换对中间逻辑至关重要但不需要成为图主Schema一部分的信息。这些私有数据与图的整体输入/输出不相关,应仅在某些节点之间共享。
下面,我们将创建一个由三个节点(node_1、node_2和node_3)组成的简单顺序图示例,其中私有数据在最初两个步骤(node_1和node_2)之间传递,而第三个步骤(node_3)只能访问公共整体状态。
API 参考: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# The overall state of the graph (this is the public state shared across nodes)
class OverallState(TypedDict):
a: str
# Output from node_1 contains private data that is not part of the overall state
class Node1Output(TypedDict):
private_data: str
# The private data is only shared between node_1 and node_2
def node_1(state: OverallState) -> Node1Output:
output = {"private_data": "set by node_1"}
print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}")
return output
# Node 2 input only requests the private data available after node_1
class Node2Input(TypedDict):
private_data: str
def node_2(state: Node2Input) -> OverallState:
output = {"a": "set by node_2"}
print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}")
return output
# Node 3 only has access to the overall state (no access to private data from node_1)
def node_3(state: OverallState) -> OverallState:
output = {"a": "set by node_3"}
print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}")
return output
# Connect nodes in a sequence
# node_2 accepts private data from node_1, whereas
# node_3 does not see the private data.
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()
# Invoke the graph with the initial state
response = graph.invoke(
{
"a": "set at start",
}
)
print()
print(f"Output of graph invocation: {response}")
Entered node `node_1`:
Input: {'a': 'set at start'}.
Returned: {'private_data': 'set by node_1'}
Entered node `node_2`:
Input: {'private_data': 'set by node_1'}.
Returned: {'a': 'set by node_2'}
Entered node `node_3`:
Input: {'a': 'set by node_2'}.
Returned: {'a': 'set by node_3'}
Output of graph invocation: {'a': 'set by node_3'}
为图状态使用Pydantic模型¶
一个StateGraph在初始化时接受一个state_schema
参数,用于指定图中节点可以访问和更新的状态的“形状”。
在我们的示例中,我们通常使用 Python 原生的 TypedDict
作为 state_schema
,但 state_schema
可以是任何类型。
在这里,我们将看到如何使用Pydantic BaseModel作为state_schema
,以添加对输入的运行时验证。
已知限制
- 目前,图的输出将不是Pydantic模型的一个实例。
- 运行时验证只发生在节点的输入上,而不是输出上。
- Pydantic的验证错误跟踪不会显示错误发生在哪一个节点。
API 参考: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from pydantic import BaseModel
# The overall state of the graph (this is the public state shared across nodes)
class OverallState(BaseModel):
a: str
def node(state: OverallState):
return {"a": "goodbye"}
# Build the state graph
builder = StateGraph(OverallState)
builder.add_node(node) # node_1 is the first node
builder.add_edge(START, "node") # Start the graph with node_1
builder.add_edge("node", END) # End the graph after node_1
graph = builder.compile()
# Test the graph with a valid input
graph.invoke({"a": "hello"})
使用无效输入调用图
try:
graph.invoke({"a": 123}) # Should be a string
except Exception as e:
print("An exception was raised because `a` is an integer rather than a string.")
print(e)
An exception was raised because `a` is an integer rather than a string.
1 validation error for OverallState
a
Input should be a valid string [type=string_type, input_value=123, input_type=int]
For further information visit https://errors.pydantic.dev/2.9/v/string_type
序列化行为
当使用 Pydantic 模型作为状态 Schema 时,理解序列化如何工作非常重要,尤其是在以下情况:- 将 Pydantic 对象作为输入传递
- 接收图的输出
- 使用嵌套 Pydantic 模型
API 参考: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
class NestedModel(BaseModel):
value: str
class ComplexState(BaseModel):
text: str
count: int
nested: NestedModel
def process_node(state: ComplexState):
# Node receives a validated Pydantic object
print(f"Input state type: {type(state)}")
print(f"Nested type: {type(state.nested)}")
# Return a dictionary update
return {"text": state.text + " processed", "count": state.count + 1}
# Build the graph
builder = StateGraph(ComplexState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()
# Create a Pydantic instance for input
input_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))
print(f"Input object type: {type(input_state)}")
# Invoke graph with a Pydantic instance
result = graph.invoke(input_state)
print(f"Output type: {type(result)}")
print(f"Output content: {result}")
# Convert back to Pydantic model if needed
output_model = ComplexState(**result)
print(f"Converted back to Pydantic: {type(output_model)}")
运行时类型强制转换
Pydantic 对某些数据类型执行运行时类型强制转换。这可能很有帮助,但如果您不了解它,也可能导致意外行为。API 参考: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
class CoercionExample(BaseModel):
# Pydantic will coerce string numbers to integers
number: int
# Pydantic will parse string booleans to bool
flag: bool
def inspect_node(state: CoercionExample):
print(f"number: {state.number} (type: {type(state.number)})")
print(f"flag: {state.flag} (type: {type(state.flag)})")
return {}
builder = StateGraph(CoercionExample)
builder.add_node("inspect", inspect_node)
builder.add_edge(START, "inspect")
builder.add_edge("inspect", END)
graph = builder.compile()
# Demonstrate coercion with string inputs that will be converted
result = graph.invoke({"number": "42", "flag": "true"})
# This would fail with a validation error
try:
graph.invoke({"number": "not-a-number", "flag": "true"})
except Exception as e:
print(f"\nExpected validation error: {e}")
使用消息模型
在状态模式中使用 LangChain 消息类型时,序列化有一些重要注意事项。在使用消息对象进行网络传输时,您应该使用AnyMessage
(而不是 BaseMessage
)以实现正确的序列化/反序列化。API 参考: StateGraph | START | END | HumanMessage | AIMessage | AnyMessage
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, AIMessage, AnyMessage
from typing import List
class ChatState(BaseModel):
messages: List[AnyMessage]
context: str
def add_message(state: ChatState):
return {"messages": state.messages + [AIMessage(content="Hello there!")]}
builder = StateGraph(ChatState)
builder.add_node("add_message", add_message)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", END)
graph = builder.compile()
# Create input with a message
initial_state = ChatState(
messages=[HumanMessage(content="Hi")], context="Customer support chat"
)
result = graph.invoke(initial_state)
print(f"Output: {result}")
# Convert back to Pydantic model to see message types
output_model = ChatState(**result)
for i, msg in enumerate(output_model.messages):
print(f"Message {i}: {type(msg).__name__} - {msg.content}")
添加运行时配置¶
有时您希望在调用图时能够对其进行配置。例如,您可能希望在运行时指定使用哪个LLM或系统提示,而无需用这些参数污染图状态。
要添加运行时配置
- 指定配置的Schema
- 将配置添加到节点或条件边的函数签名中
- 将配置传递给图。
请参阅下面的简单示例
API 参考: RunnableConfig | END | StateGraph | START
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict
# 1. Specify config schema
class ConfigSchema(TypedDict):
my_runtime_value: str
# 2. Define a graph that accesses the config in a node
class State(TypedDict):
my_state_value: str
def node(state: State, config: RunnableConfig):
if config["configurable"]["my_runtime_value"] == "a":
return {"my_state_value": 1}
elif config["configurable"]["my_runtime_value"] == "b":
return {"my_state_value": 2}
else:
raise ValueError("Unknown values.")
builder = StateGraph(State, config_schema=ConfigSchema)
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)
graph = builder.compile()
# 3. Pass in configuration at runtime:
print(graph.invoke({}, {"configurable": {"my_runtime_value": "a"}}))
print(graph.invoke({}, {"configurable": {"my_runtime_value": "b"}}))
扩展示例:在运行时指定LLM
下面我们演示一个实际的例子,其中我们在运行时配置使用哪个LLM。我们将同时使用OpenAI和Anthropic模型。import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")
from langchain.chat_models import init_chat_model
from langchain_core.runnables import RunnableConfig
from langgraph.graph import MessagesState
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict
class ConfigSchema(TypedDict):
model: str
MODELS = {
"anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
"openai": init_chat_model("openai:gpt-4.1-mini"),
}
def call_model(state: MessagesState, config: RunnableConfig):
model = config["configurable"].get("model", "anthropic")
model = MODELS[model]
response = model.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(MessagesState, config_schema=ConfigSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)
graph = builder.compile()
# Usage
input_message = {"role": "user", "content": "hi"}
# With no configuration, uses default (Anthropic)
response_1 = graph.invoke({"messages": [input_message]})["messages"][-1]
# Or, can set OpenAI
config = {"configurable": {"model": "openai"}}
response_2 = graph.invoke({"messages": [input_message]}, config=config)["messages"][-1]
print(response_1.response_metadata["model_name"])
print(response_2.response_metadata["model_name"])
扩展示例:在运行时指定模型和系统消息
下面我们演示一个实际的例子,其中我们在运行时配置两个参数:LLM和系统消息。import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")
from typing import Optional
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, MessagesState, StateGraph, START
from typing_extensions import TypedDict
class ConfigSchema(TypedDict):
model: Optional[str]
system_message: Optional[str]
MODELS = {
"anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
"openai": init_chat_model("openai:gpt-4.1-mini"),
}
def call_model(state: MessagesState, config: RunnableConfig):
model = config["configurable"].get("model", "anthropic")
model = MODELS[model]
messages = state["messages"]
if system_message := config["configurable"].get("system_message"):
messages = [SystemMessage(system_message)] + messages
response = model.invoke(messages)
return {"messages": [response]}
builder = StateGraph(MessagesState, config_schema=ConfigSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)
graph = builder.compile()
# Usage
input_message = {"role": "user", "content": "hi"}
config = {"configurable": {"model": "openai", "system_message": "Respond in Italian."}}
response = graph.invoke({"messages": [input_message]}, config)
for message in response["messages"]:
message.pretty_print()
添加重试策略¶
在许多用例中,您可能希望节点具有自定义重试策略,例如,如果您正在调用API、查询数据库或调用LLM等。LangGraph允许您向节点添加重试策略。
要配置重试策略,请将retry
参数传递给add_node。retry
参数接受一个RetryPolicy
命名元组对象。下面我们使用默认参数实例化一个RetryPolicy
对象并将其与一个节点关联
from langgraph.pregel import RetryPolicy
builder.add_node(
"node_name",
node_function,
retry=RetryPolicy(),
)
默认情况下,retry_on
参数使用default_retry_on
函数,它会重试除了以下异常之外的任何异常:
ValueError
TypeError
ArithmeticError
ImportError
LookupError
NameError
SyntaxError
RuntimeError
ReferenceError
StopIteration
StopAsyncIteration
OSError
此外,对于来自流行http请求库(如requests
和httpx
)的异常,它只在5xx状态码时重试。
扩展示例:自定义重试策略
考虑一个我们正在从SQL数据库读取数据的示例。下面我们向节点传递两个不同的重试策略API 参考: init_chat_model | END | StateGraph | START | SQLDatabase | AIMessage
import sqlite3
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.pregel import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage
db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("anthropic:claude-3-5-haiku-latest")
def query_database(state: MessagesState):
query_result = db.run("SELECT * FROM Artist LIMIT 10;")
return {"messages": [AIMessage(content=query_result)]}
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# Define a new graph
builder = StateGraph(MessagesState)
builder.add_node(
"query_database",
query_database,
retry=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()
添加节点缓存¶
节点缓存对于避免重复操作非常有用,例如执行耗时或耗费成本的操作时。LangGraph 允许您为图中的节点添加个性化缓存策略。
要配置缓存策略,请将 cache_policy
参数传递给 add_node 函数。在以下示例中,实例化了一个 CachePolicy
对象,其生存时间为 120 秒,并使用默认的 key_func
生成器。然后将其与一个节点关联。
from langgraph.types import CachePolicy
builder.add_node(
"node_name",
node_function,
cache_policy=CachePolicy(ttl=120),
)
然后,要为图启用节点级缓存,请在编译图时设置cache
参数。以下示例使用InMemoryCache
设置一个带有内存缓存的图,但也可以使用SqliteCache
。
创建步骤序列¶
先决条件
本指南假定您熟悉上面关于状态的部分。
这里我们演示如何构建一个简单的步骤序列。我们将展示
- 如何构建顺序图
- 构建类似图的内置简写。
要添加节点序列,我们使用图的.add_node
和.add_edge
方法:API 参考: START | StateGraph
from langgraph.graph import START, StateGraph
builder = StateGraph(State)
# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)
# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
我们也可以使用内置的简写 .add_sequence
builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")
为什么使用LangGraph将应用步骤拆分为序列?
LangGraph使得为您的应用程序添加底层持久化层变得容易。这允许在节点执行之间检查点状态,因此您的LangGraph节点控制它们还决定了执行步骤如何流式传输,以及如何使用LangGraph Studio可视化和调试您的应用程序。让我们演示一个端到端示例。我们将创建三个步骤的序列
- 填充状态中某个键的值
- 更新相同的值
- 填充不同的值
首先定义我们的状态。这决定了图的Schema,也可以指定如何应用更新。有关更多详细信息,请参阅本节。
在我们的例子中,我们将只跟踪两个值
我们的节点只是Python函数,它们读取我们图的状态并对其进行更新。该函数的第一个参数始终是状态
def step_1(state: State):
return {"value_1": "a"}
def step_2(state: State):
current_value_1 = state["value_1"]
return {"value_1": f"{current_value_1} b"}
def step_3(state: State):
return {"value_2": 10}
注意
请注意,在向状态发出更新时,每个节点只需指定它希望更新的键的值。
默认情况下,这将**覆盖**相应键的值。您还可以使用Reducer来控制如何处理更新——例如,您可以将连续更新附加到键而不是覆盖。有关更多详细信息,请参阅本节。
最后,我们定义图。我们使用StateGraph来定义一个在此状态上操作的图。
然后我们将使用add_node和add_edge来填充我们的图并定义其控制流。
API 参考: START | StateGraph
from langgraph.graph import START, StateGraph
builder = StateGraph(State)
# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)
# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
请注意,
.add_edge
接受节点的名称,对于函数而言,默认为node.__name__
。- 我们必须指定图的入口点。为此,我们添加一个带有START节点的边。
- 当没有更多节点可执行时,图将停止。
接下来,我们编译我们的图。这提供了对图结构的一些基本检查(例如,识别孤立节点)。如果我们要通过检查点为应用程序添加持久性,它也会在这里传递。
LangGraph提供了内置的实用程序来可视化您的图。让我们检查一下我们的序列。有关可视化的详细信息,请参阅本指南。
让我们继续进行简单的调用
请注意,
- 我们通过为单个状态键提供值来启动调用。我们必须始终至少为一个键提供值。
- 我们传入的值被第一个节点覆盖了。
- 第二个节点更新了值。
- 第三个节点填充了不同的值。
内置简写
langgraph>=0.2.46
包含一个内置的简写 add_sequence
用于添加节点序列。您可以如下编译相同的图:
创建分支¶
节点的并行执行对于加快整体图操作至关重要。LangGraph提供对节点并行执行的原生支持,这可以显著提高基于图的工作流的性能。这种并行化通过扇出(fan-out)和扇入(fan-in)机制实现,同时利用标准边和条件边。下面是一些示例,展示了如何创建适合您的分支数据流。
并行运行图节点¶
在此示例中,我们从Node A
扇出到B和C
,然后扇入到D
。通过我们的状态,我们指定Reducer的添加操作。这将组合或累积状态中特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接起来。有关使用Reducer更新状态的更多详细信息,请参阅上面关于状态Reducer的部分。
API 参考: StateGraph | START | END
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Adding "D" to {state["aggregate"]}')
return {"aggregate": ["D"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
使用Reducer,您可以看到每个节点中添加的值都被累积起来。
注意
在上面的示例中,节点 "b"
和 "c"
在同一超步中并发执行。因为它们在同一超步中,节点 "d"
在 "b"
和 "c"
都完成后才执行。
重要的是,来自并行超步的更新可能无法保持一致的顺序。如果您需要并行超步中更新的一致、预定顺序,您应该将输出写入状态中的单独字段,并附带用于排序的值。
异常处理?
LangGraph 在“超步”中执行节点,这意味着虽然并行分支并行执行,但整个超步是事务性的。如果任何分支引发异常,没有更新会应用到状态(整个超步都会出错)。
重要的是,当使用检查点时,超步内成功节点的运行结果会被保存,并在恢复时不会重复。
如果您有容易出错(可能希望处理不稳定的 API 调用)的代码,LangGraph 提供了两种方式来解决这个问题:- 您可以在节点中编写常规 Python 代码来捕获和处理异常。
- 您可以设置一个重试策略,指示图重试引发某些类型异常的节点。只有失败的分支会被重试,因此您不必担心执行重复的工作。
延迟节点执行¶
当您希望延迟节点的执行直到所有其他待处理任务都完成时,延迟节点执行非常有用。这在分支长度不同时尤其重要,这在诸如Map-Reduce流等工作流中很常见。
上面的示例展示了当每个路径只有一个步骤时如何扇出和扇入。但是,如果一个分支有多个步骤呢?让我们在"b"
分支中添加一个节点"b_2"
API 参考: StateGraph | START | END
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def b_2(state: State):
print(f'Adding "B_2" to {state["aggregate"]}')
return {"aggregate": ["B_2"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Adding "D" to {state["aggregate"]}')
return {"aggregate": ["D"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']
在上述示例中,节点"b"
和"c"
在同一超步中并发执行。我们将节点d
的defer
设置为True
,这样它将不会执行,直到所有待处理任务都完成。在这种情况下,这意味着"d"
会等待,直到整个"b"
分支完成执行。
条件分支¶
如果您的扇出需要根据运行时状态而变化,您可以使用add_conditional_edges来根据图状态选择一条或多条路径。请参阅下面的示例,其中节点a
生成一个状态更新,该更新决定了下一个节点。
API 参考: StateGraph | START | END
import operator
from typing import Annotated, Literal, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
aggregate: Annotated[list, operator.add]
# Add a key to the state. We will set this key to determine
# how we branch.
which: str
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"], "which": "c"}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)
def conditional_edge(state: State) -> Literal["b", "c"]:
# Fill in arbitrary logic here that uses the state
# to determine the next node
return state["which"]
builder.add_conditional_edges("a", conditional_edge)
graph = builder.compile()
提示
您的条件边可以路由到多个目标节点。例如
Map-reduce 和 Send
API¶
默认情况下,Nodes
和Edges
是预先定义的,并在相同的共享状态上操作。然而,在某些情况下,确切的边可能无法预先知道,并且/或者您可能希望同时存在不同版本状态。一个常见的例子是Map-reduce设计模式。在这种设计模式中,第一个节点可能生成一个对象列表,您可能希望将某个其他节点应用于所有这些对象。对象的数量可能无法预先知道(这意味着边的数量可能无法知道),并且下游Node
的输入状态应该不同(每个生成的对象一个)。
为了支持这种设计模式,LangGraph支持从条件边返回Send对象。Send
接受两个参数:第一个是节点的名称,第二个是传递给该节点的状态。
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
graph.add_conditional_edges("node_a", continue_to_jokes)
下面我们实现一个简单的例子,模拟使用LLM (1) 生成主题列表(长度未知),(2) 并行生成笑话,以及 (3) 选择“最佳”笑话。重要的是,扇出节点的输入状态与图的整体状态不同。
API 参考: Send | END | StateGraph | START
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.types import Send
from langgraph.graph import END, StateGraph, START
# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
topic: str
subjects: list
# Notice here we use the operator.add
# This is because we want combine all the jokes we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
jokes: Annotated[list, operator.add]
best_selected_joke: str
# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
subject: str
# This is the function we will use to generate the subjects of the jokes.
# In general the length of the list generated by this node could vary each run.
def generate_topics(state: OverallState):
# Simulate a LLM.
return {"subjects": ["lions", "elephants", "penguins"]}
# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
# Simulate a LLM.
joke_map = {
"lions": "Why don't lions like fast food? Because they can't catch it!",
"elephants": "Why don't elephants use computers? They're afraid of the mouse!",
"penguins": (
"Why don’t penguins like talking to strangers at parties? "
"Because they find it hard to break the ice."
),
}
return {"jokes": [joke_map[state["subject"]]]}
# Here we define the logic to map out over the generated subjects
# We will use this as an edge in the graph
def continue_to_jokes(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
# Here we will judge the best joke
def best_joke(state: OverallState):
return {"best_selected_joke": "penguins"}
# Construct the graph: here we put everything together to construct our graph
builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
graph = builder.compile()
# Call the graph: here we call it to generate a list of jokes
for step in graph.stream({"topic": "animals"}):
print(step)
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don’t penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}
创建和控制循环¶
当创建带循环的图时,我们需要一个终止执行的机制。这最常用通过添加条件边来实现,该边在达到某个终止条件时路由到END节点。
您还可以在调用或流式传输图时设置图的递归限制。递归限制设置了图在引发错误之前允许执行的超步数量。在此处阅读有关递归限制概念的更多信息这里。
让我们考虑一个带有循环的简单图,以便更好地理解这些机制的工作原理。
提示
要返回状态的最后一个值而不是收到递归限制错误,请参阅下一节。
创建循环时,可以包含一个指定终止条件的条件边
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
def route(state: State) -> Literal["b", END]:
if termination_condition(state):
return END
else:
return "b"
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
要控制递归限制,请在配置中指定 "recursion_limit"
。这将引发一个 GraphRecursionError
,您可以捕获并处理它。
from langgraph.errors import GraphRecursionError
try:
graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:
print("Recursion Error")
让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。
API 参考: StateGraph | START | END
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Node A sees {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Node B sees {state["aggregate"]}')
return {"aggregate": ["B"]}
# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
# Define edges
def route(state: State) -> Literal["b", END]:
if len(state["aggregate"]) < 7:
return "b"
else:
return END
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
这种架构类似于ReAct 智能体,其中节点"a"
是一个工具调用模型,节点"b"
代表工具。
在我们的route
条件边中,我们指定当状态中的"aggregate"
列表达到阈值长度后,应该结束。
调用图后,我们看到在达到终止条件之前,我们在节点"a"
和"b"
之间交替执行。
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
施加递归限制¶
在某些应用中,我们可能无法保证能够达到给定的终止条件。在这些情况下,我们可以设置图的递归限制。这将在给定数量的超步后引发GraphRecursionError
。然后我们可以捕获并处理此异常。
from langgraph.errors import GraphRecursionError
try:
graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
print("Recursion Error")
扩展示例:达到递归限制时返回状态
我们可以引入一个新的键到状态中,用于跟踪距离达到递归限制剩余的步数,而不是抛出GraphRecursionError
。然后我们可以使用这个键来判断是否应该结束运行。LangGraph实现了一个特殊的RemainingSteps
注解。在底层,它创建了一个ManagedValue
通道——一个将在图运行期间存在且不再存在的状态通道。API 参考: StateGraph | START | END
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingSteps
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
remaining_steps: RemainingSteps
def a(state: State):
print(f'Node A sees {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Node B sees {state["aggregate"]}')
return {"aggregate": ["B"]}
# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
# Define edges
def route(state: State) -> Literal["b", END]:
if state["remaining_steps"] <= 2:
return END
else:
return "b"
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
# Test it out
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)
扩展示例:带分支的循环
为了更好地理解递归限制的工作原理,让我们考虑一个更复杂的示例。下面我们实现一个循环,但其中一个步骤会扇出成两个节点API 参考: StateGraph | START | END
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Node A sees {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Node B sees {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Node C sees {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Node D sees {state["aggregate"]}')
return {"aggregate": ["D"]}
# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
# Define edges
def route(state: State) -> Literal["b", END]:
if len(state["aggregate"]) < 7:
return "b"
else:
return END
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()
Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']
异步¶
使用异步编程范式可以在并发运行I/O 密集型代码(例如,向聊天模型提供商发出并发 API 请求)时显著提高性能。
要将图的同步
实现转换为异步
实现,您需要
- 更新
nodes
以使用async def
而不是def
。 - 更新内部代码以适当使用
await
。 - 根据需要使用
.ainvoke
或.astream
调用图。
由于许多LangChain对象都实现了Runnable协议,该协议具有所有同步
方法的异步
变体,因此将同步
图升级到异步
图通常非常快。
请看下面的示例。为了演示底层LLM的异步调用,我们将包含一个聊天模型。
import os
from langchain.chat_models import init_chat_model
os.environ["AZURE_OPENAI_API_KEY"] = "..."
os.environ["AZURE_OPENAI_ENDPOINT"] = "..."
os.environ["OPENAI_API_VERSION"] = "2025-03-01-preview"
llm = init_chat_model(
"azure_openai:gpt-4.1",
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
)
API 参考: init_chat_model | StateGraph
from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph
async def node(state: MessagesState): # (1)!
new_message = await llm.ainvoke(state["messages"]) # (2)!
return {"messages": [new_message]}
builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()
input_message = {"role": "user", "content": "Hello"}
result = await graph.ainvoke({"messages": [input_message]}) # (3)!
- 将节点声明为异步函数。
- 在节点内部使用异步调用(如果可用)。
- 对图对象本身使用异步调用。
异步流式传输
有关异步流式传输的示例,请参阅流式传输指南。
将控制流和状态更新与Command
结合¶
将控制流(边)和状态更新(节点)结合起来会很有用。例如,您可能希望在同一个节点中既执行状态更新又决定下一步要转向哪个节点。LangGraph提供了一种通过从节点函数返回Command对象来实现此目的的方法。
def my_node(state: State) -> Command[Literal["my_other_node"]]:
return Command(
# state update
update={"foo": "bar"},
# control flow
goto="my_other_node"
)
下面我们展示一个端到端的例子。我们创建一个包含3个节点:A、B和C的简单图。我们将首先执行节点A,然后根据节点A的输出决定下一步是转到节点B还是节点C。
API 参考: StateGraph | START | Command
import random
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START
from langgraph.types import Command
# Define graph state
class State(TypedDict):
foo: str
# Define the nodes
def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
print("Called A")
value = random.choice(["a", "b"])
# this is a replacement for a conditional edge function
if value == "a":
goto = "node_b"
else:
goto = "node_c"
# note how Command allows you to BOTH update the graph state AND route to the next node
return Command(
# this is the state update
update={"foo": value},
# this is a replacement for an edge
goto=goto,
)
def node_b(state: State):
print("Called B")
return {"foo": state["foo"] + "b"}
def node_c(state: State):
print("Called C")
return {"foo": state["foo"] + "c"}
现在我们可以使用上述节点创建StateGraph
。请注意,图没有用于路由的条件边!这是因为控制流是在node_a
内部使用Command
定义的。
builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# NOTE: there are no edges between nodes A, B and C!
graph = builder.compile()
重要
您可能已经注意到我们使用了 Command
作为返回类型注解,例如 Command[Literal["node_b", "node_c"]]
。这对于图的渲染是必要的,并且告诉 LangGraph node_a
可以导航到 node_b
和 node_c
。
如果我们多次运行该图,我们会看到它根据节点 A 中的随机选择采取不同的路径(A -> B 或 A -> C)。
导航到父图中的节点¶
如果您正在使用子图,您可能希望从子图中的一个节点导航到另一个子图(即父图中的另一个节点)。为此,您可以在Command
中指定graph=Command.PARENT
def my_node(state: State) -> Command[Literal["my_other_node"]]:
return Command(
update={"foo": "bar"},
goto="other_subgraph", # where `other_subgraph` is a node in the parent graph
graph=Command.PARENT
)
我们用上面的例子来演示一下。我们将通过将上面例子中的node_a
改为一个单节点图,然后将其作为子图添加到我们的父图中。
import operator
from typing_extensions import Annotated
class State(TypedDict):
# NOTE: we define a reducer here
foo: Annotated[str, operator.add]
def node_a(state: State):
print("Called A")
value = random.choice(["a", "b"])
# this is a replacement for a conditional edge function
if value == "a":
goto = "node_b"
else:
goto = "node_c"
# note how Command allows you to BOTH update the graph state AND route to the next node
return Command(
update={"foo": value},
goto=goto,
# this tells LangGraph to navigate to node_b or node_c in the parent graph
# NOTE: this will navigate to the closest parent graph relative to the subgraph
graph=Command.PARENT,
)
subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()
def node_b(state: State):
print("Called B")
# NOTE: since we've defined a reducer, we don't need to manually append
# new characters to existing 'foo' value. instead, reducer will append these
# automatically (via operator.add)
return {"foo": "b"}
def node_c(state: State):
print("Called C")
return {"foo": "c"}
builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)
graph = builder.compile()
在工具内使用¶
一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或ID查找客户信息。要从工具更新图状态,您可以从工具返回Command(update={"my_custom_key": "foo", "messages": [...]})
@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):
"""Use this to look up user information to better assist them with their questions."""
user_info = get_user_info(config.get("configurable", {}).get("user_id"))
return Command(
update={
# update the state keys
"user_info": user_info,
# update the message history
"messages": [ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id)]
}
)
重要
当从工具返回Command
时,您必须在Command.update
中包含messages
(或用于消息历史记录的任何状态键),并且messages
中的消息列表必须包含ToolMessage
。这对于使生成的消息历史记录有效是必需的(LLM提供商要求包含工具调用的AI消息后跟工具结果消息)。
如果您使用通过Command
更新状态的工具,我们建议使用预构建的ToolNode
,它会自动处理返回Command
对象的工具,并将其传播到图状态。如果您正在编写调用工具的自定义节点,则需要手动将工具返回的Command
对象作为节点的更新进行传播。
可视化您的图¶
这里我们演示如何可视化您创建的图。
您可以可视化任何任意的图,包括状态图。让我们画一些分形来找点乐子吧 :)
API 参考: StateGraph | START | END | add_messages
import random
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
class MyNode:
def __init__(self, name: str):
self.name = name
def __call__(self, state: State):
return {"messages": [("assistant", f"Called node {self.name}")]}
def route(state) -> Literal["entry_node", "__end__"]:
if len(state["messages"]) > 10:
return "__end__"
return "entry_node"
def add_fractal_nodes(builder, current_node, level, max_level):
if level > max_level:
return
# Number of nodes to create at this level
num_nodes = random.randint(1, 3) # Adjust randomness as needed
for i in range(num_nodes):
nm = ["A", "B", "C"][i]
node_name = f"node_{current_node}_{nm}"
builder.add_node(node_name, MyNode(node_name))
builder.add_edge(current_node, node_name)
# Recursively add more nodes
r = random.random()
if r > 0.2 and level + 1 < max_level:
add_fractal_nodes(builder, node_name, level + 1, max_level)
elif r > 0.05:
builder.add_conditional_edges(node_name, route, node_name)
else:
# End
builder.add_edge(node_name, "__end__")
def build_fractal_graph(max_level: int):
builder = StateGraph(State)
entry_point = "entry_node"
builder.add_node(entry_point, MyNode(entry_point))
builder.add_edge(START, entry_point)
add_fractal_nodes(builder, entry_point, 1, max_level)
# Optional: set a finish point if required
builder.add_edge(entry_point, END) # or any specific node
return builder.compile()
app = build_fractal_graph(3)
Mermaid¶
我们也可以将一个图类转换为 Mermaid 语法。
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
__start__([<p>__start__</p>]):::first
entry_node(entry_node)
node_entry_node_A(node_entry_node_A)
node_entry_node_B(node_entry_node_B)
node_node_entry_node_B_A(node_node_entry_node_B_A)
node_node_entry_node_B_B(node_node_entry_node_B_B)
node_node_entry_node_B_C(node_node_entry_node_B_C)
__end__([<p>__end__</p>]):::last
__start__ --> entry_node;
entry_node --> __end__;
entry_node --> node_entry_node_A;
entry_node --> node_entry_node_B;
node_entry_node_B --> node_node_entry_node_B_A;
node_entry_node_B --> node_node_entry_node_B_B;
node_entry_node_B --> node_node_entry_node_B_C;
node_entry_node_A -.-> entry_node;
node_entry_node_A -.-> __end__;
node_node_entry_node_B_A -.-> entry_node;
node_node_entry_node_B_A -.-> __end__;
node_node_entry_node_B_B -.-> entry_node;
node_node_entry_node_B_B -.-> __end__;
node_node_entry_node_B_C -.-> entry_node;
node_node_entry_node_B_C -.-> __end__;
classDef default fill:#f2f0ff,line-height:1.2
classDef first fill-opacity:0
classDef last fill:#bfb6fc
PNG¶
如果愿意,我们可以将图渲染成.png
格式。这里有三种选择:
- 使用 Mermaid.ink API(无需额外包)
- 使用 Mermaid + Pyppeteer(需要
pip install pyppeteer
) - 使用 graphviz(需要
pip install graphviz
)
使用 Mermaid.Ink
默认情况下,draw_mermaid_png()
使用Mermaid.Ink的API生成图表。
API 参考: CurveStyle | MermaidDrawMethod | NodeStyles
from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles
display(Image(app.get_graph().draw_mermaid_png()))
使用 Mermaid + Pyppeteer
import nest_asyncio
nest_asyncio.apply() # Required for Jupyter Notebook to run async functions
display(
Image(
app.get_graph().draw_mermaid_png(
curve_style=CurveStyle.LINEAR,
node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
wrap_label_n_words=9,
output_file_path=None,
draw_method=MermaidDrawMethod.PYPPETEER,
background_color="white",
padding=10,
)
)
)
使用 Graphviz
try:
display(Image(app.get_graph().draw_png()))
except ImportError:
print(
"You likely need to install dependencies for pygraphviz, see more here https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt"
)