1. 簡介
Kinesis 是一款用於實時收集、處理和分析數據流的工具,由亞馬遜開發。其主要優勢在於幫助開發基於事件驅動的應用。
在本教程中,我們將探索一些庫,這些庫使我們的 Spring 應用能夠向 Kinesis 流中生產和消費記錄。代碼示例將展示基本功能,但不代表生產級別的代碼。
2. 先決條件
在繼續之前,我們需要完成兩項任務。
第一項是創建 Spring 項目,因為我們的目標是使用 Spring 項目與 Kinesis 交互。
第二項是創建 Kinesis 數據流。我們可以從 AWS 賬户中的 Web 瀏覽器中完成此操作。 對於 AWS CLI 愛好者來説,另一種選擇是使用命令行:參考命令。 由於我們將從代碼中與其交互,因此我們還需要準備好 AWS IAM 憑證,包括訪問密鑰和密鑰。 此外,還需要指定區域。
所有生產者將創建模擬 IP 地址記錄,而消費者將讀取這些值並在應用程序控制台中進行列出。
3. Java 版本的 AWS SDK
我們將首先使用 Java 版本的 AWS SDK。它的優勢在於它允許我們管理與 Kinesis Data Streams 交互的許多方面。我們可以讀取數據、生產數據、創建數據流以及重塑數據流。 缺點是,為了編寫生產級別的代碼,我們需要處理諸如重塑、錯誤處理或保持消費者存活的守護進程等方面。
3.1. Maven 依賴
<a href="https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client">amazon-kinesis-client</a> Maven 依賴將提供我們所需的全部內容,用於構建可運行的示例。 我們現在將它添加到我們的 <em >pom.xml</em> 文件中:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.12.380</version>
</dependency>3.2. Spring 配置
讓我們重用 AmazonKinesis 對象,以便與我們的 Kinesis Stream 進行交互。我們將它創建為一個 @Bean 並在我們的 @SpringBootApplication 類中:
@Bean
public AmazonKinesis buildAmazonKinesis() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonKinesisClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}接下來,讓我們在 application.properties 中定義 aws.access.key 和 aws.secret.key,這些用於本地機器的密鑰:
aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here我們將會使用 @Value 註解來讀取它們:
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;為了簡化操作,我們將依賴 @Scheduled 方法來創建和消費記錄。
3.3. 消費者
AWS SDK Kinesis 消費者採用拉式模型,這意味着我們的代碼將從 Kinesis 數據流的片段(shards)中拉取記錄:
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);
GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
recordsResult.getRecords().stream()
.map(record -> new String(record.getData().array()))
.forEach(System.out::println);
recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
recordsResult = kinesis.getRecords(recordsRequest);
}GetRecordsRequest 對象用於構建流數據的請求。 在我們的示例中,我們定義了每請求 25 條記錄的限制,並一直讀取直到沒有更多記錄。
我們還可以注意到,對於我們的迭代,我們使用了 GetShardIteratorResult 對象。 我們在 @PostConstruc 方法中創建了這個對象,以便我們能夠立即跟蹤記錄。
private GetShardIteratorResult shardIterator;
@PostConstruct
private void buildShardIterator() {
GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
readShardsRequest.setStreamName(IPS_STREAM);
readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
readShardsRequest.setShardId(IPS_SHARD_ID);
this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}3.4. 生產者 (Producer)
現在,讓我們看看如何處理我們 Kinesis 數據流中記錄的創建。
我們使用 PutRecordsRequest 對象來插入數據。對於這個新對象,我們添加一個包含多個 PutRecordsRequestEntry 對象的列表:
List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
entry.setPartitionKey(IPS_PARTITION_KEY);
return entry;
}).collect(Collectors.toList());
PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);
kinesis.putRecords(createRecordsRequest);我們已經創建了一個基本的消費者和一個模擬 IP 記錄的生產者。現在剩下的就是運行我們的 Spring 項目,並在應用程序控制台中查看 IP 地址列表。
4. KCL 和 KPL
Kinesis 客户端庫 (KCL) 是一套簡化消費記錄的庫。它還為 AWS SDK Java 的 Kinesis 數據流 API 提供了一個抽象層。在後台,該庫處理了跨多個實例的負載均衡、響應實例故障、記錄檢查點以及響應重新分區。
Kinesis 生產者庫 (KPL) 是一套用於寫入 Kinesis 數據流的庫。它還提供了一個位於 AWS SDK Java 的 Kinesis 數據流 API 之上的抽象層。為了獲得更好的性能,該庫會自動處理批量、多線程和重試邏輯。
KCL 和 KPL 都有一個主要優勢,即易於使用,以便我們可以專注於生產和消費記錄。
4.1. Maven 依賴
如果需要,可以將這兩個庫單獨引入到我們的項目中。要將 KPL 和 KCL 引入到我們的 Maven 項目中,我們需要更新我們的 pom.xml 文件:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.14.9</version>
</dependency>4.2. Spring 配置
我們需要做的只有確保我們擁有 IAM 憑證。<em aws.access.key</em> 和 <em aws.secret.key</em> 的值已在我們的 <em application.properties</em> 文件中定義,因此我們可以使用 @Value 讀取它們。
4.3. 消費者
首先,我們將創建一個實現 IRecordProcessor 接口的類,並定義如何處理 Kinesis 數據流記錄的邏輯,即在控制枱中將其打印出來:
public class IpProcessor implements IRecordProcessor {
@Override
public void initialize(InitializationInput initializationInput) { }
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.getRecords()
.forEach(record -> System.out.println(new String(record.getData().array())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) { }
}下一步是定義一個實現 IRecordProcessorFactory 接口的工廠類,並返回一個先前創建的 IpProcessor 對象:
public class IpProcessorFactory implements IRecordProcessorFactory {
@Override
public IRecordProcessor createProcessor() {
return new IpProcessor();
}
}現在我們進入最終步驟,我們將使用一個 Worker對象來定義我們的消費者管道。
我們需要一個 KinesisClientLibConfiguration對象,該對象將定義所需的IAM憑據和AWS區域(如果需要)。
我們將傳遞 KinesisClientLibConfiguration對象和我們的 IpProcessorFactory對象到 Worker對象,然後在一個單獨的線程中啓動它。我們使用 Worker類來始終保持記錄消費的邏輯,因此我們現在持續讀取新的記錄:
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
"KinesisKCLConsumer",
IPS_STREAM,
"",
"",
DEFAULT_INITIAL_POSITION_IN_STREAM,
new AWSStaticCredentialsProvider(awsCredentials),
new AWSStaticCredentialsProvider(awsCredentials),
new AWSStaticCredentialsProvider(awsCredentials),
DEFAULT_FAILOVER_TIME_MILLIS,
"KinesisKCLConsumer",
DEFAULT_MAX_RECORDS,
DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS,
DEFAULT_METRICS_BUFFER_TIME_MILLIS,
DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
Regions.EU_CENTRAL_1.getName(),
DEFAULT_SHUTDOWN_GRACE_MILLIS,
DEFAULT_DDB_BILLING_MODE,
null,
0,
0,
0
);
final Worker worker = new Worker.Builder()
.recordProcessorFactory(new IpProcessorFactory())
.config(consumerConfig)
.build();
CompletableFuture.runAsync(worker.run());4.4. 生產者
現在,我們定義 KinesisProducerConfiguration 對象,添加 IAM 憑證和 AWS 區域:
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
.setVerifyCertificate(false)
.setRegion(Regions.EU_CENTRAL_1.getName());
this.kinesisProducer = new KinesisProducer(producerConfig);我們將在一個 @Scheduled 任務中包含之前創建的 kinesisProducer 對象,並持續地為我們的 Kinesis 數據流生產記錄:
IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
.forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));5. Spring Cloud Stream Binder Kinesis
我們已經見過了兩個庫,它們都出自非 Spring 生態系統。現在我們將看看 Spring Cloud Stream Binder Kinesis 如何進一步簡化我們的工作,同時建立在 Spring Cloud Stream 之上。
5.1. Maven 依賴
我們需要在應用程序中定義的 Maven 依賴是:Spring Cloud Stream Binder Kinesis。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>2.2.0</version>
</dependency>5.2. Spring 部署配置
當在 EC2 上運行時,所需的 AWS 屬性會自動發現,因此無需手動定義。由於我們正在本地機器上運行示例,因此需要為我們的 AWS 賬户定義 IAM 訪問密鑰、密鑰和區域。我們還已禁用應用程序的自動 CloudFormation 堆棧名稱檢測:
cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=falseSpring Cloud Stream 包含三個接口,我們可以用於流綁定:
- Sink 用於數據攝取
- Source 用於發佈記錄
- Processor 是這兩種的結合
我們也可以在需要時定義自己的接口。
5.3. 消費者
定義消費者是一個兩步的過程。首先,我們在 application.properties 中定義將要消費的數據流:
spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input-in-0.group=live-ips-group
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition = input接下來,讓我們定義一個 Spring @Configuration 類,使用 @Bean 及其 Supplier,該 Supplier 能夠從 Kinesis 流中讀取數據:
@Configuration public class ConsumerBinder {
@Bean Consumer < String > input() {
return str - >{
System.out.println(str);
};
}
}5.4. 生產者
生產者還可以被分割成兩部分。首先,我們需要在 application.properties 中定義我們的流屬性:
spring.cloud.stream.bindings.output-out-0.destination=myStream
spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay = 3000然後,我們會在 Spring 的 @Configuration 中添加 @Bean,並傳入一個 Supplier,從而每隔幾秒鐘創建新的測試消息:
@Configuration class ProducerBinder {
@Bean public Supplier output() {
return () -> IntStream.range(1, 200)
.mapToObj(ipSuffix - >"192.168.0." + ipSuffix)
.map(entry - >MessageBuilder.withPayload(entry)
.build());
}
}這就是 Spring Cloud Stream Binder Kinesis 工作所需的全部內容。我們現在可以簡單地啓動應用程序。
6. 結論
在本文中,我們學習瞭如何將我們的 Spring 項目與兩個 AWS 庫集成,以便與 Kinesis Data Stream 進行交互。我們還學習瞭如何使用 Spring Cloud Stream Binder Kinesis 庫,以簡化實現過程。