知識庫 / Spring / Spring Cloud RSS 訂閱

使用 Spring Cloud App Starter

Spring Cloud
HongKong
6
02:07 PM · Dec 06 ,2025

1. 引言

在本文中,我們將演示如何使用 Spring Cloud App 啓動器——這些啓動器提供配置完成且可以直接使用的應用程序,可作為未來開發的基礎起點。

簡單來説,任務啓動器專門用於諸如數據庫遷移和分佈式測試等用例,而流應用啓動器則提供與外部系統集成的功能。

總而言之,共有超過 55 個啓動器;請查閲官方文檔 這裏這裏 以獲取有關這兩種啓動器的更多信息。

接下來,我們將構建一個小型分佈式 Twitter 應用程序,該應用程序將 Twitter 帖子流式傳輸到 Hadoop 分佈式文件系統。

2. 搭建環境

我們將使用 `consumer-keyaccess-token 創建一個簡單的 Twitter 應用。

然後,我們將配置 Hadoop,以便持久保存我們的 Twitter 流,用於未來的大數據應用。

最後,我們有選擇地使用提供的 Spring GitHub 倉庫,通過 Maven 編譯和組裝 sourcesprocessors-sinks 架構模式的獨立組件,或者通過它們的 Spring Stream 綁定接口組合 sourcesprocessorssinks

我們將探討這兩種方法。

值得注意的是,以前,所有 Stream App Starter 都彙集在一個大型倉庫中:github.com/spring-cloud/spring-cloud-stream-app-starters。 每個 Starter 都已被簡化並隔離。

3. Twitter 開發者憑證

首先,我們來設置我們的 Twitter 開發者憑證。要獲取 Twitter 開發者憑證,請按照設置應用程序和創建訪問令牌的步驟 參考官方 Twitter 開發者文檔

具體來説,我們需要:

  1. 消費者密鑰 (Consumer Key)
  2. 消費者密鑰秘鑰 (Consumer Key Secret)
  3. 訪問令牌秘鑰 (Access Token Secret)
  4. 訪問令牌 (Access Token)

請務必保持該窗口打開或記錄下來,因為我們將會在下面使用它們!

4. 安裝 Hadoop

接下來,讓我們安裝 Hadoop!我們可以按照 官方文檔 的指示進行操作,或者直接使用 Docker:

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5. 編譯我們的 App 啓動器

為了使用獨立的、完全獨立的組件,我們可以從它們的 GitHub 倉庫中單獨下載並編譯所需的 Spring Cloud Stream App 啓動器。

5.1. Twitter Spring Cloud Stream App Starter

讓我們將 Twitter Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.twitterstream.source) 添加到我們的項目中:

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

然後,我們運行 Maven:

./mvnw clean install -PgenerateApps

生成的編譯後的 Starter App 將位於本地項目根目錄的 ‘/target’ 目錄下。

然後我們可以運行這個編譯後的 .jar 文件,並傳入相關的應用程序屬性,例如如下所示:

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

我們還可以使用熟悉的 Spring application.properties 文件傳遞我們的憑據。

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

5.2. HDFS Spring Cloud Stream 應用啓動器

現在(Hadoop 已經設置好之後),我們將 HDFS Spring Cloud Stream 應用啓動器 (org.springframework.cloud.stream.app.hdfs.sink) 依賴項添加到我們的項目中。

首先,克隆相關的倉庫:

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

然後,運行 Maven 任務:

./mvnw clean install -PgenerateApps

編譯後的 Starter App 將位於本地項目根目錄的 ‘/target’ 目錄下。然後,我們可以運行該編譯後的 .jar 文件並傳入相關的應用程序屬性:

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

hdfs://127.0.0.1:50010/‘ 是 Hadoop 的默認值,但您的 HDFS 端口可能因您配置實例的方式而異。

我們可以通過 ‘http://0.0.0.0:50070‘ 查看數據節點的列表(及其當前端口)。

我們還可以使用熟悉的 Spring application.properties 在編譯之前傳遞憑據,這樣我們就無需始終通過 CLI 傳遞它們。

讓我們配置我們的 application.properties 以使用默認的 Hadoop 端口:

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6. 使用 AggregateApplicationBuilder

或者,我們可以通過將我們的 Spring Stream Source Sink 整合到簡單的 Spring Boot 應用中,利用 org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder

首先,我們將兩個 Stream App Starter 添加到我們的 pom.xml中:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
</dependencies>

然後,我們將開始將兩個 Stream App Starter 依賴項組合到它們的相應子應用程序中。

6.1. 構建我們的應用程序組件

我們的 SourceApp 指定了需要轉換或被消費的 Source

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

請注意,我們將 SourceApp 綁定到 org.springframework.cloud.stream.messaging.Source,並注入適當的配置類以從我們的環境屬性中獲取所需設置。

接下來,我們設置一個簡單的 org.springframework.cloud.stream.messaging.Processor 綁定:

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

然後,我們創建我們的消費者(Sink</em lang="en">):

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

在這裏,我們將 SinkApp 綁定到 org.springframework.cloud.stream.messaging.Sink,並再次注入正確的配置類以使用我們指定的 Hadoop 設置。

最後,我們使用 AggregateApplicationBuilderAggregateApp 的主方法中組合我們的 SourceApp, ProcessorAppSinkApp

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

與任何 Spring Boot 應用程序一樣,我們可以通過 application.properties 或程序化方式注入指定的設置作為環境屬性。

由於我們使用 Spring Stream 框架,還可以將我們的參數傳遞到 AggregateApplicationBuilder 構造函數中。

6.2. 運行完成應用

我們可以使用以下命令行指令編譯並運行我們的應用程序:

    $ mvn install
    $ java -jar twitterhdfs.jar

請務必將每個 @SpringBootApplication 類放在單獨的包中(否則,將會拋出多個不同的綁定異常)!有關如何使用 AggregateApplicationBuilder 的更多信息,請參閲 官方文檔

在編譯並運行我們的應用程序後,我們應該在控制枱中看到類似以下的輸出(當然,具體內容會因 Tweet 的不同而異):

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

這演示了我們的 處理器接收器 在接收來自 的數據時的正確運行方式! 在此示例中,我們未配置我們的 HDFS 接收器進行過多操作,它只會打印消息“Payload received!”

7. 結論

在本教程中,我們學習瞭如何將兩個強大的 Spring Stream App Starter 組合成一個令人興奮的 Spring Boot 示例!

以下是一些關於 Spring Boot Starter 和如何創建自定義 Starter 的其他優秀官方文章。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.