2. ResourceManager端

Client端通過YarnRunner.submitJob()將Application提交給了ResourceManager。

連接Client與ResourceManager的協議為ClientRMProtocol,該協議的實現類為ClientRMService。

1) ClientRMService.java

Client端與ResourceManager交互的所有操作最終都是由ClientRMService中的操作實現的。以submitApplication()為例。

 

public SubmitApplicationResponse submitApplication(
			  SubmitApplicationRequest request) throws YarnRemoteException {
			// 獲取Application的相關信息,包括上下文、id、用户
			ApplicationSubmissionContext submissionContext = request
				.getApplicationSubmissionContext();
			ApplicationId applicationId = submissionContext.getApplicationId();
			String user = submissionContext.getUser();
			try {
			  user = UserGroupInformation.getCurrentUser().getShortUserName();
			  if (rmContext.getRMApps().get(applicationId) != null) {
				throw new IOException("Application with id " + applicationId
					+ " is already present! Cannot add a duplicate!");
			  }
			  submissionContext.setUser(user);
			// 通過上下文構造RMAppManagerSubmitEvent,並調用RMAppManger的handle方法進行處理
			  rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
				  .currentTimeMillis()));
			  LOG.info("Application with id " + applicationId.getId() + 
				  " submitted by user " + user + " with " + submissionContext);
			  RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
				  "ClientRMService", applicationId);
			} catch (IOException ie) {
			  ...
			}
			SubmitApplicationResponse response = recordFactory
				.newRecordInstance(SubmitApplicationResponse.class);
			return response;
		 }

 

  2) RMAppmanager.java

RMAppManager實現了EventHandler接口,代表該類是用於處理某種事件的

 

public void handle(RMAppManagerEvent event) {
			ApplicationId applicationId = event.getApplicationId();
			LOG.debug("RMAppManager processing event for " 
				+ applicationId + " of type " + event.getType());
			// 由event.getType()可以看出,該類用於處理Application的提交和完成事件
			switch(event.getType()) {
			  case APP_COMPLETED: 
			  {
				finishApplication(applicationId);
				ApplicationSummary.logAppSummary(
					rmContext.getRMApps().get(applicationId));
				checkAppNumCompletedLimit(); 
			  } 
			  break;
			  case APP_SUBMIT:
			  {
				ApplicationSubmissionContext submissionContext = 
					((RMAppManagerSubmitEvent)event).getSubmissionContext();
				long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
				// 這裏調用了submitApplication函數去向ResourceManager提交Job
				submitApplication(submissionContext, submitTime);
			  }
			  break;
			  default:
				LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
			  }
		  }
		  protected synchronized void submitApplication(
			  ApplicationSubmissionContext submissionContext, long submitTime) {
			ApplicationId applicationId = submissionContext.getApplicationId();
			RMApp application = null;
			try {
			// 從傳進來的context中獲取Application的相關參數,並對沒有賦值的參數添加默認值
			  String clientTokenStr = null;
			  ...
			  ...
			// 存儲Application的相關信息用於在Application出錯或者掛掉時恢復
			  ApplicationStore appStore = rmContext.getApplicationsStore()
				  .createApplicationStore(submissionContext.getApplicationId(),
				  submissionContext);
			// 創建ResourceManager用於封裝Application的RMAppImpl對象
			  application = new RMAppImpl(applicationId, rmContext,
				  this.conf, submissionContext.getApplicationName(), user,
				  submissionContext.getQueue(), submissionContext, clientTokenStr,
				  appStore, this.scheduler,
				  this.masterService, submitTime);
			// 判斷是否重複提交相同的Application
			  if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
				  null) {
				String message = "Application with id " + applicationId
					+ " is already present! Cannot add a duplicate!";
				LOG.info(message);
				throw RPCUtil.getRemoteException(message);
			  } 

			// 通知ACLsManager
			  this.applicationACLsManager.addApplication(applicationId,
				  submissionContext.getAMContainerSpec().getApplicationACLs());

			// 安全令牌
			  if (UserGroupInformation.isSecurityEnabled()) {
				this.rmContext.getDelegationTokenRenewer().addApplication(
					applicationId,parseCredentials(submissionContext),
					submissionContext.getCancelTokensWhenComplete()
					);
			  }      
			// 向AsyncDispatcher發送RMAppEventType.START事件,ApplicationEventDispatcher接到AsyncDispatcher分發來的事件並交由RMAppImpl處理
			  this.rmContext.getDispatcher().getEventHandler().handle(
				  new RMAppEvent(applicationId, RMAppEventType.START));
			} catch (IOException ie) {
				LOG.info("RMAppManager submit application exception", ie);
				if (application != null) {
				// 發送RMAppRejectedEvent事件
				  this.rmContext.getDispatcher().getEventHandler().handle(
					  new RMAppRejectedEvent(applicationId, ie.getMessage()));
				}
			}
		  }

 

3) EventHandler

自此AsyncDispatcher將接管之後所有的事件分發,所有事件都將由AsyncDispatcher分發給對應的EventDispatcher。EventDispatcher會初始化處理該事件的類,並將事件交給創建的類來進行處理。以RMAppEventType.START事件為例,該類將分發給ApplicationEventDispatcher,然後由ApplicationEventDispatcher初始化RMApp的實現類RMAppImpl來處理。

 

public static final class ApplicationEventDispatcher implements
			  EventHandler<RMAppEvent> {
			private final RMContext rmContext;
			public ApplicationEventDispatcher(RMContext rmContext) {
			  this.rmContext = rmContext;
			}
			@Override
			public void handle(RMAppEvent event) {
			  ApplicationId appID = event.getApplicationId();
			// 初始化處理對應事件的類
			  RMApp rmApp = this.rmContext.getRMApps().get(appID);
			  if (rmApp != null) {
				try {
				// 將事件交由對應類處理
				  rmApp.handle(event);
				} catch (Throwable t) {
				  LOG.error("Error in handling event type " + event.getType()
					  + " for application " + appID, t);
				}
			  }
			}
		  }

 

 RMAppImpl.java

 

public void handle(RMAppEvent event) {
				// 為更新狀態機加鎖
				this.writeLock.lock();
				try {
				  ApplicationId appID = event.getApplicationId();
				  LOG.debug("Processing event for " + appID + " of type "
					  + event.getType());
				  final RMAppState oldState = getState();
				  try {
					// 由狀態機處理該事件
					this.stateMachine.doTransition(event.getType(), event);
				  } catch (InvalidStateTransitonException e) {
					...
				  }
				} finally {
				// 解鎖
				  this.writeLock.unlock();
				}
			  }

 

狀態機的工作方式如下,以RMAppImpl中的狀態機為例

 

/* 泛型參數從左到右依次為
			執行狀態變換的類、封裝狀態的類、封裝事件類型的類、封裝事件的類
		   構造函數參數為該狀態機的起始狀態 */
		  private static final StateMachineFactory<RMAppImpl,
								   RMAppState,
								   RMAppEventType,
								   RMAppEvent> stateMachineFactory
					   = new StateMachineFactory<RMAppImpl,
								   RMAppState,
								   RMAppEventType,
								   RMAppEvent>(RMAppState.NEW)
						/* 添加狀態之間的變換以及變換時的需要進行的操作的封裝類
						   以下即表示狀態從RMAppState.NEW -> RMAppState.SUBMITTED
						   的觸發事件類型為RMAppEventType.START事件
						   需要執行的方法被封裝在StartAppAttemptTransition類中 */
						.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
							RMAppEventType.START, new StartAppAttemptTransition())
						...
						...
						// 構建狀態機
						.installTopology();
		// 封裝執行狀態變化所需的方法的類需要實現SingleArcTransition接口,以StartAppAttemptTransition為例
		// 每個狀態機都會對應一個實現了SingleArcTransition接口的類,在這裏為RMAppTransition
		// StartAppAttemptTransition通過繼承RMAppTransition並實現transition方法,在該方法中實現狀態變化的處理邏輯
		  private static final class StartAppAttemptTransition extends RMAppTransition {
			public void transition(RMAppImpl app, RMAppEvent event) {
			  app.createNewAttempt();
			};
		  }

 

至此,一個典型的由狀態機分發事件並進行處理的相關類介紹完畢。總結如下:

(1) AsyncDispatcher根據接收到的事件按它的類分發給相應的EventDispatcher

(2) EventDispatcher初始化處理該類事件的類A,並將事件傳遞給A

(3) A調用狀態機的doTransition方法確定該事件類型對應的狀態變化和封裝了需要執行的方法的類

(4) 實現了SingleArcTransition接口的類,將調用transition方法完成狀態變換

由於狀態機處理代碼大都相同,以下將以事件為標題來描述狀態變換和涉及的類和操作

4) RMAppEventType.START

EventDispatcher: ApplicationEventDispatcher

事件處理類: RMAppImpl

狀態更新: RMAppState.NEW -> RMAppState.SUBMITTED

所需操作: 創建RMAppAttemptImpl對象,初始化其狀態為RMAppAttemptState.NEW

觸發RMAppAttemptEventType.START事件

5) RMAppAttemptEventType.START

EventDispatcher: ApplicationAttemptEventDispatcher

事件處理類: RMAppAttemptImpl

狀態更新: RMAppAttemptState.NEW -> RMAppAttemptState.SUBMITTED

所需操作: 向ApplicationMasterService註冊該AppAttempt

觸發AppAddedSchedulerEvent事件

6) AppAddedSchedulerEvent

EventDispatcher: SchedulerEventDispatcher

事件處理類: FifoScheduler/CapacityScheduler

所需操作: 創建SchedulerApp對象

觸發RMAppAttemptEventType.APP_ACCEPTED事件

7) RMAppAttemptEventType.APP_ACCEPTED

EventDispatcher: ApplicationAttemptEventDispatcher

事件處理類: RMAppAttemptImpl

狀態更新: RMAppAttemptState.SUBMITTED -> RMAppAttemptState.SCHEDULED

所需操作: 調用ResourceScheduler的allocate函數,向ResourceManager申請運行                                                             ApplicationMaster需要的Container

觸發RMAppEventType.APP_ACCEPTED事件

8) RMAppEventType.APP_ACCEPTED

EventDispatcher: ApplicationEventDispatcher

事件處理類: RMAppImpl

狀態更新: RMAppState.SUBMITTED -> RMAppState.ACCEPTED

9) 某個NodeManager向ResourceManager發送心跳

      10) ResourceManager的ResourceTrackerService收到心跳信息後觸發封裝了                                                            RMNodeEventType.STATUS_UPDATE的RMNodeStatusEvent事件

      11) RMNodeEventType.STATUS_UPDATE

EventDispatcher: ApplicationEventDispatcher

事件處理類: RMNodeImpl

狀態更新: RMNodeState.RUNNING -> RMNodeState.RUNNING

所需操作: 更新節點的健康狀態

觸發NodeUpdateSchedulerEvent事件

      12) NodeUpdateSchedulerEvent

EventDispatcher: SchedulerEventDispatcher

事件處理類: FifoScheduler/CapacityScheduler

所需操作: 創建SchedulerApp對象,調用assginContainers為該application分配一個                                                         container,此時還未真正分配

觸發RMContainerEventType.START事件

      13) RMContainerEventType.START

EventDispatcher: NodeEventDispatcher

事件處理類: RMContainerImpl

狀態更新: RMContainerState.NEW -> RMContainerState.ALLOCATED

所需操作: 觸發RMAppAttemptContainerAllocatedEvent

                                                (RMAppAttemptEventType.CONTAINER_ALLOCATED)事件

      14) RMAppAttemptEventType.CONTAINER_ALLOCATED

EventDispatcher: ApplicationAttemptEventDispatcher

事件處理類: RMAppAttemptImpl

狀態更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作: 調用Scheduler的allocate函數申請一個container

觸發AMLauncherEventType.LAUNCH事件

      15) AMLauncherEventType.LAUNCH

事件處理類: ApplicationMasterLauncher

狀態更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作: 創建AMLauncher對象,並將其添加到隊列masterEvents中;

LauncherThread不斷從masterEvents取出,進行處理,並調用

                                                AMLauncher.launch()函數;

AMLancher.launch()調用ContainerManager.startContainer()函數創建

                                                container;

同時觸發RMAppAttemptEventType.LAUNCHED事件。

      16) RMAppAttemptEventType.LAUNCHED

EventDispatcher: ApplicationAttemptEventDispatcher

事件處理類: RMAppAttemptImpl

狀態更新: RMAppAttemptState.ALLOCATED -> RMAppAttemptState.LAUNCHED

所需操作: 向AMLivelinessMonitor註冊,用於實時監控該Application的狀態

      17) 第9)步中向ResourceManager發送心跳的NodeManager,調用

             AMRMProtocol.registerApplicationMaster()向ApplicationMasterService進行註冊

      18) ApplicationMasterService.registerApplicationMaster()

從request中獲取ApplicationAttempt的相關信息

觸發RMAppAttemptEventType.REGISTERED事件

返回給調用端response

      19) RMAppAttemptEventType.REGISTERED

EventDispatcher: ApplicationAttemptEventDispatcher

事件處理類: RMAppAttemptImpl

狀態更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作: 設置Application註冊後的信息,如運行的host、端口、TrackingURL等

觸發RMAppEventType.ATTEMPT_REGISTERED事件

      20) 觸發RMAppEventType.REGISTERED

EventDispatcher: ApplicationAttemptEventDispatcher

事件處理類: RMAppAttemptImpl

狀態更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作: 設置Application註冊後的信息,如運行的host、端口、TrackingURL等

觸發RMAppEventType.ATTEMPT_REGISTERED事件

      21) RMAppEventType.ATTEMPT_REGISTERED

EventDispatcher: ApplicationEventDispatcher

事件處理類: RMAppImpl

狀態更新: RMAppState.ACCEPTED -> RMAppState.RUNNING   

至此,ApplicationMaster(MRAppMaster)創建完畢,之後Application的運行將由ApplicationMaster(MRAppMaster)接管,它將負責向ResourceManager申請運行子任務所需的資源,監控子任務的運行狀態,並向ResourceManager彙報Application的運行狀態