1. 概述
Anthropic 最近發佈了一份關於如何構建有效 AI 代理的出版物,內容可參考 《構建有效 AI 代理》。 在本文檔中,他們提出了軟件開發人員可以遵循的代理模式作為最佳實踐。 他們還聲稱,我們可以將這些模式作為替代方案,用於替代複雜的框架。 然而,在 Java 生態系統中,我們確實廣泛使用大型框架,例如 Spring。
儘管 Spring 是一種大型且複雜的框架,但它提供了簡化使用 Spring AI 構建有效代理的工具。
在本文中,我們將回顧出版物中提出的模式,以及使用的關鍵定義,以確保清晰度。 然後,我們將使用 Spring AI 實現這些模式。 由於重點在於實現這些模式,因此我們將不關注與實際模型主機集成。
2. 使用 Spring AI 構建高效的代理
在與數十家各行業利用 LLM 構建代理的團隊合作後,Anthropic 提出了一些簡單且可組合的模式。但首先,我們來澄清他們出版物中使用的兩個概念:
- 代理是指 LLM 動態地指導自身流程和工具使用,從而控制如何完成任務的系統。
- 工作流程是指通過預定義的代碼路徑對 LLM 和工具進行編排的系統。
考慮到這一點,所提出的模式如下:
- Prompt 鏈式工作流程將複雜的任務分解為一系列步驟,其中每個 LLM 提示的輸出作為後續 LLM 提示的輸入。
- 並行化工作流程允許對多個 LLM 操作進行併發處理,並通過程序化方式聚合其輸出。
- 路由工作流程通過基於內容分類,將輸入智能地路由到專用處理程序。
- 編排-工人工作流程,其中一箇中心 LLM 將分解任務、將其委託給工人 LLM,並將最終響應組合為最終響應。
- 評估器-優化器工作流程,其中一個 LLM 生成結果,而另一個 LLM 提供評估和反饋,形成一個循環。
3. 依賴項
為了保持簡潔,我們將僅使用所需的最小依賴項。這意味着我們不會包含任何實現,例如 spring-ai-ollama-spring-boot-starter。我們的解決方案將基於接口,而不是使用任何 Model 實現:
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-model</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-client-chat</artifactId>
<version>1.0.2</version>
</dependency>這兩個組件足以構建使用 Spring AI 的代理,因為我們可以利用 Model 接口及其擴展,ChatModel。此外,ChatClient 在現代應用中得到廣泛使用,正如稍後示例中看到的。
4. 使用 Spring AI 構建鏈式工作流代理
鏈式工作流在任務可以分解為一系列子任務的情況下非常適用。每個子任務的結果將傳遞到下一個任務。我們也有機會在中間添加一些編碼,用於決策或更改。
一個很好的例子是我們在 CI/CD 中使用的構建流水線。我們可以將構建流水線分解為具體的、順序的步驟:
- 從 VCS 中檢出
- 構建代碼並打包
- 將代碼容器化並推送到 Docker 倉庫
- 將 Docker 鏡像部署到測試環境
- 運行集成測試
讓我們假設我們有一個 OpsClient 接口,它 擴展了 ChatClient,並且還有一個與 DevOps 模型交互的實現。有效的 Spring AI 代理將如下所示:
public String opsPipeline(String userInput) {
String response = userInput;
for (String prompt : OpsClientPrompts.DEV_PIPELINE_STEPS) {
String request = String.format("{%s}\n {%s}", prompt, response);
ChatClient.ChatClientRequestSpec requestSpec = opsClient.prompt(request);
ChatClient.CallResponseSpec responseSpec = requestSpec.call();
response = responseSpec.content();
if (response.startsWith("ERROR:")) {
break;
}
}
return response;
}首先,我們使用 OpsClientPrompts.DEV_PIPELINE_STEPS 在一個循環中。這些步驟比之前提到的步驟更具描述性。例如,對於從 VCS 中檢出代碼,它將類似於“從給定的 URL 檢出代碼。返回檢出的代碼路徑,或者如果發生錯誤”。
在鏈式模式中,每個響應都會輸入到後續步驟的輸入中。因此,request 字段正是這樣做的。給定下一個步驟提示和之前的響應,創建一個作為 prompt() 方法參數的 request,該方法將使用 call() 方法執行。結果存儲在 response 中,以便在下一步中使用。初始 response 值將是用户輸入。最後,在步驟之間,我們添加一個 if 子句,以在發生錯誤時中斷循環。鏈式工作流程應如下所示:
5. 使用 Spring AI 進行並行化工作流
並行化工作流適用於將任務分解為獨立的子任務,並在並行執行。結果通過程序化聚合到一個單一結果中。
一個實際的例子,遵循相同的 DevOps 模型概念,是將新版本的代碼部署到多個環境(例如測試、開發、集成等)的任務。由於這涉及到 AI 代理,我們期望在幕後發生更多事情,例如檢查部門指南以驗證該鏡像是否可以部署到每個環境等。藉助 Spring AI 代理,我們將利用 ExecutorService 和 CompletableFuture:
public List<String> opsDeployments(String containerLink, List<String> environments, int maxConcurentWorkers) {
try (ExecutorService executor = Executors.newFixedThreadPool(maxConcurentWorkers)) {
List<CompletableFuture<String>> futures = environments.stream()
.map(env -> CompletableFuture.supplyAsync(() -> {
try {
String request = OpsClientPrompts.NON_PROD_DEPLOYMENT_PROMPT + "\n Image:" + containerLink + " to environment: " + env;
ChatClient.ChatClientRequestSpec requestSpec = opsClient.prompt(request);
ChatClient.CallResponseSpec responseSpec = requestSpec.call();
return responseSpec.content();
} catch (Exception e) { ... }
}, executor))
.toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
allFutures.join();
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}opsDeployments() 方法接受部署鏡像的容器鏈接、目標環境以及使用的最大併發工節點數量。然後,我們使用流將任務分解為每個給定環境的一個部署任務。
每個request 字段是部署的提示,在本例中是OpsClientPrompts.NON_PROD_DEPLOYMENT_PROMPT,containerLink以及特定的environment。結果只是每個結果的數組。並行化工作流程應如下所示:
6. 使用 Spring AI 路由工作流代理
路由工作流非常適合在需要將任務分配給更專業的 AI 模型時使用。一個很好的例子是作為客户服務支持的 LLM,接收客户輸入,然後將任務重定向到技術支持模型、賬户服務模型等。客户無需知道有多個模型。相反,使用通用模型,該模型作為路由工作流。
延續我們 DevOps 模型示例,我們可以有一個通用的 DevOps 代理,該代理接受任何構建拉取請求 (PR) 管道或將 PR 部署到某些環境的請求。然後,路由工作流將用於將請求引導到先前文章中實現的有效 Spring AI 代理。
讓我們從使用 opsRoutingClient 接口開始,該接口擴展了 ChatClient。我們向客户端提供路由選項和用户輸入。然後,我們要求它將請求引導到 ChainWorkflow 或 ParallelizationWorkflow,正如先前文章中實現的:
public class RoutingWorkflow {
private final OpsRouterClient opsRouterClient;
private final ChainWorkflow chainWorkflow;
private final ParallelizationWorkflow parallelizationWorkflow;
// constructor omitted
public String route(String input) {
String[] route = determineRoute(input, OPS_ROUTING_OPTIONS);
String opsOperation = route[0];
List<String> requestValues = route[1].lines()
.toList();
return switch (opsOperation) {
case "pipeline" -> chainWorkflow.opsPipeline(requestValues.getFirst());
case "deployment" -> executeDeployment(requestValues);
default -> throw new IllegalStateException("Unexpected value: " + opsOperation);
};
}
private String[] determineRoute(String input, Map<String, String> availableRoutes) {
String request = String.format("""
Given this map that provides the ops operation as key and the description for you to build the operation value, as value: %s.
Analyze the input and select the most appropriate operation.
Return an array of two strings. First string is the operations decided and second is the value you built based on the operation.
Input: %s""", availableRoutes, input);
ChatClient.ChatClientRequestSpec requestSpec = opsRouterClient.prompt(request);
ChatClient.CallResponseSpec responseSpec = requestSpec.call();
String[] routingResponse = responseSpec.entity(String[].class);
System.out.printf("Routing Decision: Operation is: %s\n, Operation value: %s%n", routingResponse[0], routingResponse[1]);
return routingResponse;
}
private String executeDeployment(List<String> requestValues) {
String containerLink = requestValues.getFirst();
List<String> environments = Arrays.asList(requestValues.get(1)
.split(","));
int maxWorkers = Integer.parseInt(requestValues.getLast());
List<String> results = parallelizationWorkflow.opsDeployments(containerLink, environments, maxWorkers);
return String.join(", ", results);
}
}route() 方法首先確定要使用的具體任務,即pipeline 或deployment。route 數組將包含opsRouterClient 返回的動作和提示。如果動作是pipeline,則直接將請求發送到chainWorkflow。如果是deployment,則executeDeployment() 方法準備請求並將其發送到parallelizationWorkflow 代理。
以下是使用 Spring AI 實現 Routing Workflow 模式的圖形化表示:
7. 使用 Spring AI 的編排器工作者代理
編排器工作者工作流最適合於我們有需要分解為更簡單子任務的複雜任務,但無法在前期預測這些子任務是什麼。
編排器代理會將這些子任務委派給工作者代理。最後,它收集結果併為初始任務組合一個結果。
在我們的 DevOps 模型示例中,我們可以有一個編排器,它接受一個 PR URL,分析更改,並確定哪些環境需要測試。例如,簡單的輸入驗證更新只需在我們的測試環境中進行測試。相反,影響外部系統集成的更改也需要在一個集成的環境中進行測試。
假設我們有一個名為 OpsOrchestratorClient 的類,該類繼承自 ChatClient。我們可以提供一個描述這種情況和 PR 鏈接的提示,並要求它返回我們需要運行測試的環境。然後將該結果提供給我們的 OpsClient 以執行部署和運行測試:
public String remoteTestingExecution(String userInput) {
String orchestratorRequest = REMOTE_TESTING_ORCHESTRATION_PROMPT + userInput;
ChatClient.ChatClientRequestSpec orchestratorRequestSpec = opsOrchestratorClient.prompt(orchestratorRequest);
ChatClient.CallResponseSpec orchestratorResponseSpec = orchestratorRequestSpec.call();
String[] orchestratorResponse = orchestratorResponseSpec.entity(String[].class);
String prLink = orchestratorResponse[0];
StringBuilder response = new StringBuilder();
for (int i = 1; i < orchestratorResponse.length; i++) {
String testExecutionChainInput = prLink + " on " + orchestratorResponse[i];
for (String prompt : OpsClientPrompts.EXECUTE_TEST_ON_DEPLOYED_ENV_STEPS) {
String testExecutionChainRequest =
String.format("%s\n PR: [%s] environment", prompt, testExecutionChainInput);
ChatClient.ChatClientRequestSpec requestSpec = opsClient.prompt(testExecutionChainRequest);
ChatClient.CallResponseSpec responseSpec = requestSpec.call();
testExecutionChainInput = responseSpec.content();
System.out.printf("OUTCOME: %s\n", testExecutionChainInput);
}
response.append(testExecutionChainInput).append("\n");
}
return response.toString();
}orchestratorRequest 字符串將包含對 PR 變更的分析提示,並返回環境列表。我們從orchestratorResponse數組中獲取環境列表。對於每個環境,我們以兩步的方式執行子任務:首先是部署,然後是測試執行。
希望您注意到我們使用了第二個for循環中的 Chain Workflow。我們從部署提示開始,並將 PR 鏈接和當前環境作為輸入。響應包含一個輸入句子。然後將此句子輸入到第二個提示中,該提示根據 PR 鏈接中的信息,執行與當前環境相關的測試。
輸出包含每個環境的測試執行結果。
請注意,為了簡化示例,我們避免了子任務的並行或異步執行。然而,使用此類技術將是實現 Orchestrator- Workers Workflow 模式的最有效方法。該模式的視覺序列應如下所示:
8. 使用 Spring AI 的評估優化代理
Anthropic 提出的最終模式是評估優化代理。正如模式名稱所暗示的,評估-優化工作流程非常適合涉及生成建議的任務。優化器會提出一個初始解決方案或改進方案。評估器會提供對建議的反饋,並可能再次調用優化器以完善結果或產生更準確的成果。
我們擴展了我們的 DevOps 示例,並提供了一個關於 PR 的額外功能:我們將使用評估-優化工作流程提供 PR 評論。假設有一個 <em >CodeReviewClient</em> 接口,該接口 <em >extends ChatClient</em>,我們將使用 Spring AI 創建代理,並提供一個簡單的 <em >evaluate()</em> 方法:
public class EvaluatorOptimizerWorkflow {
private final CodeReviewClient codeReviewClient;
static final ParameterizedTypeReference<Map<String, String>> mapClass = new ParameterizedTypeReference<>() {};
// constructor omitted
public Map<String, String> evaluate(String task) {
return loop(task, new HashMap<>(), "");
}
private Map<String, String> loop(String task, Map<String, String> latestSuggestions, String evaluation) {
latestSuggestions = generate(task, latestSuggestions, evaluation);
Map<String, String> evaluationResponse = evaluate(latestSuggestions, task);
String outcome = evaluationResponse.keySet().iterator().next();
evaluation = evaluationResponse.values().iterator().next();
if ("PASS".equals(outcome)) {
return latestSuggestions;
}
return loop(task, latestSuggestions, evaluation);
}
// we'll see the generate() and evaluate() methods later
}評估優化模式從接受任務的 evaluate() 方法開始。在我們的示例中,輸入只是 PR 鏈接。我們將此任務發送到 loop() 方法。該方法接受三個參數:任務、先前建議和評估。在第一次循環中,我們僅提供任務。
loop() 方法調用 generate(),我們稍後會看到。結果是最新建議,我們將其傳遞給 evaluate() 方法,以及任務。接下來,我們讀取來自 evaluationResponse 的結果,如果它是“PASS”,則返回最新建議。如果不是,則再次調用 loop() 方法,使用新的 latestSuggestions 和 evaluation。
我們使用 CodeReviewClient 代理來執行 generate() 和 evaluate() 方法。
private Map<String, String> generate(String task, Map<String, String> previousSuggestions, String evaluation) {
String request = CODE_REVIEW_PROMPT +
"\n PR: " + task +
"\n previous suggestions: " + previousSuggestions +
"\n evaluation on previous suggestions: " + evaluation;
ChatClient.ChatClientRequestSpec requestSpec = codeReviewClient.prompt(request);
ChatClient.CallResponseSpec responseSpec = requestSpec.call();
Map<String, String> response = responseSpec.entity(mapClass);
return response;
}
private Map<String, String> evaluate(Map<String, String> latestSuggestions, String task) {
String request = EVALUATE_PROPOSED_IMPROVEMENTS_PROMPT +
"\n PR: " + task +
"\n proposed suggestions: " + latestSuggestions;
ChatClient.ChatClientRequestSpec requestSpec = codeReviewClient.prompt(request);
ChatClient.CallResponseSpec responseSpec = requestSpec.call();
Map<String, String> response = responseSpec.entity(mapClass);
return response;
}在 generate() 中,我們向 codeReviewClient 代理提供提示(prompt)、任務(task)、先前建議(previousSuggestions)和評估(evaluation)。提示是基於最新的建議和最新的評估,對 PR 進行代碼審查。建議應遵循給定的規則並遵循特定的格式。
在 evaluation() 中,提示是“評估建議的代碼改進在正確性、時間複雜度以及最佳實踐方面”,並提供給定的任務和最新的建議。然後返回評估結果,“PASS”, “NEEDS_IMPROVEMENT”, “FAIL”,以及反饋。
優化器-評估器工作流的序列圖如下:
9. 結論
在本教程中,我們回顧了 Antropic 關於有效 AI 代理的出版物。我們介紹了所有模式,提供了簡要的定義,並提供了實際應用案例的示例。然後,我們使用 Spring AI 代理實施了每個模式。最後,我們為每個模式都包含了序列圖,以幫助您可視化該模式在行動中的工作原理。
正如往常一樣,演示中所使用的所有源代碼可以在 GitHub 上找到。