前言
CompletableFuture是jdk8的新特性。CompletableFuture的實現與使用上,處處體現出了函數式異步編程的味道。一個CompletableFuture對象可以被一個環節接一個環節的處理、也可以對兩個或者多個CompletableFuture進行組合處理或者等待結果完成。通過對CompletableFuture各種方法的合理使用與組合搭配,可以在很多的場景都可以應付自如。
CompletableFuture實現了CompletionStage接口和Future接口,前者是對後者的一個擴展,增加了異步會點、流式處理、多個Future組合處理的能力,使Java在處理多任務的協同工作時更加順暢便利。
假設現在需求如下:
從網上查詢某個產品的最低價格,例如可以從淘寶、京東、拼多多去獲取某個商品的價格、優惠金額,並計算出實際的付款金額,最終返回價格最低的價格信息。
這裏假設每個平台獲取原價格與優惠券的接口已經實現、且都是需要調用HTTP接口查詢的耗時操作,接口每個耗時1s左右。
根據需求理解,可以很自然的寫出對應實現代碼:
public int getCheapestPlatAndPrice(String product){
int taoBaoPrice = computeRealPrice(HttpRequestMock.getTaoBaoPrice(product), HttpRequestMock.getTaoBaoDiscounts(product));
int jingDongPrice = computeRealPrice(HttpRequestMock.getJingDongPrice(product), HttpRequestMock.getJingDongDiscounts(product));
int pinDuoDuoPrice = computeRealPrice(HttpRequestMock.getPinDuoDuoPrice(product), HttpRequestMock.getPinDuoDuoDiscounts(product));
// 計算並選出實際價格最低的平台
return Stream.of(taoBaoPrice, jingDongPrice, pinDuoDuoPrice).min(Comparator.comparingInt(p - > p)).get();
}
運行測試下:
14:58:32.330228700[main]獲取淘寶上iphone16的價格完成: 5199
14:58:33.351948100[main]獲取淘寶上iphone16的折扣價格完成: 200
14:58:33.352933400[main]計算實際價格完成: 4999
14:58:34.364138900[main]獲取京東上iphone16的價格完成: 5299
14:58:35.377258800[main]獲取京東上iphone16的折扣價格完成: 150
14:58:35.378257300[main]計算實際價格完成: 5149
14:58:36.392813800[main]獲取拼多多上iphone16的價格完成: 5399
14:58:37.405863200[main]獲取拼多多上iphone16的折扣價格完成: 99
14:58:37.406712600[main]計算實際價格完成: 5300
4999
耗時:6142ms
結果符合預期,功能正常,但是耗時較長。試想一下,假如你在某個APP操作需要等待6s才返回最終計算結果,那不得直接摔手機?
梳理下代碼的實現思路:
可以知道所有的環節都是串行實現的的,由於每個查詢接口的耗時都是1s,因此每個環節耗時加到一起,接口總耗時超過6s。
但實際上,每個平台之間的操作是互不干擾的,那其實就可以通過多線程的方式,同時去分別執行各個平台的邏輯處理,最後將各個平台的結果彙總到一起比對得到最低價格。
所以整個執行過程會變成如下的效果:
因此為了提升性能,可以採用線程池來負責多線程的處理操作,因為需要得到各個子線程處理的結果,所以需要使用 Future來實現:
public Integer getCheapestPlatAndPrice2(String product) {
Future <Integer> taoBaoFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getTaoBaoPrice(product), HttpRequestMock.getTaoBaoDiscounts(product)));
Future <Integer> jingDongFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getJingDongPrice(product), HttpRequestMock.getJingDongDiscounts(product)));
Future <Integer> pinDuoDuoFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getPinDuoDuoPrice(product), HttpRequestMock.getPinDuoDuoDiscounts(product)));
// 等待所有線程結果都處理完成,然後從結果中計算出最低價
return Stream.of(taoBaoFuture, jingDongFuture, pinDuoDuoFuture)
.map(price - > {
try {
return price.get();
} catch (Exception e) {
return null;
}
})
.min(Comparator.comparingInt(p - > p))
.get();
}
上述代碼中,將三個不同平台對應的Callable函數邏輯放入到ThreadPool中去執行,返回Future對象,然後再逐個通過Future.get()接口阻塞獲取各自平台的結果,最後經比較處理後返回最低價信息。
執行代碼,可以看到執行結果與過程如下:
15:19:25.793891500[pool-1-thread-3]獲取拼多多上iphone16的價格完成: 5399
15:19:25.793891500[pool-1-thread-2]獲取京東上iphone16的價格完成: 5299
15:19:25.794891500[pool-1-thread-1]獲取淘寶上iphone16的價格完成: 5199
15:19:26.816140300[pool-1-thread-2]獲取京東上iphone16的折扣價格完成: 150
15:19:26.816140300[pool-1-thread-3]獲取拼多多上iphone16的折扣價格完成: 99
15:19:26.816923600[pool-1-thread-3]計算實際價格完成: 5300
15:19:26.816923600[pool-1-thread-2]計算實際價格完成: 5149
15:19:26.817921500[pool-1-thread-1]獲取淘寶上iphone16的折扣價格完成: 200
15:19:26.820923400[pool-1-thread-1]計算實際價格完成: 4999
4999
耗時:2085ms
接口總耗時從6s下降到了2s,效果還是很顯著的。但是,是否還能再壓縮一些呢?
基於上面按照平台拆分並行處理的思路繼續推進,我們可以看出每個平台內的處理邏輯其實可以分為3個主要步驟:
- 獲取原始價格(耗時操作)
- 獲取折扣優惠(耗時操作)
- 得到原始價格和折扣優惠之後,計算實付價格
這3個步驟中,其實第1、2兩個耗時操作也是相對獨立的,如果也能並行處理的話,響應時長上應該也能繼續縮短,即如下的處理流程:
這裏當然也可以繼續使用上面提到的線程池+Future的方式,但Future在應對並行結果組合以及後續處理等方面顯得力不從心,弊端明顯:
代碼寫起來會非常拖沓:先封裝
Callable函數放到線程池中去執行查詢操作,然後分三組阻塞等待結果並計算出各自結果,最後再阻塞等待價格計算完成後彙總得到最終結果。
説到這裏呢,就需要CompletableFuture登場了,CompletableFuture可以很輕鬆的來完成任務的並行處理,以及各個並行任務結果之間的組合再處理等操作。使用CompletableFuture編寫實現代碼如下:
public Integer getCheapestPlatAndPrice3(String product) {
CompletableFuture <Integer> taoBao = CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoDiscounts(product)), this::computeRealPrice);
CompletableFuture <Integer> jingDong = CompletableFuture.supplyAsync(() -> HttpRequestMock.getJingDongPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getJingDongDiscounts(product)), this::computeRealPrice);
CompletableFuture <Integer> pinDuoDuo = CompletableFuture.supplyAsync(() -> HttpRequestMock.getPinDuoDuoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getPinDuoDuoDiscounts(product)), this::computeRealPrice);
// 排序並獲取最低價格
return Stream.of(taoBao, jingDong, pinDuoDuo)
.map(CompletableFuture::join)
.min(Comparator.comparingInt(p - > p))
.get();
}
看下執行結果符合預期,而接口耗時則降到了1s(因為依賴的每一個查詢實際操作的接口耗時都是模擬的1s,所以這個結果已經算是此複合接口能達到的極限值了)。
15:29:04.911516600[ForkJoinPool.commonPool-worker-1]獲取淘寶上iphone16的價格完成: 5199
15:29:04.911516600[ForkJoinPool.commonPool-worker-4]獲取京東上iphone16的折扣價格完成: 150
15:29:04.911516600[ForkJoinPool.commonPool-worker-2]獲取淘寶上iphone16的折扣價格完成: 200
15:29:04.911516600[ForkJoinPool.commonPool-worker-3]獲取京東上iphone16的價格完成: 5299
15:29:04.911516600[ForkJoinPool.commonPool-worker-5]獲取拼多多上iphone16的價格完成: 5399
15:29:04.911516600[ForkJoinPool.commonPool-worker-6]獲取拼多多上iphone16的折扣價格完成: 99
15:29:04.924568[ForkJoinPool.commonPool-worker-2]計算實際價格完成: 4999
15:29:04.924568[ForkJoinPool.commonPool-worker-3]計算實際價格完成: 5149
15:29:04.924568[ForkJoinPool.commonPool-worker-6]計算實際價格完成: 5300
4999
耗時:1071ms
這裏CompletableFuture執行時所使用的默認線程池是ForkJoinPool。
Future與CompletableFuture
首先,先來理一下Future與CompletableFuture之間的關係。
Future
如果接觸過多線程相關的概念,那Future應該不會陌生,早在Java5中就已經存在了。
該如何理解Future呢?舉個生活中的例子:
你去咖啡店點了一杯咖啡,然後服務員會給你一個訂單小票。 當服務員在後台製作咖啡的時候,你並沒有在店裏等待,而是出門到隔壁甜品店又買了個麪包。 當面包買好之後,你回到咖啡店,拿着訂單小票去取咖啡。 取到咖啡後,你邊喝咖啡邊把麪包吃了……嗝~
是不是很熟悉的生活場景? 對比到我們多線程異步編程的場景中,咖啡店的訂單小票其實就是Future,通過Future可以讓稍後適當的時候可以獲取到對應的異步執行線程中的執行結果。
上面的場景,我們翻譯為代碼實現邏輯:
public void buyCoffeeAndOthers() throws ExecutionException, InterruptedException {
goShopping();
// 子線程中去處理做咖啡這件事,返回future對象
Future<Coffee> coffeeTicket = threadPool.submit(this::makeCoffee);
// 主線程同步去做其他的事情
Bread bread = buySomeBread();
// 主線程其他事情並行處理完成,阻塞等待獲取子線程執行結果
Coffee coffee = coffeeTicket.get();
// 子線程結果獲取完成,主線程繼續執行
eatAndDrink(bread, coffee);
}
Future相關的瞭解可以看這篇文章:FutureTask是Future的基礎實現
CompletableFuture
Future在應對一些簡單且相互獨立的異步執行場景很便捷,但是在一些複雜的場景,比如同時需要多個有依賴關係的異步獨立處理的時候,或者是一些類似流水線的異步處理場景時,就顯得力不從心了。比如:
- 同時執行多個並行任務,等待最快的一個完成之後就可以繼續往後處理
- 多個異步任務,每個異步任務都需要依賴前一個異步任務執行的結果再去執行下一個異步任務,最後只需要一個最終的結果
- 獲取計算結果的
get()方法為阻塞調用
Java 8 才被引入CompletableFuture 類可以解決Future 的這些缺陷。CompletableFuture 除了提供了更為好用和強大的 Future 特性之外,還提供了函數式編程、異步任務編排組合(可以將多個異步任務串聯起來,組成一個完整的鏈式調用)等能力。
可以看到,CompletableFuture 同時實現了 Future 和 CompletionStage 接口。
CompletableFuture使用方式
創建CompletableFuture並執行
當需要進行異步處理的時候,可以通過CompletableFuture.supplyAsync方法,傳入一個具體的要執行的處理邏輯函數,這樣就輕鬆的完成了CompletableFuture的創建與觸發執行。
| 方法名稱 | 作用描述 |
|---|---|
| supplyAsync | 靜態方法,用於構建一個CompletableFuture<T>對象,並異步執行傳入的函數,允許執行函數有返回值T。 |
| runAsync | 靜態方法,用於構建一個CompletableFuture<Void>對象,並異步執行傳入函數,與supplyAsync的區別在於此方法傳入的是Callable類型,僅執行,沒有返回值。 |
使用示例:
public void testCreateFuture(String product) {
// supplyAsync, 執行邏輯有返回值Integer
CompletableFuture<Integer> supplyAsyncResult =
CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoPrice(product));
// runAsync, 執行邏輯沒有返回值
CompletableFuture<Void> runAsyncResult =
CompletableFuture.runAsync(() -> System.out.println(product));
}
特別補充:
supplyAsync或者runAsync創建後便會立即執行,無需手動調用觸發。
線程串行化方法
使用方法
在流水線處理場景中,往往都是一個任務環節處理完成後,下一個任務環節接着上一環節處理結果繼續處理。CompletableFuture用於這種流水線環節驅動類的方法有很多,相互之間主要是在返回值或者給到下一環節的入參上有些許差異,使用時需要注意區分:
具體的方法的描述歸納如下:
| 方法名稱 | 作用描述 |
|---|---|
| thenApply | 對CompletableFuture的執行後的具體結果進行追加處理,並將當前的CompletableFuture泛型對象更改為處理後新的對象類型,返回當前CompletableFuture對象。 |
| thenCompose | 與thenApply類似。區別點在於:此方法的入參函數是一個CompletableFuture類型對象,適用於回調函數需要啓動另一個異步計算,並且想要一個扁平化的結果CompletableFuture,而不是嵌套的CompletableFuture<CompletableFuture<U>> |
| thenAccept | 與thenApply方法類似,區別點在於thenAccept返回void類型,沒有具體結果輸出,適合無需返回值的場景。 |
| thenRun | 與thenAccept類似,區別點在於thenAccept可以將前面CompletableFuture執行的實際結果作為入參進行傳入並使用,但是thenRun方法沒有任何入參,只能執行一個Runnable函數,並且返回void類型。 |
因為上述thenApply、thenCompose方法的輸出仍然都是一個CompletableFuture對象,所以各個方法是可以一環接一環的進行調用,形成流水線式的處理邏輯:
thenApply
上面任務執行完執行 + 能獲取上步返回值 + 自己有返回值
@Test
public void thenApplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("thenApplyAsync當前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("thenApplyAsync運行結果:" + i);
return i;
}, executor).thenApply(result -> {
System.out.println("thenApplyAsync任務2啓動了。。。。。上步結果:" + result);
return "hello" + result * 2;
});
System.out.println("main.................end....." + thenApplyAsync.get());
}
結果:
thenApplyAsync當前線程:33
thenApplyAsync運行結果:5
thenApplyAsync任務2啓動了。。。。。上步結果:5
main.................end.....hello10
thenAccept
上面任務執行完執行 + 能獲取上步返回值
@Test
public void thenAcceptAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Void> thenAcceptAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("thenAcceptAsync當前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("thenAcceptAsync運行結果:" + i);
return i;
}, executor).thenAccept(result -> {
System.out.println("thenAcceptAsync任務2啓動了。。。。。上步結果:" + result);
});
}
結果:
thenAcceptAsync當前線程:33
thenAcceptAsync運行結果:5
thenAcceptAsync任務2啓動了。。。。。上步結果:5
thenRun
上面任務執行完執行
@Test
public void thenRunAsync() throws ExecutionException, InterruptedException {
System.out.println("main.................start.....");
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("當前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運行結果:" + i);
return i;
}, executor).thenRun(() -> {
System.out.println("任務2啓動了。。。。。");
});
}
結果:
main.................start.....
當前線程:33
運行結果:5
任務2啓動了。。。。。
thenCompose
接收返回值並生成新的任務
@Test
public void thenCompose() {
CompletableFuture cf = CompletableFuture.completedFuture("hello")
.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
return str + ": thenCompose";
},executor));
System.out.println(cf.join());
}
- thenApply():轉換的是泛型中的類型,相當於將CompletableFuture 轉換生成新的CompletableFuture
- thenCompose():用來連接兩個CompletableFuture,是生成一個新的CompletableFuture。
串聯示例
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
System.out.println("supplyAsync first");
return "first";
}, fixedThreadPool).thenApply(s -> {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
System.out.println("supplyAsync second");
return "second " + s;
}).whenComplete((s, t) -> {//s,是上面的返回值,t是上面可能會拋出的Throwable對象
if (t == null) {
System.out.println("whenComplete succeed:" + s);
} else {
System.out.println("whenComplete error");
}
});
System.out.println(future.get());
//結果:
supplyAsync first
supplyAsync second
whenComplete succeed:secondfirst
second first
線程並聯方法
很多時候為了提升並行效率,一些沒有依賴的環節我們會讓他們同時去執行,然後在某些環節需要依賴的時候,進行結果的依賴合併處理,類似如下圖的效果。
CompletableFuture相比於Future的一大優勢,就是可以方便的實現多個並行環節的合併處理。相關涉及方法介紹歸納如下:
| 方法名稱 | 作用描述 |
|---|---|
| thenCombine | 將兩個CompletableFuture對象組合起來進行下一步處理,可以拿到兩個執行結果,並傳給自己的執行函數進行下一步處理,最後返回一個新的CompletableFuture對象。 |
| thenAcceptBoth | 與thenCombine類似,區別點在於thenAcceptBoth傳入的執行函數沒有返回值,即thenAcceptBoth返回值為CompletableFuture<Void>。 |
| runAfterBoth | 等待兩個CompletableFuture都執行完成後再執行某個Runnable對象,再執行下一個的邏輯,類似thenRun。 |
| applyToEither | 兩個CompletableFuture中任意一個完成的時候,繼續執行後面給定的新的函數處理。再執行後面給定函數的邏輯,類似thenApply。 |
| acceptEither | 兩個CompletableFuture中任意一個完成的時候,繼續執行後面給定的新的函數處理。再執行後面給定函數的邏輯,類似thenAccept。 |
| runAfterEither | 等待兩個CompletableFuture中任意一個執行完成後再執行某個Runnable對象,可以理解為thenRun的升級版,注意與runAfterBoth對比理解。 |
| allOf | 靜態方法,阻塞等待所有給定的CompletableFuture執行結束後,返回一個CompletableFuture<Void>結果。 |
| anyOf | 靜態方法,阻塞等待任意一個給定的CompletableFuture對象執行結束後,返回一個CompletableFuture<Void>結果。 |
使用方法
thenCombine
消費兩個結果 + 返回結果
@Test
public void thenCombine() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任務1運行結果:" + i);
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getId());
System.out.println("任務2運行結果:");
return "hello";
}, executor);
CompletableFuture<String> thenCombineAsync = future1.thenCombine(future2, (result1, result2) -> {
System.out.println("任務5啓動。。。結果1:" + result1 + "。。。結果2:" + result2);
return result2 + "-->" + result1;
});
System.out.println("任務5結果" + thenCombineAsync.get());
}
結果:
任務1線程:33
任務1運行結果:5
任務2線程:34
任務2運行結果:
任務5啓動。。。結果1:5。。。結果2:hello
任務5結果hello-->5
thenAcceptBoth
消費兩個結果 + 無返回
@Test
public void thenAcceptBothAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任務1運行結果:" + i);
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getId());
System.out.println("任務2運行結果:");
return "hello";
}, executor);
CompletableFuture<Void> thenAcceptBothAsync = future1.thenAcceptBoth(future2, (result1, result2) -> {
System.out.println("任務4啓動。。。結果1:" + result1 + "。。。結果2:" + result2);
});
}
結果
任務1線程:33
任務1運行結果:5
任務2線程:34
任務2運行結果:
任務4啓動。。。結果1:5。。。結果2:hello
runAfterBoth
兩個任務都完成後,再接着運行
@Test
public void runAfterBothAsync() {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任務1運行結果:" + i);
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getId());
System.out.println("任務2運行結果:");
return "hello";
}, executor);
CompletableFuture<Void> runAfterBothAsync = future1.runAfterBoth(future2, () -> {
System.out.println("任務3啓動。。。");
});
}
結果
任務1線程:33
任務1運行結果:5
任務2線程:34
任務2運行結果:
任務3啓動。。。
applyToEither
只要有一個執行完就執行 + 獲取返回值 + 有返回值
@Test
public void applyToEither() throws ExecutionException, InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getId());
int i = 10 / 2;
try {
Thread.sleep(3000);
System.out.println("任務1運行結果:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getId());
System.out.println("任務2運行結果:");
return "hello";
}, executor);
CompletableFuture<String> applyToEitherAsync = future1.applyToEither(future2, result -> {
System.out.println("任務5開始執行。。。結果:" + result);
return result.toString() + " world";
});
System.out.println("任務5結果:" + applyToEitherAsync.get());
}
結果
任務1線程:33
任務2線程:34
任務2運行結果:
任務5開始執行。。。結果:hello
任務5結果:hello world
acceptEither
只要有一個執行完就執行 + 獲取返回值
@Test
public void acceptEither() {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getId());
int i = 10 / 2;
try {
Thread.sleep(3000);
System.out.println("任務1運行結果:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getId());
System.out.println("任務2運行結果:");
return "hello";
}, executor);
CompletableFuture<Void> acceptEitherAsync = future1.acceptEither(future2, result -> {
System.out.println("任務4開始執行。。。結果:" + result);
});
}
結果
任務1線程:33
任務2線程:34
任務2運行結果:
任務4開始執行。。。結果:hello
runAfterEither
只要有一個執行完就執行
@Test
public void runAfterEither() {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getId());
int i = 10 / 2;
try {
Thread.sleep(3000);
System.out.println("任務1運行結果:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getId());
System.out.println("任務2運行結果:");
return "hello";
}, executor);
CompletableFuture<Void> runAfterEitherAsync = future1.runAfterEither(future2, () -> {
System.out.println("任務3開始執行。。。");
});
}
結果
任務1線程:33
任務2線程:34
任務2運行結果:
任務3開始執行。。。
allOf
等待全部完成後才執行
@Test
public void allOf() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1");
return "任務1";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("任務2");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任務2";
}, executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務3");
return "任務3";
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
//等待所有任務完成
//allOf.get();
allOf.join();
System.out.println("allOf" + future1.get() + "-------" + future2.get() + "-------" + future3.get());
}
結果
任務1
任務3
任務2
allOf任務1-------任務2-------任務3
anyOf
等待其中之一完成後就執行
@Test
public void anyOf() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1");
return "任務1";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("任務2");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任務2";
}, executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務3");
return "任務3";
}, executor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
System.out.println("anyOf--最先完成的是" + anyOf.get());
//等待future2打印
System.out.println("等等任務2");
Thread.sleep(3000);
}
結果
任務1
anyOf--最先完成的是任務1
任務3
等等任務2
任務2
並聯示例
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
CompletableFuture<String> firstfuture = CompletableFuture.supplyAsync(() -> {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
System.out.println("supplyAsync first");
return "first";
}, fixedThreadPool);
CompletableFuture<String> secondfuture = CompletableFuture.supplyAsync(() -> {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
System.out.println("supplyAsync second");
return "second";
}, fixedThreadPool);
CompletableFuture<String> thirdfuture = CompletableFuture.supplyAsync(() -> {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
System.out.println("supplyAsync third");
return "third";
}, fixedThreadPool);
CompletableFuture.allOf(firstfuture, secondfuture, thirdfuture)
.whenComplete((aVoid, t) -> {
try {
System.out.println("whenComplete succeed:" + firstfuture.get() + secondfuture.get() + thirdfuture.get());
} catch (Exception e) {
System.out.println("error");
}
});
結果等待與獲取
在執行線程中將任務放到工作線程中進行處理的時候,執行線程與工作線程之間是異步執行的模式,如果執行線程需要獲取到共工作線程的執行結果,則可以通過get或者join方法,阻塞等待並從CompletableFuture中獲取對應的值。
對get和join的方法功能含義説明歸納如下:
| 方法名稱 | 作用描述 |
|---|---|
| get() | 等待CompletableFuture執行完成並獲取其具體執行結果,可能會拋出異常,需要代碼調用的地方手動try...catch進行處理。 |
| get(long, TimeUnit) | 與get()相同,只是允許設定阻塞等待超時時間,如果等待超過設定時間,則會拋出異常終止阻塞等待。 |
| join() | 等待CompletableFuture執行完成並獲取其具體執行結果,可能會拋出運行時異常,無需代碼調用的地方手動try...catch進行處理。 |
從介紹上可以看出,兩者的區別就在於是否需要調用方顯式的進行try...catch處理邏輯,使用代碼示例如下:
public void testGetAndJoin(String product) {
// join無需顯式try...catch...
PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
.join();
try {
// get顯式try...catch...
PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
.get(5L, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
異常處理
在編排流水線的時候,如果某一個環節執行拋出異常了,會導致整個流水線後續的環節就沒法再繼續下去了,比如下面的例子:
public void testExceptionHandle() {
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("supplyAsync excetion occurred...");
}).thenApply(obj -> {
System.out.println("thenApply executed...");
return obj;
}).join();
}
執行之後會發現,supplyAsync拋出異常後,後面的thenApply並沒有被執行。
那如果想要讓流水線的每個環節處理失敗之後都能讓流水線繼續往下面環節處理,讓後續環節可以拿到前面環節的結果或者是拋出的異常並進行對應的應對處理,就需要用到handle和whenCompletable方法了。
先看下兩個方法的作用描述:
| 方法名稱 | 作用描述 |
|---|---|
| handle | 與thenApply類似,區別點在於handle執行函數的入參有兩個,一個是CompletableFuture執行的實際結果,一個是Throwable對象,這樣如果前面執行出現異常的時候,可以通過handle獲取到異常並進行處理。 |
| whenComplete | 與handle類似,區別點在於whenComplete執行後無返回值。 |
| exceptionally | 捕獲異常並返回指定值 |
handle
入參為 結果 或者 異常,返回新結果
@Test
public void handle() throws ExecutionException, InterruptedException {
System.out.println("main.................start.....");
final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("當前線程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("運行結果:" + i);
return i;
}, executor).handleAsync((in, throwable) -> {
if (throwable != null) {
return "報錯返回";
}
return "正確了";
});
System.out.println("main.................end....." + completableFuture.get());
}
結果
main.................start.....
當前線程:33
main.................end.....報錯返回
whenComplete
whenComplete雖然得到異常信息,但是不能修改返回信息
@Test
public void whenComplete() {
System.out.println("main.................start.....");
final CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("當前線程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("運行結果:" + i);
return i;
}, executor).whenComplete((result, throwable) -> {
//whenComplete雖然得到異常信息,但是不能修改返回信息
System.out.println("異步完成。。。。結果是:" + result + "...異常是:" + throwable);
});
try {
System.out.println("main.................end..T..." + completableFuture.get());
} catch (InterruptedException e) {
System.out.println("報錯了1");
} catch (ExecutionException e) {
System.out.println("報錯了2");
}
}
結果
main.................start.....
當前線程:33
異步完成。。。。結果是:null...異常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: 除以零
報錯了2
exceptionally
@Test
public void exceptionally() throws ExecutionException, InterruptedException {
System.out.println("main.................start.....");
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("當前線程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("運行結果:" + i);
return i;
}, executor).exceptionally(throwable -> {
//R apply(T t);
//exceptionally可以感知錯誤並返回指定值
System.out.println("執行了exceptionally");
return 0;
});
System.out.println("main.................end....." + completableFuture.get());
}
結果
main.................start.....
當前線程:33
執行了exceptionally
main.................end.....0
實現超時
由於網絡波動或者連接節點下線等種種問題,對於大多數網絡異步任務的執行常常會進行超時限制,在異步開發中可以看成是一個常見的問題。
在 Java 9 中,CompletableFuture 引入了支持超時和延遲執行的改進,這兩個功能對於控制異步操作行為至關重要。
orTimeout()
允許為 CompletableFuture 設置一個超時時間。如果在指定的超時時間內未完成,CompletableFuture 將以 TimeoutException 完成
- 示例
@Test
public void orTimeTest() {
try {
CompletableFuture completableFuture = CompletableFuture.runAsync(() - > {
System.out.println("異步任務開始執行....");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).orTimeout(2, TimeUnit.SECONDS);
completableFuture.join();
} catch (Exception e) {
System.out.println(e);
}
}
completeOnTimeout()
允許在指定的超時時間內如果未完成,則用一個默認值來完成 CompletableFuture。該方法提供了一種優雅的回退機制,確保即使在超時的情況下也能保持異步流的連續性和完整性。
- 示例
@Test
public void completeOnTimeoutTest() {
CompletableFuture <String> completableFuture = CompletableFuture.supplyAsync(() - > {
System.out.println("異步任務開始執行....");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "死磕 Java 新特性";
}).completeOnTimeout("死磕 Java", 2, TimeUnit.SECONDS);
System.out.println("執行結果為:" + completableFuture.join());
}
延遲執行
CompletableFuture 提供了delayedExecutor() 來支持延遲執行,該方法創建一個延遲執行的 Executor,可以將任務的執行推遲到未來某個時間點。能夠讓我們更加精確地控制異步任務的執行時機,特別是在需要根據時間安排任務執行的場景中。
- 示例
@Test
public void completeOnTimeoutTest() {
// 創建一個延遲執行的Executor
Executor delayedExecutor = CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS);
// 使用延遲的Executor執行一個簡單任務
CompletableFuture <Void> future = CompletableFuture.runAsync(() - > {
System.out.println("任務延遲後執行...");
}, delayedExecutor);
// 等待異步任務完成
future.join();
}
CompletableFuture的Async版本
在使用CompletableFuture的時候會發現,有很多的方法,都會同時有兩個以Async命名結尾的方法版本。以thenCombine方法為例:
- thenCombine(CompletionStage, BiFunction)
- thenCombineAsync(CompletionStage, BiFunction)
- thenCombineAsync(CompletionStage, BiFunction, Executor)
從參數上看,區別並不大,僅第三個方法入參中多了線程池Executor對象。看下三個方法的源碼實現,會發現其整體實現邏輯都是一致的,僅僅是使用線程池這個地方的邏輯有一點點的差異:
有興趣的可以去翻一下此部分的源碼實現,這裏概括下三者的區別:
- thenCombine方法,沿用上一個執行任務所使用的線程池進行處理
- thenCombineAsync兩個入參的方法,使用默認的ForkJoinPool線程池中的工作線程進行處理
- themCombineAsync三個入參的方法,支持自定義線程池並指定使用自定義線程池中的線程作為工作線程去處理待執行任務。
為了更好的理解下上述的三個差異點,通過下面的代碼來演示下:
- **用法1: **其中thenCombineAsync指定使用自定義線程池,supplyAsync方法不指定線程池(使用默認線程池)
public PriceResult getCheapestPlatAndPrice4(String product) {
// 構造自定義線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
return
CompletableFuture.supplyAsync(
() -> HttpRequestMock.getPinDuoDuoPrice(product)
).thenCombineAsync(
CompletableFuture.supplyAsync(() -> HttpRequestMock.getPinDuoDuoDiscounts(product)),
this::computeRealPrice,
executor
).join();
}
沒有指定自定義線程池的supplyAsync方法,其使用了默認的ForkJoinPool工作線程來運行,而指定了自定義線程池的方法,則使用了自定義線程池來執行。
17:23:50.683636700[ForkJoinPool.commonPool-worker-1]獲取拼多多上iphone16的價格完成: 5399
17:23:50.683636700[ForkJoinPool.commonPool-worker-2]獲取拼多多上iphone16的折扣價格完成: 99
17:23:50.696637100[pool-2-thread-1]計算實際價格完成: 5300
5300
耗時:1079ms
- 用法2: 不指定自定義線程池,使用默認線程池策略,使用thenCombine方法
public PriceResult getCheapestPlatAndPrice5(String product) {
return
CompletableFuture.supplyAsync(
() -> HttpRequestMock.getPinDuoDuoPrice(product)
).thenCombine(
CompletableFuture.supplyAsync(() -> HttpRequestMock.getPinDuoDuoDiscounts(product)),
this::computeRealPrice
).join();
}
執行結果如下,可以看到執行線程名稱與用法1示例相比發生了變化。因為沒有指定線程池,所以兩個supplyAsync方法都是用的默認的ForkJoinPool線程池,而thenCombine使用的是上一個任務所使用的線程池,所以也是用的ForkJoinPool。
17:24:53.840945700[ForkJoinPool.commonPool-worker-2]獲取拼多多上iphone16的折扣價格完成: 99
17:24:53.840945700[ForkJoinPool.commonPool-worker-1]獲取拼多多上iphone16的價格完成: 5399
17:24:53.850944100[ForkJoinPool.commonPool-worker-1]計算實際價格完成: 5300
5300
耗時:1083ms
現在,我們知道了方法名稱帶有Async和不帶Async的實現策略上的差異點就在於使用哪個線程池來執行而已。那麼,對我們實際的指導意義是啥呢?實際使用的時候,應該怎麼判斷自己應該使用帶Async結尾的方法、還是不帶Async結尾的方法呢?
上面是Async結尾方法默認使用的ForkJoinPool創建的邏輯,這裏可以看出,默認的線程池中的工作線程數是CPU核數 - 1,並且指定了默認的丟棄策略等,這就是一個主要關鍵點。所以説,符合以下幾個條件的時候,可以考慮使用帶有Async後綴的方法,指定自定義線程池:
- 默認線程池的線程數滿足不了實際訴求
- 默認線程池的類型不符合自己業務訴求
- 默認線程池的隊列滿處理策略不滿足自己訴求
使用注意點
與Stream結合
在涉及批量進行並行處理的時候,通過Stream與CompletableFuture結合使用,可以簡化很多編碼邏輯。但是在使用細節方面需要注意下,避免達不到使用CompletableFuture的預期效果。
需求場景: 在同一個平台內,傳入多個商品,查詢不同商品對應的價格與優惠信息,並選出實付價格最低的商品信息。
結合前面的介紹分析,我們應該知道最佳的方式,就是同時並行的方式去各自請求數據,最後合併處理即可。所以我們規劃按照如下的策略來實現:
先看第一種編碼實現:
public int comparePriceInOnePlat(List <String> products) {
return products.stream()
.map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoDiscounts(product)),
this::computeRealPrice))
.map(CompletableFuture::join)
.min(Comparator.comparingInt(p -> p))
.get();
}
對於List的處理場景,這裏採用了Stream方式來進行遍歷與結果的收集、排序與返回。看似正常,但是執行的時候會發現,並沒有達到我們預期的效果:
16:59:22.384338900[ForkJoinPool.commonPool-worker-2]獲取淘寶上iphone16的折扣價格完成: 200
16:59:22.384338900[ForkJoinPool.commonPool-worker-1]獲取淘寶上iphone16的價格完成: 5199
16:59:22.396881[ForkJoinPool.commonPool-worker-1]計算實際價格完成: 4999
16:59:23.404683800[ForkJoinPool.commonPool-worker-2]獲取淘寶上iphone17的折扣價格完成: 200
16:59:23.404683800[ForkJoinPool.commonPool-worker-1]獲取淘寶上iphone17的價格完成: 5199
16:59:23.404683800[ForkJoinPool.commonPool-worker-1]計算實際價格完成: 4999
16:59:24.416418500[ForkJoinPool.commonPool-worker-2]獲取淘寶上iphone18的折扣價格完成: 200
16:59:24.417266700[ForkJoinPool.commonPool-worker-1]獲取淘寶上iphone18的價格完成: 5199
16:59:24.417266700[ForkJoinPool.commonPool-worker-1]計算實際價格完成: 4999
4999
耗時:3116ms
從上述執行結果可以看出,其具體處理的時候,其實是按照下面的邏輯去處理了:
為什麼會出現這種實際與預期的差異呢?原因就在於使用的Stream上面!雖然Stream中使用兩個map方法,但Stream處理的時候並不會分別遍歷兩遍,其實寫法等同於下面這種寫到1個map中處理,改為下面這種寫法,其實也就更容易明白為啥會沒有達到我們預期的整體並行效果了:
public int comparePriceInOnePlat1(List < String > products) {
return products.stream()
.map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoDiscounts(product)), this::computeRealPrice).join())
.min(Comparator.comparingInt(p -> p))
.get();
}
既然如此,這種場景是不是就不能使用Stream了呢?也不是,其實拆開成兩個Stream分步操作下其實就可以了。
再看下面的第二種實現代碼:
public int comparePriceInOnePlat2(List < String > products) {
// 先觸發各自平台的並行處理
List <CompletableFuture <Integer>> completableFutures = products.stream()
.map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getTaoBaoDiscounts(product)), this::computeRealPrice))
.collect(Collectors.toList());
// 在獨立的流中,等待所有並行處理結束,做最終結果處理
return completableFutures.stream()
.map(CompletableFuture::join)
.min(Comparator.comparingInt(p -> p))
.get();
}
執行結果:
17:08:00.052684200[ForkJoinPool.commonPool-worker-2]獲取淘寶上iphone16的折扣價格完成: 200
17:08:00.051681700[ForkJoinPool.commonPool-worker-5]獲取淘寶上iphone18的價格完成: 5199
17:08:00.051681700[ForkJoinPool.commonPool-worker-6]獲取淘寶上iphone18的折扣價格完成: 200
17:08:00.052684200[ForkJoinPool.commonPool-worker-3]獲取淘寶上iphone17的價格完成: 5199
17:08:00.051681700[ForkJoinPool.commonPool-worker-1]獲取淘寶上iphone16的價格完成: 5199
17:08:00.051681700[ForkJoinPool.commonPool-worker-4]獲取淘寶上iphone17的折扣價格完成: 200
17:08:00.064680500[ForkJoinPool.commonPool-worker-4]計算實際價格完成: 4999
17:08:00.064680500[ForkJoinPool.commonPool-worker-1]計算實際價格完成: 4999
17:08:00.063680100[ForkJoinPool.commonPool-worker-6]計算實際價格完成: 4999
4999
耗時:1083ms
從執行結果可以看出,三個商品並行處理,整體處理耗時相比前面編碼方式有很大提升,達到了預期的效果。
歸納下:因為Stream的操作具有惰性執行的特點,且只有遇到終止操作(比如collect方法)的時候才會真正的執行。所以遇到這種需要並行處理且需要合併多個並行處理流程的情況下,需要將並行流程與合併邏輯放到兩個Stream中,這樣分別觸發完成各自的處理邏輯,就可以了。
使用自定義線程池
CompletableFuture 默認使用ForkJoinPool.commonPool() 作為執行器,這個線程池是全局共享的,可能會被其他任務佔用,導致性能下降或者飢餓。因此,建議使用自定義的線程池來執行 CompletableFuture 的異步任務,可以提高併發度和靈活性。
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
CompletableFuture.runAsync(() -> {
//...
}, executor);
儘量避免使用get()
CompletableFuture的get()方法是阻塞的,儘量避免使用。如果必須要使用的話,需要添加超時時間,否則可能會導致主線程一直等待,無法執行其他任務。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, world!";
});
// 獲取異步任務的返回值,設置超時時間為 5 秒
try {
String result = future.get(5, TimeUnit.SECONDS);
System.out.println(result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// 處理異常
e.printStackTrace();
}
}