1. 簡介
在本教程中,我們將學習如何使用 Spring Integration Java DSL 創建應用程序集成。
我們將使用 Java DSL 來構建我們在《Spring Integration 簡介》中構建的文件移動集成。
2. 依賴項
Spring Integration Java DSL 是 Spring Integration Core 的一部分。
因此,我們可以添加該依賴項:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>6.0.0</version>
</dependency>為了開發我們的文件移動應用程序,我們還需要使用Spring Integration File:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.0.0</version>
</dependency>3. Spring Integration Java DSL
在 Java DSL 之前,用户使用 XML 配置 Spring Integration 組件。
DSL 引入了一些流暢的構建器,我們可以僅使用 Java 創建完整的 Spring Integration 管道。
例如,如果我們想要創建一個將任何通過管道傳入的數據轉換為大寫的通道,
在過去,我們可能會這樣做:
<int:channel id="input"/>
<int:transformer input-channel="input" expression="payload.toUpperCase()" />現在我們可以直接這樣做:
@Bean
public IntegrationFlow upcaseFlow() {
return IntegrationFlow.from("input")
.transform(String::toUpperCase)
.get();
}4. The File-Moving App
為了開始我們的文件移動集成,我們需要一些簡單的構建模塊。
4.1. 集成流程
我們需要的第一塊構建模塊是集成流程,我們可以從 IntegrationFlows 構建器獲得它:
IntegrationFlows.from(...)from 可以採用多種類型,但在此教程中,我們將僅關注三種類型:
- MessageSource
- MessageChannel,以及
- String
我們稍後將討論這三種類型。
在調用from之後,現在有一些自定義方法可用:
IntegrationFlow flow = IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory())
// add more components
.get();<p>最終,<em >IntegrationFlow</em> 將始終產生一個 <em >IntegrationFlow</em> 實例,它是任何 Spring Integration 應用的最終產物。</p>
<p><strong >這種將輸入、執行適當的轉換以及發出結果的模式是所有 Spring Integration 應用的基礎。</strong></p>
4.2. 描述輸入源
首先,為了移動文件,我們需要指示集成流程應該在哪裏查找它們。為此,我們需要一個 消息源。
@Bean
public MessageSource<File> sourceDirectory() {
// .. create a message source
}簡單來説,一個MessageSource 是消息產生地,這些消息是外部的,不屬於應用程序本身。
更具體地説,我們需要一個能夠適應這些外部源並將其轉換為 Spring 消息表示形式的東西。由於這種適應側重於輸入,因此它們通常被稱為輸入通道適配器。
spring-integration-file 依賴項為我們提供了一個適用於我們用例的輸入通道適配器:FileReadingMessageSource:
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(INPUT_DIR));
return messageSource;
}在這裏,我們的FileReadingMessageSource將讀取由INPUT_DIR指定的目錄,並從中創建一條MessageSource。
讓我們將其指定為在IntegrationFlow.from調用中作為源:
IntegrationFlow.from(sourceDirectory());4.3. 配置輸入源
如果我們將此視為一個長期運行的應用,我們可能希望能夠注意到新文件在進來時,而不僅僅是移動啓動時已經存在的那些文件。
為了實現這一點,可以也能夠接受額外的配置器,以進一步自定義輸入源:
IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));在這種情況下,我們可以通過告知 Spring Integration 每 10 秒鐘輪詢該源–在本例中是我們的文件系統–來使輸入源更具彈性。
當然,這不適用於我們僅有的文件輸入源,我們也可以將此輪詢器添加到任何 MessageSource。
4.4. 從輸入源過濾消息
接下來,假設我們希望文件移動應用程序僅移動特定文件,例如具有 jpg 擴展名的圖像文件。
為此,我們可以使用 GenericSelector :
@Bean
public GenericSelector<File> onlyJpgs() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().endsWith(".jpg");
}
};
}那麼,我們再更新一下集成流程:
IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs());或者,由於此過濾器非常簡單,我們可以使用 lambda 函數來定義它:
IntegrationFlow.from(sourceDirectory())
.filter(source -> ((File) source).getName().endsWith(".jpg"));4.5. 處理帶有服務激活器(Service Activator)的消息
現在我們已經擁有了過濾後的文件列表,需要將它們寫入新的位置。
服務激活器(Service Activator)s 是我們在 Spring Integration 中思考輸出時所使用的機制。
讓我們使用來自 spring-integration-file 的 FileWritingMessageHandler 服務激活器:
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}在這裏,我們的FileWritingMessageHandler將寫入它接收到的每個Message負載到OUTPUT_DIR。
再次,讓我們更新:
IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory());另外,請注意,此處使用了 setExpectReply 。 由於集成流可以是雙向的,因此,此調用表示該管道僅為單向。
4.6. 激活我們的集成流程
當我們添加了所有組件後,需要將我們的 IntegrationFlow 註冊為 Bean,才能激活它:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlow.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
.filter(onlyJpgs())
.handle(targetDirectory())
.get();
}get方法提取一個需要註冊為Spring Bean的IntegrationFlow實例。
在我們的應用程序上下文加載後,包含在IntegrationFlow中的所有組件都會被激活。
現在,應用程序將開始將文件從源目錄移動到目標目錄。
5. 附加組件
在我們的基於 DSL 的文件移動應用程序中,我們創建了入站渠道適配器、消息過濾器和服務激活器。
讓我們看看一些常見的 Spring Integration 組件以及我們如何使用它們。
5.1. 消息通道
正如前面提到的,一個 消息通道 也是初始化流程的一種方式:
IntegrationFlow.from("anyChannel")我們可以將其理解為“請創建一個名為 anyChannel 的渠道 Bean。然後,從其他流程中讀取任何輸入到 anyChannel 的數據。”
但實際上,它比這更通用。
簡單來説,一個渠道抽象了生產者和消費者的關係,我們可以將其視為一個 Java 的 Queue。Channel 可以插入到流程的任何點。
例如,假設我們想要在文件從一個目錄移動到下一個目錄時,優先處理這些文件:
@Bean
public PriorityChannel alphabetically() {
return new PriorityChannel(1000, (left, right) ->
((File)left.getPayload()).getName().compareTo(
((File)right.getPayload()).getName()));
}然後,我們可以插入一個對<em>channel</em>的調用到我們的流程中間:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.channel("alphabetically")
.handle(targetDirectory())
.get();
}有數十個頻道可供選擇,其中一些更實用的是用於併發、審計或中間持久化(例如 Kafka 或 JMS 緩衝區)。
此外,當與橋接器結合使用時,頻道也具有強大的功能。
5.2. 橋樑 (Bridge)
當我們需要組合兩個通道時,我們會使用橋樑。
設想一下,我們不直接將文件寫入輸出目錄,而是讓我們的文件移動應用程序寫入另一個通道:
@Bean
public IntegrationFlow fileReader() {
return IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.channel("holdingTank")
.get();
}現在,由於我們將其寫入一個通道,我們可以從那裏建立連接到其他流程。
讓我們創建一個從我們的存儲罐中輪詢消息並將其寫入目標位置的連接。
@Bean
public IntegrationFlow fileWriter() {
return IntegrationFlow.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(Duration.of(1, TimeUnit.SECONDS.toChronoUnit()), Duration.of(20, TimeUnit.SECONDS.toChronoUnit()))))
.handle(targetDirectory())
.get();
}再次強調,由於我們寫入了中間通道,現在我們可以添加另一個流程它將這些相同的文件以不同的速率寫入:
@Bean
public IntegrationFlow anotherFileWriter() {
return IntegrationFlow.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(Duration.of(2, TimeUnit.SECONDS.toChronoUnit()), Duration.of(10, TimeUnit.SECONDS.toChronoUnit()))))
.handle(anotherTargetDirectory())
.get();
}如我們所見,單個橋樑可以控制不同處理程序的輪詢配置。
一旦我們的應用程序上下文加載完成,我們現在擁有一個更復雜的應用程序,它將從源目錄移動文件到兩個目標目錄。
6. 結論
在本文中,我們探討了如何使用 Spring Integration Java DSL 構建不同的集成管道。
基本上,我們成功地重構了之前教程中的文件移動應用程序,這次完全使用 Java 實現。
此外,我們還研究了通道和橋樑等其他組件。