知識庫 / Reactive RSS 訂閱

R2DBC – 反應式關係數據庫連接

Persistence,Reactive,Spring
HongKong
7
01:20 PM · Dec 06 ,2025

1. 概述

本教程將演示如何使用 R2DBC 執行反應式數據庫操作。

為了探索 R2DBC,我們將創建一個簡單的 Spring WebFlux REST 應用,該應用實現對單個實體的 CRUD 操作,僅使用異步操作來實現這一目標。

2. 什麼是 >?

反應式開發日益普及,每天都有新的框架涌現,而現有框架的採用率也在不斷提高。然而,反應式開發面臨的主要問題是,。這源於 JDBC 的設計方式,導致了這兩種根本上不同的方法之間的醜陋的“黑客”解決方案。

為了解決 Java 世界中異步數據庫訪問的需求,兩個標準已經出現。第一個標準,ADBC(異步數據庫訪問 API),由 Oracle 贊助,但截至目前,似乎停滯不前,沒有明確的時間表。

第二個標準,我們將在此進行介紹,是 R2DBC(反應式關係數據庫連接),這是一個由 Pivotal 和其他公司團隊主導的社區驅動項目。這個項目仍在 beta 版本中,但已經展示出活力,並提供了 Postgres、H2 和 MSSQL 數據庫的驅動程序。

3. 項目設置

使用 R2DBC 在項目中需要添加對核心 API 和合適的驅動程序的依賴。在我們的示例中,我們將使用 H2,這意味着只需要一個依賴項:

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>1.0.0.RELEASE</version>
</dependency>

r2dbc-spi 將作為 r2dbc-h2 的一個瞬態依賴項。

4. 連接工廠設置

首先,要使用 R2DBC 訪問數據庫,我們需要創建一個 連接工廠對象,它在功能上與 JDBC 的 DataSource 類似。 創建 ConnectionFactory 的最簡單方法是通過 ConnectionFactories 類。

該類提供靜態方法,它接受一個 ConnectionFactoryOptions 對象並返回一個 ConnectionFactory。</nbsp;由於我們只需要一個 ConnectionFactory 的實例,讓我們創建一個 @Bean,以便我們可以在需要時通過注入使用它:

@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
    ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
    Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
    if (!StringUtil.isNullOrEmpty(properties.getUser())) {
        ob = ob.option(USER, properties.getUser());
    }
    if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
        ob = ob.option(PASSWORD, properties.getPassword());
    }        
    return ConnectionFactories.get(ob.build());    
}

在這裏,我們接收來自用帶有 @ConfigurationProperties 註解裝飾的輔助類接收到的選項,並填充我們的 ConnectionFactoryOptions 實例。為了填充該實例,R2DBC 實現了單方法構建器模式,該方法接收一個 Option 和一個值。

R2DBC 定義了許多已知選項,例如 USERNAMEPASSWORD,我們在上面使用了這些選項。 另一種設置這些選項的方法是將連接字符串傳遞給 ConnectionFactoryOptions 類中的 parse() 方法。

以下是一個典型的 R2DBC 連接 URL 示例:

r2dbc:h2:mem://./testdb

讓我們將這個字符串分解成其組成部分:

  • r2dbc: R2DBC URL 的固定方案標識符——另一個有效方案是 rd2bcs,用於 SSL 加密連接
  • h2: 用於定位適當連接工廠的驅動程序標識符
  • mem: 驅動程序特定的協議——在本例中,它對應於內存數據庫
  • //./testdb: 驅動程序特定的字符串,通常包含主機、數據庫和任何其他選項。

在準備好選項集後,我們將它傳遞給 get() 靜態工廠方法以創建我們的 ConnectionFactory  Bean。

5. 執行語句

類似於 JDBC,使用 R2DBC 主要涉及向數據庫發送 SQL 語句並處理結果集。**然而,由於 R2DBC 是一個反應式 API,因此它高度依賴於反應式流類型,例如 <em >Publisher&nbsp;</em >和Subscriber》。

使用這些類型直接操作比較繁瑣,因此我們將使用項目 Reactor 的類型,例如 <em >Mono&nbsp;</em >和Flux>,以幫助我們編寫更簡潔、更清晰的代碼。

在下一部分,我們將看到如何通過創建反應式 DAO 類來實現與數據庫相關的任務,該類針對一個簡單的 `Account> 類。該類僅包含三個屬性,並且在我們的數據庫中有一個對應的表:

public class Account {
    private Long id;
    private String iban;
    private BigDecimal balance;
    // ... getters and setters omitted
}

5.1. 獲取連接

在我們可以將語句發送到數據庫之前,我們需要一個 Connection 實例。我們已經知道如何創建 ConnectionFactory,因此也不奇怪我們會使用它來獲取 Connection。重要的是要記住,現在我們獲得的不是一個普通的 Connection,而是 Publisher 一個單一的 Connection

我們的 ReactiveAccountDao,這是一個標準的 Spring @Component,通過構造函數注入獲取 ConnectionFactory,因此它在處理方法中可以直接使用。

讓我們看看 findById() 方法的幾行代碼,以瞭解如何檢索和使用 Connection

public Mono<Account>> findById(Long id) {         
    return Mono.from(connectionFactory.create())
      .flatMap(c ->
          // use the connection
      )
      // ... downstream processing omitted
}

在這裏,我們正在將從我們的 ConnectionFactory 返回的 Publisher 轉換為一個 Mono,該 Mono 是我們事件流的初始源。

5.1. 準備和提交語句

現在我們已經建立了一個 連接,我們可以利用它來創建一條 語句並將其與參數綁定:

.flatMap( c -> 
    Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
      .bind("$1", id)
      .execute())
      .doFinally((st) -> close(c))
 )

Connection 對象的 createStatement 方法接受一個 SQL 查詢字符串,該字符串可以可選地包含佔位符——在規範中稱為“標記” (見規範)

這裏有幾個需要注意的點:首先,createStatement 是一個同步操作,這允許我們使用流式風格將值綁定到返回的 Statement 對象;其次,而且非常重要,佔位符/標記語法是特定於供應商的!

在此示例中,我們使用 H2 的特定語法,它使用 $n 來標記參數。其他供應商可能使用不同的語法,例如 :param@Pn 或其他約定。這對於將遺留代碼遷移到此新 API 時,是一個必須注意的重要方面。

綁定過程本身相當簡單,這是由於流式 API 模式和簡化類型系統:只有一個重載的 bind() 方法來處理所有類型轉換——當然,受數據庫規則約束。

bind() 方法接受的第一個參數可以是零索引的序數,該序數對應於標記在語句中的位置,也可以是包含實際標記的字符串。

在將所有參數的值設置為後,我們調用 execute(),它返回一個 PublisherResult 對象,我們再次將其包裝在 Mono 中以進行進一步處理。我們附加一個 doFinally() 處理程序到此 Mono,以確保我們無論流式處理是否正常完成,都會關閉我們的連接。

5.2. 處理結果

接下來的流程步驟負責處理 Result 對象並生成一系列 ResponseEntity<Account> 實例。

由於我們知道給定 id 只有一個實例,因此我們將實際返回一個 Mono 流。 實際轉換髮生在傳遞給接收到的 Resultmap() 方法的函數內部:

.map(result -> result.map((row, meta) -> 
    new Account(row.get("id", Long.class),
      row.get("iban", String.class),
      row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));

結果的 map() 方法期望接收一個接受兩個參數的函數。第一個參數是一個 Row 對象,用於收集每列的值並填充一個 Account 實例。第二個參數是 meta,它是一個 RowMetadata 對象,包含有關當前行的信息,例如列名和類型。

之前的 map() 調用在我們的流水線中解析為 Mono<Producer<Account>>">,但我們需要從該方法返回一個 Mono<Account>">。為了解決這個問題,我們添加了一個最終的 flatMap() 步驟,它將 Producer 適配為一個 Mono

5.3. 批量語句

R2DBC 還支持創建和執行語句批次,這允許在單個 execute() 調用中執行多個 SQL 語句。與常規語句不同,批量語句不支持綁定,主要用於 ETL 任務等性能優化場景。

我們的示例項目使用批量語句創建 Account 表並向其中插入一些測試數據:

@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
    return (args) ->
      Flux.from(cf.create())
        .flatMap(c -> 
            Flux.from(c.createBatch()
              .add("drop table if exists Account")
              .add("create table Account(" +
                "id IDENTITY(1,1)," +
                "iban varchar(80) not null," +
                "balance DECIMAL(18,2) not null)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120980198201982',100.00)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120998729871000',250.00)")
              .execute())
            .doFinally((st) -> c.close())
          )
        .log()
        .blockLast();
}

在這裏,我們使用來自<em>createBatch()</em>的<em>Batch</em>對象,並添加一些 SQL 語句。然後,我們使用相同的<em>execute()</em>方法,通過該方法執行這些語句,該方法在<em>Statement</em>接口中可用。

在本例中,我們不關心任何結果——只關心所有語句都能正常執行。如果我們需要任何產生的結果,我們只需要在流中添加一個下游步驟來處理髮出的<em>Result</em>對象。

6. 事務

我們將在本教程中涵蓋的最後一個主題是事務。正如我們已經預料到的,我們以與 JDBC 類似的方式管理事務,即通過使用在 Connection 對象中提供的方法。

正如之前所述,主要區別在於現在 所有與事務相關的類方法都是異步的,返回一個 Publisher ,我們需要在適當的時間將其添加到我們的流中。

我們的示例項目在其 createAccount() 方法中使用了事務。

public Mono<Account> createAccount(Account account) {
    return Mono.from(connectionFactory.create())
      .flatMap(c -> Mono.from(c.beginTransaction())
        .then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
          .bind("$1", account.getIban())
          .bind("$2", account.getBalance())
          .returnGeneratedValues("id")
          .execute()))
        .map(result -> result.map((row, meta) -> 
            new Account(row.get("id", Long.class),
              account.getIban(),
              account.getBalance())))
        .flatMap(pub -> Mono.from(pub))
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close()));   
}

在這裏,我們添加了與交易相關的調用,分為兩點。首先,在從數據庫獲取新的連接後,我們調用beginTransactionMethod()方法。一旦我們確認交易已成功啓動,我們準備並執行insert語句。

這次,我們還使用了returnGeneratedValues()方法,指示數據庫返回為新Account生成的身份值。R2DBC以Result形式返回這些值,該Result包含一行數據,其中包含所有生成的身份值,我們使用它來創建Account實例。

再次強調,我們需要將傳入的Mono<Publisher<Account>>轉換為Mono<Account>,因此我們添加了一個flatMap()方法來解決這個問題。

接下來,我們使用delayUntil()步驟提交事務。我們需要這樣做,以確保返回的Account實例已經提交到數據庫。

最後,我們附加了一個doFinally步驟到該流水線,在所有從返回的Mono中消費的事件完成後關閉Connection

7. 樣車DAO使用示例

現在我們已經擁有一個反應式DAO,讓我們使用它來創建一個簡單的 Spring WebFlux 應用程序,以展示如何在典型應用程序中使用它。由於該框架已經支持反應式構造,因此這成為一項輕而易舉的任務。例如,讓我們來看看 GET 方法的實現:

@RestController
public class AccountResource {
    private final ReactiveAccountDao accountDao;

    public AccountResource(ReactiveAccountDao accountDao) {
        this.accountDao = accountDao;
    }

    @GetMapping("/accounts/{id}")
    public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
        return accountDao.findById(id)
          .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
          .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
    }
    // ... other methods omitted
}

在這裏,我們使用 DAO 返回的 Mono 構建一個 ResponseEntity,並設置適當的狀態碼。我們這樣做只是為了在沒有 Account 對應指定 ID 的情況下,返回 NOT_FOUND (404) 狀態碼。

8. 結論

在本文中,我們介紹了使用 R2DBC 進行反應式數據庫訪問的基本知識。雖然該項目仍處於早期階段,但正迅速發展,預計將在 2020 年初發布。

與 ADBA 相比,後者肯定不會成為 Java 12 的一部分,R2DBC 似乎更具前景,並且已經為一些流行的數據庫提供了驅動程序——Oracle 數據庫在此處是一個重要的例外。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.