知識庫 / Spring RSS 訂閱

異步批量操作在 Couchbase

NoSQL,Spring
HongKong
5
02:49 PM · Dec 06 ,2025

1. 簡介

本教程的後續內容將探討在 Spring 應用中使用 Couchbase 的異步特性,並展示如何利用 Couchbase SDK 的異步功能,通過批量執行持久化操作,從而使我們的應用程序能夠充分利用 Couchbase 資源。

1.1. CrudService 接口

首先,我們增強了通用的 CrudService 接口,以包含批量操作:

public interface CrudService<T> {
    ...
    
    List<T> readBulk(Iterable<String> ids);

    void createBulk(Iterable<T> items);

    void updateBulk(Iterable<T> items);

    void deleteBulk(Iterable<String> ids);

    boolean exists(String id);
}

1.2. CouchbaseEntity 接口

我們定義了一個接口,用於持久化我們想要持久化的實體:

public interface CouchbaseEntity {

    String getId();
    
    void setId(String id);
    
}

1.3. AbstractCrudService

然後,我們將實現這些方法,並在一個通用的抽象類中實現它們。該類繼承自我們在上一教程中使用的 PersonCrudService 類,其內容如下:

public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
    private BucketService bucketService;
    private Bucket bucket;
    private JsonDocumentConverter<T> converter;

    public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
        this.bucketService = bucketService;
        this.converter = converter;
    }

    protected void loadBucket() {
        bucket = bucketService.getBucket();
    }
    
    ...
}

2. 異步 Bucket 接口

Couchbase SDK 提供 AsyncBucket 接口,用於執行異步操作。給定一個 Bucket 實例,您可以通過 async() 方法獲取其異步版本:

AsyncBucket asyncBucket = bucket.async();

3. 批量操作

要使用 <em >AsyncBucket</em> 接口執行批量操作,我們使用 <em >RxJava</em> 庫。

3.1. 批量讀取

我們實現了 <em >readBulk</em> 方法。首先,我們使用 RxJava 中的 <em >AsyncBucket</em><em >flatMap</em> 機制異步地將文檔檢索到 <em >Observable&lt;JsonDocument&gt;</em >> 中,然後我們使用 RxJava 中的 <em >toBlocking</em> 機制將這些轉換為實體列表:

@Override
public List<T> readBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable<JsonDocument> asyncOperation = Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.get(key);
          }
    });

    List<T> items = new ArrayList<T>();
    try {
        asyncOperation.toBlocking()
          .forEach(new Action1<JsonDocument>() {
              public void call(JsonDocument doc) {
                  T item = converter.fromDocument(doc);
                  items.add(item);
              }
        });
    } catch (Exception e) {
        logger.error("Error during bulk get", e);
    }

    return items;
}

3.2. 大批量插入

我們再次使用 RxJava 的 flatMap 構造函數來實現 createBulk 方法。

由於大批量修改請求的產生速度超過了響應生成速度,有時會導致過載情況,因此在遇到 BackpressureException 時,我們會採用帶有指數級延遲的重試機制。

@Override
public void createBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              if(t.getId() == null) {
                  t.setId(UUID.randomUUID().toString());
              }
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.insert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.3. 大批量更新

我們使用類似的機制在 updateBulk 方法中:

@Override
public void updateBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.upsert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.4. 批量刪除

我們編寫了 deleteBulk 方法如下:

@Override
public void deleteBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.remove(key)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

4. PersonCrudService

最後,我們編寫了一個 Spring 服務,PersonCrudService,它擴展了我們的 AbstractCrudService 用於 Person 實體。

由於所有 Couchbase 交互都已在抽象類中實現,因此對於實體類的實現非常簡單,我們只需要確保所有依賴項已注入,並加載我們的 bucket:

@Service
public class PersonCrudService extends AbstractCrudService<Person> {

    @Autowired
    public PersonCrudService(
      @Qualifier("TutorialBucketService") BucketService bucketService,
      PersonDocumentConverter converter) {
        super(bucketService, converter);
    }

    @PostConstruct
    private void init() {
        loadBucket();
    }
}

5. 結論

您可以在官方 Couchbase 開發者文檔站點 中瞭解更多有關 Couchbase Java SDK 的信息。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.