Hadoop核心源码解析——Yarn源码解析

YARN工作机制,如下图所示:

大数据培训Hadoop核心源码解析——Yarn源码解析

 

1、Yarn客户端创建并提交

1)在wordcount程序的驱动类中点击

Job.java

boolean result = job.waitForCompletion(true);
 
public boolean waitForCompletion(boolean verbose
                                 ) throwsIOException, InterruptedException,
                                         ClassNotFoundException {
  if (state ==JobState.DEFINE) {
    submit();
  }
  if (verbose) {
   monitorAndPrintJob();
  } else {
    // get thecompletion poll interval from the client.
    int completionPollIntervalMillis=
     Job.getCompletionPollInterval(cluster.getConf());
    while(!isComplete()) {
      try {
       Thread.sleep(completionPollIntervalMillis);
      } catch(InterruptedException ie) {
      }
    }
  }
  returnisSuccessful();
}
 
public void submit()
       throwsIOException, InterruptedException, ClassNotFoundException {
 ensureState(JobState.DEFINE);
 setUseNewAPI();
  connect();
  final JobSubmitter submitter =
     getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  status =ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException,
   ClassNotFoundException {
      return submitter.submitJobInternal(Job.this, cluster);
    }
  });
  state =JobState.RUNNING;
 LOG.info("The url to track the job: " + getTrackingURL());
 }

点击submitJobInternal()

JobSubmitter.java

JobStatus submitJobInternal(Job job, Cluster cluster)
  throws ClassNotFoundException, InterruptedException, IOException {
  ... ...
  status =submitClient.submitJob(
          jobId,submitJobDir.toString(), job.getCredentials());
  ... ...
}
 
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException;

2)创建提交环境

ctrl + alt +B 查找submitJob实现类,YARNRunner.java

public JobStatus submitJob(JobID jobId, String jobSubmitDir,Credentials ts)
throws IOException, InterruptedException {
 
 addHistoryToken(ts);
  // 创建提交环境:
 ApplicationSubmissionContext appContext =
    createApplicationSubmissionContext(conf, jobSubmitDir, ts);
 
  // Submit toResourceManager
  try {
    // 向RM提交一个应用程序,appContext里面封装了启动mrappMaster和运行container的命令
   ApplicationId applicationId =
       resMgrDelegate.submitApplication(appContext);
       
    // 获取提交响应
   ApplicationReport appMaster = resMgrDelegate
       .getApplicationReport(applicationId);
       
    Stringdiagnostics =
       (appMaster == null ?
           "application report is null" : appMaster.getDiagnostics());
    if(appMaster == null
        || appMaster.getYarnApplicationState() ==YarnApplicationState.FAILED
        ||appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
      throw newIOException("Failed to run job : " +
         diagnostics);
    }
    return clientCache.getClient(jobId).getJobStatus(jobId);
  } catch(YarnException e) {
    throw newIOException(e);
  }
}
public ApplicationSubmissionContext createApplicationSubmissionContext(
   Configuration jobConf, String jobSubmitDir, Credentials ts)
    throws IOException{
  ApplicationId applicationId = resMgrDelegate.getApplicationId();
 
  // SetupLocalResources
  // 封装了本地资源相关路径
  Map<String,LocalResource> localResources =
      setupLocalResources(jobConf, jobSubmitDir);
 
  // Setupsecurity tokens
  DataOutputBuffer dob = new DataOutputBuffer();
 ts.writeTokenStorageToStream(dob);
  ByteBuffersecurityTokens =
     ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 
  // SetupContainerLaunchContext for AM container
  // 封装了启动mrappMaster和运行container的命令
  List<String> vargs = setupAMCommand(jobConf);
 ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
      jobConf,localResources, securityTokens, vargs);
 
  ... ...
 
  return appContext;
}
 
private List<String> setupAMCommand(Configuration jobConf) {
 List<String> vargs = new ArrayList<>(8);
  // Java进程启动命令开始
 vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
      + "/bin/java");
 
  Path amTmpDir=
      new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
         YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
 vargs.add("-Djava.io.tmpdir=" + amTmpDir);
 MRApps.addLog4jSystemProperties(null, vargs, conf);
 
  // Check forJava Lib Path usage in MAP and REDUCE configs
 warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
     "map",
     MRJobConfig.MAP_JAVA_OPTS,
     MRJobConfig.MAP_ENV);
 warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
     "map",
     MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
     MRJobConfig.MAPRED_ADMIN_USER_ENV);
 warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
     "reduce",
     MRJobConfig.REDUCE_JAVA_OPTS,
     MRJobConfig.REDUCE_ENV);
 warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
      "reduce",
     MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
     MRJobConfig.MAPRED_ADMIN_USER_ENV);
 
  // Add AMadmin command opts before user command opts
  // so that itcan be overridden by user
  String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
     MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
 warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
     MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
 vargs.add(mrAppMasterAdminOptions);
 
  // Add AM usercommand opts 用户命令参数
  String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
     MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
 warnForJavaLibPath(mrAppMasterUserOptions, "app master",
     MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
 vargs.add(mrAppMasterUserOptions);
 
  if(jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
     MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
    final StringprofileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
       MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
    if(profileParams != null) {
     vargs.add(String.format(profileParams,
         ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
              +TaskLog.LogName.PROFILE));
    }
  }
 
  // 封装了要启动的mrappmaster全类名
  // org.apache.hadoop.mapreduce.v2.app.MRAppMaster
  vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR+
     Path.SEPARATOR + ApplicationConstants.STDOUT);
 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR+
     Path.SEPARATOR + ApplicationConstants.STDERR);
  return vargs;
}

3)向Yarn提交

点击submitJob方法中的submitApplication()

YARNRunner.java

ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
 
public ApplicationId
    submitApplication(ApplicationSubmissionContext appContext)
        throws YarnException, IOException {
  return client.submitApplication(appContext);
}

ctrl + alt +B 查找submitApplication实现类,YarnClientImpl.java

public ApplicationId
    submitApplication(ApplicationSubmissionContextappContext)
        throws YarnException, IOException {
  ApplicationId applicationId = appContext.getApplicationId();
  if(applicationId == null) {
    throw new ApplicationIdNotProvidedException(
       "ApplicationId is not provided inApplicationSubmissionContext");
  }
  // 创建一个提交请求
 SubmitApplicationRequest request =
     Records.newRecord(SubmitApplicationRequest.class);
  request.setApplicationSubmissionContext(appContext);
  ... ...
 
  //TODO:YARN-1763:Handle RM failovers during the submitApplication call.
  // 继续提交,实现类是ApplicationClientProtocolPBClientImpl
  rmClient.submitApplication(request);
 
  int pollCount= 0;
  long startTime= System.currentTimeMillis();
 EnumSet<YarnApplicationState> waitingStates =
                              EnumSet.of(YarnApplicationState.NEW,
                              YarnApplicationState.NEW_SAVING,
                               YarnApplicationState.SUBMITTED);
 EnumSet<YarnApplicationState> failToSubmitStates =
                                EnumSet.of(YarnApplicationState.FAILED,
                               YarnApplicationState.KILLED);      
  while (true) {
    try {
      // 获取提交给Yarn的反馈
     ApplicationReport appReport = getApplicationReport(applicationId);
      YarnApplicationState state =appReport.getYarnApplicationState();
      ... ...
    } catch(ApplicationNotFoundException ex) {
      //FailOver or RM restart happens before RMStateStore saves
      //ApplicationState
     LOG.info("Re-submit application " + applicationId + "withthe " +
         "same ApplicationSubmissionContext");
      // 如果提交失败,则再次提交
     rmClient.submitApplication(request);
    }
  }
 
  return applicationId;
}

ctrl + alt +B 查找submitApplication实现类,
ApplicationClientProtocolPBClientImpl.java

public SubmitApplicationResponse submitApplication(
   SubmitApplicationRequest request) throws YarnException,
    IOException{
 
 SubmitApplicationRequestProto requestProto =
     ((SubmitApplicationRequestPBImpl) request).getProto();
  try {
 
// 使用RM代理对象提交一个应用程序,其实是发送一个Java指令,bin/java jar jar包 org.apache.hadoop.mapreduce.v2.app.MRAppMaster 参数
    return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
     requestProto));
 
  } catch(ServiceException e) {
   RPCUtil.unwrapAndThrowException(e);
    return null;
  }
}
public class ApplicationClientProtocolPBClientImplimplements ApplicationClientProtocol,
    Closeable {
 
  private ApplicationClientProtocolPB proxy;
 
  public ApplicationClientProtocolPBClientImpl(long clientVersion,
     InetSocketAddress addr, Configuration conf) throws IOException {
   RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
     ProtobufRpcEngine.class);
 
    proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
  }
  ... ...
}

2、启动MRAppMaster

0)在pom.xml中增加如下依赖

<dependency>
   <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-app</artifactId>
    <version>3.1.3</version>
</dependency>

ctrl +n 查找MRAppMaster,搜索main方法

public static void main(String[] args) {
  try {
    ... ...
 
       // 初始化一个container
    ContainerId containerId =ContainerId.fromString(containerIdStr);
    ApplicationAttemptId applicationAttemptId =
        containerId.getApplicationAttemptId();
    if (applicationAttemptId != null) {
      CallerContext.setCurrent(new CallerContext.Builder(
          "mr_appmaster_" +applicationAttemptId.toString()).build());
    }
    long appSubmitTime =Long.parseLong(appSubmitTimeStr);
   
    // 创建appMaster对象
    MRAppMaster appMaster =
        new MRAppMaster(applicationAttemptId,containerId, nodeHostString,
            Integer.parseInt(nodePortString),
           Integer.parseInt(nodeHttpPortString), appSubmitTime);
    ... ...
      
       // 初始化并启动AppMaster
    initAndStartAppMaster(appMaster, conf,jobUserName);
  } catch (Throwable t) {
    LOG.error("Error startingMRAppMaster", t);
    ExitUtil.terminate(1, t);
  }
}
 
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
    final JobConf conf, String jobUserName)throws IOException,
    InterruptedException {
  ... ...
  conf.getCredentials().addAll(credentials);
  appMasterUgi.doAs(newPrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
        // 初始化
     appMaster.init(conf);
        // 启动
     appMaster.start();
      if(appMaster.errorHappenedShutDown) {
        throw new IOException("Was askedto shut down.");
      }
      return null;
    }
  });
}
 
public void init(Configuration conf) {
  ... ...
  synchronized (stateChangeLock) {
    if (enterState(STATE.INITED) !=STATE.INITED) {
      setConfig(conf);
      try {
          // 调用MRAppMaster中的serviceInit()方法
        serviceInit(config);
        if (isInState(STATE.INITED)) {
          //if the service ended up here duringinit,
          //notify the listeners
               // 如果初始化完成,通知监听器
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG,this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

ctrl + alt +B 查找serviceInit实现类,MRAppMaster.java

protected void serviceInit(final Configuration conf) throws Exception {
  ... ...
  // 创建提交路径
  clientService = createClientService(context);
 
  // 创建调度器
  clientService.init(conf);
 
  // 创建job提交RPC客户端
  containerAllocator =createContainerAllocator(clientService, context);
  ... ...
}

点击MRAppMaster.java 中的initAndStartAppMaster 方法中的appMaster.start();

public void start() {
  if (isInState(STATE.STARTED)) {
    return;
  }
  //enter the started state
  synchronized (stateChangeLock) {
    if (stateModel.enterState(STATE.STARTED) !=STATE.STARTED) {
      try {
        startTime = System.currentTimeMillis();
              // 调用MRAppMaster中的serviceStart()方法
        serviceStart();
        if (isInState(STATE.STARTED)) {
          //if the service started (and isn'tnow in a later state), notify
          LOG.debug("Service {} isstarted", getName());
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG,this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}
 
protected void serviceStart() throws Exception {
  ... ...
  if (initFailed) {
    JobEvent initFailedEvent = newJobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
    jobEventDispatcher.handle(initFailedEvent);
  } else {
    // All components have started, start thejob.
       // 初始化成功后,提交Job到队列中
    startJobs();
  }
}
 
protected void startJobs() {
  /** create a job-start event to get this ballrolling */
  JobEvent startJobEvent = newJobStartEvent(job.getID(),
      recoveredJobStartTime);
  /** send the job-start event. this triggersthe job execution. */
  // 这里将job存放到yarn队列
  // dispatcher = AsyncDispatcher
  // getEventHandler()返回的是GenericEventHandler
  dispatcher.getEventHandler().handle(startJobEvent);
}

ctrl + alt +B 查找handle实现类,GenericEventHandler.java

class GenericEventHandler implements EventHandler<Event> {
  public void handle(Event event) {
    ... ...
    try {
        // 将job存储到yarn队列中
      eventQueue.put(event);
    } catch (InterruptedException e) {
      ... ...
    }
  };
}

3、ResourceManager启动

0)在pom.xml中增加如下依赖

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
    <version>3.1.3</version>
</dependency>

ctrl +n 查找ResourceManager,搜索main方法

public static void main(String argv[]) {
  Thread.setDefaultUncaughtExceptionHandler(newYarnUncaughtExceptionHandler());
 StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
  try {
    Configuration conf = new YarnConfiguration();
    GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
    argv = hParser.getRemainingArgs();
    // If -format-state-store, then deleteRMStateStore; else startup normally
    if(argv.length >= 1) {
      if (argv[0].equals("-format-state-store")){
        deleteRMStateStore(conf);
      } else if(argv[0].equals("-remove-application-from-state-store")
          && argv.length == 2) {
        removeApplication(conf, argv[1]);
      } else {
        printUsage(System.err);
      }
    } else {
        // 创建RM对象
      ResourceManager resourceManager = new ResourceManager();
     ShutdownHookManager.get().addShutdownHook(
        new CompositeServiceShutdownHook(resourceManager),
        SHUTDOWN_HOOK_PRIORITY);
        // 初始化RM,修改用户指定的配置
      resourceManager.init(conf);
        // 启动RM
      resourceManager.start();
    }
  } catch (Throwable t) {
    LOG.fatal("Error startingResourceManager", t);
    System.exit(-1);
  }
}
 
public void init(Configuration conf) {
  ... ...
  synchronized (stateChangeLock) {
    if (enterState(STATE.INITED) !=STATE.INITED) {
      setConfig(conf);
      try {
          // 调用ResourceManager中的serviceInit()方法
        serviceInit(config);
        if (isInState(STATE.INITED)) {
          //if the service ended up here duringinit,
          //notify the listeners 通知监听器
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG,this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}
 
protected void serviceInit(Configuration conf) throws Exception {
  this.conf = conf;
  UserGroupInformation.setConfiguration(conf);
  this.rmContext = new RMContextImpl();
  rmContext.setResourceManager(this);
 
  this.configurationProvider =
     ConfigurationProviderFactory.getConfigurationProvider(conf);
  this.configurationProvider.init(this.conf);
 rmContext.setConfigurationProvider(configurationProvider);
 
  // load core-site.xml
  loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
 
  // Do refreshSuperUserGroupsConfigurationwith loaded core-site.xml
  // Or use RM specific configurations tooverwrite the common ones first
  // if they exist
  RMServerUtils.processRMProxyUsersConf(conf);
 ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);
 
  // load yarn-site.xml
 loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
 
  validateConfigs(this.conf);
 
  // Set HA configuration should be done beforelogin
 this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
  if (this.rmContext.isHAEnabled()) {
   HAUtil.verifyAndSetConfiguration(this.conf);
  }
 
  // Set UGI and do login
  // If security is enabled, use login user
  // If security is not enabled, use currentuser
  this.rmLoginUGI =UserGroupInformation.getCurrentUser();
  try {
    doSecureLogin();
  } catch(IOException ie) {
    throw new YarnRuntimeException("Failedto login", ie);
  }
 
  // register the handlers for all AlwaysOnservices using setupDispatcher().
  // 这里验证了mrAppMaster是将job提交到了yarn管理的调度器,等待后续调度
  // 这里会在任务开始执行时,首先启动一个进程yarnChild,用来启动任务
 rmDispatcher = setupDispatcher();
  addIfService(rmDispatcher);
 rmContext.setDispatcher(rmDispatcher);
 
  // The order of services below should not bechanged as services will be
  // started in same order
  // As elector service needs admin service tobe initialized and started,
  // first we add admin service then electorservice
 
  adminService = createAdminService();
  addService(adminService);
  rmContext.setRMAdminService(adminService);
 
  // elector must be added post adminservice
  if (this.rmContext.isHAEnabled()) {
    // If the RM is configured to use anembedded leader elector,
    // initialize the leader elector.
    if (HAUtil.isAutomaticFailoverEnabled(conf)
        &&HAUtil.isAutomaticFailoverEmbedded(conf)) {
      EmbeddedElector elector =createEmbeddedElector();
      addIfService(elector);
     rmContext.setLeaderElectorService(elector);
    }
  }
 
  rmContext.setYarnConfiguration(conf);
 
  createAndInitActiveServices(false);
 
  webAppAddress =WebAppUtils.getWebAppBindURL(this.conf,
                    YarnConfiguration.RM_BIND_HOST,
                   WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
 
  RMApplicationHistoryWriter rmApplicationHistoryWriter =
      createRMApplicationHistoryWriter();
  addService(rmApplicationHistoryWriter);
 rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
 
  // initialize the RM timeline collector firstso that the system metrics
  // publisher can bind to it
  if (YarnConfiguration.timelineServiceV2Enabled(this.conf)){
    RMTimelineCollectorManager timelineCollectorManager =
        createRMTimelineCollectorManager();
    addService(timelineCollectorManager);
   rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
  }
 
  SystemMetricsPublisher systemMetricsPublisher=
      createSystemMetricsPublisher();
  addIfService(systemMetricsPublisher);
 rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
 
  registerMXBean();
 
  super.serviceInit(this.conf);
}

4、YarnChild启动

1)启动MapTask

ctrl +n 查找YarnChild,搜索main方法

public static void main(String[] args) throwsThrowable {
  Thread.setDefaultUncaughtExceptionHandler(new  YarnUncaughtExceptionHandler());
  LOG.debug("Child starting");
 
  ... ...
 
  task = myTask.getTask();
  YarnChild.taskid = task.getTaskID();
  ... ...
 
  // Create a final reference to the task forthe doAs block
  final Task taskFinal = task;
  childUGI.doAs(new PrivilegedExceptionAction<Object>(){
      @Override
      public Object run() throws Exception {
        // use job-specified working directory
       setEncryptedSpillKeyIfRequired(taskFinal);
        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
              // 调用task执行(maptask或者reducetask)
        taskFinal.run(job, umbilical); // run the task
        return null;
      }
    });
  }
  ... ...
}

ctrl + alt +B 查找run实现类,maptask.java

public void run(final JobConf job, final TaskUmbilicalProtocolumbilical)
  throws IOException, ClassNotFoundException,InterruptedException {
  this.umbilical = umbilical;
 
  // 判断是否是MapTask
  if (isMapTask()) {
    // If there are no reducers then therewon't be any sort. Hence the map
    // phase will govern the entire attempt'sprogress.
       // 如果reducetask个数为零,maptask占用整个任务的100%
    if (conf.getNumReduceTasks() == 0) {
      mapPhase = getProgress().addPhase("map",1.0f);
    } else {
      // If there are reducers then the entireattempt's progress will be
      // split between the map phase (67%) andthe sort phase (33%).
        // 如果reduceTask个数不为零,MapTask占用整个任务的66.7% sort阶段占比
      mapPhase = getProgress().addPhase("map",0.667f);
      sortPhase = getProgress().addPhase("sort", 0.333f);
    }
  }
  ... ...
  if (useNewApi) {
    // 调用新的API执行maptask
    runNewMapper(job,splitMetaInfo, umbilical, reporter);
  } else {
    runOldMapper(job, splitMetaInfo, umbilical,reporter);
  }
  done(umbilical, reporter);
}
 
void runNewMapper(final JobConf job,
                  final TaskSplitIndexsplitIndex,
                  final TaskUmbilicalProtocolumbilical,
                  TaskReporter reporter
                  ) throws IOException,ClassNotFoundException,
                           InterruptedException{
  ... ...
 
  try {
    input.initialize(split,mapperContext);
       // 运行maptask
    mapper.run(mapperContext);
      
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    input = null;
    output.close(mapperContext);
    output = null;
  } finally {
    closeQuietly(input);
    closeQuietly(output, mapperContext);
  }
}

Mapper.java(和Map联系在一起)

public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(),context.getCurrentValue(), context);
    }
  } finally {
    cleanup(context);
  }
}

2)启动ReduceTask

在YarnChild.java类中的main方法中ctrl + alt +B 查找run实现类,reducetask.java

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, InterruptedException,ClassNotFoundException {
  job.setBoolean(JobContext.SKIP_RECORDS,isSkipping());
 
  ... ...
 
  if (useNewApi) {
       // 调用新API执行reduce
    runNewReducer(job, umbilical,reporter, rIter, comparator,
                  keyClass, valueClass);
  } else {
    runOldReducer(job, umbilical, reporter,rIter, comparator,
                  keyClass, valueClass);
  }
 
  shuffleConsumerPlugin.close();
  done(umbilical, reporter);
}
 
void runNewReducer(JobConf job,
                   final TaskUmbilicalProtocolumbilical,
                   final TaskReporter reporter,
                   RawKeyValueIterator rIter,
                   RawComparator<INKEY>comparator,
                   Class<INKEY> keyClass,
                   Class<INVALUE> valueClass
                   ) throwsIOException,InterruptedException,
                           ClassNotFoundException {
  ... ...
  try {
    // 调用reducetask的run方法
    reducer.run(reducerContext);
  } finally {
    trackedRW.close(reducerContext);
  }
}

Reduce.java

public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  try {
    while (context.nextKey()) {
      reduce(context.getCurrentKey(),context.getValues(), context);
      // If a back up store is used, reset it
      Iterator<VALUEIN> iter =context.getValues().iterator();
      if(iter instanceofReduceContext.ValueIterator) {
        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       
      }
    }
  } finally {
    cleanup(context);
  }
}

想要了解跟多关于

大数据培训

课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。