除了我們顯式聲明的用於存儲業務數據或驅動信號的Channel之外,Pregel自身也會維護一些系統Channel,其中最重要的莫過於一個名為__pregel_tasks的Channel。通過前面針對BSP的介紹,我們知道當Superstep進入同步屏障並應用所有更新後,引擎會根據Node針對Channel的訂閲情況和Channel自身的更新狀態生成下一步待執行的任務,其實待執行的任務不限於此。
1. 兩種任務創建方式
我們將根據Node針對Channel的訂閲來驅動任務執行的模式稱為 “Pull模式”,與之相對的則是解決“__pregel_tasks”這個Channel實現的“Push模式”。具體來説,這是一個關閉“累積模式”的Topic類型的Channel,它存儲的“Topic”體現為具有如下定義的Send對象。當某個Node執行之後,可以像這個Channel中寫入一個Send來驅動某個Node在下一Superstep中執行。除了利用Send對象的node字段指定待執行的Node名稱外,還可以利用arg字段提供輸入參數。
class Send:
node: str
arg: Any
def __init__(self, /, node: str, arg: Any) -> None
由於關閉了“累積模式”,在Topic類型Channel中寫入的內容只會在下一個Superstep中生效,並且“閲後即焚”。對於執行引擎來説,這個名為“__pregel_tasks”的Channel存儲的就是下一Superstep以“Push模式”驅動執行的任務列表,兩者完美契合。
2. 確認__pregel_tasks通道的存在
__pregel_tasks通道的存在可以通過如下的演示實例來驗證。如代碼片段所示,在採用常規方式將Pregel對象創建出來後,我們根據Channel名稱從它的channels字段中將此Channel提取出來。斷言揭示了該Channel自身的類型、存儲的數據類型和“累積模式”開關。
from langgraph.channels import LastValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send, Sequence
node = (
NodeBuilder()
.subscribe_only("input_channel")
.do(lambda args: args)
.write_to("output_channel")
)
app = Pregel(
nodes={"node": node},
channels={"input_channel": LastValue(str), "output_channel": LastValue(str)},
input_channels=["input_channel"],
output_channels=["output_channel"],
)
tasks: Topic[Send] = app.channels["__pregel_tasks"]
assert isinstance(tasks, Topic)
assert tasks.ValueType == Sequence[Send]
assert tasks.accumulate == False
3. 被保護起來的通道
雖然“__pregel_tasks”就是一個普通的Topic類型的Channel,但是它並未開發對外部使用,Pregel把它“保護”的非常好。我們不能聲明一個與之同名的Channel,否則就會像如下的方式一樣拋出一個ValueError,並提示“Channel '__pregel_tasks' is reserved and cannot be used in the graph.”。
from langgraph.channels import Topic
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send, Sequence
try:
app = Pregel(
nodes={"node": NodeBuilder().subscribe_only("__pregel_tasks")},
channels={"__pregel_tasks": Topic[Sequence[Send]]},
input_channels=["input_channel"],
output_channels=["output_channel"],
)
assert False, "Expected an error due to reserved channel name"
except Exception as e:
assert isinstance(e, ValueError)
assert str(e) == "Channel '__pregel_tasks' is reserved and cannot be used in the graph."
我們也不能採用常規的方式將向其發送Send對象。比如在如下的演示程序中,節點foo試圖向此Channel發送一個驅節點bar執行的Send對象,最終拋出一個InvalidUpdateError異常,並提示“Cannot write to the reserved channel TASKS”。除此之外,由於Pregel在利用它將基於“PUSH模式”的任務創建出來後就會將其清空,所以我們也無法讀取其中的任務。
from langgraph.channels import LastValue
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send
from langgraph.errors import InvalidUpdateError
foo = (NodeBuilder()
.subscribe_to("start",read= False)
.do(lambda _: Send(node="bar", arg="foobar"))
.write_to("__pregel_tasks"))
bar = (NodeBuilder()
.do(lambda args:args)
.write_to("output"))
app = Pregel(
nodes={ "foo": foo, "bar": bar},
channels={
"start": LastValue(str),
"output": LastValue(str),
},
input_channels=["start"],
output_channels=["output"])
try:
app.invoke({"start": None})
assert False, "Should have raised InvalidUpdateError"
except Exception as e:
assert isinstance(e, InvalidUpdateError)
assert str(e) == "Cannot write to the reserved channel TASKS"
4. 唯一的解決方案
我們能夠想到的常規方法針對此Channel的寫入基本都繞不開引擎針對它的保護機制。我們將在下部分介紹Pregel另一個核心組成部分Node,Node會利用ChannelWriter對象實現針對Channel的寫入,我們可以將針對Channel的寫入意圖封裝成ChannelWriteTupleEntry,並以此來創建ChannelWriter,這應該是唯一能夠“欺騙”引擎寫入驗證的唯一手段。如代碼片段所示,率先執行的節點foo會返回一個驅動節點bar指定的Send對象,為了將它寫入“__pregel_tasks”,我們創建了一個ChannelWriter,針對該Channel的寫入定義在ChannelWriteTupleEntry對象中,具體體現在調用構造函數指定的mapper參數上,它提供一個映射將Node的執行結果轉成成Channel名稱和值的映射關係。
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue
from langgraph.pregel._read import PregelNode
from langgraph.pregel._write import ChannelWrite, ChannelWriteTupleEntry
from langgraph.types import Send
foo: PregelNode = (NodeBuilder()
.subscribe_to("foo")
.do(lambda _: Send(node="bar", arg="foo"))
).build()
entry = ChannelWriteTupleEntry(mapper= lambda args: [("__pregel_tasks", args)])
foo.writers.append(ChannelWrite(writes=[entry]))
bar = (NodeBuilder()
.do(lambda args: f"bar is triggered by {args}.")
.write_to("output"))
app = Pregel(
nodes={"foo": foo, "bar": bar},
channels={
"foo": LastValue(None),
"output": LastValue(str),
},
input_channels=["foo"],
output_channels=["output"],
)
result = app.invoke(input={"foo": None})
assert result == {"output": "bar is triggered by foo."}