博客 / 詳情

返回

DolphinScheduler的高可靠性高擴展性設計實現

Apache DolphinScheduler介紹

Apache DolphinScheduler 是一個分佈式易擴展的可視化DAG工作流任務調度開源系統。適用於企業級場景,提供了一個可視化操作任務、工作流和全生命週期數據處理過程的解決方案。
要適應於企業級場景,兩個特色必不可少

High Reliability

高可靠性: 去中心化設計,確保穩定性。 原生 HA 任務隊列支持,提供過載容錯能力。 DolphinScheduler 能提供高度穩健的環境。

High Scalability

高擴展性: 支持多租户和在線資源管理。支持每天10萬個數據任務的穩定運行。

中心化和去中心化

中心化思想

中心化的設計理念比較簡單,分佈式集羣中的節點按照角色分工,大體上分為兩種角色

Master

Master的角色主要負責任務分發並監督Worker的健康狀態,可以動態的將任務均衡到Worker上,以致Worker節點不至於“忙死”或”閒死”的狀態。

Worker

Worker的角色主要負責任務的執行工作並維護和Master的心跳,以便Master可以分配任務給Worker。
image.png

中心化思想設計存在的問題:
一旦Master出現了問題,則羣龍無首,整個集羣就會崩潰。為了解決這個問題,大多數Master/Slave架構模式都採用了主備Master的設計方案,可以是熱備或者冷備,也可以是自動切換或手動切換,而且越來越多的新系統都開始具備自動選舉切換Master的能力,以提升系統的可用性。
另外一個問題是如果Scheduler在Master上,雖然可以支持一個DAG中不同的任務運行在不同的機器上,但是會產生Master的過負載。如果Scheduler在Slave上,則一個DAG中所有的任務都只能在某一台機器上進行作業提交,則並行任務比較多的時候,Slave的壓力可能會比較大。

去中心化

image.png

  1. 在去中心化設計裏,通常沒有Master/Slave的概念,所有的角色都是一樣的,地位是平等的,全球互聯網就是一個典型的去中心化的分佈式系統,聯網的任意節點設備down機,都只會影響很小範圍的功能。
  2. 去中心化設計的核心設計在於整個分佈式系統中不存在一個區別於其他節點的”管理者”,因此不存在單點故障問題。但由於不存在” 管理者”節點所以每個節點都需要跟其他節點通信才得到必須要的機器信息,而分佈式系統通信的不可靠性,則大大增加了上述功能的實現難度。
  3. 實際上,真正去中心化的分佈式系統並不多見。反而動態中心化分佈式系統正在不斷涌出。在這種架構下,集羣中的管理者是被動態選擇出來的,而不是預置的,並且集羣在發生故障的時候,集羣的節點會自發的舉行"會議"來選舉新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go語言實現的Etcd。
  4. DolphinScheduler的去中心化是Master/Worker註冊心跳到Zookeeper中,Master基於slot處理各自的Command,通過selector分發任務給worker,實現Master集羣和Worker集羣無中心。

Master Server去中心化

去中心化的設計理念是Token Ring,一組節點提供服務,每個節點被分配一個slot,所有的節點組成一個收尾相顧的圓環,對於一個待分配的任務,對任務id進行hash取模,模因子為slot num,模值就是要分配的任務節點索引

在dolphin scheduler 提供了3中動態發現的Registry機制,分別是

  1. EtcdRegistry
  2. JdbcRegistry
  3. ZookeeperRegistry

具體使用哪個,可以根據不同的配置激活

zookeeper
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")

etcd
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "etcd")

jdbc
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")

master節點的slot獲取,需要三個核心的service

MasterRegistryClient

提供了心跳機制,心跳信息中包含了機器的狀態

  1. busy
  2. normal
    當心跳發生變更時,會過濾掉busy狀態的節點,這樣對過載的機器起到了保護的作用
    判斷機器是否過載的標準的閾值為0.7
    private double maxCpuUsagePercentageThresholds = 0.7;

    private double maxJVMMemoryUsagePercentageThresholds = 0.7;

    private double maxSystemMemoryUsagePercentageThresholds = 0.7;

    private double maxDiskUsagePercentageThresholds = 0.7;

同時監聽master和worker的狀態,當master或者worker server down掉後,執行failover後續處理

ServerNodeManager

ServerNodeManager維護了兩個重要的listener list, workerInfoChangeListeners和masterInfoChangeListeners,所有對master和worker節點狀態的信息都可以使用這個接口。

ServerNodeManager是一個工具性service,不參與具體的業務邏輯。

ServerNodeManager同時也會對master和worker的狀態變化進行重要監控,可以修改
全局報警組
default admin warning group
global alert group
這樣後續的master或者worker節點down掉,就能夠及時發現報警

看最新版只對機器down掉進行通知,可以改一下源代碼,增加對master和worker的恢復進行通知

MasterSlotManager

MasterSlotManager是維護slot的核心service
這包含兩個核心變量,這兩個變量是維護去中心化設計實現的核心抓手

    private volatile int currentSlot = 0;
    private volatile int totalSlot = 0;

這兩個變量的獲取也是非常簡單的,拿到所有的master節點,過濾掉busy狀態的節點
設置totalSlot為狀態為normal的 master節點數量
設置currentSlot為當前機器所在的索引
源代碼

private void syncMasterNodes(List<Server> masterNodes) {
            slotLock.lock();
            try {
                this.masterPriorityQueue.clear();
                this.masterPriorityQueue.putAll(masterNodes);
                int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
                int tempTotalSlot = masterNodes.size();
                if (tempCurrentSlot < 0) {
                    totalSlot = 0;
                    currentSlot = 0;
                    log.warn("Current master is not in active master list");
                } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
                    totalSlot = tempTotalSlot;
                    currentSlot = tempCurrentSlot;
                    log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
                }
            } finally {
                slotLock.unlock();
            }
        }

slot管理流程如下

image.png

這個流程有一個小小的bug

        // remove before persist
        registryClient.remove(masterRegistryPath);
        registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));

        while (!registryClient.checkNodeExists(NetUtils.getHost(), RegistryNodeType.MASTER)) {
            log.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
        }

如果persistEphemeral失敗,會一直sleep在這裏,不過這種情況貌似是不是很少出現,要不官方也會解決這個問題。

master輪訓執行任務

DolphinScheduler使用mysql表t_ds_command作為分佈式任務隊列,每個master節點輪訓請求t_ds_command獲取待執行的任務

poll from the queue 語句為

<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit}
    </select>

看起來,是優先執行優先級高的任務,sql語句中thisMasterSlot是本機在list<master>中的索引
masterCount為正常的master數量

t_ds_command建表語句為

CREATE TABLE `t_ds_command` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
  `command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start workflow, 1 start execution from current node, 2 resume fault-tolerant workflow, 3 resume pause process, 4 start execution from failed node, 5 complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
  `process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
  `process_definition_version` int(11) DEFAULT '0' COMMENT 'process definition version',
  `process_instance_id` int(11) DEFAULT '0' COMMENT 'process instance id',
  `command_param` text COLLATE utf8_bin COMMENT 'json command parameters',
  `task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0 current node, 1 forward, 2 backward',
  `failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1 continue',
  `warning_type` tinyint(4) DEFAULT '0' COMMENT 'Alarm type: 0 is not sent, 1 process is sent successfully, 2 process is sent failed, 3 process is sent successfully and all failures are sent',
  `warning_group_id` int(11) DEFAULT NULL COMMENT 'warning group',
  `schedule_time` datetime DEFAULT NULL COMMENT 'schedule time',
  `start_time` datetime DEFAULT NULL COMMENT 'start time',
  `executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
  `update_time` datetime DEFAULT NULL COMMENT 'update time',
  `process_instance_priority` int(11) DEFAULT '2' COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
  `worker_group` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT 'worker group',
  `tenant_code` varchar(64) COLLATE utf8_bin DEFAULT 'default' COMMENT 'tenant code',
  `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
  `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag?0 normal, 1 dry run',
  `test_flag` tinyint(4) DEFAULT NULL COMMENT 'test flag?0 normal, 1 test run',
  PRIMARY KEY (`id`),
  KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8 COLLATE=utf8_bin 

command裏面也有一個資源控制字段worker_group,

任務組主要用於控制任務實例併發,旨在控制其他資源的壓力(也可以控制 Hadoop 集羣壓力,不過集羣會有隊列管控)。您可在新建任務定義時,可配置對應的任務組,並配置任務在任務組內運行的優先級。用户僅能查看有權限的項目對應的任務組,且僅能創建或修改具有寫權限的項目對應的任務組。

注意:任務組的對資源的限制是在項目級別的,和租户沒有關係

關於任務組的配置,您需要做的只需要配置紅色框內的部分,其中:

【任務組名稱】:任務組配置頁面顯示的任務組名稱,這裏只能看到該項目有權限的任務組(新建任務組時選擇了該項目),或作用在全局的任務組(新建任務組時沒有選擇項目)

【組內優先級】:在出現等待資源時,優先級高的任務會最先被 master 分發給 worker 執行,該部分數值越大,優先級越高。

任務組的實現邏輯
獲取任務組資源:
Master 在分發任務時判斷該任務是否配置了任務組,如果任務沒有配置,則正常拋給 worker 運行;如果配置了任務組,在拋給 worker 執行之前檢查任務組資源池剩餘大小是否滿足當前任務運行,如果滿足資源池 -1,繼續運行;如果不滿足則退出任務分發,等待其他任務結束喚醒。

釋放與喚醒:
當獲取到任務組資源的任務結束運行後,會釋放任務組資源,釋放後會檢查當前任務組是否有任務等待,如果有則標記優先級最好的任務可以運行,並新建一個可以執行的event。該event中存儲着被標記可以獲取資源的任務id,隨後在獲取任務組資源然後運行。

任務組執行流程
image.png

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.