上一篇关于作业的受理中,RM端通过RMAppImpl类表示作业,通过调度策略将作业存储后尝试运行该作业,这个时候调度器将为作业分配容器。容器代表着整个系统的资源在RM端的分配形式。 allocate (),其实只是从这个列表中收揽已经分配的容器(RMContainer 对象),并为这些容器办理使用证件( NMToken 对象),然后一并打包成一个Alloc ation 对象。每当从 newlyAllocatedContainers 列表中收揽到一个容器,就向这容器(更确切地说是该容器的状态机)发出一个 RMContainerEventType.ACQUIRED 事件。
1. 心跳推送
yarn\server\nodemanager\NodeStatusUpdaterImpl.java
”球“ 滚到这我们不得不考察一下容器怎么来的。在NM端心跳推送是通过NodeStatusUpdaterImpl周期性的收集本节点的资源信息,并向RM推送统计信息。
public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater {
//RPC协议
private ResourceTracker resourceTracker;
//随服务启动
protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartbeatID = 0;
while (!isStopped) { //一直循环
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
//获取节点状态
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
//生成请求
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat);
//发送请求
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response);
//根据命令清除不再需要运行的容器和应用
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
// 更新资源状态
Resource newResource = response.getResource();
if (newResource != null) {
updateNMResource(newResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Node's resource is updated to " +
newResource.toString());
}
}
} catch (ConnectException e) {
......
}
}
}
}
yarn\server\resourcemanager\ResourceTrackerService.java
在RM端维护了一个 ResourceTrackerService 来接收心跳推送。RM 节点上对于本集群内所有已注册的 NM 节点都维持着一个 RMNodeImpl 对象,每当接收到来自一个节点的心跳报告,就根据报告的内容更新相应 RMNodeImpl 对象的内容,并向其状态机发送一个 STATUS _ UPDATE 事件,然后通过 Hadoop 的 RPC 机制发回一个对心跳报告的回应。跟心跳报告相似,在回应中也可搭载一些信息和命令,主要是关于清除不再需要运行的容器和应用。
public class ResourceTrackerService extends AbstractService implements ResourceTracker {
//接收NM心跳推送
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
//取节点状态
NodeStatus remoteNodeStatus = request.getNodeStatus();
//节点ID
NodeId nodeId = remoteNodeStatus.getNodeId();
// 1. 空节点验证
if (!this.nodesListManager.isValidNode(nodeId.getHost())
&& !isNodeInDecommissioning(nodeId)) {
return YarnServerBuilderUtils.newNodeHeartbeatResponse(
NodeAction.SHUTDOWN, message);
}
// 2. 注册节点验证
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
message);
}
// Send ping
this.nmLivelinessMonitor.receivedPing(nodeId);
// 3. 节点心跳是否重复
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (getNextResponseId(
remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
.getResponseId()) {
return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
.getResponseId()) {
// TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
message);
}
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
//更新RMAppImpl的信息
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
//其他内容响应
populateKeys(request, nodeHeartBeatResponse);
ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
rmContext.getSystemCredentialsForApps();
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
// 4.更新RMNode状态 RMNodeEventType. STATUS _ UPDATE,驱动RMNodeImpl
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request
.getLogAggregationReportsForApps());
}
this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
// 5. 更新 RM 的 NodeLabelManager.
if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
try {
updateNodeLabelsFromNMReport(
NodeLabelsUtils.convertToStringSet(request.getNodeLabels()),
nodeId);
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
//ensure the error message is captured and sent across in response
nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage());
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false);
}
}
// 6. 校验该节点的配置资源容量
String nid = nodeId.toString();
Resource capability = loadNodeResourceFromDRConfiguration(nid);
// sync back with new resource if not null.
if (capability != null) {
nodeHeartBeatResponse.setResource(capability);
}
return nodeHeartBeatResponse;
}
}
yarn\server\resourcemanager\rmnode\RMNodeImpl.java
节点健康状态更新
public static class StatusUpdateWhenHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
NodeHealthStatus remoteNodeHealthStatus =
statusEvent.getNodeHealthStatus();
//设置节点健康报告
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
rmNode.setAggregatedContainersUtilization(
statusEvent.getAggregatedContainersUtilization());
rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
NodeState initialState = rmNode.getState();
......
// 如果这个节点已经不健康了
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
// if a node in decommissioning receives an unhealthy report,
// it will stay in decommissioning.
if (isNodeDecommissioning) {
return NodeState.DECOMMISSIONING;
} else {
reportNodeUnusable(rmNode, NodeState.UNHEALTHY);
return NodeState.UNHEALTHY;
}
}
rmNode.handleContainerStatus(statusEvent.getContainers());
rmNode.handleReportedIncreasedContainers(
statusEvent.getNMReportedIncreasedContainers());
List<LogAggregationReport> logAggregationReportsForApps =
statusEvent.getLogAggregationReportsForApps();
if (logAggregationReportsForApps != null
&& !logAggregationReportsForApps.isEmpty()) {
rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
}
//设置下一次心跳
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
//驱动调度器
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode));
}
.......
return initialState;
}
}
yarn\server\resourcemanager\scheduler\fifo\FifoScheduler.java
容器周转
public class FifoScheduler extends AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements Configurable {
//节点更新
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
//来自前面的 rmNode.nodeUpdateQueue
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
//包括新发起和已结束这两种容器
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// 对这两种容器分别加以处理
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// 对于每个已结束的容器(会有资源释放)
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
node.releaseContainer(containerId, true);
}
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
// 分配容器
assignContainers(node);
}
updateAvailableResourcesMetrics();
}
}
yarn\server\resourcemanager\scheduler\AbstractYarnScheduler.java
驱动容器
public abstract class AbstractYarnScheduler <T extends SchedulerApplicationAttempt,
N extends SchedulerNode> extends AbstractService implements ResourceScheduler {
protected synchronized void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
// Get the application for the finished container
SchedulerApplicationAttempt application = getCurrentAttemptForContainer(containerId);
if (application == null) {
// 使该容器的状态机受 LAUNCHED 事件的触发
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
node.containerStarted(containerId);
}
}
2. 容器分配
yarn\server\nodemanager\containermanager\container\Container.java
public interface Container extends EventHandler<ContainerEvent> { ...... }
yarn\server\nodemanager\containermanager\container\ContainerImpl.java
public class ContainerImpl implements Container {
private final Dispatcher dispatcher; //状态机派发器
private final NMStateStoreService stateStore; //NM状态存储服务
private final Credentials credentials; //安全认证
private final NodeManagerMetrics metrics; //NM管理矩阵
private final ContainerLaunchContext launchContext; //容器交互上下文
private final ContainerTokenIdentifier containerTokenIdentifier; //容器令牌标识
private final ContainerId containerId; //容器ID
private volatile Resource resource; //资源
//本地资源请求
private final Map<LocalResourceRequest,List<String>> pendingResources =
new HashMap<LocalResourceRequest,List<String>>();
//已经本地化的资源
private final Map<Path,List<String>> localizedResources =
new HashMap<Path,List<String>>();
//公共资源
private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<LocalResourceRequest>();
//私有资源
private final List<LocalResourceRequest> privateRsrcs =
new ArrayList<LocalResourceRequest>();
//App 资源
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
......
}
app\src\main\java\org\apache\hadoop\mapreduce\v2\app\rm\RMContainerRequestor.java
实际存在的集群中,节点是有地域之分的,因为至少每台机器都安装在某个机架上,同一个机架上有好几台机器,它们之间的通信效率可能高一些。另外,更重要的是,一般而言应该把一个应用的容器分配在哪里,让它在哪里运行,要看它的数据在哪里,这就是“计算跟着数据走”的原则。所以, App 的资源需求一般都是有地域要求的,Hadoop 为此定义了三种地域(Locality )类型,即 NODE _ LOCAL 、RACK _ LOCAL 和 OFF _ SWITCH 。 App 在其资源要求中可 以 给 定 节 点 名 ( NodeName )或 机 架 名 ( RackName )。
static class ContainerRequest {
final TaskAttemptId attemptID;
final Resource capability; //要求什么资源
final String[] hosts; //要求在哪一些节点机上
final String[] racks; //要求在哪一些机架上
yarn\server\resourcemanager\scheduler\fifo\FifoScheduler.java
private void assignContainers(FiCaSchedulerNode node) {
//对于调度器中的每一个 App ,试图满足其对于容器的要求
for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications.entrySet()) {
//获取该 App 的当前 AppAttempt 对象
FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
application.showRequests();
synchronized (application) {
//如果这个 App 不适合放在这个节点上,已将本节点列入其黑名单,就跳过
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
continue;
}
//对于同一 App 不同优先级别的容器要求
for (Priority priority : application.getPriorities()) {
//( Node ,Rack ,或 OffSwitch )中最多能为本 App 分配多少个容器
int maxContainers = getMaxAllocatableContainers(application, priority, node,
NodeType.OFF_SWITCH);
if (maxContainers > 0) {
//先尝试满足其指定需要在这个节点上的容器要求
int assignedContainers = assignContainersOnNode(node, application, priority);
......
}
}
}
// 已完成对同一 App 所有优先级别资源要求的扫描, 资源不足就结束循环
if (Resources.lessThan(resourceCalculator, clusterResource,
node.getAvailableResource(), minimumAllocation)) {
break;
}
}
// 已完成对所有已受理 App 的扫描
for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
FiCaSchedulerApp attempt = (FiCaSchedulerApp) application.getCurrentAppAttempt();
if (attempt == null) {
continue;
}
//修改各个 FiCaSchedulerApp 的可用资源上限
updateAppHeadRoom(attempt);
}
}
yarn\server\resourcemanager\scheduler\fifo\FifoScheduler.java
private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, int assignableContainers,
ResourceRequest request, NodeType type) {
//这是具体 App 所要求的资源
Resource capability = request.getCapability();
//从内存资源角度计算该节点上能容纳几份这样的任务
int availableContainers =
(int) (node.getAvailableResource().getMemorySize() / capability
.getMemorySize());
//需求和可能,取其小者
int assignedContainers =
Math.min(assignableContainers, availableContainers);
//逐一分配这些容器
if (assignedContainers > 0) {
for (int i=0; i < assignedContainers; ++i) {
NodeId nodeId = node.getRMNode().getNodeID();
//生成一个 ID
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
// 创建一个容器,即 ContainerPBImpl 对象
Container container =
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
// Allocate!
......
}
}
return assignedContainers;
}
yarn\server\resourcemanager\rmcontainer\RMContainerImpl.java
public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
//初始跳转
addTransition ( RMContainerState.NEW , RMContainerState.ALLOCATED ,
RMContainerEventType.START , newContainerStartedTransition ())
private static final class ContainerStartedTransition extends
BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
container.eventHandler.handle(new RMAppAttemptEvent(
container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
}
}
}
当前作业分配到一个容器以后,RMAppAttempt收状态机驱动,开始准备想AM发射任务
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
//跳转规则
addTransition ( RMAppAttemptState.SCHEDULED , EnumSet.of ( RMAppAttemptState.ALLOCATED _ SAVING ,
RMAppAttemptState.SCHEDULED ),RMAppAttemptEventType.CONTAINER _ ALLOCATED ,
newAMContainerAllocatedTransition ())
private static final class AMContainerAllocatedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
//再次从Scheduler中为AM分容器
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
null, null, null);
//要是还不行,就创建一个线程,让它过一会就试一下
if (amContainerAllocation.getContainers().size() == 0) {
appAttempt.retryFetchingAMContainer(appAttempt);
return RMAppAttemptState.SCHEDULED;
}
// amContainerAllocation 中至少有一个容器,可以往前走了
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0));
RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
.getRMContainer(appAttempt.getMasterContainer().getId());
rmMasterContainer.setAMContainer(true);
appAttempt.rmContext.getNMTokenSecretManager()
.clearNodeSetForAttempt(appAttempt.applicationAttemptId);
appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource());
appAttempt.storeAttempt(); //发出并处理 STORE _ APP _ ATTEMPT 事件
return RMAppAttemptState.ALLOCATED_SAVING;
}
}
//跳转规则
addTransition ( RMAppAttemptState.ALLOCATED _ SAVING ,RMAppAttemptState.ALLOCATED ,
RMAppAttemptEventType.ATTEMPT _ NEW _ SAVED ,new AttemptStoredTransition ())
//准备发射
private static final class AttemptStoredTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
// AMLauncherEventType.LAUNCH
appAttempt.registerClientToken();
appAttempt.launchAttempt();
}
}
}
yarn\server\resourcemanager\amlauncher\ApplicationMasterLauncher.java
发射架由一个线程池循环取发射任务,并加载AMLauncher
public class ApplicationMasterLauncher extends AbstractService implements EventHandler<AMLauncherEvent> {
public synchronized void handle(AMLauncherEvent appEvent) {
AMLauncherEventType event = appEvent.getType();
RMAppAttempt application = appEvent.getAppAttempt();
switch (event) {
case LAUNCH:
launch(application);
break;
case CLEANUP:
cleanup(application);
break;
default:
break;
}
}
//发射任务
private void launch(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH);
masterEvents.add(launcher);
}
//由一个线程来循环发射
private class LauncherThread extends Thread {
public LauncherThread() {
super("ApplicationMaster Launcher");
}
@Override
public void run() {
while (!this.isInterrupted()) {
Runnable toLaunch;
try {
//从队列中取出,这里的 toLaunch 就是前面挂入队列的 AMLauncher
toLaunch = masterEvents.take();
//将线程池中的一个线程用于 AMLauncher ,执行 AMLauncher.run ()
launcherPool.execute(toLaunch);
} catch (InterruptedException e) {
LOG.warn(this.getClass().getName() + " interrupted. Returning.");
return;
}
}
}
}
}
事实上,这个 AMLauncher 线程将把一个包含着主容器的 ContainerLaunchContext 发送到所指派的某个节点上,由那个节点上的 NodeManager 在其宿主机上发起一个 JVM 进程,以执行 ContainerLaunchContext 中给定的命令行。而 ContainerLaunchContext 中的这个命令行,则来自客户端提交的 ApplicationSubmissionContext。