2. ResourceManager端
Client端通過YarnRunner.submitJob()將Application提交給了ResourceManager。
連接Client與ResourceManager的協議為ClientRMProtocol,該協議的實現類為ClientRMService。
1) ClientRMService.java
Client端與ResourceManager交互的所有操作最終都是由ClientRMService中的操作實現的。以submitApplication()為例。
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnRemoteException {
// 獲取Application的相關信息,包括上下文、id、用户
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
String user = submissionContext.getUser();
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
if (rmContext.getRMApps().get(applicationId) != null) {
throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!");
}
submissionContext.setUser(user);
// 通過上下文構造RMAppManagerSubmitEvent,並調用RMAppManger的handle方法進行處理
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user + " with " + submissionContext);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (IOException ie) {
...
}
SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
2) RMAppmanager.java
RMAppManager實現了EventHandler接口,代表該類是用於處理某種事件的
public void handle(RMAppManagerEvent event) {
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
// 由event.getType()可以看出,該類用於處理Application的提交和完成事件
switch(event.getType()) {
case APP_COMPLETED:
{
finishApplication(applicationId);
ApplicationSummary.logAppSummary(
rmContext.getRMApps().get(applicationId));
checkAppNumCompletedLimit();
}
break;
case APP_SUBMIT:
{
ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
// 這裏調用了submitApplication函數去向ResourceManager提交Job
submitApplication(submissionContext, submitTime);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
protected synchronized void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
// 從傳進來的context中獲取Application的相關參數,並對沒有賦值的參數添加默認值
String clientTokenStr = null;
...
...
// 存儲Application的相關信息用於在Application出錯或者掛掉時恢復
ApplicationStore appStore = rmContext.getApplicationsStore()
.createApplicationStore(submissionContext.getApplicationId(),
submissionContext);
// 創建ResourceManager用於封裝Application的RMAppImpl對象
application = new RMAppImpl(applicationId, rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, clientTokenStr,
appStore, this.scheduler,
this.masterService, submitTime);
// 判斷是否重複提交相同的Application
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.info(message);
throw RPCUtil.getRemoteException(message);
}
// 通知ACLsManager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
// 安全令牌
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId,parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete()
);
}
// 向AsyncDispatcher發送RMAppEventType.START事件,ApplicationEventDispatcher接到AsyncDispatcher分發來的事件並交由RMAppImpl處理
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START));
} catch (IOException ie) {
LOG.info("RMAppManager submit application exception", ie);
if (application != null) {
// 發送RMAppRejectedEvent事件
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
}
}
}
3) EventHandler
自此AsyncDispatcher將接管之後所有的事件分發,所有事件都將由AsyncDispatcher分發給對應的EventDispatcher。EventDispatcher會初始化處理該事件的類,並將事件交給創建的類來進行處理。以RMAppEventType.START事件為例,該類將分發給ApplicationEventDispatcher,然後由ApplicationEventDispatcher初始化RMApp的實現類RMAppImpl來處理。
public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
private final RMContext rmContext;
public ApplicationEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void handle(RMAppEvent event) {
ApplicationId appID = event.getApplicationId();
// 初始化處理對應事件的類
RMApp rmApp = this.rmContext.getRMApps().get(appID);
if (rmApp != null) {
try {
// 將事件交由對應類處理
rmApp.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appID, t);
}
}
}
}
RMAppImpl.java
public void handle(RMAppEvent event) {
// 為更新狀態機加鎖
this.writeLock.lock();
try {
ApplicationId appID = event.getApplicationId();
LOG.debug("Processing event for " + appID + " of type "
+ event.getType());
final RMAppState oldState = getState();
try {
// 由狀態機處理該事件
this.stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
...
}
} finally {
// 解鎖
this.writeLock.unlock();
}
}
狀態機的工作方式如下,以RMAppImpl中的狀態機為例
/* 泛型參數從左到右依次為
執行狀態變換的類、封裝狀態的類、封裝事件類型的類、封裝事件的類
構造函數參數為該狀態機的起始狀態 */
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
/* 添加狀態之間的變換以及變換時的需要進行的操作的封裝類
以下即表示狀態從RMAppState.NEW -> RMAppState.SUBMITTED
的觸發事件類型為RMAppEventType.START事件
需要執行的方法被封裝在StartAppAttemptTransition類中 */
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
RMAppEventType.START, new StartAppAttemptTransition())
...
...
// 構建狀態機
.installTopology();
// 封裝執行狀態變化所需的方法的類需要實現SingleArcTransition接口,以StartAppAttemptTransition為例
// 每個狀態機都會對應一個實現了SingleArcTransition接口的類,在這裏為RMAppTransition
// StartAppAttemptTransition通過繼承RMAppTransition並實現transition方法,在該方法中實現狀態變化的處理邏輯
private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
app.createNewAttempt();
};
}
至此,一個典型的由狀態機分發事件並進行處理的相關類介紹完畢。總結如下:
(1) AsyncDispatcher根據接收到的事件按它的類分發給相應的EventDispatcher
(2) EventDispatcher初始化處理該類事件的類A,並將事件傳遞給A
(3) A調用狀態機的doTransition方法確定該事件類型對應的狀態變化和封裝了需要執行的方法的類
(4) 實現了SingleArcTransition接口的類,將調用transition方法完成狀態變換
由於狀態機處理代碼大都相同,以下將以事件為標題來描述狀態變換和涉及的類和操作
4) RMAppEventType.START
EventDispatcher: ApplicationEventDispatcher
事件處理類: RMAppImpl
狀態更新: RMAppState.NEW -> RMAppState.SUBMITTED
所需操作: 創建RMAppAttemptImpl對象,初始化其狀態為RMAppAttemptState.NEW
觸發RMAppAttemptEventType.START事件
5) RMAppAttemptEventType.START
EventDispatcher: ApplicationAttemptEventDispatcher
事件處理類: RMAppAttemptImpl
狀態更新: RMAppAttemptState.NEW -> RMAppAttemptState.SUBMITTED
所需操作: 向ApplicationMasterService註冊該AppAttempt
觸發AppAddedSchedulerEvent事件
6) AppAddedSchedulerEvent
EventDispatcher: SchedulerEventDispatcher
事件處理類: FifoScheduler/CapacityScheduler
所需操作: 創建SchedulerApp對象
觸發RMAppAttemptEventType.APP_ACCEPTED事件
7) RMAppAttemptEventType.APP_ACCEPTED
EventDispatcher: ApplicationAttemptEventDispatcher
事件處理類: RMAppAttemptImpl
狀態更新: RMAppAttemptState.SUBMITTED -> RMAppAttemptState.SCHEDULED
所需操作: 調用ResourceScheduler的allocate函數,向ResourceManager申請運行 ApplicationMaster需要的Container
觸發RMAppEventType.APP_ACCEPTED事件
8) RMAppEventType.APP_ACCEPTED
EventDispatcher: ApplicationEventDispatcher
事件處理類: RMAppImpl
狀態更新: RMAppState.SUBMITTED -> RMAppState.ACCEPTED
9) 某個NodeManager向ResourceManager發送心跳
10) ResourceManager的ResourceTrackerService收到心跳信息後觸發封裝了 RMNodeEventType.STATUS_UPDATE的RMNodeStatusEvent事件
11) RMNodeEventType.STATUS_UPDATE
EventDispatcher: ApplicationEventDispatcher
事件處理類: RMNodeImpl
狀態更新: RMNodeState.RUNNING -> RMNodeState.RUNNING
所需操作: 更新節點的健康狀態
觸發NodeUpdateSchedulerEvent事件
12) NodeUpdateSchedulerEvent
EventDispatcher: SchedulerEventDispatcher
事件處理類: FifoScheduler/CapacityScheduler
所需操作: 創建SchedulerApp對象,調用assginContainers為該application分配一個 container,此時還未真正分配
觸發RMContainerEventType.START事件
13) RMContainerEventType.START
EventDispatcher: NodeEventDispatcher
事件處理類: RMContainerImpl
狀態更新: RMContainerState.NEW -> RMContainerState.ALLOCATED
所需操作: 觸發RMAppAttemptContainerAllocatedEvent
(RMAppAttemptEventType.CONTAINER_ALLOCATED)事件
14) RMAppAttemptEventType.CONTAINER_ALLOCATED
EventDispatcher: ApplicationAttemptEventDispatcher
事件處理類: RMAppAttemptImpl
狀態更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED
所需操作: 調用Scheduler的allocate函數申請一個container
觸發AMLauncherEventType.LAUNCH事件
15) AMLauncherEventType.LAUNCH
事件處理類: ApplicationMasterLauncher
狀態更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED
所需操作: 創建AMLauncher對象,並將其添加到隊列masterEvents中;
LauncherThread不斷從masterEvents取出,進行處理,並調用
AMLauncher.launch()函數;
AMLancher.launch()調用ContainerManager.startContainer()函數創建
container;
同時觸發RMAppAttemptEventType.LAUNCHED事件。
16) RMAppAttemptEventType.LAUNCHED
EventDispatcher: ApplicationAttemptEventDispatcher
事件處理類: RMAppAttemptImpl
狀態更新: RMAppAttemptState.ALLOCATED -> RMAppAttemptState.LAUNCHED
所需操作: 向AMLivelinessMonitor註冊,用於實時監控該Application的狀態
17) 第9)步中向ResourceManager發送心跳的NodeManager,調用
AMRMProtocol.registerApplicationMaster()向ApplicationMasterService進行註冊
18) ApplicationMasterService.registerApplicationMaster()
從request中獲取ApplicationAttempt的相關信息
觸發RMAppAttemptEventType.REGISTERED事件
返回給調用端response
19) RMAppAttemptEventType.REGISTERED
EventDispatcher: ApplicationAttemptEventDispatcher
事件處理類: RMAppAttemptImpl
狀態更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING
所需操作: 設置Application註冊後的信息,如運行的host、端口、TrackingURL等
觸發RMAppEventType.ATTEMPT_REGISTERED事件
20) 觸發RMAppEventType.REGISTERED
EventDispatcher: ApplicationAttemptEventDispatcher
事件處理類: RMAppAttemptImpl
狀態更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING
所需操作: 設置Application註冊後的信息,如運行的host、端口、TrackingURL等
觸發RMAppEventType.ATTEMPT_REGISTERED事件
21) RMAppEventType.ATTEMPT_REGISTERED
EventDispatcher: ApplicationEventDispatcher
事件處理類: RMAppImpl
狀態更新: RMAppState.ACCEPTED -> RMAppState.RUNNING
至此,ApplicationMaster(MRAppMaster)創建完畢,之後Application的運行將由ApplicationMaster(MRAppMaster)接管,它將負責向ResourceManager申請運行子任務所需的資源,監控子任務的運行狀態,並向ResourceManager彙報Application的運行狀態