博客 / 詳情

返回

[拆解LangChain執行引擎] __pregel_tasks通道——成就“PUSH任務”的功臣

除了我們顯式聲明的用於存儲業務數據或驅動信號的Channel之外,Pregel自身也會維護一些系統Channel,其中最重要的莫過於一個名為__pregel_tasks的Channel。通過前面針對BSP的介紹,我們知道當Superstep進入同步屏障並應用所有更新後,引擎會根據Node針對Channel的訂閲情況和Channel自身的更新狀態生成下一步待執行的任務,其實待執行的任務不限於此。

1. 兩種任務創建方式

我們將根據Node針對Channel的訂閲來驅動任務執行的模式稱為 “Pull模式”,與之相對的則是解決“__pregel_tasks”這個Channel實現的“Push模式”。具體來説,這是一個關閉“累積模式”的Topic類型的Channel,它存儲的“Topic”體現為具有如下定義的Send對象。當某個Node執行之後,可以像這個Channel中寫入一個Send來驅動某個Node在下一Superstep中執行。除了利用Send對象的node字段指定待執行的Node名稱外,還可以利用arg字段提供輸入參數。

class Send:   
    node: str
    arg: Any
    def __init__(self, /, node: str, arg: Any) -> None

由於關閉了“累積模式”,在Topic類型Channel中寫入的內容只會在下一個Superstep中生效,並且“閲後即焚”。對於執行引擎來説,這個名為“__pregel_tasks”的Channel存儲的就是下一Superstep以“Push模式”驅動執行的任務列表,兩者完美契合。

2. 確認__pregel_tasks通道的存在

__pregel_tasks通道的存在可以通過如下的演示實例來驗證。如代碼片段所示,在採用常規方式將Pregel對象創建出來後,我們根據Channel名稱從它的channels字段中將此Channel提取出來。斷言揭示了該Channel自身的類型、存儲的數據類型和“累積模式”開關。

from langgraph.channels import LastValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send, Sequence

node = (
    NodeBuilder()
    .subscribe_only("input_channel")
    .do(lambda args: args)
    .write_to("output_channel")
)
app = Pregel(
    nodes={"node": node},
    channels={"input_channel": LastValue(str), "output_channel": LastValue(str)},
    input_channels=["input_channel"],
    output_channels=["output_channel"],
)

tasks: Topic[Send] = app.channels["__pregel_tasks"]
assert isinstance(tasks, Topic)
assert tasks.ValueType == Sequence[Send]
assert tasks.accumulate == False

3. 被保護起來的通道

雖然“__pregel_tasks”就是一個普通的Topic類型的Channel,但是它並未開發對外部使用,Pregel把它“保護”的非常好。我們不能聲明一個與之同名的Channel,否則就會像如下的方式一樣拋出一個ValueError,並提示“Channel '__pregel_tasks' is reserved and cannot be used in the graph.”。

from langgraph.channels import Topic
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send, Sequence

try:
    app = Pregel(
        nodes={"node": NodeBuilder().subscribe_only("__pregel_tasks")},
        channels={"__pregel_tasks": Topic[Sequence[Send]]},
        input_channels=["input_channel"],
        output_channels=["output_channel"],
    )
    assert False, "Expected an error due to reserved channel name"
except Exception as e:
    assert isinstance(e, ValueError)
    assert str(e) == "Channel '__pregel_tasks' is reserved and cannot be used in the graph."

我們也不能採用常規的方式將向其發送Send對象。比如在如下的演示程序中,節點foo試圖向此Channel發送一個驅節點bar執行的Send對象,最終拋出一個InvalidUpdateError異常,並提示“Cannot write to the reserved channel TASKS”。除此之外,由於Pregel在利用它將基於“PUSH模式”的任務創建出來後就會將其清空,所以我們也無法讀取其中的任務。

from langgraph.channels import LastValue
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.types import Send
from langgraph.errors import InvalidUpdateError

foo = (NodeBuilder()
    .subscribe_to("start",read= False)
    .do(lambda _: Send(node="bar", arg="foobar"))
    .write_to("__pregel_tasks"))
bar = (NodeBuilder()
    .do(lambda args:args)
    .write_to("output"))

app = Pregel(
    nodes={ "foo": foo, "bar": bar},
    channels={
        "start": LastValue(str),
        "output": LastValue(str),
    },
    input_channels=["start"],
    output_channels=["output"])
try: 
    app.invoke({"start": None})
    assert False, "Should have raised InvalidUpdateError"
except Exception as e:
    assert isinstance(e, InvalidUpdateError)
    assert str(e) == "Cannot write to the reserved channel TASKS"

4. 唯一的解決方案

我們能夠想到的常規方法針對此Channel的寫入基本都繞不開引擎針對它的保護機制。我們將在下部分介紹Pregel另一個核心組成部分Node,Node會利用ChannelWriter對象實現針對Channel的寫入,我們可以將針對Channel的寫入意圖封裝成ChannelWriteTupleEntry,並以此來創建ChannelWriter,這應該是唯一能夠“欺騙”引擎寫入驗證的唯一手段。如代碼片段所示,率先執行的節點foo會返回一個驅動節點bar指定的Send對象,為了將它寫入“__pregel_tasks”,我們創建了一個ChannelWriter,針對該Channel的寫入定義在ChannelWriteTupleEntry對象中,具體體現在調用構造函數指定的mapper參數上,它提供一個映射將Node的執行結果轉成成Channel名稱和值的映射關係。

from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue
from langgraph.pregel._read import PregelNode
from langgraph.pregel._write import ChannelWrite, ChannelWriteTupleEntry
from langgraph.types import Send

foo: PregelNode = (NodeBuilder()
    .subscribe_to("foo")
    .do(lambda _: Send(node="bar", arg="foo"))
).build()
entry = ChannelWriteTupleEntry(mapper= lambda args: [("__pregel_tasks", args)])
foo.writers.append(ChannelWrite(writes=[entry]))

bar = (NodeBuilder()
    .do(lambda args: f"bar is triggered by {args}.")
    .write_to("output"))

app = Pregel(
    nodes={"foo": foo, "bar": bar},
    channels={
        "foo": LastValue(None),
        "output": LastValue(str),
    },
    input_channels=["foo"],
    output_channels=["output"],
)

result = app.invoke(input={"foo": None})
assert result == {"output": "bar is triggered by foo."}
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.