重磅!Flink源码解析环境准备及提交流程之任务提交流程

发布时间:2021年10月13日作者:atguigu浏览次数:1,174

Flink Yarn-per-job模式提交流程如图所示:

重磅!Flink源码解析环境准备及提交流程之任务提交流程

 

1、启动Yarn客户端

AbstractJobClusterExecutor.java

public CompletableFuture<JobClient> execute(@Nonnull finalPipeline pipeline, @Nonnull final Configuration configuration) throws Exception{
         final JobGraph jobGraph =ExecutorUtils.getJobGraph(pipeline, configuration);
         // 创建并启动yarn客户端
         try (finalClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)){
                  final ExecutionConfigAccessorconfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
                  // 获取集群配置参数
                  final ClusterSpecificationclusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
                  // 部署集群
                  final ClusterClientProvider<ClusterID> clusterClientProvider =clusterDescriptor
                                   .deployJobCluster(clusterSpecification,jobGraph, configAccessor.getDetachedMode());
                  LOG.info("Job has beensubmitted with JobID " + jobGraph.getJobID());
 
                  return CompletableFuture.completedFuture(
                                   newClusterClientJobClientAdapter<>(clusterClientProvider,jobGraph.getJobID()));
         }
}

YarnClusterClientFactory.java

public YarnClusterDescriptor createClusterDescriptor(Configuration configuration){
... ...
         return getClusterDescriptor(configuration);
}
 
private YarnClusterDescriptor getClusterDescriptor(Configurationconfiguration) {
         final YarnClient yarnClient = YarnClient.createYarnClient();
         final YarnConfigurationyarnConfiguration = new YarnConfiguration();
 
         yarnClient.init(yarnConfiguration);
         yarnClient.start();
 
         return new YarnClusterDescriptor(
                          configuration,
                          yarnConfiguration,
                          yarnClient,
                          YarnClientYarnClusterInformationRetriever.create(yarnClient),
                          false);
}

2、获取集群配置参数

AbstractContainerizedClusterClientFactory.java

public ClusterSpecification getClusterSpecification(Configurationconfiguration) {
... ...
         final int jobManagerMemoryMB =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                  configuration,
                  JobManagerOptions.TOTAL_PROCESS_MEMORY)
         .getTotalProcessMemorySize()
         .getMebiBytes();
 
         final int taskManagerMemoryMB = TaskExecutorProcessUtils
.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                          configuration,TaskManagerOptions.TOTAL_PROCESS_MEMORY))
                  .getTotalProcessMemorySize()
                  .getMebiBytes();
 
         int slotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
         return newClusterSpecification.ClusterSpecificationBuilder()
                  .setMasterMemoryMB(jobManagerMemoryMB)
                  .setTaskManagerMemoryMB(taskManagerMemoryMB)
                  .setSlotsPerTaskManager(slotsPerTaskManager)
                  .createClusterSpecification();
}

3、部署集群

YarnClusterDescriptor.java

public ClusterClientProvider<ApplicationId> deployJobCluster(
         ClusterSpecification clusterSpecification,
         JobGraph jobGraph,
         boolean detached) throws ClusterDeploymentException{
         try {
                  return deployInternal(
                          clusterSpecification,
                          "Flink per-jobcluster",
                          getYarnJobClusterEntrypoint(),  //获取YarnJobClusterEntrypoint,启动AM的入口
                          jobGraph,
                          detached);
         } catch (Exception e) {
                  throw new ClusterDeploymentException("Couldnot deploy Yarn job cluster.", e);
         }
}

上传 jar 包和配置文件到 HDFS

YarnClusterDescriptor.java

private ClusterClientProvider<ApplicationId> deployInternal(
                  ClusterSpecificationclusterSpecification,
                  String applicationName,
                  String yarnClusterEntrypoint,
                  @Nullable JobGraph jobGraph,
                  boolean detached) throwsException {
... ...
// 创建应用
         final YarnClientApplicationyarnApplication = yarnClient.createApplication();
... ...
         ApplicationReport report = startAppMaster(
                          flinkConfiguration,
                          applicationName,
                          yarnClusterEntrypoint,
                          jobGraph,
                          yarnClient,
                          yarnApplication,
                          validClusterSpecification);
... ...
}
private ApplicationReport startAppMaster(
                  Configuration configuration,
                  String applicationName,
                  String yarnClusterEntrypoint,
                  JobGraph jobGraph,
                  YarnClient yarnClient,
                  YarnClientApplicationyarnApplication,
                  ClusterSpecificationclusterSpecification) throws Exception {
... ...
         // 初始化文件系统(HDFS)
         final FileSystem fs = FileSystem.get(yarnConfiguration);
... ...
ApplicationSubmissionContextappContext =yarnApplication.getApplicationSubmissionContext();
 
final List<Path> providedLibDirs = getRemoteSharedPaths(configuration);
// 上传文件的工具类
final YarnApplicationFileUploader fileUploader=YarnApplicationFileUploader.from(
         fs,
         fs.getHomeDirectory(),
         providedLibDirs,
         appContext.getApplicationId(),
         getFileReplication());
... ...
         final ApplicationId appId =appContext.getApplicationId();
... ...
         if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)){
                  // yarn重试次数,默认2
                  appContext.setMaxAppAttempts(
                                   configuration.getInteger(
                                                     YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                                                     YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
 
                  activateHighAvailabilitySupport(appContext);
         } else {
                  //不是高可用重试次数为1
                  appContext.setMaxAppAttempts(
                                   configuration.getInteger(
                                                     YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                                                     1));
         }
... ...
        
         // 多次调用上传HDFS的方法,分别是:
         // => systemShipFiles:日志的配置文件、lib/目录下除了dist的jar包
         // => shipOnlyFiles:plugins/目录下的文件
         // => userJarFiles:用户代码的jar包
fileUploader.registerMultipleLocalResources (... ...);
... ...
         // 上传和配置ApplicationMaster的jar包:flink-dist*.jar
         final YarnLocalResourceDescriptorlocalResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);
... ...
//
fileUploader.registerSingleLocalResource(
                                            jobGraphFilename,
                                            newPath(tmpJobGraphFile.toURI()),
                                            "",
                                            true,
                                            false);
... ...
         // 上传flink配置文件
         String flinkConfigKey ="flink-conf.yaml";
         Path remotePathConf = setupSingleLocalResource(
                  flinkConfigKey,
                  fs,
                  appId,
                  newPath(tmpConfigurationFile.getAbsolutePath()),
                  localResources,
                  homeDir,
                  "");
... ...
         // 将JobGraph写入tmp文件并添加到本地资源,并上传到HDFS
         fileUploader.registerSingleLocalResource(
                  jobGraphFilename,
                  newPath(tmpJobGraphFile.toURI()),
                  "",
                  true,
                  false);
... ...
// 上传flink配置文件
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
         flinkConfigKey,
         newPath(tmpConfigurationFile.getAbsolutePath()),
         "",
         true,
         true);
... ...
final JobManagerProcessSpec processSpec =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                          flinkConfiguration,
                          JobManagerOptions.TOTAL_PROCESS_MEMORY);
         //封装启动AM container的Java命令
         final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
                          yarnClusterEntrypoint,
                          hasKrb5,
                          processSpec);
... ... 
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType!= null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
... ...
         yarnClient.submitApplication(appContext);
... ... 
}

封装 AM 参数和命令

YarnClusterDescriptor.java

ContainerLaunchContext setupApplicationMasterContainer(
                  String yarnClusterEntrypoint,
                  boolean hasKrb5,
                  JobManagerProcessSpec processSpec) {
        
         // respect custom JVM options in theYAML file
         String javaOpts =flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
         if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length()> 0) {
                  javaOpts += " " +flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
         }
         //applicable only for YarnMiniClustersecure test run
         //krb5.conf file will be available aslocal resource in JM/TM container
         if (hasKrb5) {
                  javaOpts += "-Djava.security.krb5.conf=krb5.conf";
         }
 
         // 创建AM的容器启动上下文
         ContainerLaunchContext amContainer =Records.newRecord(ContainerLaunchContext.class);
 
         final Map<String, String> startCommandValues = new HashMap<>();
         startCommandValues.put("java","$JAVA_HOME/bin/java");
 
         String jvmHeapMem =JobManagerProcessUtils.generateJvmParametersStr(processSpec,flinkConfiguration);
         startCommandValues.put("jvmmem", jvmHeapMem);
 
         startCommandValues.put("jvmopts", javaOpts);
         startCommandValues.put("logging",YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
 
         startCommandValues.put("class",yarnClusterEntrypoint);
         startCommandValues.put("redirects",
                  "1> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
                  "2> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
         startCommandValues.put("args", "");
 
         final String commandTemplate =flinkConfiguration
                          .getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
                                            ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
         final String amCommand =
                  BootstrapTools.getStartCommand(commandTemplate,startCommandValues);
 
         amContainer.setCommands(Collections.singletonList(amCommand));
 
         LOG.debug("Application Masterstart command: " + amCommand);
 
         return amContainer;
}

封装 AM 参数:

private ApplicationReport startAppMaster(
                  Configuration configuration,
                  String applicationName,
                  String yarnClusterEntrypoint,
                  JobGraph jobGraph,
                  YarnClient yarnClient,
                  YarnClientApplicationyarnApplication,
                  ClusterSpecificationclusterSpecification) throws Exception {
 
                  ... ...
                  final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
                                   yarnClusterEntrypoint,
                                   hasKrb5,
                                   processSpec);
                  ... ...
                  // 封装AM 的classpath和环境参数
                  final Map<String,String> appMasterEnv = new HashMap<>();
                  // set user specified appmaster environment variables
                  appMasterEnv.putAll(
                  ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,configuration));
                  // set Flink app class path
                  appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,classPathBuilder.toString());
 
                  // set Flink on YARN internalconfiguration values
                  appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR,localResourceDescFlinkJar.toString());
                  appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
                  appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,fileUploader.getHomeDir().toString());
                  appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));
                  appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
                  appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString());
 
                  //https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
                  appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,UserGroupInformation.getCurrentUser().getUserName());
 
                  if (localizedKeytabPath !=null) {
                          appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
                          String principal =configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
                          appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
                          if (remotePathKeytab!= null) {
                                   appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,remotePathKeytab.toString());
                          }
                  }
 
                  //To support Yarn SecureIntegration Test Scenario
                  if (remoteYarnSiteXmlPath !=null) {
                          appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH,remoteYarnSiteXmlPath.toString());
                  }
                  if (remoteKrb5Path != null) {
                          appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
                  }
 
                  // set classpath from YARNconfiguration
                  Utils.setupYarnClassPath(yarnConfiguration,appMasterEnv);           
                  //设置 AM 参数
                  amContainer.setEnvironment(appMasterEnv);
                  ... ...
                  yarnClient.submitApplication(appContext);
... ...  
}

4、提交应用

YarnClientImpl.java

public ApplicationId submitApplication(ApplicationSubmissionContextappContext) throws YarnException, IOException {
    ApplicationId applicationId =appContext.getApplicationId();
    ... ...
    SubmitApplicationRequest request =
       Records.newRecord(SubmitApplicationRequest.class);
   request.setApplicationSubmissionContext(appContext);
 
    //TODO: YARN-1763:Handle RM failoversduring the submitApplication call.
rmClient.submitApplication(request);
... ...
}

ApplicationClientProtocolPBClientImpl.java

public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throwsYarnException,
IOException {
//取出报文
    SubmitApplicationRequestProto requestProto =
        ((SubmitApplicationRequestPBImpl)request).getProto();
    //将报文发送发送到服务端,并将返回结果构成response
    try {
      return newSubmitApplicationResponsePBImpl(proxy.submitApplication(null,
        requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
}

ApplicationClientProtocolPBServiceImpl.java

public SubmitApplicationResponseProto submitApplication(RpcController arg0,
SubmitApplicationRequestProto proto) throws ServiceException {
//服务端重新构建报文
    SubmitApplicationRequestPBImpl request = newSubmitApplicationRequestPBImpl(proto);
    ......
 
    SubmitApplicationResponse response = real.submitApplication(request);
    return((SubmitApplicationResponsePBImpl)response).getProto();
    ......
}

ClientRMService.java

public SubmitApplicationResponse submitApplication(SubmitApplicationRequestrequest) throws YarnException {
         ... ...
         //将应用请求提交到Yarn上的RMAppManager去提交任务
         this.rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(), user);
         ... ...
}

5、创建Dispatcher、ResourceManager

Per-job模式的AM container加载运行入口是YarnJobClusterEntryPoint中的main()方法

YarnJobClusterEntrypoint.java

public staticvoid main(String[] args) {
         ... ...
         Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory,env);
 
         YarnJobClusterEntrypoint yarnJobClusterEntrypoint = newYarnJobClusterEntrypoint(configuration);
 
         ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
}

ClusterEntrypoint.java

private void runCluster(Configuration configuration,PluginManager pluginManager) throws Exception {
         synchronized (lock) {
                  initializeServices(configuration,pluginManager);
                  ... ...
                  //1、创建dispatcher、ResourceManager对象的工厂类 
                  //       其中有从本地重新创建JobGraph的过程
                  finalDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory= createDispatcherResourceManagerComponentFactory(configuration);
                  //2、通过工厂类创建dispatcher、ResourceManager对象
                  //   Entry 启动RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等
                  clusterComponent = dispatcherResourceManagerComponentFactory.create(
                          configuration,
                          ioExecutor,
                          commonRpcService,
                          haServices,
                          blobServer,
                          heartbeatServices,
                          metricRegistry,
                          archivedExecutionGraphStore,
                         newRpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
                          this);
                  ... ...
         }
}

DefaultDispatcherResourceManagerComponentFactory.java

public DispatcherResourceManagerComponent create(
                  Configuration configuration,
                  Executor ioExecutor,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  BlobServer blobServer,
                  HeartbeatServices heartbeatServices,
                  MetricRegistry metricRegistry,
                  ArchivedExecutionGraphStore archivedExecutionGraphStore,
                  MetricQueryServiceRetriever metricQueryServiceRetriever,
                  FatalErrorHandler fatalErrorHandler) throws Exception {
 
         LeaderRetrievalService dispatcherLeaderRetrievalService = null;
         LeaderRetrievalService resourceManagerRetrievalService = null;
         WebMonitorEndpoint<?> webMonitorEndpoint = null;
         ResourceManager<?> resourceManager = null;
         ResourceManagerMetricGroup resourceManagerMetricGroup = null;
         DispatcherRunner dispatcherRunner =null;
 
         try {
                  dispatcherLeaderRetrievalService =highAvailabilityServices.getDispatcherLeaderRetriever();
 
                  resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
                  final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = newRpcGatewayRetriever<>(
                          rpcService,
                          DispatcherGateway.class,
                          DispatcherId::fromUuid,
                          10,
                          Time.milliseconds(50L));
 
                  final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = newRpcGatewayRetriever<>(
                          rpcService,
                          ResourceManagerGateway.class,
                          ResourceManagerId::fromUuid,
                          10,
                          Time.milliseconds(50L));
 
                  ... ...
                 
                  // 创建接收前端Rest请求的节点
                  webMonitorEndpoint =restEndpointFactory.createRestEndpoint(
                          configuration,
                          dispatcherGatewayRetriever,
                          resourceManagerGatewayRetriever,
                          blobServer,
                          executor,
                          metricFetcher,
                          highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                          fatalErrorHandler);
 
                  log.debug("StartingDispatcher REST endpoint.");
                  webMonitorEndpoint.start();
 
                  ... ...
                  // 创建ResourceManager对象,返回的是new YarnResourceManager
        // 调度过程:AbstractDispatcherResourceManagerComponentFactory
        //                 -> ActiveResourceManagerFactory
        //                 -> YarnResourceManagerFactory
                  resourceManager = resourceManagerFactory.createResourceManager(
                          configuration,
                          ResourceID.generate(),
                          rpcService,
                          highAvailabilityServices,
                          heartbeatServices,
                          fatalErrorHandler,
                          newClusterInformation(hostname, blobServer.getPort()),
                          webMonitorEndpoint.getRestBaseUrl(),
                          resourceManagerMetricGroup);
 
                  ... ...
 
                  // 创建dispatcherRunner对象并启动
                  log.debug("StartingDispatcher.");
                  dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
                          highAvailabilityServices.getDispatcherLeaderElectionService(),
                          fatalErrorHandler,
                          newHaServicesJobGraphStoreFactory(highAvailabilityServices),
                          ioExecutor,
                          rpcService,
                          partialDispatcherServices);
 
                  // 启动ResourceManager
                  log.debug("StartingResourceManager.");
                  resourceManager.start();
 
                  resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
                  dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
 
                  return newDispatcherResourceManagerComponent(
                          dispatcherRunner,
                          resourceManager,
                          dispatcherLeaderRetrievalService,
                          resourceManagerRetrievalService,
                          webMonitorEndpoint);
 
         }
         ... ...
}

创建 YarnResourceManager

ResourceManagerFactory.java

public ResourceManager<T> createResourceManager(
                  Configuration configuration,
                  ResourceID resourceId,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  HeartbeatServices heartbeatServices,
                  FatalErrorHandler fatalErrorHandler,
                  ClusterInformation clusterInformation,
                  @Nullable String webInterfaceUrl,
                  MetricRegistry metricRegistry,
                  String hostname) throwsException {
 
         final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry,hostname);
         final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry,hostname);
 
         final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(
                  configuration, rpcService,highAvailabilityServices, slotManagerMetricGroup);
 
         return createResourceManager(
                  configuration,
                  resourceId,
                  rpcService,
                  highAvailabilityServices,
                  heartbeatServices,
                  fatalErrorHandler,
                  clusterInformation,
                  webInterfaceUrl,
                  resourceManagerMetricGroup,
                  resourceManagerRuntimeServices);
}

YarnResourceManagerFactory.java

public ResourceManager<YarnWorkerNode> createResourceManager(
                  Configuration configuration,
                  ResourceID resourceId,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  HeartbeatServices heartbeatServices,
                  FatalErrorHandler fatalErrorHandler,
                  ClusterInformation clusterInformation,
                  @Nullable String webInterfaceUrl,
                  ResourceManagerMetricGroup resourceManagerMetricGroup,
                  ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
 
         return new YarnResourceManager(
                  rpcService,
                  resourceId,
                  configuration,
                  System.getenv(),
                  highAvailabilityServices,
                  heartbeatServices,
                  resourceManagerRuntimeServices.getSlotManager(),
                  ResourceManagerPartitionTrackerImpl::new,
                  resourceManagerRuntimeServices.getJobLeaderIdService(),
                  clusterInformation,
                  fatalErrorHandler,
                  webInterfaceUrl,
                  resourceManagerMetricGroup);
}

创建YarnResourceManager时,创建了SlotManager

ResourceManagerFactory.java

private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
                  Configuration configuration,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {
 
         return ResourceManagerRuntimeServices.fromConfiguration(
                  createResourceManagerRuntimeServicesConfiguration(configuration),
                  highAvailabilityServices,
                  rpcService.getScheduledExecutor(),
                  slotManagerMetricGroup);
}

ResourceManagerRuntimeServices.java

public static ResourceManagerRuntimeServices fromConfiguration(
                  ResourceManagerRuntimeServicesConfiguration configuration,
                  HighAvailabilityServices highAvailabilityServices,
                  ScheduledExecutor scheduledExecutor,
                  SlotManagerMetricGroup slotManagerMetricGroup) {
 
         final SlotManager slotManager = createSlotManager(configuration,scheduledExecutor, slotManagerMetricGroup);
 
         final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                  highAvailabilityServices,
                  scheduledExecutor,
                  configuration.getJobTimeout());
 
         return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}

创建并启动 Dispatcher

DefaultDispatcherRunnerFactory.java

public DispatcherRunner createDispatcherRunner(
                  LeaderElectionService leaderElectionService,
                  FatalErrorHandler fatalErrorHandler,
                  JobGraphStoreFactory jobGraphStoreFactory,
                  Executor ioExecutor,
                  RpcService rpcService,
                  PartialDispatcherServices partialDispatcherServices) throws Exception {
 
         final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(
                  jobGraphStoreFactory,
                  ioExecutor,
                  rpcService,
                  partialDispatcherServices,
                  fatalErrorHandler);
 
         return DefaultDispatcherRunner.create(
                  leaderElectionService,
                  fatalErrorHandler,
                  dispatcherLeaderProcessFactory);
}

DefaultDispatcherRunner.java

public static DispatcherRunner create(
                  LeaderElectionService leaderElectionService,
                  FatalErrorHandler fatalErrorHandler,
                  DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
         final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(
                  leaderElectionService,
                  fatalErrorHandler,
                  dispatcherLeaderProcessFactory);
         return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner,leaderElectionService);
}

DispatcherRunnerLeaderElectionLifecycleManager.java

public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner,LeaderElectionService leaderElectionService) throws Exception {
         return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner,leaderElectionService);
}
 
private DispatcherRunnerLeaderElectionLifecycleManager(TdispatcherRunner, LeaderElectionService leaderElectionService) throws Exception{
         this.dispatcherRunner =dispatcherRunner;
         this.leaderElectionService =leaderElectionService;
         // 启动dispacher的leader选举
         leaderElectionService.start(dispatcherRunner);
}

StandaloneLeaderElectionService.java

public void start(LeaderContender newContender) throws Exception {
         ... ...
         contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}

DefaultDispatcherRunner.java

public void grantLeadership(UUID leaderSessionID) {
         runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}
private void startNewDispatcherLeaderProcess(UUIDleaderSessionID) {
         ... ...         previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}

AbstractDispatcherLeaderProcess.java

public final void start() {
         runIfStateIs(
                  State.CREATED,
                  this::startInternal);
}
private void startInternal() {
         log.info("Start {}.",getClass().getSimpleName());
         state = State.RUNNING;
         onStart();
}

JobDispatcherLeaderProcess.java

protected void onStart() {
         final DispatcherGatewayServicedispatcherService = dispatcherGatewayServiceFactory.create(
                  DispatcherId.fromUuid(getLeaderSessionId()),
                  Collections.singleton(jobGraph),
                  ThrowingJobGraphWriter.INSTANCE);
 
         completeDispatcherSetup(dispatcherService);
}
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
                  DispatcherId fencingToken,
                  Collection<JobGraph> recoveredJobs,
                  JobGraphWriter jobGraphWriter){
         ... ...
         // 启动dispacher
         dispatcher.start();
         ... ...
}

启动 ResourceManager

DefaultDispatcherResourceManagerComponentFactory.java

public DispatcherResourceManagerComponent create(
                  Configuration configuration,
                  Executor ioExecutor,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  BlobServer blobServer,
                  HeartbeatServices heartbeatServices,
                  MetricRegistry metricRegistry,
                  ArchivedExecutionGraphStore archivedExecutionGraphStore,
                  MetricQueryServiceRetriever metricQueryServiceRetriever,
                  FatalErrorHandler fatalErrorHandler) throws Exception {
         ... ...
                  // 启动ResourceManager
                  log.debug("StartingResourceManager.");
                  resourceManager.start();
         ... ...
}

ResourceManager.java

public void onStart() throws Exception {
         ... ...
                  startResourceManagerServices();
         ... ...
}
private void startResourceManagerServices() throwsException {
         try {
                  leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();
 
                  initialize();
 
                  leaderElectionService.start(this);
                  jobLeaderIdService.start(new JobLeaderIdActionsImpl());
 
                  registerTaskExecutorMetrics();
         } catch (Exception e) {
                  handleStartResourceManagerServicesException(e);
         }
}

6、Dispatcher启动JobManager

Dispatcher.java

public void onStart() throws Exception {
         try {
                  // 启动Dispatcher
                  startDispatcherServices();
         }
         ... ...
         // 启动Job
         startRecoveredJobs();
         ... ...
}

Dispatcher.java

CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraphjobGraph, long initializationTimestamp) {
         final RpcService rpcService =getRpcService();
         return CompletableFuture.supplyAsync(
                  () -> {
                          try {
                                   JobManagerRunnerrunner = jobManagerRunnerFactory.createJobManagerRunner(
                                            jobGraph,
                                            configuration,
                                            rpcService,
                                            highAvailabilityServices,
                                            heartbeatServices,
                                            jobManagerSharedServices,
                                            newDefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                                            fatalErrorHandler,
                                            initializationTimestamp);
                                   // 启动 JobManagerRunner
                                   runner.start();
                                   return runner;
                          }
                  ......
}

JobManagerRunnerImpl.java

public void start() throws Exception {
         try {
                  leaderElectionService.start(this);
         } catch (Exception e) {
                  log.error("Could notstart the JobManager because the leader election service did not start.",e);
                  throw new Exception("Couldnot start the leader election service.", e);
         }
}

StandaloneLeaderElectionService.java

public void start(LeaderContendernewContender) throws Exception {
         ... ...
         contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}

JobManagerRunnerImpl.java

public void grantLeadership(final UUID leaderSessionID){
         synchronized (lock) {
                  if (shutdown) {
                          log.debug("JobManagerRunnercannot be granted leadership because it is already shut down.");
                          return;
                  }
 
                  leadershipOperation =leadershipOperation.thenCompose(
                          (ignored) -> {
                                   synchronized(lock) {
                                            // 校验作业的调度状态然后启动作业管理器
                                            returnverifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
                                   }
                          });
 
                  handleException(leadershipOperation,"Could not start the job manager.");
         }
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUIDleaderSessionId) {
         final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
 
         return jobSchedulingStatusFuture.thenCompose(
                  jobSchedulingStatus -> {
                          if(jobSchedulingStatus == JobSchedulingStatus.DONE) {
                                   returnjobAlreadyDone();
                          } else {
                                   return startJobMaster(leaderSessionId);
                          }
                  });
}
private CompletionStage<Void> startJobMaster(UUIDleaderSessionId) {
         ... ...
                  startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
         ... ...
}

JobMaster.java

public CompletableFuture<Acknowledge> start(finalJobMasterId newJobMasterId) throws Exception {
         // make sure we receive RPC and asynccalls
         start();
 
         return callAsyncWithoutFencing(() ->startJobExecution(newJobMasterId),RpcUtils.INF_TIMEOUT);
}
private Acknowledge startJobExecution(JobMasterId newJobMasterId)throws Exception {
... ...
// 启动JobMaster
         startJobMasterServices();
 
         log.info("Starting execution ofjob {} ({}) under job master id {}.", jobGraph.getName(),jobGraph.getJobID(), newJobMasterId);
         // 重置开始调度
         resetAndStartScheduler();
... ...
}

7、ResourceManager启动SlotManager

ResourceManager.java

public final void onStart() throws Exception {
         ... ...
                  startResourceManagerServices();
         ... ...
}
private void startResourceManagerServices() throwsException {
         try {
                  leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();
 
                  initialize();
 
                  leaderElectionService.start(this);
                  jobLeaderIdService.start(newJobLeaderIdActionsImpl());
 
                  registerTaskExecutorMetrics();
         } catch (Exception e) {
                  handleStartResourceManagerServicesException(e);
         }
}

创建 Yarn 的 RM 和 NM 客户端

ActiveResourceManager.java

protected void initialize() throws ResourceManagerException{
         try {
                  resourceManagerDriver.initialize(
                                   this,
                                   newGatewayMainThreadExecutor(),
                                   ioExecutor);
         } catch (Exception e) {
                  throw newResourceManagerException("Cannot initialize resource provider.", e);
         }
}

AbstractResourceManagerDriver.java

public final void initialize(
                  ResourceEventHandler<WorkerType> resourceEventHandler,
                  ScheduledExecutor mainThreadExecutor,
                  Executor ioExecutor) throwsException {
         this.resourceEventHandler =Preconditions.checkNotNull(resourceEventHandler);
         this.mainThreadExecutor =Preconditions.checkNotNull(mainThreadExecutor);
         this.ioExecutor =Preconditions.checkNotNull(ioExecutor);
 
         initializeInternal();
}

YarnResourceManagerDriver.java

protected void initializeInternal() throws Exception {
         final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
         try {
                  // 创建和启动yarn的resourcemanager客户端
                  resourceManagerClient =yarnResourceManagerClientFactory.createResourceManagerClient(
                          yarnHeartbeatIntervalMillis,
                          yarnContainerEventHandler);
                  resourceManagerClient.init(yarnConfig);
                  resourceManagerClient.start();
 
                  final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
                  getContainersFromPreviousAttempts(registerApplicationMasterResponse);
                  taskExecutorProcessSpecContainerResourcePriorityAdapter=
                          newTaskExecutorProcessSpecContainerResourcePriorityAdapter(
                                   registerApplicationMasterResponse.getMaximumResourceCapability(),
                                   ExternalResourceUtils.getExternalResources(flinkConfig,YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
         } catch (Exception e) {
                  throw newResourceManagerException("Could not start resource manager client.",e);
         }
        
         // 创建和启动yarn的nodemanager客户端
         nodeManagerClient =yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
         nodeManagerClient.init(yarnConfig);
         nodeManagerClient.start();
}

启动 SlotManager

StandaloneLeaderElectionService.java

private void startServicesOnLeadership() {
         startHeartbeatServices();
 
         slotManager.start(getFencingToken(),getMainThreadExecutor(), new ResourceActionsImpl());
 
         onLeadership();
}

SlotManagerImpl.java

public void start(ResourceManagerId newResourceManagerId,Executor newMainThreadExecutor, ResourceActions newResourceActions) {
         LOG.info("Starting theSlotManager.");
 
         this.resourceManagerId =Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor =Preconditions.checkNotNull(newMainThreadExecutor);
         resourceActions =Preconditions.checkNotNull(newResourceActions);
 
         started = true;
 
         taskManagerTimeoutsAndRedundancyCheck =scheduledExecutor.scheduleWithFixedDelay(
                  () ->mainThreadExecutor.execute(
                          () -> checkTaskManagerTimeoutsAndRedundancy()),
                  0L,
                  taskManagerTimeout.toMilliseconds(),
                  TimeUnit.MILLISECONDS);
 
         slotRequestTimeoutCheck =scheduledExecutor.scheduleWithFixedDelay(
                  () ->mainThreadExecutor.execute(
                          () -> checkSlotRequestTimeouts()),
                  0L,
                  slotRequestTimeout.toMilliseconds(),
                  TimeUnit.MILLISECONDS);
 
         registerSlotManagerMetrics();
}
void checkTaskManagerTimeoutsAndRedundancy() {
         if (!taskManagerRegistrations.isEmpty()){
                  long currentTime =System.currentTimeMillis();
 
                  ArrayList<TaskManagerRegistration>timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());
 
                  // first retrieve the timedout TaskManagers
                  for (TaskManagerRegistrationtaskManagerRegistration : taskManagerRegistrations.values()) {
                          if (currentTime -taskManagerRegistration.getIdleSince() >=taskManagerTimeout.toMilliseconds()) {
                                   // we collectthe instance ids first in order to avoid concurrent modifications by the
                                   //ResourceActions.releaseResource call
                                   timedOutTaskManagers.add(taskManagerRegistration);
                          }
                  }
                 
                  int slotsDiff =redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
                  if (freeSlots.size() == slots.size()){
                          // No need tokeep redundant taskManagers if no job is running.
                          // 如果没有job在运行,释放taskmanager
                          releaseTaskExecutors(timedOutTaskManagers,timedOutTaskManagers.size());
                  } else if (slotsDiff > 0) {
                          // Keep enoughredundant taskManagers from time to time.
            // 保证随时有足够的taskmanager
                          intrequiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
                          allocateRedundantTaskManagers(requiredTaskManagers);
                  } else {
                          // second we triggerthe release resource callback which can decide upon the resource release
                          int maxReleaseNum =(-slotsDiff) / numSlotsPerWorker;
                          releaseTaskExecutors(timedOutTaskManagers,Math.min(maxReleaseNum, timedOutTaskManagers.size()));
                  }
         }
}

8、JobManager申请Slot

启动 SlotPool

接6,JobMaster启动时,启动SlotPool,向ResourceManager注册

private void startJobMasterServices() throwsException {
         // 启动心跳服务
         startHeartbeatServices();
 
         // 启动slotPool
         slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
 
         // 连接到之前已知的ResourceManager
         reconnectToResourceManager(newFlinkException("Starting JobMaster component."));
 
         // 启动后slotpool开始向slot manager请求slot
         resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}

向 ResourceManager 注册

经过下面层层调用:

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

-> notifyOfNewResourceManagerLeader()

-> notifyOfNewResourceManagerLeader()

-> reconnectToResourceManager()

-> tryConnectToResourceManager()

-> connectToResourceManager()

private void connectToResourceManager() {
         ... ...
 
         resourceManagerConnection = new ResourceManagerConnection(
                  log,
                  jobGraph.getJobID(),
                  resourceId,
                  getAddress(),
                  getFencingToken(),
                  resourceManagerAddress.getAddress(),
                  resourceManagerAddress.getResourceManagerId(),
                  scheduledExecutorService);
 
         resourceManagerConnection.start();
}

RegisteredRpcConnection.java

public void start() {
         ... ...
 
         final RetryingRegistration<F, G,S> newRegistration = createNewRegistration();
 
         if (REGISTRATION_UPDATER.compareAndSet(this,null, newRegistration)) {
                  newRegistration.startRegistration();
         } else {
                  // concurrent start operation
                  newRegistration.cancel();
         }
}
privateRetryingRegistration<F, G, S> createNewRegistration() {
         RetryingRegistration<F, G, S>newRegistration = checkNotNull(generateRegistration());
 
         ... ...
}

JobMaster.java的内部类ResourceManagerConnection

protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway,JobMasterRegistrationSuccess> generateRegistration() {
         return newRetryingRegistration<ResourceManagerId, ResourceManagerGateway,JobMasterRegistrationSuccess>(
                  log,
                  getRpcService(),
                  "ResourceManager",
                  ResourceManagerGateway.class,
                  getTargetAddress(),
                  getTargetLeaderId(),
                  jobMasterConfiguration.getRetryingRegistrationConfiguration()){
 
                  @Override
                  protectedCompletableFuture<RegistrationResponse> invokeRegistration(
                                   ResourceManagerGatewaygateway, ResourceManagerId fencingToken, long timeoutMillis) {
                          Time timeout =Time.milliseconds(timeoutMillis);
 
                          return gateway.registerJobManager(
                                   jobMasterId,
                                   jobManagerResourceID,
                                   jobManagerRpcAddress,
                                   jobID,
                                   timeout);
                  }
         };
}

SlotPool 申请 slot

注册成功调用onRegistrationSuccess(),向ResourceManager进行slot的申请:

JobMaster.java的内部类ResourceManagerConnection

protected void onRegistrationSuccess(finalJobMasterRegistrationSuccess success) {
         runAsync(() -> {
                  // filter out outdatedconnections
                  //noinspection ObjectEquality
                  if (this ==resourceManagerConnection) {
                          establishResourceManagerConnection(success);
                  }
         });
}
private void establishResourceManagerConnection(finalJobMasterRegistrationSuccess success) {
         ... ...
         slotPool.connectToResourceManager(resourceManagerGateway);
         ... ...
}
SlotPoolImpl.java
public void connectToResourceManager(@NonnullResourceManagerGateway resourceManagerGateway) {
         this.resourceManagerGateway =checkNotNull(resourceManagerGateway);
 
         // work on all slots waiting for thisconnection
         for (PendingRequest pendingRequest :waitingForResourceManager.values()) {
                  // 向ResourceManager申请slot
                  requestSlotFromResourceManager(resourceManagerGateway,pendingRequest);
         }
 
         // all sent off
         waitingForResourceManager.clear();
}
private void requestSlotFromResourceManager(
                  final ResourceManagerGatewayresourceManagerGateway,
                  final PendingRequestpendingRequest) {
         ... ...
         CompletableFuture<Acknowledge>rmResponse = resourceManagerGateway.requestSlot(
                  jobMasterId,
                  new SlotRequest(jobId,allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
                  rpcTimeout);
 
         ... ...
}

ResourceManager.java:由ResourceManager里的SlotManager处理请求

public CompletableFuture<Acknowledge> requestSlot(
                  JobMasterId jobMasterId,
                  SlotRequest slotRequest,
                  final Time timeout) {
 
         ... ...
                          try {
                                   // SlotManager处理slot请求
                                   slotManager.registerSlotRequest(slotRequest);
                          }
... ...
}

SlotManagerImpl.java

public boolean registerSlotRequest(SlotRequest slotRequest)throws ResourceManagerException {
         checkInit();
 
         ... ...
 
                  PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
 
                  pendingSlotRequests.put(slotRequest.getAllocationId(),pendingSlotRequest);
 
                  try {
                          internalRequestSlot(pendingSlotRequest);
                  }
         ... ...
}
private void internalRequestSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {
         final ResourceProfile resourceProfile =pendingSlotRequest.getResourceProfile();
 
         OptionalConsumer.of(findMatchingSlot(resourceProfile))
                  .ifPresent(taskManagerSlot-> allocateSlot(taskManagerSlot, pendingSlotRequest))
                  .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {
         ... ...
         if (!pendingTaskManagerSlotOptional.isPresent()){
                  pendingTaskManagerSlotOptional= allocateResource(resourceProfile);
         }
 
         ... ...
}

9、ResourceManager申请资源

ResourceManager.java

public boolean allocateResource(WorkerResourceSpec workerResourceSpec){
         validateRunsInMainThread();
         return startNewWorker(workerResourceSpec);
}
ActiveResourceManager.java
public boolean startNewWorker(WorkerResourceSpecworkerResourceSpec) {
         requestNewWorker(workerResourceSpec);
         return true;
}
private void requestNewWorker(WorkerResourceSpecworkerResourceSpec) {
         // 从配置中获取taskexecutor配置
         final TaskExecutorProcessSpectaskExecutorProcessSpec =
                          TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig,workerResourceSpec);
... ...
// 申请资源
         CompletableFuture<WorkerType> requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);
... ...
}

YarnResourceManagerDriver.java

public CompletableFuture<YarnWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
         checkInitialized();
 
         final CompletableFuture<YarnWorkerNode> requestResourceFuture = newCompletableFuture<>();
 
         final Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource>priorityAndResourceOpt =
         taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
 
         if(!priorityAndResourceOpt.isPresent()) {
                  requestResourceFuture.completeExceptionally(
                          newResourceManagerException(
                                   String.format("Couldnot compute the container Resource from the given TaskExecutorProcessSpec %s." +
                                                     "Thisusually indicates the requested resource is larger than Yarn's max containerresource limit.",
                                            taskExecutorProcessSpec)));
         } else {
                  final Priority priority =priorityAndResourceOpt.get().getPriority();
                  final Resource resource =priorityAndResourceOpt.get().getResource();
                  resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
 
                  // make sure we transmit therequest fast and receive fast news of granted allocations
                  resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
 
                  requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec,ignore -> new LinkedList<>()).add(requestResourceFuture);
 
                  log.info("Requesting newTaskExecutor container with resource {}, priority {}.",taskExecutorProcessSpec, priority);
         }
 
         return requestResourceFuture;
}

10、TaskManager启动

YarnTaskExecutorRunner.java

public static void main(String[] args) {
         EnvironmentInformation.logEnvironmentInfo(LOG,"YARN TaskExecutor runner", args);
         SignalHandler.register(LOG);
         JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
         runTaskManagerSecurely(args);
}
private static void runTaskManagerSecurely(String[] args) {
         try {
                  LOG.debug("Allenvironment variables: {}", ENV);
 
                  final String currDir =ENV.get(Environment.PWD.key());
                  LOG.info("Current workingDirectory: {}", currDir);
 
                  final Configurationconfiguration = TaskManagerRunner.loadConfiguration(args);
                  setupAndModifyConfiguration(configuration,currDir, ENV);
 
                  TaskManagerRunner.runTaskManagerSecurely(configuration);
         }
         catch (Throwable t) {
                  final ThrowablestrippedThrowable = ExceptionUtils.stripException(t,UndeclaredThrowableException.class);
                  // make sure that everythingwhatever ends up in the log
                  LOG.error("YARN TaskManagerinitialization failed.", strippedThrowable);
                  System.exit(INIT_ERROR_EXIT_CODE);
         }
}

TaskManagerRunner.java

public void start() throws Exception {
         taskExecutorService.start();
}

TaskExecutorToServiceAdapter.java

public void start() {
         taskExecutor.start();
}

TaskExecutor.java

public void onStart() throws Exception {
         try {
                  startTaskExecutorServices();
         } catch (Throwable t) {
                  final TaskManagerExceptionexception = new TaskManagerException(String.format("Could not start theTaskExecutor %s", getAddress()), t);
                  onFatalError(exception);
                  throw exception;
         }
 
         startRegistrationTimeout();
}

11、向ResourceManager注册

TaskExecutor.java

private void startTaskExecutorServices() throwsException {
         try {
                  // start by connecting to theResourceManager
                  resourceManagerLeaderRetriever.start(newResourceManagerLeaderListener());
 
                  // tell the task slot tablewho's responsible for the task slot actions
                  taskSlotTable.start(newSlotActionsImpl(), getMainThreadExecutor());
 
                  // start the job leaderservice
                  jobLeaderService.start(getAddress(),getRpcService(), haServices, new JobLeaderListenerImpl());
 
                  fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(),blobCacheService.getPermanentBlobService());
         } catch (Exception e) {
                  handleStartTaskExecutorServicesException(e);
         }
}

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

-> notifyOfNewResourceManagerLeader()

-> TaskExecutor的
notifyOfNewResourceManagerLeader()

-> TaskExecutor的
reconnectToResourceManager()

-> TaskExecutor的
tryConnectToResourceManager()

-> TaskExecutor的connectToResourceManager()

-> TaskExecutor的
resourceManagerConnection.start()

执行 createNewRegistration()->generateRegistration()

TaskExecutorToResourceManagerConnection.java

protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway,TaskExecutorRegistrationSuccess> generateRegistration() {
         return newTaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
                  log,
                  rpcService,
                  getTargetAddress(),
                  getTargetLeaderId(),
                  retryingRegistrationConfiguration,
                  taskExecutorRegistration);
}

开始注册newRegistration. startRegistration()会调用invokeRegistration():

TaskExecutorToResourceManagerConnection.java的内部类ResourceManagerRegistration

private static class ResourceManagerRegistration
                  extendsRetryingRegistration<ResourceManagerId, ResourceManagerGateway,TaskExecutorRegistrationSuccess> {
 
         private final TaskExecutorRegistration taskExecutorRegistration;
 
         ResourceManagerRegistration(
                          Logger log,
                          RpcService rpcService,
                          String targetAddress,
                          ResourceManagerIdresourceManagerId,
                          RetryingRegistrationConfigurationretryingRegistrationConfiguration,
                          TaskExecutorRegistrationtaskExecutorRegistration) {
 
                  super(log, rpcService,"ResourceManager", ResourceManagerGateway.class, targetAddress,resourceManagerId, retryingRegistrationConfiguration);
                  this.taskExecutorRegistration= taskExecutorRegistration;
         }
 
         @Override
         protectedCompletableFuture<RegistrationResponse> invokeRegistration(
                          ResourceManagerGatewayresourceManager, ResourceManagerId fencingToken, long timeoutMillis) throwsException {
 
                  Time timeout =Time.milliseconds(timeoutMillis);
                  return resourceManager.registerTaskExecutor(
                          taskExecutorRegistration,
                          timeout);
         }
}

注册成功调用onRegistrationSuccess

protected void onRegistrationSuccess(TaskExecutorRegistrationSuccesssuccess) {
         log.info("Successful registrationat resource manager {} under registration id {}.",
                  getTargetAddress(),success.getRegistrationId());
 
         registrationListener.onRegistrationSuccess(this, success);
}

TaskExecutor.java的内部类
ResourceManagerRegistrationListener

public void onRegistrationSuccess(TaskExecutorToResourceManagerConnectionconnection, TaskExecutorRegistrationSuccess success) {
         final ResourceID resourceManagerId =success.getResourceManagerId();
         final InstanceIDtaskExecutorRegistrationId = success.getRegistrationId();
         final ClusterInformationclusterInformation = success.getClusterInformation();
         final ResourceManagerGatewayresourceManagerGateway = connection.getTargetGateway();
 
         runAsync(
                  () -> {
                          // filter out outdatedconnections
                          //noinspectionObjectEquality
                          if(resourceManagerConnection == connection) {
                                   try {
                                            establishResourceManagerConnection(
                                                     resourceManagerGateway,
                                                     resourceManagerId,
                                                     taskExecutorRegistrationId,
                                                     clusterInformation);
                                   } catch(Throwable t) {
                                            log.error("EstablishingResource Manager connection in Task Executor failed", t);
                                   }
                          }
                  });
}
private void establishResourceManagerConnection(
                  ResourceManagerGateway resourceManagerGateway,
                  ResourceID resourceManagerResourceId,
                  InstanceID taskExecutorRegistrationId,
                  ClusterInformation clusterInformation) {
        
         // 向ResourceManager注册slot
         finalCompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(
                  getResourceID(),
                  taskExecutorRegistrationId,
                  taskSlotTable.createSlotReport(getResourceID()),
                  taskManagerConfiguration.getTimeout());
 
         ... ...
}

ResourceManager.java

public CompletableFuture<Acknowledge> sendSlotReport(ResourceIDtaskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReportslotReport, Time timeout) {
         final WorkerRegistration<WorkerType> workerTypeWorkerRegistration =taskExecutors.get(taskManagerResourceId);
 
         if(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)){
                  if (slotManager.registerTaskManager(workerTypeWorkerRegistration,slotReport)) {
                          onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
                  }
                  returnCompletableFuture.completedFuture(Acknowledge.get());
         } else {
                  return FutureUtils.completedExceptionally(newResourceManagerException(String.format("Unknown TaskManager registrationid %s.", taskManagerRegistrationId)));
  }
}       

SlotManagerImpl.java

public boolean registerTaskManager(final TaskExecutorConnectiontaskExecutorConnection, SlotReport initialSlotReport) {
         checkInit();
 
         LOG.debug("Registering TaskManager{} under {} at the SlotManager.",taskExecutorConnection.getResourceID().getStringWithMetadata(),taskExecutorConnection.getInstanceID());
 
         // we identify task managers by theirinstance id
         // 通过实例id判断某个taskmanager是否已经注册过
         if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())){
                  // 报告已注册过的taskmanager的slot分配情况,更新slot情况
                  reportSlotStatus(taskExecutorConnection.getInstanceID(),initialSlotReport);
                  return false;
         } else {
                  if(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
                          LOG.info("Thetotal number of slots exceeds the max limitation {}, release the excess resource.",maxSlotNum);
                          resourceActions.releaseResource(taskExecutorConnection.getInstanceID(),new FlinkException("The total number of slots exceeds the maxlimitation."));
                          return false;
                  }
 
                  // first register theTaskManager
                  ArrayList<SlotID> reportedSlots= new ArrayList<>();
 
                  for (SlotStatus slotStatus :initialSlotReport) {
                          reportedSlots.add(slotStatus.getSlotID());
                  }
 
                  TaskManagerRegistrationtaskManagerRegistration = new TaskManagerRegistration(
                          taskExecutorConnection,
                          reportedSlots);
 
                  taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(),taskManagerRegistration);
 
                  // next register the new slots
                  for (SlotStatus slotStatus :initialSlotReport) {
                          // 注册新的slot,根据slot请求进行分配
                          registerSlot(
                                   slotStatus.getSlotID(),
                                   slotStatus.getAllocationID(),
                                   slotStatus.getJobID(),
                                   slotStatus.getResourceProfile(),
                                   taskExecutorConnection);
                  }
 
                  return true;
         }
 
}

12、ResourceManager分配Slot

SlotManagerImpl.java

private void registerSlot(
                  SlotID slotId,
                  AllocationID allocationId,
                  JobID jobId,
                  ResourceProfileresourceProfile,
                  TaskExecutorConnectiontaskManagerConnection) {
 
         if (slots.containsKey(slotId)) {
                  // remove the old slot first
                  // 移除旧slot
                  removeSlot(
                          slotId,
                          newSlotManagerException(
                                   String.format(
                                            "Re-registrationof slot %s. This indicates that the TaskExecutor has re-connected.",
                                            slotId)));
         }
 
         // 创建和注册TaskManager的slot
         final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId,resourceProfile, taskManagerConnection);
 
         final PendingTaskManagerSlotpendingTaskManagerSlot;
 
         if (allocationId == null) {
                  pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
         } else {
                  pendingTaskManagerSlot = null;
         }
 
         if (pendingTaskManagerSlot == null) {
                  updateSlot(slotId,allocationId, jobId);
         } else {
                  pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
                  final PendingSlotRequestassignedPendingSlotRequest =pendingTaskManagerSlot.getAssignedPendingSlotRequest();
 
                  // 分配slot给请求
                  if (assignedPendingSlotRequest== null) {
                          handleFreeSlot(slot);
                  } else {
                          assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
                          allocateSlot(slot,assignedPendingSlotRequest);
                  }
         }
}
private void allocateSlot(TaskManagerSlottaskManagerSlot, PendingSlotRequest pendingSlotRequest) {
         ... ...
         taskManagerRegistration.markUsed();
 
         // RPC call to the task manager
         CompletableFuture<Acknowledge>requestFuture = gateway.requestSlot(
                  slotId,
                  pendingSlotRequest.getJobId(),
                  allocationId,
                  pendingSlotRequest.getResourceProfile(),
                  pendingSlotRequest.getTargetAddress(),
                  resourceManagerId,
                  taskManagerRequestTimeout);
 
         ... ...
}

13、TaskManager提供Slot

TaskExecutor.java

public CompletableFuture<Acknowledge> requestSlot(
         final SlotID slotId,
         final JobID jobId,
         final AllocationID allocationId,
         final ResourceProfile resourceProfile,
         final String targetAddress,
         final ResourceManagerIdresourceManagerId,
         final Time timeout) {
         ... ...
 
         try { 
                  // 分配taskmanager上的slot
                  allocateSlot(
                          slotId,
                          jobId,
                          allocationId,
                          resourceProfile);
         } catch (SlotAllocationException sae) {
                  returnFutureUtils.completedExceptionally(sae);
         }
 
         final JobTable.Job job;
 
         try {
                  job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId,targetAddress));
         } catch (Exception e) {
                  // free the allocated slot
                  try {
                          taskSlotTable.freeSlot(allocationId);
                  } catch (SlotNotFoundExceptionslotNotFoundException) {
                          // slot no longerexistent, this should actually never happen, because we've
                          // just allocated theslot. So let's fail hard in this case!
                          onFatalError(slotNotFoundException);
                  }
 
                  // release local state underthe allocation id.
                  localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
 
                  // sanity check
                  if(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                          onFatalError(newException("Could not free slot " + slotId));
                  }
 
                  returnFutureUtils.completedExceptionally(new SlotAllocationException("Could notcreate new job.", e));
         }
 
         if (job.isConnected()) {
                  // 连接上job,提供slot给JobManager
                  offerSlotsToJobManager(jobId);
         }
 
         return CompletableFuture.completedFuture(Acknowledge.get());
}
private void internalOfferSlotsToJobManager(JobTable.ConnectionjobManagerConnection) {
         final JobID jobId =jobManagerConnection.getJobId();
 
         if(taskSlotTable.hasAllocatedSlots(jobId)) {
                  log.info("Offer reservedslots to the leader of job {}.", jobId);
 
                  final JobMasterGatewayjobMasterGateway = jobManagerConnection.getJobManagerGateway();
 
                  finalIterator<TaskSlot<Task>> reservedSlotsIterator =taskSlotTable.getAllocatedSlots(jobId);
                  final JobMasterId jobMasterId= jobManagerConnection.getJobMasterId();
 
                  finalCollection<SlotOffer> reservedSlots = new HashSet<>(2);
 
                  while(reservedSlotsIterator.hasNext()) {
                          SlotOffer offer =reservedSlotsIterator.next().generateSlotOffer();
                          reservedSlots.add(offer);
                  }
 
                  CompletableFuture<Collection<SlotOffer>>acceptedSlotsFuture = jobMasterGateway.offerSlots(
                          getResourceID(),
                          reservedSlots,
                          taskManagerConfiguration.getTimeout());
 
                  acceptedSlotsFuture.whenCompleteAsync(
                          handleAcceptedSlotOffers(jobId,jobMasterGateway, jobMasterId, reservedSlots),
                          getMainThreadExecutor());
         } else {
                  log.debug("There are nounassigned slots for the job {}.", jobId);
         }
}

JobMaster.java

public CompletableFuture<Collection<SlotOffer>> offerSlots(
                  final ResourceIDtaskManagerId,
                  final Collection<SlotOffer>slots,
                  final Time timeout) {
 
         Tuple2<TaskManagerLocation,TaskExecutorGateway> taskManager =registeredTaskManagers.get(taskManagerId);
 
         if (taskManager == null) {
                  returnFutureUtils.completedExceptionally(new Exception("Unknown TaskManager" + taskManagerId));
         }
 
         final TaskManagerLocationtaskManagerLocation = taskManager.f0;
         final TaskExecutorGatewaytaskExecutorGateway = taskManager.f1;
 
         final RpcTaskManagerGatewayrpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway,getFencingToken());
 
         return CompletableFuture.completedFuture(
                  slotPool.offerSlots(
                          taskManagerLocation,
                          rpcTaskManagerGateway,
                          slots));
}

SlotPoolImpl.java

public Collection<SlotOffer> offerSlots(
                  TaskManagerLocation taskManagerLocation,
                  TaskManagerGateway taskManagerGateway,
                  Collection<SlotOffer>offers) {
 
         ArrayList<SlotOffer> result = newArrayList<>(offers.size());
 
         for (SlotOffer offer : offers) {
                  if (offerSlot(
                          taskManagerLocation,
                          taskManagerGateway,
                          offer)) {
 
                          result.add(offer);
                  }
         }
 
         return result;
}
boolean offerSlot(
                  final TaskManagerLocationtaskManagerLocation,
                  final TaskManagerGatewaytaskManagerGateway,
                  final SlotOffer slotOffer) {
 
         ... ...
 
         // use the slot to fulfill pendingrequest, in requested order
         // 按照请求顺序,使用slot来完成挂起的请求
         tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 
         // we accepted the request in any case.slot will be released after it idled for
         // too long and timed out
         return true;
}

想要了解跟多关于

大数据培训

课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)