1、背景
在我們的項目中有這麼一個場景,需要消費kafka中的消息,並生成對應的工單數據。早些時候程序運行的好好的,但是有一天,我們升級了容器的配置,結果導致部分消息無法消費。而消費者的代碼是使用CompletableFuture.runAsync(() -> {while (true){ ..... }}) 來實現的。
即:
- 需要消費Kafka topic的個數: 7個,每個線程消費一個topic
- 消費方式:使用線程池異步消費
- 消費池:默認的
ForkJoin線程池???,並且沒有做任何配置 - 是否會釋放線程池中的核心線程: 不會釋放
- 沒出問題時容器配置:
2核4G - 出問題時容器配置:
4核8G,影響的結果:只有3個topic的數據可以消費。
2、容器2核4G可以正常消費
即:此時程序會啓動7個線程來進行消費。
3、容器4核8G只有部分可以消費
即:此時程序會啓動3個線程來進行消費。
4、問題原因分析
1、通過上面的背景我們可以知道,是因為升級了容器的配置,才導致我們消費kafka中的消息失敗了。
2、針對kafka中的每個topic,我們都會使用一個單獨的線程來消費,並且不會釋放這個線程。\
3、而線程的啓動方式是通過CompletableFuture.runAsync()方法來啓動的,那麼通過這種方式啓動的線程,是每個任務一個啓動一個線程,還是隻啓動固定的線程呢?.
通過以上分析,那麼問題肯定是出現在線程池身上,那麼我們默認使用的是什麼線程池呢?查看CompletableFuture.runAsync()的源碼可知,有一定的機率是ForkJoinPool。那麼我們一起看下源碼。
5、源碼分析
1、確認使用什麼線程池
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
通過上述源碼可知,我們可能使用的ForkJoin線程池,也可能使用的是ThreadPerTaskExecutor線程池。
ThreadPerTaskExecutor這個是每個任務,一個線程。ForkJoinPool那麼就需要確定啓動了多少個線程。
2、確認是否使用 ForkJoin 線程池
需要確定 useCommonPool 字段是如何賦值的。
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
通過上面代碼可知,是否使用ForkJoin線程池,是由 ForkJoinPool.getCommonPoolParallelism()的值確定的。(即並行度是否大於1,大於則使用ForkJoin線程池)
public static int getCommonPoolParallelism() {
return commonParallelism;
}
3、commonParallelism 的賦值
1、從上圖中可知parallelism的設置有2種方式
- 通過Jvm的啓動參數
java.util.concurrent.ForkJoinPool.common.parallelism進行設置,且這個值最大為MAX_CAP即32727。 - 若沒有通過Jvm的參數配置,則有
2種情況,若cpu的核數<=1,則返回1,否則返回cpu的核數-1
2、commonParallelism的取值
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
SMASK 的值是 65535。
common.config 的值就是 (parallelism & SMASK) | 0的值,即最大為65535,若parallelism的值為0,則返回0。\
int par = common.config & SMASK ,即最大為 65535
commonParallelism = par > 0 ? par : 1 的值就為 parallelism的值或1
6、結論
結論:\
由上面的知識點,我們可以得出,當我們的容器是2核4G時,程序選擇的線程池是ThreadPerTaskExecutor,當我們的容器是4核8G時,程序選擇的線程池是ForkJoinPool。