1. 簡介
本教程的後續內容將探討在 Spring 應用中使用 Couchbase 的異步特性,並展示如何利用 Couchbase SDK 的異步功能,通過批量執行持久化操作,從而使我們的應用程序能夠充分利用 Couchbase 資源。
1.1. CrudService 接口
首先,我們增強了通用的 CrudService 接口,以包含批量操作:
public interface CrudService<T> {
...
List<T> readBulk(Iterable<String> ids);
void createBulk(Iterable<T> items);
void updateBulk(Iterable<T> items);
void deleteBulk(Iterable<String> ids);
boolean exists(String id);
}1.2. CouchbaseEntity 接口
我們定義了一個接口,用於持久化我們想要持久化的實體:
public interface CouchbaseEntity {
String getId();
void setId(String id);
}1.3. AbstractCrudService 類
然後,我們將實現這些方法,並在一個通用的抽象類中實現它們。該類繼承自我們在上一教程中使用的 PersonCrudService 類,其內容如下:
public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
private BucketService bucketService;
private Bucket bucket;
private JsonDocumentConverter<T> converter;
public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
this.bucketService = bucketService;
this.converter = converter;
}
protected void loadBucket() {
bucket = bucketService.getBucket();
}
...
}2. 異步 Bucket 接口
Couchbase SDK 提供 AsyncBucket 接口,用於執行異步操作。給定一個 Bucket 實例,您可以通過 async() 方法獲取其異步版本:
AsyncBucket asyncBucket = bucket.async();3. 批量操作
要使用 <em >AsyncBucket</em> 接口執行批量操作,我們使用 <em >RxJava</em> 庫。
3.1. 批量讀取
我們實現了 <em >readBulk</em> 方法。首先,我們使用 RxJava 中的 <em >AsyncBucket</em> 和 <em >flatMap</em> 機制異步地將文檔檢索到 <em >Observable<JsonDocument></em >> 中,然後我們使用 RxJava 中的 <em >toBlocking</em> 機制將這些轉換為實體列表:
@Override
public List<T> readBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable<JsonDocument> asyncOperation = Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(String key) {
return asyncBucket.get(key);
}
});
List<T> items = new ArrayList<T>();
try {
asyncOperation.toBlocking()
.forEach(new Action1<JsonDocument>() {
public void call(JsonDocument doc) {
T item = converter.fromDocument(doc);
items.add(item);
}
});
} catch (Exception e) {
logger.error("Error during bulk get", e);
}
return items;
}3.2. 大批量插入
我們再次使用 RxJava 的 flatMap 構造函數來實現 createBulk 方法。
由於大批量修改請求的產生速度超過了響應生成速度,有時會導致過載情況,因此在遇到 BackpressureException 時,我們會採用帶有指數級延遲的重試機制。
@Override
public void createBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
if(t.getId() == null) {
t.setId(UUID.randomUUID().toString());
}
JsonDocument doc = converter.toDocument(t);
return asyncBucket.insert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}3.3. 大批量更新
我們使用類似的機制在 updateBulk 方法中:
@Override
public void updateBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
JsonDocument doc = converter.toDocument(t);
return asyncBucket.upsert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}3.4. 批量刪除
我們編寫了 deleteBulk 方法如下:
@Override
public void deleteBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(String key) {
return asyncBucket.remove(key)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}4. PersonCrudService
最後,我們編寫了一個 Spring 服務,PersonCrudService,它擴展了我們的 AbstractCrudService 用於 Person 實體。
由於所有 Couchbase 交互都已在抽象類中實現,因此對於實體類的實現非常簡單,我們只需要確保所有依賴項已注入,並加載我們的 bucket:
@Service
public class PersonCrudService extends AbstractCrudService<Person> {
@Autowired
public PersonCrudService(
@Qualifier("TutorialBucketService") BucketService bucketService,
PersonDocumentConverter converter) {
super(bucketService, converter);
}
@PostConstruct
private void init() {
loadBucket();
}
}5. 結論
您可以在官方 Couchbase 開發者文檔站點 中瞭解更多有關 Couchbase Java SDK 的信息。