今天為大家帶來的是併發設計模式實戰系列,第十五章Future/Promise,廢話不多説直接開始~
一、核心原理深度拆解
1. 異步計算雙階段模型
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Task │───>│ Future │───>│ Callback │
│ Submission │<───│ (Promise) │<───│ Execution │
└─────────────┘ └─────────────┘ └─────────────┘
- 提交階段:主線程提交任務後立即返回Future佔位符
- 計算階段:工作線程異步執行計算,通過Promise設置結果
- 回調階段:結果就緒後觸發回調(觀察者模式)
2. 狀態機流轉
public interface Future<V> {
boolean isDone(); // 完成狀態(成功/失敗/取消)
V get() throws...; // 阻塞獲取結果
void addCallback(...); // 回調註冊
}
二、生活化類比:快遞櫃取件
| 系統組件 | 現實類比 | 核心行為 |
|---|---|---|
| Future | 快遞櫃取件碼 | 憑碼查詢包裹是否到達 |
| Promise | 快遞員存件操作 | 實際將包裹放入櫃中並更新狀態 |
| Callback | 短信通知服務 | 包裹入櫃後自動發送取件提醒 |
- 異步流程:下單→獲得取件碼(Future)→快遞員送貨(異步計算)→短信通知(Callback)
三、Java代碼實現(生產級Demo)
1. 完整可運行代碼
import java.util.concurrent.*;
import java.util.function.Consumer;
public class FuturePromiseDemo {
// 1. 自定義Promise實現
static class MyPromise<V> implements Future<V>, Runnable {
private volatile V result;
private volatile Throwable error;
private volatile boolean isDone;
private final CountDownLatch latch = new CountDownLatch(1);
private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();
// 提交任務時執行的方法
@Override
public void run() {
try {
// 模擬耗時計算
Thread.sleep(1000);
setResult((V) "計算結果"); // 實際業務邏輯替換此處
} catch (Exception e) {
setError(e);
}
}
// Promise核心方法:設置結果
public void setResult(V result) {
this.result = result;
this.isDone = true;
latch.countDown();
notifyCallbacks();
}
// Promise核心方法:設置異常
public void setError(Throwable error) {
this.error = error;
this.isDone = true;
latch.countDown();
}
private void notifyCallbacks() {
callbacks.forEach(cb -> cb.accept(result));
}
// Future實現方法
@Override
public V get() throws InterruptedException, ExecutionException {
latch.await();
if (error != null) throw new ExecutionException(error);
return result;
}
@Override
public boolean isDone() {
return isDone;
}
// 註冊回調(非JUC標準方法)
public void addCallback(Consumer<V> callback) {
if (isDone) {
callback.accept(result);
} else {
callbacks.add(callback);
}
}
}
// 2. 使用示例
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
// 創建Promise並提交任務
MyPromise<String> promise = new MyPromise<>();
executor.submit(promise);
// 註冊回調
promise.addCallback(result ->
System.out.println("[回調] 異步結果: " + result));
// 同步阻塞獲取
System.out.println("[主線程] 立即返回,繼續其他工作...");
System.out.println("最終結果: " + promise.get());
executor.shutdown();
}
}
2. 關鍵機制説明
// 1. 狀態同步控制
private volatile boolean isDone; // 保證可見性
private final CountDownLatch latch; // 實現阻塞等待
// 2. 線程安全回調列表
private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();
// 3. 異常處理流程
public void setError(Throwable error) {
this.error = error;
this.isDone = true;
latch.countDown(); // 喚醒所有等待線程
}
四、橫向對比表格
1. 異步模式對比
| 模式 | 核心特點 | 適用場景 | Java實現類 |
|---|---|---|---|
| Future | 阻塞式獲取結果 | 簡單異步任務 | FutureTask |
| CompletableFuture | 鏈式調用+組合操作 | 複雜異步流水線 | CompletableFuture |
| Promise | 可寫的結果容器 | 跨線程結果傳遞 | 需自行實現 |
| Callback | 事件驅動無阻塞 | 高併發IO | Netty的ChannelFuture |
2. 回調註冊方式對比
| 方法 | 觸發時機 | 線程安全性 | 鏈式支持 |
|---|---|---|---|
| addCallback | 結果就緒後立即執行 | 需自行保證 | 不支持 |
| thenApply | 前序階段完成後觸發 | 內置線程池控制 | 支持 |
| whenComplete | 無論成功失敗都執行 | 可能在不同線程執行 | 支持 |
五、高級應用技巧
1. 組合多個異步任務
CompletableFuture<String> query1 = queryDatabase("sql1");
CompletableFuture<String> query2 = queryDatabase("sql2");
// 並行執行後合併結果
CompletableFuture<String> merged = query1.thenCombineAsync(query2,
(r1, r2) -> r1 + "|" + r2,
ForkJoinPool.commonPool());
2. 超時控制
Future<String> future = executor.submit(task);
try {
String result = future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true); // 中斷任務執行
}
3. 回調線程控制
promise.addCallback(result -> {
// 指定回調執行線程池
ForkJoinPool.commonPool().execute(() -> processResult(result));
});
通過這種 原理+實現+對比 的立體解析,可以掌握:
- Future/Promise的雙階段異步本質
- 如何實現生產級的Promise容器
- 不同異步模式的適用場景選擇
- 複雜場景下的組合使用技巧
六、源碼級實現剖析(接五)
1. JDK FutureTask 核心邏輯
// 狀態機定義(OpenJDK 17)
private volatile int state;
static final int NEW = 0; // 初始化狀態
static final int COMPLETING = 1; // 臨時狀態
static final int NORMAL = 2; // 正常完成
static final int EXCEPTIONAL = 3; // 異常完成
static final int CANCELLED = 4; // 已取消
static final int INTERRUPTING = 5; // 中斷中
static final int INTERRUPTED = 6; // 已中斷
// 結果存儲設計
private Object outcome; // 非volatile,依賴狀態可見性保證
2. CompletableFuture 回調鏈實現
// 回調節點結構(簡化版)
static final class UniCompletion<T,V> extends Completion {
Executor executor; // 執行線程池
CompletableFuture<V> dep; // 依賴的前序Future
BiFunction<? super T,? super Throwable,? extends V> fn; // 回調函數
void tryFire(int mode) { // 觸發回調執行
if (dep != null &&
compareAndSetState(0, 1)) { // CAS保證線程安全
fn.apply(src, ex); // 實際執行用户回調
}
}
}
七、生產環境最佳實踐
1. 異常處理模板
CompletableFuture.supplyAsync(() -> {
// 業務邏輯
return doSomething();
})
.exceptionally(ex -> { // 捕獲所有異常
log.error("任務失敗", ex);
return defaultValue; // 提供降級值
})
.thenAccept(result -> { // 只處理成功情況
updateUI(result);
});
2. 資源清理策略
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
try {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 使用try-with-resources確保資源釋放
try (Connection conn = getConnection()) {
process(conn);
}
}, executor);
future.whenComplete((r, ex) -> {
if (ex != null) {
cleanupTempFiles(); // 失敗時清理臨時文件
}
});
} finally {
executor.shutdown(); // 確保線程池關閉
}
3. 性能監控指標
// 監控Future完成時長
Timer.Sample sample = Timer.start();
future.whenComplete((r, ex) -> {
sample.stop(registry.timer("async.task.time"));
});
// 監控隊列積壓
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
metrics.gauge("task.queue.size", pool.getQueue()::size);
八、與其他模式的協作
1. 結合發佈-訂閲模式
EventBus bus = new EventBus();
CompletableFuture.supplyAsync(() -> fetchData())
.thenAccept(data -> {
bus.post(new DataReadyEvent(data)); // 異步事件通知
});
// 訂閲方處理
@Subscribe
void handleDataReady(DataReadyEvent event) {
// 處理已完成的數據
}
2. 與反應式編程整合
// CompletableFuture -> Mono
Mono.fromFuture(() -> {
return CompletableFuture.supplyAsync(() -> {
return reactiveDao.query();
});
}).subscribeOn(Schedulers.boundedElastic())
.subscribe(System.out::println);
// Mono -> CompletableFuture
reactorMono.toFuture().thenApply(...);
九、各語言實現對比
| 語言 | 核心實現類 | 特色功能 | 典型使用場景 |
|---|---|---|---|
| Java | CompletableFuture | 鏈式組合、CompletionStage | 服務端異步編排 |
| C# | Task | async/await語法糖 | UI線程非阻塞調用 |
| JavaScript | Promise | then/catch鏈式調用 | 前端API請求 |
| Python | asyncio.Future | 協程集成 | 爬蟲/高併發IO |
| Go | chan | 通道原生支持 | 高併發微服務 |
十、常見陷阱與解決方案
1. 回調地獄問題
反模式:
future.thenApply(r1 -> {
future2.thenApply(r2 -> {
future3.thenApply(r3 -> { // 嵌套層次過深
return r1 + r2 + r3;
});
});
});
解決方案:
// 使用組合式編程
CompletableFuture.allOf(future1, future2, future3)
.thenApply(v -> {
return future1.join() +
future2.join() +
future3.join();
});
2. 線程泄漏場景
問題代碼:
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture.runAsync(() -> {
while (true) { // 無限循環任務
process();
}
}, executor); // 線程永遠無法回收
正確做法:
// 使用守護線程或超時控制
ExecutorService executor = Executors.newFixedThreadPool(5, r -> {
Thread t = new Thread(r);
t.setDaemon(true); // 設置為守護線程
return t;
});
3. 上下文丟失問題
問題現象:
SecurityContext ctx = getContext();
CompletableFuture.runAsync(() -> {
// 此處無法獲取原始上下文
doPrivilegedAction();
}, executor);
解決方案:
// 使用ContextPropagator
ExecutorService wrappedExecutor = ContextPropagator.wrap(executor);
CompletableFuture.runAsync(() -> {
// 可以獲取原始上下文
doPrivilegedAction();
}, wrappedExecutor);