Storm在集羣上運行一個Topology時,主要通過以下3個實體來完成Topology的執行工作:

1. Worker Process(工作進程)——Spout/Bolt中運行具體處理邏輯的進程

2. Executor(線程、執行器)——物理線程

3. Task(任務)——具體的處理邏輯對象


下圖簡要描述了這3者之間的關係:

storm work個數_storm work個數

  storm集羣的一個節點可能有一個或者多個工作進程(worker)運行在一個多個拓撲上,一個工作進程執行拓撲的一個子集。工作進程(worker)屬於一個特定的拓撲,並可能為這個拓撲的一個或者多個組件(spout/bolt)運行一個或多個執行器(executor線程)。一個運行中的拓撲包括多個運行在storm集羣內多個節點的進程。

  1個worker進程執行的是1個topology的子集(注:不會出現1個worker為多個topology服務)。1個worker進程會啓動1個或多個executor線程來執行1個topology的component(spout或bolt)。因此,1個運行中的topology就是由集羣中多台物理機上的多個worker進程組成的。

  executor是1個被worker進程啓動的單獨線程。每個executor只會運行1個topology的1個component(spout或bolt)的task(注:task可以是1個或多個,storm默認是1個component只生成1個task,executor線程裏會在每次循環裏順序調用所有task實例)。

  task是最終運行spout或bolt中代碼的單元(注:1個task即為spout或bolt的1個實例,executor線程在執行期間會調用該task的nextTuple或execute方法)。topology啓動後,1個component(spout或bolt)的task數目是固定不變的,但該component使用的executor線程數可以動態調整(例如:1個executor線程可以執行該component的1個或多個task實例)。這意味着,對於1個component存在這樣的條件:#threads<=#tasks(即:線程數小於等於task數目)。默認情況下task的數目等於executor線程數目,即1個executor線程只運行1個task。

 

配置拓撲的並行度:

1.工作進程的數量

工作進程的數量表示集羣中不同節點的拓撲可以創建多少個工作進程。

配置參數是:TOPOLOGY_WORKERS

也可以通過java API進行設置:

Config#setNumWorkers

2.執行器(線程)的數量

執行器的數量指的是每個組件產生多少個線程。

這個參數暫時只能通過java API進行配置:

TopologyBuilder#setSpout()
TopologyBuilder#setBolt()

3.任務的數量

任務的數量表示的是每個組件創建多少個任務。

配置選項:TOPOLOGY_TASKS

也可以通過java API進行配置:

ComponentConfigurationDeclarer#setNumTasks()
T setNumTasks(java.lang.Number val)

 

拓撲示例

下面我們定義一個名為mytopology的拓撲,由一個Spout組件(BlueSpout)、兩個Bolt組件(GreenBolt和YellowBolt)共三個組件構成,代碼如下:

1 Config conf = new Config();
 2 conf.setNumWorkers(2); 
 3 
 4 topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); 
 5 
 6 topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) 
 7                .setNumTasks(4) 
 8 .shuffleGrouping("blue-spout");
 9 
10 topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
11                .shuffleGrouping("green-bolt");
12 
13 StormSubmitter.submitTopology(
14         "mytopology",
15         conf,
16     topologyBuilder.createTopology()
17     );

mytopology拓撲的描述如下:

1、拓撲將使用兩個工作進程(Worker)。

2、Spout是id為“blue-spout”、並行度為2的BlueSpout實例(產生兩個執行器和兩個任務)。

3、第一個Bolt的id為"green-bolt"、並行度為2、任務數為4、使用隨機分組方式接收"blue-spout"所發射元組的GreenBolt實例(產生兩個執行器和4個任務)。

4、第二個Bolt是id為"yellow-bolt"、並行度為6、使用隨機分組方式接收"green-bolt"所發射元組的YellowBolt實例(產生6個執行器和6個任務)。

  綜上所述,該拓撲一共有兩個工作進程(Worker),2+2+6=10個執行器(Executor),2+4+6=12個任務。因此,每個工作進程可以分配到10/2=5個執行器,12/2=6個任務。默認情況下,一個執行器執行一個任務,但是如果指定了任務的數目,則任務會平均分配到執行器中,因此,GreenBolt的實例"green-bolt"的一個執行器將會分配到4/2個任務。

mytopology的拓撲及其對應的資源分配如下圖所示:


storm work個數_storm work個數_02

 

 

動態設置拓撲的併發度 

 Storm支持在不重啓topology的情況下,動態的改變(增減)worker process的數目和executor的數目,稱為rebalancing。有兩種方式可以實現拓撲的再平衡:

1、使用Storm Web UI

2、使用Storm rebalance命令(推薦使用)

使用命令行的方式如下: 

1 # 重新配置拓撲
2 # "mytopology" 拓撲使用5個Worker進程
3 # "blue-spout" Spout使用3個Executor
4 # "blue-spout" Bolt使用10個Executor
5 
6 # storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

注:"mytopology"是拓撲的名稱,"blue-spout"和"yellow-bolt"是組件的名稱。