從不足到精進:H5 即開並行加載方案的演進之路

新聞
HongKong
3
03:41 PM · Dec 04 ,2025

作者: vivo 互聯網客户端團隊- Chen Long

並行加載是 H5 即開 SDK 的加速技術,通過 native 層在用户打開頁面時並行請求關鍵資源(如 index.html 和 CSR 模式 API),利用 webview 初始化時間窗口提前發起請求,減少加載耗時。其核心挑戰是解決 webview 與並行任務間的資源交接問題。

 

1分鐘看圖掌握核心觀點👇

一、並行加載能力核心解析

1.1 什麼是並行加載

並行加載是H5即開SDK提供的一項加速能力,核心邏輯是:在用户打開頁面時,通過native層並行請求關鍵資源,減少頁面加載耗時。其本質是利用webview及native頁面初始化的時間窗口,提前發起資源請求,實現"時間複用"。

 

1.2 核心加載資源

並行加載的關鍵資源包含兩類:

  • H5頁面首幀渲染依賴的index.html(首幀資源,加載時機極早)。

  • CSR模式下頁面依賴的API接口(通常在首幀渲染後調用,加載時機較晚)。

 

1.3 工作流程示意圖

 

即開SDK在流程中主要完成兩件事:

  1. 用户打開URL時,並行請求H5依賴的API接口或index.html文件。

  2. 攔截webview請求,將並行加載的緩存數據導流給webview,實現加速。

 

二、並行加載的核心挑戰:資源交接場景

並行加載的核心問題是如何在webview需要資源時,將並行請求的資源無縫交接。根據資源狀態可分為三類場景

 

下面我們針對三個場景分別看處理方案

2.1 場景一:網絡數據尚未響應,webview開始需要資源

解決方案一

忽略網絡數據,webview自己加載資源。

矛盾點:

如果是因為服務器壓力大,網絡環境差導致的響應慢,webview自己去加載也會遇到同樣的問題,而且直接放棄已並行加載的任務,等於是浪費了已經處於建聯中,可能已經完成了一部分等待的時間。

 

解決方案二

webview等待網絡資源加載成功後,再使用加載成功的資源。

矛盾點:

讓webview的資源獲取線程等待並行任務響應並返回,那麼等多久,如果時間太久怎麼辦,需要設置超時時間,如何等待,並且在等待的過程中還要監控並行任務是否已經返回。

 

2.2 場景二:網絡數據已響應,webview開始需要資源

場景二需要考慮兩種情況

情況一

在webvie需要數據的時候,網絡數據流剛好完成建聯,webview可以直接使用網絡數據流加載。

 

情況二

網絡數據流建聯的時候,webview還未開始需要使用數據,並行任務有時間將網絡數據流讀取到緩衝區中,webview在需要數據的時候,可以讀取緩衝區的數據。

矛盾點:

大家知道網絡數據讀取是有受限於網絡環境影響的,預先進行網絡的數據讀取,再交接給webview,肯定能更大程度上降低讀取耗時,但是問題又來了,如果並行任務在讀取的過程中,還沒有讀完,webview就來要數據怎麼辦,讓webview等待嗎?如果等待,等待多久?如果不等待,又如何將已讀取在緩衝區中的數據和未讀取的網絡流數據一起交給webview呢。

 

2.3 場景三:網絡數據返回錯誤,webview開始需要資源

解決方案

並行任務已失敗,直接廢棄並行任務,讓webview自己加載資源。

 

三、早期方案設計與侷限

在最開始,我們希望並行任務設計的足夠簡單,基於以上所有場景下的理解,權衡開發難度,我們設計了第一個方案。

 

首先我們希望避開場景二中,網絡數據流已建聯,網絡線程正在讀取網絡流到緩衝區,讀到一半webview來取數據的場景,因為我們覺得這種場景較為複雜,如果返回混合流,可能會出現控制不好的情況,而且整個過程中,兩條線程參與的生產者和消費者,存在一箇中間態,也就是生產者生產到一半時,消費者過來要消費數據,生產者要立刻停止生產,並把未成品交給消費者,這顯然比常規的生產者消費者模式更加複雜,於是我們決定用更簡單的方法來處理,方案就是把這個較為複雜的生產者消費者,變回簡單的典型生產者消費者,消費者不能打斷生產者的生產過程,而是等待生產完成,避免中間態下的複雜處理,雖然做了妥協,但是我們依然希望有較好的性能,所以我們將index.html任務和CSR模式下的API任務分為兩個不同的方式來進行處理。

 

3.1 index.html首幀資源處理

index.html,這是webview完成初始化後第一個要加載的資源,俗稱首幀資源。因此index.html的使用時機可以説是非常的靠前,在並行加載任務中,它的可用並行加載時間也大致在100ms左右,我們認為在這個時間內,並行任務大概率可以完成建聯,但是可能沒有時間再完成數據流的讀取。

 

因此,我們使用了stream對象來保存網絡數據流。

 

用户點擊H5入口按鈕,並行index任務發起,訪問網絡開始建聯,一但完成有效建聯,則保存網絡stream對象,當webview需要使用該數據流的時候,判定stream對象是否存在,如果存在,則直接使用該數據流,如果stream對象不存在,則判定網絡訪問是否返回,如果是因為網絡訪問失敗導致stream為空,則認為是並行任務失敗,否則會進入等待模式,等待模式下,線程建立一個循環體,每隔5ms,探測一次stream是否存在,網絡是否已完成建聯,一但探測到結果,則退出循環體,即使多次沒有探測到結果,1500ms後依然退出循環體,此時放棄並行任務數據,改為webview自主加載資源。

 

我們來看下這個方案

循環體的等待機制實現了消費者等待生產者完成生產的過程,5ms的檢測時機,實現了生產者生產完成後,消費者知曉生產者生產狀態的能力,1500ms的的超時退出,解決了生產者出現問題時,消費者困在循環等待中的情況。整個方案,只在保存的stream對象上添加volatile關鍵字,實現輕量的對象可見性線程同步。

 

整體方案邏輯如下:

 

3.1.1 index並行任務發起

flowchart TD
    A[用户點擊H5入口按鈕] --> B[並行發起index任務]
    B --> C[native訪問網絡開始建聯]
    C --> D{建聯成功?}
    D -->|是| E[保存網絡Stream對象]
    E -->F[記錄結束狀態]
    D -->|否| F[記錄結束狀態]

 

3.1.2 webview使用index資源

flowchart TD
    A[用户點擊H5入口按鈕] --> B[webview初始化]
    B --> C{需要使用index數據流}
    C -->|是| D[查找stream對象是否存在]
    D -->|存在| E[直接使用數據流]
    D -->|不存在| F{網絡訪問已結束?}
    F -->|是且stream為空| G[判定並行任務失敗]
    F -->|否| H[進入等待模式]
    H --> I[建立探測循環體]
    I --> J[每隔5ms探測]
    J --> K{stream存在?}
    K -->|是| L[使用數據流並退出循環]
    K -->|否| M{並行任務已結束?}
    M -->|是| N[放棄並行任務並退出循環]
    M -->|否| O{達到1500ms?}
    O -->|是| P[超時退出循環]
    O -->|否| J
    P --> Q[webview自主加載資源]
    N --> Q
    L --> R[流程結束]
    G --> Q
    Q --> R

 

 

3.2 API 接口資源處理

API接口通常在頁面完成第一次渲染後,才開始調用,這個時候頁面可能會展示一個loading狀態,或者是骨架屏,這個時機相對靠後,並行任務有充足的時間來完成網絡的建聯任務,甚至有充足的時間將網絡流讀取到緩衝區中。

 

針對這種情況,我們保存了一個byte數組,將網絡流數據讀取到這個byte數組中,webview需要使用數據時,將byte數組包裝成ByteArrayInputStream返回。

 

邏輯圖如下:

3.2.1 API並行任務發起

flowchart TD
    A[用户點擊H5入口按鈕] --> B[並行發起index任務]
    B --> C[native訪問網絡開始建聯]
    C --> D{建聯成功?}
    D -->|是| E[讀取網絡stream]
    E --> F[保存到本地byte數組]
    D -->|否| G[記錄結束狀態]
    F --> H[流程完成]
    G --> H

 

 

3.2.2 webview使用API資源

flowchart TD
    A[用户點擊H5入口按鈕] --> B[webview初始化]
    B --> C{需要使用index數據流}
    C -->|是| D[查找byte數組是否存在]
    D -->|存在| E[封裝為ByteArrayInputStream返回給webview]
    E --> X[流程結束]
    
    D -->|不存在| G{網絡訪問已結束?}
    G -->|是且byte數組為空| H[判定並行任務失敗]
    G -->|否| I[進入等待模式]
    
    I --> J[建立探測循環體]
    J --> K[每隔5ms探測]
    K --> L{byte數組存在?}
    L -->|是| M[封裝使用並退出循環]
    L -->|否| N{並行任務已結束?}
    N -->|是| O[放棄並行任務並退出]
    N -->|否| P{達到1500ms?}
    P -->|是| Q[超時退出]
    P -->|否| K
    
    M --> E
    O --> R[webview自主加載]
    Q --> R
    H --> R
    R --> X

 

 

從上述的邏輯圖中可以看到,一但網絡建聯的時間不足,數據響應不及時,webview就會處於等待狀態。

 

有兩種場景會導致webview建立線程循環體等待資源:

場景一:並行發起的網絡請求,因為網絡速度慢,尚未建聯返回有效的網絡stream。

 

場景二:在並行API的場景下,雖然已經建聯,但是網絡stream數據讀取中,尚未完全讀取到緩衝區,webview會等待緩衝區緩衝完成。

 

這兩種場景,都會造成時間的浪費

場景一的等待雖然不可避免,但是webview的循環體檢測機制,每隔5ms才會檢測一次,最差的情況下,無效等待時間可以最長達到5ms(上一次循環檢測剛結束,網絡即完成建聯,需要下一個5ms之後才會發起檢測)。

 

場景二的無效等待時間就更長了,即使已經完成建聯,webview也要等待緩存全部完成,待緩存完成之後,webview又再次從緩衝區讀取一次數據,全量緩存不就浪費了時間,還浪費了內存。

 

3.3 早期方案的核心侷限

  • 時間浪費:循環探測(每 5ms 一次)存在無效等待

  • 內存浪費:API 接口全量緩存佔用額外內存

  • 資源利用率低:放棄部分加載的資源,未充分利用並行請求的時間窗口

 

四、方案演進:優化時間與內存消耗

在反思早期方案的弊端中,我們提出了內存的消耗和時間上的浪費,那麼新方案的優化側重點就是優化內存的消耗和循環等待時間上的消耗,優化耗時的優化大刀是直接幹掉循環等待,優化內存消耗的優化大刀是幹掉全量的緩存建立,但是做加法簡單,做減法就沒那麼容易。

 

4.1 優化思路

新方案的核心是解決中間態交接問題,通過線程同步和流橋接技術實現優化。

整體的構思如下:

  • 幹掉循環等待:用線程同步機制替代輪詢

  • 幹掉全量緩存:採用半緩衝模式,僅緩存部分數據

  • 支持中間態交接:允許 webview 打斷並行任務,合併已緩衝與未緩衝數據

 

4.2 技術實現:生產者 - 消費者模型

4.2.1 核心邏輯

我們把網絡建聯+網絡數據讀取到緩衝區的整個過程,看作是生產過程, 用一個生產者消費者模型來描述這個過程

  • 生產者接到生產產品任務

  • 消費者隨時過來消費產品

  • 如果生產者還未開始生產,消費者可以等待一段時間,但是如果超時就放棄等待

  • 如果生產者正在生產,消費者可以隨時可以打斷生產,拿走生產了一半的產品,消費者完成剩下產品的生產任務

  • 如果生產者已經完成生產,消費者就可以拿走全部產品

 

這裏涉及到兩個技術點

  • 生產者在生產過程中隨時被打斷

  • 生產者生產了一半的產品可以被用來使用

 

我們嘗試使用線程間同步機制和橋接流技術來實現這些需求:

 

4.3 代碼核心實現

import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.sleep;


public class Main2 {

       //模擬響應體
       public static class ResponseBody {
            InputStream stream;

             public ResponseBody(InputStream stream){
                 this.stream = stream;
            }

            public InputStream byteStream(){
                return stream;
            }
        }


        public static class SyncLoadResponseBean {

            private static final String TAG = "SyncLoadResponseBean";

            // 狀態常量定義
            public static final int INIT = 1;  // 初始狀態
            public static final int READY = 2; // 數據準備就緒
            public static final int OFFER = 3; // 數據已提供
            public static final int DROP = 4;  // 數據已丟棄

            private final String mRequestUrl;
            private final ConcurrentHashMap<String, SyncLoadResponseBean> mSyncLoadCache;
            private final AtomicInteger mStatus = new AtomicInteger(INIT); // 狀態機

            // 數據存儲相關
            private Map<String, String> mResponseHeader;
            private ByteArrayOutputStream mBufferStream;
            private InputStream mNetworkStream;
            private long mResponseTime;


            public SyncLoadResponseBean(String requestUrl, ConcurrentHashMap<String, SyncLoadResponseBean> syncLoadCache){
                mRequestUrl = requestUrl;
                mSyncLoadCache = syncLoadCache;
                mStatus.set(INIT);
            }

            public boolean before(int status){
               return mStatus.get() < status;
            }

            public boolean during(int status){
                return mStatus.get() == status;
            }

            public boolean after(int status){
                return mStatus.get() >= status;
            }

            /**
             * 喚醒所有等待線程
             * 使用 tryLock 避免長時間阻塞
             */
            public void signalAll(){
                synchronized (SyncLoadResponseBean.this) {
                    this.notifyAll();
                }
            }

            /**
             * 保存響應數據並預處理
             * 網絡線程會在得到響應後調用該方法,保存數據
             */
            public void saveResponse(ResponseBody responseBody, Map<String, String> responseHeader){
                streamReady(responseBody, responseHeader);
                preReadStream();
            }

             /**
             * 準備數據流
             */
             private voids treamReady(ResponseBody responseBody, Map<String, String> responseHeader){
                synchronized (SyncLoadResponseBean.this) {
                    TLog.d(TAG, "並行加載 響應保存");
                    mResponseTime = System.currentTimeMillis();
                    mResponseHeader = responseHeader;
                    mBufferStream = new ByteArrayOutputStream();
                    if (responseBody != null) {
                        mNetworkStream = responseBody.byteStream();
                        // 根據流是否有效設置狀態
                        if (mNetworkStream != null) {
                            mStatus.set(READY);
                        } else {
                            drop();
                        }
                    } else {
                        drop();
                    }
                    TLog.d(TAG, "並行加載 保存完成 通知消費者");
                    signalAll();
                }
            }


            private void preReadStream(){
                TLog.d(TAG, "並行加載 預讀緩存 開始");
                byte[] buffer = new byte[4096];
                int num = 0;
                try {
                    while (during(READY)) {
                        synchronized (SyncLoadResponseBean.this) {
                            try {
                                //雙重校驗鎖
                                if (during(READY)) {
                                   // 讀取網絡流數據
                                   int bytesRead = mNetworkStream.read(buffer);
                                   if (bytesRead == -1) {
                                        TLog.d(TAG, "並行加載 預讀緩存 完成 " + bytesRead);
                                        closeStream(mNetworkStream);
                                        mNetworkStream = null;
                                        return;
                                    }
                                    TLog.d(TAG, "並行加載 預讀緩存 " + bytesRead);
                                    mBufferStream.write(buffer, 0, bytesRead);
                                }
                            } finally {
                                num++;
                                TLog.d(TAG, "並行加載 預讀緩存 第" + num + "次通知消費者");
                                signalAll();
                            }
                        }
                    }
                    //已經提供了數據,則打印個日誌看下
                    if (after(OFFER)) {
                        TLog.d(TAG, "並行加載 數據流已提供 預讀緩存 關閉");
                    }
                } catch (IOException e) {
                    TLog.e(TAG, "並行加載 預讀緩存 異常 關閉", e);
                    synchronized (SyncLoadResponseBean.this) {
                        //在讀取的過程中出現了異常,但是這個時候還沒有提供數據,就直接drop調
                        if (!after(OFFER)) {
                            drop();
                        }
                    }
                }
            }

            /**
             * 獲取橋接流
             * 瀏覽器線程調用該方法獲取數據流
             */
             public InputStream getBridgedStream(){
                TLog.d(TAG, "並行加載 查找流數據");
                synchronized (SyncLoadResponseBean.this) {
                  try {
                       if (before(READY)) {
                            TLog.d(TAG, "並行加載 查找流數據 進入等待狀態");
                            this.wait(5000);
                            TLog.d(TAG, "並行加載 查找流數據 被喚醒");
                        }
                        // 等待結束,再確認一次狀態是否可用
                        if (before(READY)) {
                            TLog.d(TAG, "並行加載 查找流數據 依舊沒有可用數據 返回空流");
                            drop();
                            return null;
                        } elseif (after(OFFER)) {
                            TLog.d(TAG, "並行加載 查找流數據 數據已被廢棄或者被他人被使用 返回空流");
                            return null;
                        } elseif (isTimeOut()) {
                            TLog.d(TAG, "並行加載 查找流數據 數據超時 返回空流");
                            drop();
                            return null;
                        } else {
                            if (mNetworkStream != null && mBufferStream != null) {
                                mStatus.set(OFFER);
                                // 創建新的橋接流實例,包含已緩存數據和剩餘網絡流
                                ByteArrayInputStream cachedStream = new ByteArrayInputStream(mBufferStream.toByteArray());
                                TLog.d(TAG, "並行加載 查找流數據 返回橋接流");
                                return new SequenceInputStream(cachedStream, mNetworkStream);
                            } elseif (mNetworkStream != null) {
                                mStatus.set(OFFER);
                                TLog.d(TAG, "並行加載 查找流數據 返回網絡流");
                                return mNetworkStream;
                            } elseif (mBufferStream != null) {
                                mStatus.set(OFFER);
                                // 創建新的橋接流實例,包含已緩存數據和剩餘網絡流
                                TLog.d(TAG, "並行加載 查找流數據 返回緩存流");
                                return new ByteArrayInputStream(mBufferStream.toByteArray());
                            } else {
                                drop();
                                TLog.d(TAG, "並行加載 查找流數據 返回空流");
                                 return null;
                            }
                        }
                    } catch (Exception e) {
                        TLog.e("TAG", "Create bridged stream failed", e);
                        drop();
                        return null;
                    }
                }
            }

            /**
             * 獲取響應頭
             */
             public Map<String, String> getResponseHeader(){
                //如果請求裏面不帶跨域標識,則帶上跨域標識
                if (mResponseHeader != null && !mResponseHeader.containsKey("Access-Control-Allow-Origin")) {
                    mResponseHeader.put("Access-Control-Allow-Origin", "*");
                }
                return mResponseHeader;
            }


            /**
             * 統一關閉流資源操作
             */
             private void closeStream(Closeable stream){
if (stream != null) {
try {
                        stream.close();
                    } catch (Exception e) {
                        TLog.e(TAG, "關閉流失敗", e);
                    }
                }
            }


            /**
             * 判斷數據有沒有超時
             */
             private boolean isTimeOut(){
                 return Math.abs(mResponseTime - System.currentTimeMillis()) > 5000;
            }

            /**
             * 丟棄數據
             */
             public void drop(){
                synchronized (SyncLoadResponseBean.this) {
                    mStatus.set(DROP);
                    mResponseHeader = null;
                    mResponseTime = 0;
                    closeStream(mBufferStream);
                    closeStream(mNetworkStream);
                    mBufferStream = null;
                    mNetworkStream = null;
                    mSyncLoadCache.remove(mRequestUrl);
                    TLog.d(TAG, "並行加速 緩存數據丟棄");
                }
            }
        }


        /**
         * 主方法,測試程序入口
         */
          public static void main(String[] args){

            SyncLoadResponseBean syncLoadResponseBean = new SyncLoadResponseBean("https://www.baidu.com", new ConcurrentHashMap<>());

            //模擬生產者線程
            new Thread(() -> {
                 try {
                    System.out.println("生產者啓動");
                    //模擬網絡建聯
                    sleep(200);
                    //模擬網絡資源返回
                    File file = new File("./xxxx.txt");
                    ResponseBody responseBody = new ResponseBody(new FileInputStream(file));
                     //模擬響應
                    syncLoadResponseBean.saveResponse(responseBody, new HashMap<>());
                } catch (FileNotFoundException e) {
                    throw new RuntimeException(e);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    System.out.println("生產線程工作完成,喚醒等待中的消費者");
                    syncLoadResponseBean.signalAll();
                }
            }).start();

              //模擬消費者線程1
              new Thread(() -> {

                System.out.println("消費者啓動");
                //模擬讀取
                InputStream stream = syncLoadResponseBean.getBridgedStream();
                System.out.println("消費者 " + stream);
            }).start();

        }

    }

 

4.4 代碼解讀

 

代碼的工作流程是這樣的:

  • 生產者網絡線程發起數據請求。

  • 建聯後會第一次發起notify,嘗試喚醒等待中的消費者webview線程,如果這個時候剛好有消費者webview在等待,還未輪到生產者預讀數據,消費者就會拿走數據。

  • 如果沒有消費者在等待,就開始預讀,預讀是一個循環讀取的過程,每次循環讀取都會有一個加鎖->讀取->釋放鎖的過程,保證消費者可以隨時打斷並拿走讀取了一半的數據。

  • 使用synchronized(SyncLoadResponse-

    Bean.this)來實現生產者和消費者線程間協同工作。

  • 使用AtomicInteger來實現狀態機的線程間同步。

  • 使用了wait(long timeoutMillis)來實現了等待和鎖的超時釋放。

  • preReadStream()方法在while循環體內實現了讀取時的同步塊。

  • 使用SequenceInputStream實現了已讀緩衝流與未讀網絡流的橋接。

  • 在消費者webview等待資源的過程中,生產者會有多個時機讓出同步鎖,並且發起notify,通知消費者可以消費數據:時機①:生產者完成網絡建聯,保存了響應體,但還沒有進行數據預讀的時候。時機②:生產者在網絡數據的預讀過程中,每次讀取4096之後都會通知消費者。

 

在代碼的最下面,我們提供了標準Java的main函數,有興趣的同學可以嘗試將代碼拷貝到本地,並且運行他,看下效果,我們這邊也直接把運行效果貼出來,看下是否符合預期。

    生產者啓動
    消費者啓動
    並行加載 查找流數據
    並行加載 查找流數據 進入等待狀態
    並行加載 響應保存
    並行加載 保存完成 通知消費者
    並行加載 預讀緩存 開始
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第1次通知消費者
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第2次通知消費者
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第3次通知消費者
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第4次通知消費者
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第5次通知消費者
    .......
    .......
    .......
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第360次通知消費者
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第361次通知消費者
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第362次通知消費者
    並行加載 查找流數據 被喚醒
    並行加載 查找流數據 返回橋接流
    並行加載 預讀緩存 第363次通知消費者
    並行加載 數據流已提供 預讀緩存 關閉
    
    消費者 java.io.SequenceInputStream@696759a5

 

4.5 運行結果解讀

這次的test代碼運行,工作流程如下:

消費者webview需要數據的時候,生產者的生產任務還沒有返回,此時生產者正在進行虛擬的網絡建聯,因此消費者進入了等待狀態,網絡建聯後,生產者首次嘗試喚醒消費者,但正在等待任務的消費者並沒有被喚醒,這裏已經開始不符合預期了。於是生產者繼續進行數據預讀,在數據預讀的循環體內,每次一個緩衝讀完,生產者都會嘗試喚醒一次消費者,但是消費者直到第363次notify的時候,才被喚醒,拿到了數據。

 

我們經過多次嘗試,發現消費者被喚醒的時機不確定,有時候是在首次喚醒的時候就能夠喚醒,有時候要在喚醒第xxx次的時候才能被喚醒,而且次數還是隨機的,顯然這樣延長了消費者等待的時間,不符合我們既定的想法,還需要進一步的優化。

 

五、方案演進:優化同步策略

從上述代碼實際運行的現象上看,生產者釋放鎖,並且喚醒消費者的時候,線程鎖並沒有交接給消費者,反而又被生產者的預讀任務給搶了過來。

 

5.1 鎖的公平性

從運行的結果來看,鎖的釋放和獲取並沒有符合預期,我們有理由懷疑在線程同步的過程中,有一些線程爭搶資源和鎖的情況發生,通過查閲java多線程相關資料,我們瞭解到一個概念:線程鎖的公平性。

線程鎖的公平性是指多個線程競爭鎖時,鎖的獲取是否按照請求順序進行分配,同步鎖的類型簡單來講可以分為如下兩類:

 

非公平鎖

  • 特點:允許線程"插隊",新請求鎖的線程可能比等待隊列中的線程先獲取鎖

  • 優點:更高的吞吐量,減少線程切換開銷

  • 缺點:可能導致某些線程長時間等待(飢餓)

 

公平鎖

  • 特點:嚴格按照FIFO(先進先出)順序分配鎖

  • 優點:避免線程飢餓,行為更可預測

  • 缺點:性能較低,因為需要維護隊列順序

Java內置的synchronized鎖就是是非公平鎖,wait和notify 也是基於synchronized來實現的。

在上述的示例代碼中,我們使用的就是非公平鎖,導致線程之間出現資源搶佔,發生了不符合預期的情況。

 

適合使用公平鎖的場景

  • 嚴格的順序要求:當線程執行順序對業務邏輯至關重要時

  • 避免飢餓:當需要確保所有線程都有機會執行時

雖然公平鎖會犧牲同步性能,但是在當前業務中,我們是希望消費者能夠儘快的獲得數據的,所以我們應該選擇使用公平鎖來實現同步,在java中要實現公平鎖,就必須使用ReentrantLock,如果要實現公平鎖的等待,就要使用Condition,我們使用ReentrantLock和Condition來修改代碼,對代碼中與同步鎖相關的邏輯進行重構。

 

公平鎖的使用方法

使用公平鎖時,無法像使用synchronized關鍵字一樣直接加在方法頭上,而是需要手動獲得鎖和釋放鎖,示例代碼如下:

private final ReentrantLock mLock = new ReentrantLock(true); // 公平鎖

public void test(){
    //業務代碼執行前獲得鎖
    mLock.lock();  
    try {
        //實際執行的業務代碼
        TLog.d(TAG, "此處執行代碼");  
    } finally {  
        //業務代碼執行完成後釋放鎖
        mLock.unlock();  
    }  
}

 

如果需要手動使線程處於等待狀態,則觸發等待和喚醒的示例代碼如下:

private final ReentrantLock mLock = new ReentrantLock(true); // 公平鎖
private final Condition mCondition \= mLock.newCondition(); // 條件變量

/**  
 * 喚醒示例
 * 
 */
public void signal(){  
    if (mLock.tryLock()) {  
        try {  
            mCondition.signal();  
        } finally {  
            mLock.unlock();  
        }  
    }  
}

/**  
 * 等待示例  
 * 等待五秒鐘後主動釋放
 */  
public void test(){  
    mLock.lock();  
    try {  
        TLog.d(TAG, "實際業務代碼執行");  
        mCondition.await(5, TimeUnit.SECONDS);  
        TLog.d(TAG, "實際業務代碼執行");  
    } finally {  
        mLock.unlock();  
    }  
}

 

這裏的疑問,為什麼lock並且await的時候,其他線程依然可以獲得鎖併發起signal?

是因為condition.await() 的原子操作,當線程調用 condition.await() 時,會自動釋放鎖,然後進入等待狀態,這是原子性操作,保證在進入等待狀態前一定會釋放鎖。

其他線程可以獲取鎖,是因為原線程已經在 condition.await() 時釋放了鎖,其他線程在調用 condition.signal() 時必須持有鎖,當等待中的線程被 condition.signal() 喚醒後,會重新嘗試獲取鎖,獲取鎖成功後才會從 condition.await() 方法返回。

 

5.2 公平鎖代碼編寫

代碼如下

    import java.io.*;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    import static java.lang.Thread.sleep;
    
    
    public class Main {
    
        public static class ResponseBody {
            InputStream stream;
    
            public ResponseBody(InputStream stream){
                this.stream = stream;
            }
    
            public InputStream byteStream(){
                return stream;
            }
        }
    
    
        public static class SyncLoadResponseBean {
    
            private static final String TAG = "SyncLoadResponseBean";
    
            // 狀態常量定義
            public static final int INIT = 1;  // 初始狀態
            public static final int READY = 2; // 數據準備就緒
            public static final int OFFER = 3; // 數據已提供
            public static finaint DROP = 4;  // 數據已丟棄
    
            private final String mRequestUrl;
            private final ConcurrentHashMap<String, SyncLoadResponseBean> mSyncLoadCache;
            private final ReentrantLock mLock = new ReentrantLock(true); // 公平鎖
            private final Condition mCondition = mLock.newCondition();   // 條件變量
            private final AtomicInteger mStatus = new AtomicInteger(INIT); // 狀態機
    
            // 數據存儲相關
            private Map<String, String> mResponseHeader;
            private ByteArrayOutputStream mBufferStream;
            private InputStream mNetworkStream;
            private long mResponseTime;
    
    
            public SyncLoadResponseBean(String requestUrl, ConcurrentHashMap<String, SyncLoadResponseBean> syncLoadCache){
                mRequestUrl = requestUrl;
                mSyncLoadCache = syncLoadCache;
                mStatus.set(INIT);
            }
    
            public boolean before(int status){
                return mStatus.get() < status;
            }
    
            public boolean during(int status){
                return mStatus.get() == status;
            }
    
            public boolean after(int status){
                return mStatus.get() >= status;
            }
    
            /**
             * 喚醒所有等待線程
             * 使用 tryLock 避免長時間阻塞
             */
            public void signalAll(){
                if (mLock.tryLock()) {
                    try {
                        mCondition.signalAll();
                    } finally {
                        mLock.unlock();
                    }
                }
            }
    
            /**
             * 保存響應數據並預處理
             * 網絡線程會在得到響應後調用該方法,保存數據
             */
            public void saveResponse(ResponseBody responseBody, Map<String, String> responseHeader){
                streamReady(responseBody, responseHeader);
                preReadStream();
            }
    
            /**
             * 準備數據流
             */
            private void streamReady(ResponseBody responseBody, Map<String, String> responseHeader){
                mLock.lock();
                try {
                    TLog.d(TAG, "並行加載 響應保存");
                    mResponseTime = System.currentTimeMillis();
                    mResponseHeader = responseHeader;
                    mBufferStream = new ByteArrayOutputStream();
                    if (responseBody != null) {
                        mNetworkStream = responseBody.byteStream();
                        // 根據流是否有效設置狀態
                        if (mNetworkStream != null) {
                            mStatus.set(READY);
                        } else {
                            drop();
                        }
                    } else {
                        drop();
                    }
                } finally {
                    TLog.d(TAG, "並行加載 保存完成 通知消費者");
                    mCondition.signalAll();
                    mLock.unlock();
                }
            }
    
    
            private void preReadStream(){
                byte[] buffer = new byte[4096];
                int num = 0;
                try {
                    while (during(READY)) {
                        mLock.lock();
                        try {
                            //雙重校驗鎖
                            if (during(READY)) {
                                // 讀取網絡流數據
                                int bytesRead = mNetworkStream.read(buffer);
                                if (bytesRead == -1) {
                                    TLog.d(TAG, "並行加載 預讀緩存 完成 " + bytesRead);
                                    closeStream(mNetworkStream);
                                    mNetworkStream = null;
                                    break;
                                }
                                TLog.d(TAG, "並行加載 預讀緩存 " + bytesRead);
                                mBufferStream.write(buffer, 0, bytesRead);
                            }
                        } finally {
                            num++;
                            TLog.d(TAG, "並行加載 預讀緩存 第" + num + "次通知消費者");
                            mCondition.signalAll();
                            mLock.unlock();
                        }
                    }
                    //已經提供了數據,則打印個日誌看下
                    if (after(OFFER)) {
                        TLog.d(TAG, "並行加載 數據流已提供 預讀緩存 關閉");
                    }
                } catch (IOException e) {
                    TLog.e(TAG, "並行加載 預讀緩存 異常 關閉", e);
                    mLock.lock();
                    try {
                        //在讀取的過程中出現了異常,但是這個時候還沒有提供數據,就直接drop調
                        if (!after(OFFER)) {
                            drop();
                        }
                    } finally {
                        mLock.unlock();
                    }
                }
            }
    
            /**
             * 獲取橋接流
             * 瀏覽器線程調用該方法獲取數據流
             */
            public InputStream getBridgedStream(){
                mLock.lock();
                TLog.d(TAG, "並行加載 查找流數據");
                try {
                    if (before(READY)) {
                        long time1 = System.currentTimeMillis();
                        TLog.d(TAG, "並行加載 查找流數據 進入等待狀態");
                        mCondition.await(5, TimeUnit.SECONDS);
                        long time2 = System.currentTimeMillis();
                        TLog.d(TAG, "並行加載 查找流數據 被喚醒 等待時長:" + (time2 - time1));
                    }
                    // 等待結束,再確認一次狀態是否可用
                    if (before(READY)) {
                        TLog.d(TAG, "並行加載 查找流數據 依舊沒有可用數據 返回空流");
                        drop();
                        return null;
                    } elseif (after(OFFER)) {
                        TLog.d(TAG, "並行加載 查找流數據 數據已被廢棄或者被他人被使用 返回空流");
                        return null;
                    } elseif (isTimeOut()) {
                        TLog.d(TAG, "並行加載 查找流數據 數據超時 返回空流");
                        drop();
                        return null;
                    } else {
                        if (mNetworkStream != null && mBufferStream != null) {
                            mStatus.set(OFFER);
                            // 創建新的橋接流實例,包含已緩存數據和剩餘網絡流
                            ByteArrayInputStream cachedStream = new ByteArrayInputStream(mBufferStream.toByteArray());
                            TLog.d(TAG, "並行加載 查找流數據 返回橋接流");
                            returnnew SequenceInputStream(cachedStream, mNetworkStream);
                        } elseif (mNetworkStream != null) {
                            mStatus.set(OFFER);
                            TLog.d(TAG, "並行加載 查找流數據 返回網絡流");
                            return mNetworkStream;
                        } elseif (mBufferStream != null) {
                            mStatus.set(OFFER);
                            // 創建新的橋接流實例,包含已緩存數據和剩餘網絡流
                            TLog.d(TAG, "並行加載 查找流數據 返回緩存流");
                            returnnew ByteArrayInputStream(mBufferStream.toByteArray());
                        } else {
                            drop();
                            TLog.d(TAG, "並行加載 查找流數據 返回空流");
                            return null;
                        }
                    }
                } catch (Exception e) {
                    TLog.e("TAG", "Create bridged stream failed", e);
                    drop();
                    return null;
                } finally {
                    mLock.unlock();
                }
            }
    
            /**
             * 獲取響應頭(線程安全)
             */
            public Map<String, String> getResponseHeader(){
                mLock.lock();
                try {
                    //如果請求裏面不帶跨域標識,則帶上跨域標識
                    if (mResponseHeader != null && !mResponseHeader.containsKey("Access-Control-Allow-Origin")) {
                        mResponseHeader.put("Access-Control-Allow-Origin", "*");
                    }
                    return mResponseHeader;
                } finally {
                    mLock.unlock();
                }
            }
    
    
            /**
             * 統一關閉流資源操作
             */
            private void closeStream(Closeable stream){
                if (stream != null) {
                    try {
                        stream.close();
                    } catch (Exception e) {
                        TLog.e(TAG, "關閉流失敗", e);
                    }
                }
            }
    
    
            /**
             * 判斷數據有沒有超時
             */
            private boolean isTimeOut(){
                return Math.abs(mResponseTime - System.currentTimeMillis()) > 5000;
            }
    
            /**
             * 丟棄數據
             */
            public void drop(){
                mLock.lock();
                try {
                    mStatus.set(DROP);
                    mResponseHeader = null;
                    mResponseTime = 0;
                    closeStream(mBufferStream);
                    closeStream(mNetworkStream);
                    mBufferStream = null;
                    mNetworkStream = null;
                    mSyncLoadCache.remove(mRequestUrl);
                    TLog.d(TAG, "並行加速 緩存數據丟棄");
                } finally {
                    mLock.unlock();
                }
            }
    
        }
    
    
        /**
         * 主方法,程序入口
         */
        public static void main(String[] args){
    
            SyncLoadResponseBean syncLoadResponseBean = new SyncLoadResponseBean("https://www.baidu.com", new ConcurrentHashMap<>());
    
            //模擬生產者線程
            new Thread(() -> {
                try {
                    System.out.println("生產者啓動");
                    //模擬網絡建聯
                    sleep(200);
                    //模擬網絡資源返回
                    File file = new File("./xxxx.txt");
                    ResponseBody responseBody = new ResponseBody(new FileInputStream(file));
                    //模擬響應
                    syncLoadResponseBean.saveResponse(responseBody, new HashMap<>());
                } catch (FileNotFoundException e) {
                    thrownew RuntimeException(e);
                } catch (InterruptedException e) {
                    thrownew RuntimeException(e);
                } finally {
                    System.out.println("生產線程工作完成,喚醒等待中的消費者");
                    syncLoadResponseBean.signalAll();
                }
            }).start();
    
            //模擬消費者線程1
            new Thread(() -> {
                System.out.println("消費者啓動");
                //模擬讀取
                InputStream stream = syncLoadResponseBean.getBridgedStream();
                System.out.println("消費者 " + stream);
            }).start();
    
        }
    
    }

 

5.3 公平鎖代碼解讀

  • 使用ReentrantLock實現公平鎖

  • 使用Condition實現線程等待

    生產者啓動
    消費者啓動
    並行加載 查找流數據
    並行加載 查找流數據 進入等待狀態
    並行加載 響應保存
    並行加載 保存完成 通知消費者
    並行加載 查找流數據 被喚醒 等待時長:203
    並行加載 查找流數據 返回橋接流
    消費者 java.io.SequenceInputStream@4e8de630
    並行加載 預讀緩存 第1次通知消費者
    並行加載 數據流已提供 預讀緩存 關閉
    生產線程工作完成,喚醒等待中的消費者
    進程已結束,退出代碼為 0

 

可以看到在建聯完成,保存響應的時候,首次通知消費者,消費者就能夠準確的被喚醒。

再通過調整測試代碼sleep的時間模擬數據正在被預讀中,消費者打斷預讀的場景,消費者依然能夠被準確的喚醒。

    生產者啓動
    消費者啓動
    並行加載 響應保存
    並行加載 保存完成 通知消費者
    並行加載 預讀緩存 4096
    並行加載 預讀緩存 第1次通知消費者
    並行加載 查找流數據
    並行加載 查找流數據 返回橋接流
    並行加載 預讀緩存 第2次通知消費者
    並行加載 數據流已提供 預讀緩存 關閉
    生產線程工作完成,喚醒等待中的消費者
    消費者 java.io.SequenceInputStream@5a40ed62
    進程已結束,退出代碼為 0

 

整體代碼運行符合預期。

 

5.4 橋接流代碼解讀

在上述代碼中,我們多次提到橋接流的概念,橋接流顧名思義,就是將多個數據流接起來,可以讓讀流的程序按照順序先讀第一個流,讀完第一個再讀第二個流,以此類推,這裏我們直接使用了java官方的一個類來實現流的橋接。

SequenceInputStream是 Java I/O 類庫中的一個輸入流類,它允許你將多個輸入流按順序連接起來,形成一個邏輯上的連續輸入流。

SequenceInputStream的主要特點

  • 順序讀取:它會按順序讀取多個輸入流,先讀取第一個流的所有內容,然後自動切換到第二個流,依此類推。

  • 流合併:將多個獨立的輸入流合併為一個連續的輸入流。

  • 自動切換:當一個流讀取完畢時,會自動切換到下一個流。

  • 自動關閉:SequenceInputStream 關閉時會自動關閉所有包含的輸入流

 

SequenceInputStream的使用場景

當有多個文件需要按順序讀取,但希望像讀取單個文件一樣處理時,合併多個來源的數據流,需要將多個輸入源串聯起來處理,我們可以很方便的把一個或者多個流串聯起來按順序讀取,通過這個特性,我們就可以實現緩存流和網絡流的無縫橋接。

 

六、方案對比與總結

 

公平鎖替代非公平鎖

  • 問題:synchronized(非公平鎖)導致生產者可能重新搶佔鎖。

  • 解決方案:用ReentrantLock(公平鎖)+Condition實現順序獲取。

 

橋接流技術

  • 用SequenceInputStream合併已緩衝流(ByteArrayInputStream)與網絡流。

  • 實現無縫銜接,無需等待全量數據。

 

半緩衝機制

  • 並行任務每次讀取 4096 字節後釋放鎖,允許消費者打斷。

  • 可以平衡預讀效率與內存佔用。

 

七、寫在最後

回顧上述方案,採用同步鎖實現緩衝過程的打斷,使用SequenceInputStream實現橋接流,預讀過程的打斷是通過流讀取循環體內的公平鎖來實現的,相比最初的循環等待,數據超時廢棄的模式,新方案實現了網絡流和緩存流的無縫切換,整合了並行請求的資源,充分利用了頁面啓動時間。

 

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.