博客 / 詳情

返回

頁面查詢多項數據組合的線程池設計 | 京東雲技術團隊

背景

我們應對併發場景時一般會採用下面方式去預估線程池的線程數量,比如QPS需求是1000,平均每個任務需要執行的時間是t秒,那麼我們需要的線程數是t * 1000。

但是在一些情況下,這個t是不好估算的,即便是估算出來了,在實際的線程環境上也需要進行驗證和微調。比如在本文所闡述分頁查詢的數據項組合場景中。

1、數據組合依賴不同的上游接接口, 它們的響應時間參差不齊,甚至差距還非常大。有些接口支持批量查詢而另一些則不支持批量查詢。有些接口因為性能問題還需要考慮降級和平滑方案。

2、為了提升用户體驗,這裏的查詢設計了動態列,因此每一次訪問所需要組合的數據項和數量也是不同的。

因此這裏如果需要估算出一個合理的t是不太現實的。

方案

一種可動態調節的策略,根據監控的反饋對線程池進行微調。整體設計分為裝配邏輯線程池封裝設計。

1、裝配邏輯

查詢結果,拆分分片(水平拆分),並行裝配(垂直拆分),獲得裝配項列表(動態列), 並行裝配每一項。

2、線程池封裝

可調節的核心線程數、最大線程數、線程保持時間,隊列大小,提交任務重試等待時間,提交任務重試次數。 固定異常拒絕策略。

調節參數:

字段 名稱 説明
corePoolSize 核心線程數 參考線程池定義
maximumPoolSize 最大線程數 參考線程池定義
keepAliveTime 線程存活時間 參考線程池定義
queueSize 隊列長度 參考線程池定義
resubmitSleepMillis 提交任務重試等待時間 添加任務被拒絕後重試時的等待時間
resubmitTimes 提交任務重試次數 添加任務被拒絕後重試添加的最大次數
    @Data
    private static class PoolPolicy {

        /** 核心線程數 */
        private Integer corePoolSize;

        /** 最大線程數 */
        private Integer maximumPoolSize;

        /** 線程存活時間 */
        private Integer keepAliveTime;

        /** 隊列容量 */
        private Integer queueSize;

        /** 重試等待時間 */
        private Long resubmitSleepMillis;

        /** 重試次數 */
        private Integer resubmitTimes;
    }

創建線程池:

線程池的創建考慮了動態的需求,滿足根據壓測結果進行微調的要求。首先緩存舊的線程池後再創建新的線程,當新的線程池創建成功後再去關閉舊的線程池。保證在這個替換過程中不影響正在執行的業務。線程池使用了中斷策略,用户可以及時感知到系統繁忙併保證了系統資源佔用的安全。

public void reloadThreadPool(PoolPolicy poolPolicy) {
    if (poolPolicy == null) {
        throw new RuntimeException("The thread pool policy cannot be empty.");
    }
    if (poolPolicy.getCorePoolSize() == null) {
        poolPolicy.setCorePoolSize(0);
    }
    if (poolPolicy.getMaximumPoolSize() == null) {
        poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);
    }
    if (poolPolicy.getKeepAliveTime() == null) {
        poolPolicy.setKeepAliveTime(60);
    }
    if (poolPolicy.getQueueSize() == null) {
        poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);
    }
    if (poolPolicy.getResubmitSleepMillis() == null) {
        poolPolicy.setResubmitSleepMillis(200L);
    }
    if (poolPolicy.getResubmitTimes() == null) {
        poolPolicy.setResubmitTimes(5);
    }
    // - 線程池策略沒有變化直接返回已有線程池。
    ExecutorService original = this.executorService;
    this.executorService = new ThreadPoolExecutor(
            poolPolicy.getCorePoolSize(),
            poolPolicy.getMaximumPoolSize(),
            poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),
            new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.AbortPolicy());
    this.poolPolicy = poolPolicy;
    if (original != null) {
        original.shutdownNow();
    }
}

任務提交:

線程池封裝對象中使用的線程池拒絕策略是AbortPolicy,因此在線程數和阻塞隊列到達上限後會觸發異常。另外在這裏為了保證提交的成功率利用重試策略實現了一定程度的延遲處理,具體場景中可以結合業務特點進行適當的調節和配置。

public <T> Future<T> submit(Callable<T> task) {
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任務
            future = this.executorService.submit(task);
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        throw exception;
    }
    return future;
}

監控:

1、submit提交的監控

見代碼中的「監控點①」,在submit方法中添加監控點,監控key的需要添線程池封裝對象的線程名稱前綴,用於區分具體的線程池對象。

「監控點①」用於監控添加任務的動作是否正常,以便對線程池對象及策略參數進行微調。

public <T> Future<T> submit(Callable<T> task) {
    // - 監控點①
    CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,
                UmpConstant.APP_NAME,
                UmpConstant.UMP_DISABLE_HEART,
                UmpConstant.UMP_ENABLE_TP);
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任務
            future = this.executorService.submit(task);
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        // - 監控點①
        Profiler.functionError(callerInfo);
        throw exception;
    }
    // - 監控點①
    Profiler.registerInfoEnd(callerInfo);
    return future;
}

2、線程池並行任務

見代碼的「監控點②」,分別在添加任務和任務完成後。

「監控點②」實時統計在線程中執行的總任務數量,用於評估線程池的任務的數量的滿載水平。

/** 任務並行數量統計 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);

public <T> Future<T> submit(Callable<T> task) {
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任務
            future = this.executorService.submit(()-> {
                T rst = task.call();
                // - 監控點②
                log.info("{} - Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.decrementAndGet());
                return rst;
            });
            // - 監控點②
            log.info("{} + Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.incrementAndGet());
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        throw exception;
    }
    return future;
}

3、調節

線程池封裝對象策略的調節時機

1)上線前基於流量預估的壓測階段;

2)上線後跟進監控數據和線程池中任務的滿載水平進行人工微調,也可以通過JOB在指定的時間自動調整;

3)大促前依據往期大促峯值來調高相關參數。

線程池封裝對象策略的調節經驗

1)訪問時長要求較低時,我們可以考慮調小線程數和阻塞隊列,適當調大提交任務重試等待時間和次數,以便降低資源佔用。

2)訪問時長要求較高時,就需要調大線程數並保證相對較小的阻塞隊列,調小提交任務的重試等待時間和次數甚至分別調成0和1(即關閉重試提交邏輯)。

作者:京東零售 王文明

來源:京東雲開發者社區 轉載請註明來源

user avatar xuezhongyu01 頭像
1 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.