博客 / 詳情

返回

Dubbo 路由及負載均衡性能優化

作者:vivo 互聯網中間件團隊- Wang Xiaochuang

本文主要介紹在vivo內部針對Dubbo路由模塊及負載均衡的一些優化手段,主要是異步化+緩存,可減少在RPC調用過程中路由及負載均衡的CPU消耗,極大提升調用效率。

一、概要

vivo內部Java技術棧業務使用的是Apache Dubbo框架,基於開源社區2.7.x版本定製化開發。在海量微服務集羣的業務實踐中,我們發現Dubbo有一些性能瓶頸的問題會極大影響業務邏輯的執行效率,尤其是在集羣規模數量較大時(提供方數量>100)時,路由及負載均衡方面有着較大的CPU消耗,從採集的火焰圖分析高達30%。為此我們針對vivo內部常用路由策略及負載均衡進行相關優化,並取得了較好的效果。接下來主要跟大家分析一下相關問題產生的根源,以及我們採用怎樣的方式來解決這些問題。當前vivo內部使用的Dubbo的主流版本是基於2.7.x進行相關定製化開發。

二、背景知識

2.1 Dubbo客户端調用流程

1. 相關術語介紹

圖片

2. 主要流程

客户端通過本地代理Proxy調用ClusterInvoker,ClusterInvoker從服務目錄Directory獲取服務列表後經過路由鏈獲取新的服務列表、負載均衡從路由後的服務列表中根據不同的負載均衡策略選取一個遠端Invoker後再發起遠程RPC調用。

圖片

2.2 Dubbo路由機制

Dubbo的路由機制實際是基於簡單的責任鏈模式實現,同時Router繼承了Comparable接口,自定義的路由可以設置不同的優先級進而定製化責任鏈上Router的順序。基於責任鏈模式可以支持多種路由策略串行執行如就近路由+標籤路由,或條件路由+就近路由等,且路由的配置支持基於接口級的配置也支持基於應用級的配置。常見的路由方式主要有:就近路由,條件路由,標籤路由等。具體的執行過程如下圖所示:

圖片

1. 核心類

Dubbo路由的核心類主要有:RouterChain、RouterFactory 與 Router 。

(1)RouterChain

RouterChain是路由鏈的入口,其核心字段有

  • invokers(List<invoker> 類型)

初始服務列表由服務目錄Directory設置,當前RouterChain要過濾的Invoker集合

  • builtinRouters(List類型)

當前RouterChain包含的自動激活的Router集合

  • routers(List類型)

包括所有要使用的路由由builtinRouters加上通過addRouters()方法添加的Router對象

RouterChain核心邏輯:

public class RouterChain<T> {

    // 註冊中心最後一次推送的服務列表
    private List<Invoker<T>> invokers = Collections.emptyList();

    // 所有路由,包括原生Dubbo基於註冊中心的路由規則如“route://” urls .
    private volatile List<Router> routers = Collections.emptyList();

    // 初始化自動激活的路由
    private List<Router> builtinRouters = Collections.emptyList();
  
   private RouterChain(URL url) {
    //通過ExtensionLoader加載可自動激活的RouterFactory
        List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
                .getActivateExtension(url, ROUTER_KEY);
    // 由工廠類生成自動激活的路由策略
        List<Router> routers = extensionFactories.stream()
                .map(factory -> factory.getRouter(url))
                .collect(Collectors.toList());

        initWithRouters(routers);
    }
  
    // 添加額外路由
    public void addRouters(List<Router> routers) {
        List<Router> newRouters = new ArrayList<>();
        newRouters.addAll(builtinRouters);
        newRouters.addAll(routers);
        Collections.sort(newRouters, comparator);
        this.routers = newRouters;
    }
  
   public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
    // 遍歷全部的Router對象,執行路由規則
        for (Router router : routers) {
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
    }
}

(2)RouterFactory為Router的工廠類

RouterFactory接口定義:

@SPI
public interface RouterFactory {

    @Adaptive("protocol")
    Router getRouter(URL url);
}

(3)Router

Router是真正的路由實現策略,由RouterChain進行調用,同時Router繼承了Compareable接口,可以根據業務邏輯設置不同的優先級。

Router主要接口定義:

public interface Router extends Comparable<Router> {

   
    /**
     *
     * @param invokers   帶過濾實例列表
     * @param url        消費方url
     * @param invocation 會話信息
     * @return routed invokers
     * @throws RpcException
     */
    <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;


    /**
     * 當註冊中心的服務列表發現變化,或有動態配置變更會觸發實例信息的變化
     * 當時2.7.x的Dubbo並沒有真正使用這個方法,可基於此方法進行路由緩存
     * @param invokers invoker list
     * @param <T>      invoker's type
     */
    default <T> void notify(List<Invoker<T>> invokers) {

    }

}

2. 同機房優先路由的實現

為方便大家瞭解路由的實現,給大家展示一下就近路由的核心代碼邏輯:

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL consumerUrl, Invocation invocation) throws RpcException {
        if (!this.enabled) {
            return invokers;
        }

      
    // 獲取本地機房信息
        String local = getSystemProperty(LOC);
        if (invokers == null || invokers.size() == 0) {
            return invokers;
        }
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        for (Invoker invoker: invokers) {
      // 獲取與本地機房一致的invoker並加入列表中
            String invokerLoc = getProperty(invoker, invocation, LOC);
            if (local.equals(invokerLoc)) {
                result.add(invoker);
            }
        }

        if (result.size() > 0) {
            if (fallback){
                // 開啓服務降級,available.ratio = 當前機房可用服務節點數量 / 集羣可用服務節點數量
                int curAvailableRatio = (int) Math.floor(result.size() * 100.0d / invokers.size());
                if (curAvailableRatio <= availableRatio) {
                    return invokers;
                }
            }

            return result;
        } else if (force) {
            return result;
        } else {
            return invokers;
        }

    }

2.3 Dubbo負載均衡

Dubbo的負載均衡實現比較簡單基本都是繼承抽象類進行實現,主要作用就是根據具體的策略在路由之後的服務列表中篩選一個實例進行遠程RPC調用,默認的負載均衡策略是隨機。

整體類圖如下所示:

圖片

LoadBalance接口定義:

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

    /**
     * 從服務列表中篩選一個.
     *
     * @param invokers   invokers.
     * @param url        refer url
     * @param invocation invocation.
     * @return selected invoker.
     */
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

隨機負載均衡核心代碼解析:

 // 預熱過程權重計算
   static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        int ww = (int) (uptime / ((float) warmup / weight));
        return ww < 1 ? 1 : (Math.min(ww, weight));
    }

int getWeight(Invoker<?> invoker, Invocation invocation) {
       int weight;
       URL url = invoker.getUrl();
       // 多註冊中心場景下的,註冊中心權重獲取
       if (UrlUtils.isRegistryService(url)) {
           weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
       } else {
           weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
           if (weight > 0) {
               // 獲取實例啓動時間
               long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
               if (timestamp > 0L) {
                   long uptime = System.currentTimeMillis() - timestamp;
                   if (uptime < 0) {
                       return 1;
                   }
                   // 獲取預熱時間
                   int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
                   if (uptime > 0 && uptime < warmup) {
                       weight = calculateWarmupWeight((int)uptime, warmup, weight);
                   }
               }
           }
       }
       return Math.max(weight, 0);
   }

@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // Every invoker has the same weight?
        boolean sameWeight = true;
        // the weight of every invokers
        int[] weights = new int[length];
        // the first invoker's weight
        int firstWeight = getWeight(invokers.get(0), invocation);
        weights[0] = firstWeight;
        // The sum of weights
        int totalWeight = firstWeight;
        for (int i = 1; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // save for later use
            weights[i] = weight;
            // Sum
            totalWeight += weight;
            if (sameWeight && weight != firstWeight) {
                sameWeight = false;
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++) {
                offset -= weights[i];
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }

預熱解釋

預熱是為了讓剛啓動的實例流量緩慢增加,因為實例剛啓動時各種資源可能還沒建立連接,相關代碼可能還是處於解釋執行,仍未變為JIT執行,此時業務邏輯較慢,不應該加載過大的流量,否則有可能造成較多的超時。Dubbo默認預熱時間為10分鐘,新部署的實例的流量會在預熱時間段內層線性增長,最終與其他實例保持一致。Dubbo預熱機制的實現就是通過控制權重來實現。如默認權重100,預熱時間10分鐘,則第一分鐘權重為10,第二分鐘為20,以此類推。

具體預熱效果圖如下:

圖片

三、問題分析

使用Dubbo的業務方反饋,他們通過火焰圖分析發現Dubbo的負載均衡模塊+路由模塊佔用CPU超過了30%,框架層面的使用率嚴重影響了業務邏輯的執行效率急需進行優化。通過火焰圖分析,具體佔比如下圖,其中該機器在業務忙時的CPU使用率在60%左右,閒時在30%左右。

圖片

通過火焰圖分析,負載均衡主要的消耗是在 getWeight方法。

圖片

路由的主要消耗是在route方法:

  • 同機房優先路由

圖片

  • 接口級標籤路由+應用級標籤路由

圖片

這些方法都有一個特點,那就是遍歷執行。如負載均衡,針對每一個invoker都需要通過getWeight方法進行權重的計算;就近路由的router方法對於每一個invoker都需要通過url獲取及機房信息進行匹配計算。

我們分析一下getWeight及router時間複雜度,發現是O(n)的時間複雜度,而且路由是由路由鏈組成的,每次每個 Router的route方法調用邏輯都會遍歷實例列表,那麼當實例列表數量過大時,每次匹配的計算的邏輯過大,那麼就會造成大量的計算成本,導致佔用大量cpu,同時也導致路由負載均衡效率低下。

綜上所述,罪惡的的根源就是遍歷導致的,當服務提供方數量越多,影響越大。

四、優化方案

知道了問題所在,我們來分析一下是否有優化空間。

4.1 路由優化

1. 優化一:關閉無效路由

通過火焰圖分析,我們發現有部分業務即使完全不使用應用級的標籤路由,原生的TagRouter也存在遍歷邏輯,原因是為了支持靜態的標籤路由,其實這部分的開銷也不少,那對於根本不會使用應用級標籤路由的可以手動進行關閉。關閉方式如下:

  • 客户端統一關閉
dubbo.consumer.router=-tag
  • 服務級別關閉
  • 註解方式:
@DubboReference(parameters = {"router","-tag"})
  • xml方式:
<dubbo:reference id="demoService" check="false" interface="com.dubbo.study.n.api.DemoService" router="-tag" />

2. 優化二:提前計算路由結果並進行緩存

每次路由目前都是進行實時計算,但是在大多數情況下,我們的實例列表是穩定不變的,只有在發佈窗口或配置變更窗口內實例列表才會發生變更,那我們是否可以考慮緩存呢。如就近路由,可以以機房為key進行機房實例的全量緩存。針對接口級標籤路由可以緩存不同標籤值指定的實例信息。

我們知道路由的執行過程是責任鏈模式,每一個Router的實例列表入參實際上是一個Router的結果,可參考公式:target = rn(…r3(r2(r1(src))))。那麼所有的路由可以基於註冊中心推送的原始服務列表進行路由計算並緩存,然後不同的路由結果相互取交集就能得到最終的結果,當實例信息發生變更時,緩存失效並重新計算。

3. 緩存更新時機

當註冊中心或者動態配置有變更時,相關通知會給到服務目錄Directory,Directory收到通知後會重新創建服務列表,並把服務列表同步到路由鏈RouterChain,RouterChain再按順序通知其鏈上的Router,各個Router再進行緩存清除並重新進行路由結果的計算及進行緩存。相關時序圖如下所示:

圖片

4. 具體路由流程

進入具體路由方法時,先判斷是否存在緩存的路由值,且緩存值的epoch必須與上一個路由的epoch需一致,此時緩存才生效,然後緩存值與上個Router的結果取交集。

如果不存在緩存或epoch不一致則重新進行實時的路由計算。

圖片

引入epoch的原因主要是保證各個路由策略緩存信息的一致性,保證所有的緩存計算都是基於同一份原始數據。當實例信息發生變更時,epoch會自動進行更新。

5. BitMap引入

上文我們説到,不同的路由策略之間的結果是取交集的,然後最終的結果才送入負載均衡流程。那如何在緩存的同時,加快交集的計算呢。答案就是基於位圖:BitMap。

BitMap的基本原理就是用一個bit位來存放某種狀態,適用於大規模數據的查找及位運算操作。如在路由場景,先基於全量的推送數據進行計算緩存。如果某個實例被路由選中,則其值為1,若兩個路由的結果要取交集,那直接對BitMap進行"&"運行即可。

全量緩存示意圖:

圖片

路由交集計算示步驟:

按照路由鏈依次計算,tagRouter->vivoTag->vivoNearestRouter

(1)tagRouter計算邏輯:

  1. 按照Invocation計算出目標的Tag,假設是tag1
  2. 然後從緩存Cache根據key:tag1,取出對應的targetAddrPool
  3. 將原始傳入的addrPool與targetAddrPool,得到結果resultAddrPool
  4. 將resultAddrPool傳入vivoTagRouter

(2)vivoTag計算邏輯:

  1. 按照Invocation計算出目標的Tag,假設是tabB
  2. 然後從緩存Cache根據key:tag1,取出對應的targetAddrPool
  3. 將上一次傳入的addrPool與targetAddrPool,得到結果resultAddrPooll
  4. 將resultAddrPool傳入vivoNearestRouter

(3)vivoNearestRouter計算邏輯

  1. 從環境變量取出當前機房,假設是bj01
  2. 然後從緩存Cache根據key:bj01,取出對應的targetAddrPool
  3. 將上一次傳入的addrPool與targetAddrPool,取出resultAddrPool
  4. 將上一次傳入的addrPool與targetAddrPool,得到結果resultAddrPool
  5. 將resultAddrPool為最終路由結果,傳遞給LoadBalance

圖片

6. 基於緩存的同機房優先路由源碼解析

緩存刷新:

/**
     * Notify router chain of the initial addresses from registry at the first time.
     * Notify whenever addresses in registry change.
     */
    public void setInvokers(List<Invoker<T>> invokers) {
    // 創建帶epoch的BitList
        this.invokers = new BitList<Invoker<T>>(invokers == null ? Collections.emptyList() : invokers,createBitListEpoch());
        routers.forEach(router -> router.notify(this.invokers));
    }

同機房優先路由源碼解讀:

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL consumerUrl, Invocation invocation) throws RpcException {
        …………//省略非核心代碼
        BitList<Invoker<T>> bitList = (BitList<Invoker<T>>) invokers;
    //獲取路由結果
        BitList<Invoker<T>> result = getNearestInvokersWithCache(bitList);
        if (result.size() > 0) {
            if (fallback) {
                // 開啓服務降級,available.ratio = 當前機房可用服務節點數量 / 集羣可用服務節點數量
                int curAvailableRatio = (int) Math.floor(result.size() * 100.0d / invokers.size());
                if (curAvailableRatio <= availableRatio) {
                    return invokers;
                }
            }

            return result;
        } else if (force) {
            return result;
        } else {
            return invokers;
        }

    }   
   /**
     * 獲取緩存列表
     * @param invokers
     * @param <T>
     * @return
     */
    private <T> BitList<Invoker<T>> getNearestInvokersWithCache(BitList<Invoker<T>> invokers) {
        ValueWrapper valueWrapper = getCache(getSystemProperty(LOC));
        // 是否存在緩存
        if (valueWrapper != null) {
            BitList<Invoker<T>> invokerBitList = (BitList<Invoker<T>>) valueWrapper.get();
            // 緩存的epoch與源列表是否一致
            if (invokers.isSameEpoch(invokerBitList)) {
                BitList<Invoker<T>> tmp = invokers.clone();
                // 結果取交集
                return tmp.and(invokerBitList);
            }
        }
        // 緩存不存在 實時計算放回
        return getNearestInvokers(invokers);
    }
  
  /**
     * 新服務列表通知
     * @param invokers
     * @param <T>
     */
 @Override
    public <T> void notify(List<Invoker<T>> invokers) {
        clear();
        if (invokers != null && invokers instanceof BitList) {
            BitList<Invoker<T>> bitList = (BitList<Invoker<T>>) invokers;
      // 設置最後一次更新的服務列表
            lastNotify = bitList.clone();
            if (!CollectionUtils.isEmpty(invokers) && this.enabled) {
        // 獲取機房相同的服務列表並進行緩存
                setCache(getSystemProperty(LOC), getNearestInvokers(lastNotify));
            }
        }
    }

4.2 負載均衡優化

1. 優化一

針對getWeight方法,我們發現有部分業務邏輯較為消耗cpu,但是在大多數場景下業務方並不會使用到,於是進行優化。

getWeight方法優化:

優化前:
//這裏主要要用多註冊中心場景下,註冊中心權重的獲取,絕大多數情況下並不會有這個邏輯
 if (UrlUtils.isRegistryService(url)) {
           weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
       }  
優化後:
 if (invoker instanceof ClusterInvoker && UrlUtils.isRegistryService(url)) {
            weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
   }

2. 優化二

遍歷是罪惡的源泉,而實例的數量決定這罪惡的深淺,我們有什麼辦法減少負載均衡過程中的遍歷呢。一是根據group及version劃分不同的集羣,但是這需要涉及到業務方代碼或配置層面的改動,會帶來額外的成本。所以我們放棄了。

二是沒有什麼是加一層解決不了的問題,為了儘量減少進入負載均衡的節點數量,考慮新增一個墊底的路由策略,在走完所有的路由策略後,若節點數量>自定義數量後,進行虛擬分組,虛擬分組的策略也可進行自定義,然後隨機篩選一組進入負載均衡。此時進入負載均衡的實例數量就會有倍數的下降。

需要注意的是分組路由必須保證是在路由鏈的最後一環,否則會導致其他路由計算錯誤。

圖片

分組路由示意:

/**
     * 
     * @param invokers 待分組實例列表
     * @param groupNum 分組數量
     * @param <T>
     * @return
     */
    public <T> List<Invoker<T>> doGroup(List<Invoker<T>> invokers, int groupNum) {
        int listLength = invokers.size() / groupNum;
        List<Invoker<T>> result = new ArrayList<>(listLength);
        int random = ThreadLocalRandom.current().nextInt(groupNum);
        for (int i = random; i < invokers.size(); i = i + groupNum) {
            result.add(invokers.get(i));
        }
        return result;
    }

五、優化效果

針對優化前和優化後,我們編寫Demo工程分別壓測了不配置路由/配置就近+標籤路由場景。Provider節點梯度設置100/500/1000/2000/5000,TPS在1000左右,記錄了主機的cpu等性能指標,並打印火焰圖。發現,配置路由後,採用相同併發,優化後的版本tps明顯高於優化前版本,且新版本相較於沒有配置路由時tps顯著提高,下游節點數大於2000時,tps提升達到100%以上,下游節點數越多,AvgCpu優化效果越明顯,並且路由及負載均衡CPU佔比明顯更低,詳細數據可見下表:

圖片

圖片

備註:-tag,表示顯示禁用原生Dubbo應用級標籤路由。該路由默認開啓。

六、總結

經過我們關閉不必要的路由邏輯、對路由緩存+異步化計算、新增分組路由等優化後,Dubbo在負載均衡及路由模塊整體的性能有了顯著的提升,為業務方節省了不少CPU資源。在正常業務場景下當提供方數量達到2000及以上時,tps提升可達100%以上,消費方平均CPU使用率下降約27%,且提供方數量越多優化效果越明顯。但是我們也發現當前的隨機負載均衡依然還是會消耗一定的CPU資源,且只能保證流量是均衡的。當前我們的應用基本部署在虛擬機及容器上。這兩者均存在超賣的狀況,且同等配置的宿主機性能存在較大差異等問題。最終會導致部分請求超時、無法最大化利用提供方的資源。我們下一步將會引入Dubbo 3.2的自適應負載均衡並進行調優減少其CPU使用率波動較大的問題,其次我們自身也擴展了基於CPU負載均衡的單一因子算法,最終實現不同性能的機器CPU負載趨於均衡,最大程度發揮集羣整體的性能。

參考資料:

  1. Dubbo 負載均衡
  2. Dubbo 流量管控
  3. Dubbo 3 StateRouter:下一代微服務高效流量路由
user avatar shaochuancs 頭像 codingdgsun 頭像 chazhoudeqingchun 頭像 turing_interview 頭像 best_6455a509a2177 頭像 cyl173 頭像
6 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.