1. 概述
本教程將演示如何使用 R2DBC 執行反應式數據庫操作。
為了探索 R2DBC,我們將創建一個簡單的 Spring WebFlux REST 應用,該應用實現對單個實體的 CRUD 操作,僅使用異步操作來實現這一目標。
2. 什麼是 >?
反應式開發日益普及,每天都有新的框架涌現,而現有框架的採用率也在不斷提高。然而,反應式開發面臨的主要問題是,。這源於 JDBC 的設計方式,導致了這兩種根本上不同的方法之間的醜陋的“黑客”解決方案。
為了解決 Java 世界中異步數據庫訪問的需求,兩個標準已經出現。第一個標準,ADBC(異步數據庫訪問 API),由 Oracle 贊助,但截至目前,似乎停滯不前,沒有明確的時間表。
第二個標準,我們將在此進行介紹,是 R2DBC(反應式關係數據庫連接),這是一個由 Pivotal 和其他公司團隊主導的社區驅動項目。這個項目仍在 beta 版本中,但已經展示出活力,並提供了 Postgres、H2 和 MSSQL 數據庫的驅動程序。
3. 項目設置
使用 R2DBC 在項目中需要添加對核心 API 和合適的驅動程序的依賴。在我們的示例中,我們將使用 H2,這意味着只需要一個依賴項:
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>r2dbc-spi 將作為 r2dbc-h2 的一個瞬態依賴項。
4. 連接工廠設置
首先,要使用 R2DBC 訪問數據庫,我們需要創建一個 連接工廠對象,它在功能上與 JDBC 的 DataSource 類似。 創建 ConnectionFactory 的最簡單方法是通過 ConnectionFactories 類。
該類提供靜態方法,它接受一個 ConnectionFactoryOptions 對象並返回一個 ConnectionFactory。</nbsp;由於我們只需要一個 ConnectionFactory 的實例,讓我們創建一個 @Bean,以便我們可以在需要時通過注入使用它:
@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
if (!StringUtil.isNullOrEmpty(properties.getUser())) {
ob = ob.option(USER, properties.getUser());
}
if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
ob = ob.option(PASSWORD, properties.getPassword());
}
return ConnectionFactories.get(ob.build());
}
在這裏,我們接收來自用帶有 @ConfigurationProperties 註解裝飾的輔助類接收到的選項,並填充我們的 ConnectionFactoryOptions 實例。為了填充該實例,R2DBC 實現了單方法構建器模式,該方法接收一個 Option 和一個值。
R2DBC 定義了許多已知選項,例如 USERNAME 和 PASSWORD,我們在上面使用了這些選項。 另一種設置這些選項的方法是將連接字符串傳遞給 ConnectionFactoryOptions 類中的 parse() 方法。
以下是一個典型的 R2DBC 連接 URL 示例:
r2dbc:h2:mem://./testdb讓我們將這個字符串分解成其組成部分:
- r2dbc: R2DBC URL 的固定方案標識符——另一個有效方案是 rd2bcs,用於 SSL 加密連接
- h2: 用於定位適當連接工廠的驅動程序標識符
- mem: 驅動程序特定的協議——在本例中,它對應於內存數據庫
- //./testdb: 驅動程序特定的字符串,通常包含主機、數據庫和任何其他選項。
在準備好選項集後,我們將它傳遞給 get() 靜態工廠方法以創建我們的 ConnectionFactory Bean。
5. 執行語句
類似於 JDBC,使用 R2DBC 主要涉及向數據庫發送 SQL 語句並處理結果集。**然而,由於 R2DBC 是一個反應式 API,因此它高度依賴於反應式流類型,例如 <em >Publisher </em >和Subscriber》。
使用這些類型直接操作比較繁瑣,因此我們將使用項目 Reactor 的類型,例如 <em >Mono </em >和Flux>,以幫助我們編寫更簡潔、更清晰的代碼。
在下一部分,我們將看到如何通過創建反應式 DAO 類來實現與數據庫相關的任務,該類針對一個簡單的 `Account> 類。該類僅包含三個屬性,並且在我們的數據庫中有一個對應的表:
public class Account {
private Long id;
private String iban;
private BigDecimal balance;
// ... getters and setters omitted
}5.1. 獲取連接
在我們可以將語句發送到數據庫之前,我們需要一個 Connection 實例。我們已經知道如何創建 ConnectionFactory,因此也不奇怪我們會使用它來獲取 Connection。重要的是要記住,現在我們獲得的不是一個普通的 Connection,而是 Publisher 一個單一的 Connection。
我們的 ReactiveAccountDao,這是一個標準的 Spring @Component,通過構造函數注入獲取 ConnectionFactory,因此它在處理方法中可以直接使用。
讓我們看看 findById() 方法的幾行代碼,以瞭解如何檢索和使用 Connection:
public Mono<Account>> findById(Long id) {
return Mono.from(connectionFactory.create())
.flatMap(c ->
// use the connection
)
// ... downstream processing omitted
}在這裏,我們正在將從我們的 ConnectionFactory 返回的 Publisher 轉換為一個 Mono,該 Mono 是我們事件流的初始源。
5.1. 準備和提交語句
現在我們已經建立了一個 連接,我們可以利用它來創建一條 語句並將其與參數綁定:
.flatMap( c ->
Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
.bind("$1", id)
.execute())
.doFinally((st) -> close(c))
)
Connection 對象的 createStatement 方法接受一個 SQL 查詢字符串,該字符串可以可選地包含佔位符——在規範中稱為“標記” (見規範)。
這裏有幾個需要注意的點:首先,createStatement 是一個同步操作,這允許我們使用流式風格將值綁定到返回的 Statement 對象;其次,而且非常重要,佔位符/標記語法是特定於供應商的!
在此示例中,我們使用 H2 的特定語法,它使用 $n 來標記參數。其他供應商可能使用不同的語法,例如 :param、@Pn 或其他約定。這對於將遺留代碼遷移到此新 API 時,是一個必須注意的重要方面。
綁定過程本身相當簡單,這是由於流式 API 模式和簡化類型系統:只有一個重載的 bind() 方法來處理所有類型轉換——當然,受數據庫規則約束。
bind() 方法接受的第一個參數可以是零索引的序數,該序數對應於標記在語句中的位置,也可以是包含實際標記的字符串。
在將所有參數的值設置為後,我們調用 execute(),它返回一個 Publisher 的 Result 對象,我們再次將其包裝在 Mono 中以進行進一步處理。我們附加一個 doFinally() 處理程序到此 Mono,以確保我們無論流式處理是否正常完成,都會關閉我們的連接。
5.2. 處理結果
接下來的流程步驟負責處理 Result 對象並生成一系列 ResponseEntity<Account> 實例。
由於我們知道給定 id 只有一個實例,因此我們將實際返回一個 Mono 流。 實際轉換髮生在傳遞給接收到的 Result 的 map() 方法的函數內部:
.map(result -> result.map((row, meta) ->
new Account(row.get("id", Long.class),
row.get("iban", String.class),
row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));
結果的 map() 方法期望接收一個接受兩個參數的函數。第一個參數是一個 Row 對象,用於收集每列的值並填充一個 Account 實例。第二個參數是 meta,它是一個 RowMetadata 對象,包含有關當前行的信息,例如列名和類型。
之前的 map() 調用在我們的流水線中解析為 Mono<Producer<Account>>">,但我們需要從該方法返回一個 Mono<Account>">。為了解決這個問題,我們添加了一個最終的 flatMap() 步驟,它將 Producer 適配為一個 Mono。
5.3. 批量語句
R2DBC 還支持創建和執行語句批次,這允許在單個 execute() 調用中執行多個 SQL 語句。與常規語句不同,批量語句不支持綁定,主要用於 ETL 任務等性能優化場景。
我們的示例項目使用批量語句創建 Account 表並向其中插入一些測試數據:
@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
return (args) ->
Flux.from(cf.create())
.flatMap(c ->
Flux.from(c.createBatch()
.add("drop table if exists Account")
.add("create table Account(" +
"id IDENTITY(1,1)," +
"iban varchar(80) not null," +
"balance DECIMAL(18,2) not null)")
.add("insert into Account(iban,balance)" +
"values('BR430120980198201982',100.00)")
.add("insert into Account(iban,balance)" +
"values('BR430120998729871000',250.00)")
.execute())
.doFinally((st) -> c.close())
)
.log()
.blockLast();
}在這裏,我們使用來自<em>createBatch()</em>的<em>Batch</em>對象,並添加一些 SQL 語句。然後,我們使用相同的<em>execute()</em>方法,通過該方法執行這些語句,該方法在<em>Statement</em>接口中可用。
在本例中,我們不關心任何結果——只關心所有語句都能正常執行。如果我們需要任何產生的結果,我們只需要在流中添加一個下游步驟來處理髮出的<em>Result</em>對象。
6. 事務
我們將在本教程中涵蓋的最後一個主題是事務。正如我們已經預料到的,我們以與 JDBC 類似的方式管理事務,即通過使用在 Connection 對象中提供的方法。
正如之前所述,主要區別在於現在 所有與事務相關的類方法都是異步的,返回一個 Publisher ,我們需要在適當的時間將其添加到我們的流中。
我們的示例項目在其 createAccount() 方法中使用了事務。
public Mono<Account> createAccount(Account account) {
return Mono.from(connectionFactory.create())
.flatMap(c -> Mono.from(c.beginTransaction())
.then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
.bind("$1", account.getIban())
.bind("$2", account.getBalance())
.returnGeneratedValues("id")
.execute()))
.map(result -> result.map((row, meta) ->
new Account(row.get("id", Long.class),
account.getIban(),
account.getBalance())))
.flatMap(pub -> Mono.from(pub))
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close()));
}在這裏,我們添加了與交易相關的調用,分為兩點。首先,在從數據庫獲取新的連接後,我們調用beginTransactionMethod()方法。一旦我們確認交易已成功啓動,我們準備並執行insert語句。
這次,我們還使用了returnGeneratedValues()方法,指示數據庫返回為新Account生成的身份值。R2DBC以Result形式返回這些值,該Result包含一行數據,其中包含所有生成的身份值,我們使用它來創建Account實例。
再次強調,我們需要將傳入的Mono<Publisher<Account>>轉換為Mono<Account>,因此我們添加了一個flatMap()方法來解決這個問題。
接下來,我們使用delayUntil()步驟提交事務。我們需要這樣做,以確保返回的Account實例已經提交到數據庫。
最後,我們附加了一個doFinally步驟到該流水線,在所有從返回的Mono中消費的事件完成後關閉Connection。
7. 樣車DAO使用示例
現在我們已經擁有一個反應式DAO,讓我們使用它來創建一個簡單的 Spring WebFlux 應用程序,以展示如何在典型應用程序中使用它。由於該框架已經支持反應式構造,因此這成為一項輕而易舉的任務。例如,讓我們來看看 GET 方法的實現:
@RestController
public class AccountResource {
private final ReactiveAccountDao accountDao;
public AccountResource(ReactiveAccountDao accountDao) {
this.accountDao = accountDao;
}
@GetMapping("/accounts/{id}")
public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
return accountDao.findById(id)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
.switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
}
// ... other methods omitted
}在這裏,我們使用 DAO 返回的 Mono 構建一個 ResponseEntity,並設置適當的狀態碼。我們這樣做只是為了在沒有 Account 對應指定 ID 的情況下,返回 NOT_FOUND (404) 狀態碼。
8. 結論
在本文中,我們介紹了使用 R2DBC 進行反應式數據庫訪問的基本知識。雖然該項目仍處於早期階段,但正迅速發展,預計將在 2020 年初發布。
與 ADBA 相比,後者肯定不會成為 Java 12 的一部分,R2DBC 似乎更具前景,並且已經為一些流行的數據庫提供了驅動程序——Oracle 數據庫在此處是一個重要的例外。