知識庫 / Spring RSS 訂閱

與 AWS Kinesis 集成 Spring

Cloud,Data,Spring
HongKong
4
01:09 PM · Dec 06 ,2025

1. 簡介

Kinesis 是一款用於實時收集、處理和分析數據流的工具,由亞馬遜開發。其主要優勢在於幫助開發基於事件驅動的應用。

在本教程中,我們將探索一些庫,這些庫使我們的 Spring 應用能夠向 Kinesis 流中生產和消費記錄。代碼示例將展示基本功能,但不代表生產級別的代碼。

2. 先決條件

在繼續之前,我們需要完成兩項任務。

第一項是創建 Spring 項目,因為我們的目標是使用 Spring 項目與 Kinesis 交互。

第二項是創建 Kinesis 數據流。我們可以從 AWS 賬户中的 Web 瀏覽器中完成此操作。 對於 AWS CLI 愛好者來説,另一種選擇是使用命令行:參考命令。 由於我們將從代碼中與其交互,因此我們還需要準備好 AWS IAM 憑證,包括訪問密鑰和密鑰。 此外,還需要指定區域。

所有生產者將創建模擬 IP 地址記錄,而消費者將讀取這些值並在應用程序控制台中進行列出。

3. Java 版本的 AWS SDK

我們將首先使用 Java 版本的 AWS SDK。它的優勢在於它允許我們管理與 Kinesis Data Streams 交互的許多方面。我們可以讀取數據、生產數據、創建數據流以及重塑數據流。 缺點是,為了編寫生產級別的代碼,我們需要處理諸如重塑、錯誤處理或保持消費者存活的守護進程等方面。

3.1. Maven 依賴

<a href="https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client">amazon-kinesis-client</a> Maven 依賴將提供我們所需的全部內容,用於構建可運行的示例。 我們現在將它添加到我們的 <em >pom.xml</em> 文件中:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-kinesis</artifactId>
    <version>1.12.380</version>
</dependency>

3.2. Spring 配置

讓我們重用 AmazonKinesis 對象,以便與我們的 Kinesis Stream 進行交互。我們將它創建為一個 @Bean 並在我們的 @SpringBootApplication 類中:

@Bean
public AmazonKinesis buildAmazonKinesis() {
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    return AmazonKinesisClientBuilder.standard()
      .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
      .withRegion(Regions.EU_CENTRAL_1)
      .build();
}

接下來,讓我們在 application.properties 中定義 aws.access.keyaws.secret.key,這些用於本地機器的密鑰:

aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here

我們將會使用 @Value 註解來讀取它們:

@Value("${aws.access.key}")
private String accessKey;

@Value("${aws.secret.key}")
private String secretKey;

為了簡化操作,我們將依賴 @Scheduled 方法來創建和消費記錄。

3.3. 消費者

AWS SDK Kinesis 消費者採用拉式模型,這意味着我們的代碼將從 Kinesis 數據流的片段(shards)中拉取記錄:

GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
    recordsResult.getRecords().stream()
      .map(record -> new String(record.getData().array()))
      .forEach(System.out::println);

    recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
    recordsResult = kinesis.getRecords(recordsRequest);
}

GetRecordsRequest 對象用於構建流數據的請求。 在我們的示例中,我們定義了每請求 25 條記錄的限制,並一直讀取直到沒有更多記錄。

我們還可以注意到,對於我們的迭代,我們使用了 GetShardIteratorResult 對象。 我們在 @PostConstruc 方法中創建了這個對象,以便我們能夠立即跟蹤記錄。

private GetShardIteratorResult shardIterator;

@PostConstruct
private void buildShardIterator() {
    GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
    readShardsRequest.setStreamName(IPS_STREAM);
    readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
    readShardsRequest.setShardId(IPS_SHARD_ID);

    this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}

3.4. 生產者 (Producer)

現在,讓我們看看如何處理我們 Kinesis 數據流中記錄的創建。

我們使用 PutRecordsRequest 對象來插入數據。對於這個新對象,我們添加一個包含多個 PutRecordsRequestEntry 對象的列表:

List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
    entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
    entry.setPartitionKey(IPS_PARTITION_KEY);
    return entry;
}).collect(Collectors.toList());

PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);

kinesis.putRecords(createRecordsRequest);

我們已經創建了一個基本的消費者和一個模擬 IP 記錄的生產者。現在剩下的就是運行我們的 Spring 項目,並在應用程序控制台中查看 IP 地址列表。

4. KCL 和 KPL

Kinesis 客户端庫 (KCL) 是一套簡化消費記錄的庫。它還為 AWS SDK Java 的 Kinesis 數據流 API 提供了一個抽象層。在後台,該庫處理了跨多個實例的負載均衡、響應實例故障、記錄檢查點以及響應重新分區。

Kinesis 生產者庫 (KPL) 是一套用於寫入 Kinesis 數據流的庫。它還提供了一個位於 AWS SDK Java 的 Kinesis 數據流 API 之上的抽象層。為了獲得更好的性能,該庫會自動處理批量、多線程和重試邏輯。

KCL 和 KPL 都有一個主要優勢,即易於使用,以便我們可以專注於生產和消費記錄。

4.1. Maven 依賴

如果需要,可以將這兩個庫單獨引入到我們的項目中。要將 KPLKCL 引入到我們的 Maven 項目中,我們需要更新我們的 pom.xml 文件:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.14.9</version>
</dependency>

4.2. Spring 配置

我們需要做的只有確保我們擁有 IAM 憑證。<em aws.access.key</em><em aws.secret.key</em> 的值已在我們的 <em application.properties</em> 文件中定義,因此我們可以使用 @Value 讀取它們。

4.3. 消費者

首先,我們將創建一個實現 IRecordProcessor 接口的類,並定義如何處理 Kinesis 數據流記錄的邏輯,即在控制枱中將其打印出來:

public class IpProcessor implements IRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) { }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.getRecords()
          .forEach(record -> System.out.println(new String(record.getData().array())));
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) { }
}

下一步是定義一個實現 IRecordProcessorFactory 接口的工廠類,並返回一個先前創建的 IpProcessor 對象:

public class IpProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new IpProcessor();
    }
}

現在我們進入最終步驟,我們將使用一個 Worker對象來定義我們的消費者管道

我們需要一個 KinesisClientLibConfiguration對象,該對象將定義所需的IAM憑據和AWS區域(如果需要)。

我們將傳遞 KinesisClientLibConfiguration對象和我們的 IpProcessorFactory對象到 Worker對象,然後在一個單獨的線程中啓動它。我們使用 Worker類來始終保持記錄消費的邏輯,因此我們現在持續讀取新的記錄:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
  "KinesisKCLConsumer",
  IPS_STREAM,
  "",
  "",
  DEFAULT_INITIAL_POSITION_IN_STREAM,
  new AWSStaticCredentialsProvider(awsCredentials),
  new AWSStaticCredentialsProvider(awsCredentials),
  new AWSStaticCredentialsProvider(awsCredentials),
  DEFAULT_FAILOVER_TIME_MILLIS,
  "KinesisKCLConsumer",
  DEFAULT_MAX_RECORDS,
  DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
  DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
  DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
  DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
  DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
  new ClientConfiguration(),
  new ClientConfiguration(),
  new ClientConfiguration(),
  DEFAULT_TASK_BACKOFF_TIME_MILLIS,
  DEFAULT_METRICS_BUFFER_TIME_MILLIS,
  DEFAULT_METRICS_MAX_QUEUE_SIZE,
  DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
  Regions.EU_CENTRAL_1.getName(),
  DEFAULT_SHUTDOWN_GRACE_MILLIS,
  DEFAULT_DDB_BILLING_MODE,
  null,
  0,
  0, 
  0
);
final Worker worker = new Worker.Builder()
 .recordProcessorFactory(new IpProcessorFactory())
 .config(consumerConfig)
 .build();
CompletableFuture.runAsync(worker.run());

4.4. 生產者

現在,我們定義 KinesisProducerConfiguration 對象,添加 IAM 憑證和 AWS 區域:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
  .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
  .setVerifyCertificate(false)
  .setRegion(Regions.EU_CENTRAL_1.getName());

this.kinesisProducer = new KinesisProducer(producerConfig);

我們將在一個 @Scheduled 任務中包含之前創建的 kinesisProducer 對象,並持續地為我們的 Kinesis 數據流生產記錄:

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
  .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Spring Cloud Stream Binder Kinesis

我們已經見過了兩個庫,它們都出自非 Spring 生態系統。現在我們將看看 Spring Cloud Stream Binder Kinesis 如何進一步簡化我們的工作,同時建立在 Spring Cloud Stream 之上。

5.1. Maven 依賴

我們需要在應用程序中定義的 Maven 依賴是:Spring Cloud Stream Binder Kinesis

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
    <version>2.2.0</version>
</dependency>

5.2. Spring 部署配置

當在 EC2 上運行時,所需的 AWS 屬性會自動發現,因此無需手動定義。由於我們正在本地機器上運行示例,因此需要為我們的 AWS 賬户定義 IAM 訪問密鑰、密鑰和區域。我們還已禁用應用程序的自動 CloudFormation 堆棧名稱檢測:

cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false

Spring Cloud Stream 包含三個接口,我們可以用於流綁定:

  • Sink 用於數據攝取
  • Source 用於發佈記錄
  • Processor 是這兩種的結合

我們也可以在需要時定義自己的接口。

5.3. 消費者

定義消費者是一個兩步的過程。首先,我們在 application.properties 中定義將要消費的數據流:

spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input-in-0.group=live-ips-group
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition = input

接下來,讓我們定義一個 Spring @Configuration 類,使用 @Bean 及其 Supplier,該 Supplier 能夠從 Kinesis 流中讀取數據:

@Configuration public class ConsumerBinder {
  @Bean Consumer < String > input() {
    return str - >{
      System.out.println(str);
    };
  }
}

5.4. 生產者

生產者還可以被分割成兩部分。首先,我們需要在 application.properties 中定義我們的流屬性:

spring.cloud.stream.bindings.output-out-0.destination=myStream
spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay = 3000

然後,我們會在 Spring 的 @Configuration 中添加 @Bean,並傳入一個 Supplier,從而每隔幾秒鐘創建新的測試消息:

@Configuration class ProducerBinder {
  @Bean public Supplier output() {
    return () -> IntStream.range(1, 200)
                 .mapToObj(ipSuffix - >"192.168.0." + ipSuffix)
                 .map(entry - >MessageBuilder.withPayload(entry)
                 .build());
  }
}

這就是 Spring Cloud Stream Binder Kinesis 工作所需的全部內容。我們現在可以簡單地啓動應用程序。

6. 結論

在本文中,我們學習瞭如何將我們的 Spring 項目與兩個 AWS 庫集成,以便與 Kinesis Data Stream 進行交互。我們還學習瞭如何使用 Spring Cloud Stream Binder Kinesis 庫,以簡化實現過程。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.