動態

詳情 返回 返回

併發設計模式實戰:Future/Promise - 動態 詳情

今天為大家帶來的是併發設計模式實戰系列,第十五章Future/Promise,廢話不多説直接開始~

一、核心原理深度拆解

1. 異步計算雙階段模型

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Task      │───>│   Future    │───>│   Callback  │
│ Submission  │<───│  (Promise)  │<───│  Execution  │
└─────────────┘    └─────────────┘    └─────────────┘
  • 提交階段:主線程提交任務後立即返回Future佔位符
  • 計算階段:工作線程異步執行計算,通過Promise設置結果
  • 回調階段:結果就緒後觸發回調(觀察者模式)

2. 狀態機流轉

public interface Future<V> {
    boolean isDone();      // 完成狀態(成功/失敗/取消)
    V get() throws...;     // 阻塞獲取結果
    void addCallback(...); // 回調註冊
}

二、生活化類比:快遞櫃取件

系統組件 現實類比 核心行為
Future 快遞櫃取件碼 憑碼查詢包裹是否到達
Promise 快遞員存件操作 實際將包裹放入櫃中並更新狀態
Callback 短信通知服務 包裹入櫃後自動發送取件提醒
  • 異步流程:下單→獲得取件碼(Future)→快遞員送貨(異步計算)→短信通知(Callback)

三、Java代碼實現(生產級Demo)

1. 完整可運行代碼

import java.util.concurrent.*;
import java.util.function.Consumer;

public class FuturePromiseDemo {

    // 1. 自定義Promise實現
    static class MyPromise<V> implements Future<V>, Runnable {
        private volatile V result;
        private volatile Throwable error;
        private volatile boolean isDone;
        private final CountDownLatch latch = new CountDownLatch(1);
        private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();

        // 提交任務時執行的方法
        @Override
        public void run() {
            try {
                // 模擬耗時計算
                Thread.sleep(1000);
                setResult((V) "計算結果"); // 實際業務邏輯替換此處
            } catch (Exception e) {
                setError(e);
            }
        }

        // Promise核心方法:設置結果
        public void setResult(V result) {
            this.result = result;
            this.isDone = true;
            latch.countDown();
            notifyCallbacks();
        }

        // Promise核心方法:設置異常
        public void setError(Throwable error) {
            this.error = error;
            this.isDone = true;
            latch.countDown();
        }

        private void notifyCallbacks() {
            callbacks.forEach(cb -> cb.accept(result));
        }

        // Future實現方法
        @Override
        public V get() throws InterruptedException, ExecutionException {
            latch.await();
            if (error != null) throw new ExecutionException(error);
            return result;
        }

        @Override
        public boolean isDone() {
            return isDone;
        }

        // 註冊回調(非JUC標準方法)
        public void addCallback(Consumer<V> callback) {
            if (isDone) {
                callback.accept(result);
            } else {
                callbacks.add(callback);
            }
        }
    }

    // 2. 使用示例
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        
        // 創建Promise並提交任務
        MyPromise<String> promise = new MyPromise<>();
        executor.submit(promise);

        // 註冊回調
        promise.addCallback(result -> 
            System.out.println("[回調] 異步結果: " + result));

        // 同步阻塞獲取
        System.out.println("[主線程] 立即返回,繼續其他工作...");
        System.out.println("最終結果: " + promise.get());

        executor.shutdown();
    }
}

2. 關鍵機制説明

// 1. 狀態同步控制
private volatile boolean isDone;  // 保證可見性
private final CountDownLatch latch; // 實現阻塞等待

// 2. 線程安全回調列表
private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();

// 3. 異常處理流程
public void setError(Throwable error) {
    this.error = error;
    this.isDone = true;
    latch.countDown(); // 喚醒所有等待線程
}

四、橫向對比表格

1. 異步模式對比

模式 核心特點 適用場景 Java實現類
Future 阻塞式獲取結果 簡單異步任務 FutureTask
CompletableFuture 鏈式調用+組合操作 複雜異步流水線 CompletableFuture
Promise 可寫的結果容器 跨線程結果傳遞 需自行實現
Callback 事件驅動無阻塞 高併發IO Netty的ChannelFuture

2. 回調註冊方式對比

方法 觸發時機 線程安全性 鏈式支持
addCallback 結果就緒後立即執行 需自行保證 不支持
thenApply 前序階段完成後觸發 內置線程池控制 支持
whenComplete 無論成功失敗都執行 可能在不同線程執行 支持

五、高級應用技巧

1. 組合多個異步任務

CompletableFuture<String> query1 = queryDatabase("sql1");
CompletableFuture<String> query2 = queryDatabase("sql2");

// 並行執行後合併結果
CompletableFuture<String> merged = query1.thenCombineAsync(query2, 
    (r1, r2) -> r1 + "|" + r2,
    ForkJoinPool.commonPool());

2. 超時控制

Future<String> future = executor.submit(task);
try {
    String result = future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true); // 中斷任務執行
}

3. 回調線程控制

promise.addCallback(result -> {
    // 指定回調執行線程池
    ForkJoinPool.commonPool().execute(() -> processResult(result));
});

通過這種 原理+實現+對比 的立體解析,可以掌握:

  1. Future/Promise的雙階段異步本質
  2. 如何實現生產級的Promise容器
  3. 不同異步模式的適用場景選擇
  4. 複雜場景下的組合使用技巧

六、源碼級實現剖析(接五)

1. JDK FutureTask 核心邏輯

// 狀態機定義(OpenJDK 17)
private volatile int state;
static final int NEW          = 0; // 初始化狀態
static final int COMPLETING   = 1; // 臨時狀態
static final int NORMAL       = 2; // 正常完成
static final int EXCEPTIONAL  = 3; // 異常完成
static final int CANCELLED    = 4; // 已取消
static final int INTERRUPTING = 5; // 中斷中
static final int INTERRUPTED  = 6; // 已中斷

// 結果存儲設計
private Object outcome; // 非volatile,依賴狀態可見性保證

2. CompletableFuture 回調鏈實現

// 回調節點結構(簡化版)
static final class UniCompletion<T,V> extends Completion {
    Executor executor;         // 執行線程池
    CompletableFuture<V> dep;   // 依賴的前序Future
    BiFunction<? super T,? super Throwable,? extends V> fn; // 回調函數

    void tryFire(int mode) {    // 觸發回調執行
        if (dep != null && 
            compareAndSetState(0, 1)) { // CAS保證線程安全
            fn.apply(src, ex);  // 實際執行用户回調
        }
    }
}

七、生產環境最佳實踐

1. 異常處理模板

CompletableFuture.supplyAsync(() -> {
        // 業務邏輯
        return doSomething();
    })
    .exceptionally(ex -> {      // 捕獲所有異常
        log.error("任務失敗", ex);
        return defaultValue;    // 提供降級值
    })
    .thenAccept(result -> {     // 只處理成功情況
        updateUI(result); 
    });

2. 資源清理策略

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
try {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 使用try-with-resources確保資源釋放
        try (Connection conn = getConnection()) {
            process(conn);
        }
    }, executor);
    
    future.whenComplete((r, ex) -> {
        if (ex != null) {
            cleanupTempFiles(); // 失敗時清理臨時文件
        }
    });
} finally {
    executor.shutdown(); // 確保線程池關閉
}

3. 性能監控指標

// 監控Future完成時長
Timer.Sample sample = Timer.start();
future.whenComplete((r, ex) -> {
    sample.stop(registry.timer("async.task.time"));
});

// 監控隊列積壓
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
metrics.gauge("task.queue.size", pool.getQueue()::size);

八、與其他模式的協作

1. 結合發佈-訂閲模式

EventBus bus = new EventBus();
CompletableFuture.supplyAsync(() -> fetchData())
    .thenAccept(data -> {
        bus.post(new DataReadyEvent(data)); // 異步事件通知
    });

// 訂閲方處理
@Subscribe
void handleDataReady(DataReadyEvent event) {
    // 處理已完成的數據
}

2. 與反應式編程整合

// CompletableFuture -> Mono
Mono.fromFuture(() -> {
    return CompletableFuture.supplyAsync(() -> {
        return reactiveDao.query();
    });
}).subscribeOn(Schedulers.boundedElastic())
  .subscribe(System.out::println);

// Mono -> CompletableFuture
reactorMono.toFuture().thenApply(...);

九、各語言實現對比

語言 核心實現類 特色功能 典型使用場景
Java CompletableFuture 鏈式組合、CompletionStage 服務端異步編排
C# Task async/await語法糖 UI線程非阻塞調用
JavaScript Promise then/catch鏈式調用 前端API請求
Python asyncio.Future 協程集成 爬蟲/高併發IO
Go chan 通道原生支持 高併發微服務

十、常見陷阱與解決方案

1. 回調地獄問題

反模式:

future.thenApply(r1 -> {
    future2.thenApply(r2 -> {
        future3.thenApply(r3 -> {  // 嵌套層次過深
            return r1 + r2 + r3;
        });
    });
});

解決方案:

// 使用組合式編程
CompletableFuture.allOf(future1, future2, future3)
    .thenApply(v -> {
        return future1.join() + 
               future2.join() + 
               future3.join();
    });

2. 線程泄漏場景

問題代碼:

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture.runAsync(() -> {
    while (true) {  // 無限循環任務
        process();
    }
}, executor);  // 線程永遠無法回收

正確做法:

// 使用守護線程或超時控制
ExecutorService executor = Executors.newFixedThreadPool(5, r -> {
    Thread t = new Thread(r);
    t.setDaemon(true);  // 設置為守護線程
    return t;
});

3. 上下文丟失問題

問題現象:

SecurityContext ctx = getContext();
CompletableFuture.runAsync(() -> {
    // 此處無法獲取原始上下文
    doPrivilegedAction(); 
}, executor);

解決方案:

// 使用ContextPropagator
ExecutorService wrappedExecutor = ContextPropagator.wrap(executor);
CompletableFuture.runAsync(() -> {
    // 可以獲取原始上下文
    doPrivilegedAction();
}, wrappedExecutor);

Add a new 評論

Some HTML is okay.