1. 引言
在本文中,我們將演示如何使用 Spring Cloud App 啓動器——這些啓動器提供配置完成且可以直接使用的應用程序,可作為未來開發的基礎起點。
簡單來説,任務啓動器專門用於諸如數據庫遷移和分佈式測試等用例,而流應用啓動器則提供與外部系統集成的功能。
總而言之,共有超過 55 個啓動器;請查閲官方文檔 這裏 和 這裏 以獲取有關這兩種啓動器的更多信息。
接下來,我們將構建一個小型分佈式 Twitter 應用程序,該應用程序將 Twitter 帖子流式傳輸到 Hadoop 分佈式文件系統。
2. 搭建環境
我們將使用 `consumer-key 和 access-token 創建一個簡單的 Twitter 應用。
然後,我們將配置 Hadoop,以便持久保存我們的 Twitter 流,用於未來的大數據應用。
最後,我們有選擇地使用提供的 Spring GitHub 倉庫,通過 Maven 編譯和組裝 sources –processors-sinks 架構模式的獨立組件,或者通過它們的 Spring Stream 綁定接口組合 sources、processors 和 sinks。
我們將探討這兩種方法。
值得注意的是,以前,所有 Stream App Starter 都彙集在一個大型倉庫中:github.com/spring-cloud/spring-cloud-stream-app-starters。 每個 Starter 都已被簡化並隔離。
3. Twitter 開發者憑證
首先,我們來設置我們的 Twitter 開發者憑證。要獲取 Twitter 開發者憑證,請按照設置應用程序和創建訪問令牌的步驟 參考官方 Twitter 開發者文檔。
具體來説,我們需要:
- 消費者密鑰 (Consumer Key)
- 消費者密鑰秘鑰 (Consumer Key Secret)
- 訪問令牌秘鑰 (Access Token Secret)
- 訪問令牌 (Access Token)
請務必保持該窗口打開或記錄下來,因為我們將會在下面使用它們!
4. 安裝 Hadoop
接下來,讓我們安裝 Hadoop!我們可以按照 官方文檔 的指示進行操作,或者直接使用 Docker:
$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.15. 編譯我們的 App 啓動器
為了使用獨立的、完全獨立的組件,我們可以從它們的 GitHub 倉庫中單獨下載並編譯所需的 Spring Cloud Stream App 啓動器。
5.1. Twitter Spring Cloud Stream App Starter
讓我們將 Twitter Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.twitterstream.source) 添加到我們的項目中:
git clone https://github.com/spring-cloud-stream-app-starters/twitter.git然後,我們運行 Maven:
./mvnw clean install -PgenerateApps生成的編譯後的 Starter App 將位於本地項目根目錄的 ‘/target’ 目錄下。
然後我們可以運行這個編譯後的 .jar 文件,並傳入相關的應用程序屬性,例如如下所示:
java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
--accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>我們還可以使用熟悉的 Spring application.properties 文件傳遞我們的憑據。
twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...5.2. HDFS Spring Cloud Stream 應用啓動器
現在(Hadoop 已經設置好之後),我們將 HDFS Spring Cloud Stream 應用啓動器 (org.springframework.cloud.stream.app.hdfs.sink) 依賴項添加到我們的項目中。
首先,克隆相關的倉庫:
git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git然後,運行 Maven 任務:
./mvnw clean install -PgenerateApps編譯後的 Starter App 將位於本地項目根目錄的 ‘/target’ 目錄下。然後,我們可以運行該編譯後的 .jar 文件並傳入相關的應用程序屬性:
java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/‘hdfs://127.0.0.1:50010/‘ 是 Hadoop 的默認值,但您的 HDFS 端口可能因您配置實例的方式而異。
我們可以通過 ‘http://0.0.0.0:50070‘ 查看數據節點的列表(及其當前端口)。
我們還可以使用熟悉的 Spring application.properties 在編譯之前傳遞憑據,這樣我們就無需始終通過 CLI 傳遞它們。
讓我們配置我們的 application.properties 以使用默認的 Hadoop 端口:
hdfs.fs-uri=hdfs://127.0.0.1:50010/6. 使用 AggregateApplicationBuilder
或者,我們可以通過將我們的 Spring Stream Source 和Sink 整合到簡單的 Spring Boot 應用中,利用 org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder。
首先,我們將兩個 Stream App Starter 添加到我們的 pom.xml中:
<dependencies>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
</dependencies>然後,我們將開始將兩個 Stream App Starter 依賴項組合到它們的相應子應用程序中。
6.1. 構建我們的應用程序組件
我們的 SourceApp 指定了需要轉換或被消費的 Source。
@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
@InboundChannelAdapter(Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}請注意,我們將 SourceApp 綁定到 org.springframework.cloud.stream.messaging.Source,並注入適當的配置類以從我們的環境屬性中獲取所需設置。
接下來,我們設置一個簡單的 org.springframework.cloud.stream.messaging.Processor 綁定:
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String processMessage(String payload) {
log.info("Payload received!");
return payload;
}
}然後,我們創建我們的消費者(Sink</em lang="en">):
@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
@ServiceActivator(inputChannel= Sink.INPUT)
public void loggerSink(Object payload) {
log.info("Received: " + payload);
}
}在這裏,我們將 SinkApp 綁定到 org.springframework.cloud.stream.messaging.Sink,並再次注入正確的配置類以使用我們指定的 Hadoop 設置。
最後,我們使用 AggregateApplicationBuilder 在 AggregateApp 的主方法中組合我們的 SourceApp, ProcessorApp 和 SinkApp。
@SpringBootApplication
public class AggregateApp {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApp.class).args("--fixedDelay=5000")
.via(ProcessorApp.class)
.to(SinkApp.class).args("--debug=true")
.run(args);
}
}與任何 Spring Boot 應用程序一樣,我們可以通過 application.properties 或程序化方式注入指定的設置作為環境屬性。
由於我們使用 Spring Stream 框架,還可以將我們的參數傳遞到 AggregateApplicationBuilder 構造函數中。
6.2. 運行完成應用
我們可以使用以下命令行指令編譯並運行我們的應用程序:
$ mvn install
$ java -jar twitterhdfs.jar請務必將每個 @SpringBootApplication 類放在單獨的包中(否則,將會拋出多個不同的綁定異常)!有關如何使用 AggregateApplicationBuilder 的更多信息,請參閲 官方文檔。
在編譯並運行我們的應用程序後,我們應該在控制枱中看到類似以下的輸出(當然,具體內容會因 Tweet 的不同而異):
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
c.b.twitterhdfs.processor.ProcessorApp : Payload received!
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
com.baeldung.twitterhdfs.sink.SinkApp : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...這演示了我們的 處理器 和 接收器 在接收來自 源 的數據時的正確運行方式! 在此示例中,我們未配置我們的 HDFS 接收器進行過多操作,它只會打印消息“Payload received!”
7. 結論
在本教程中,我們學習瞭如何將兩個強大的 Spring Stream App Starter 組合成一個令人興奮的 Spring Boot 示例!
以下是一些關於 Spring Boot Starter 和如何創建自定義 Starter 的其他優秀官方文章。