Hadoop核心源码解析——NameNode启动源码

发布时间:2021年11月03日作者:atguigu浏览次数:847

Hadoop NameNode工作机制,如下图所示:

Hadoop核心源码解析——NameNode启动源码

 

NameNode启动流程源码如下图所示:

Hadoop核心源码解析——NameNode启动源码

 

0)在pom.xml中增加如下依赖

<dependencies>
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>3.1.3</version>
       </, dependency>
 
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-hdfs</artifactId>
              <version>3.1.3</version>
       </dependency>
 
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-hdfs-client</artifactId>
              <version>3.1.3</version>
              <scope>provided</scope>
       </dependency>
</dependencies>

1)ctrl + n 全局查找namenode,进入NameNode.java

NameNode官方说明

NameNode servesas both directory namespace manager and “inode table” for the HadoopDFS. There is a single NameNode running in any DFS deployment. (Well, exceptwhen there is a second backup/failover NameNode, or when using federatedNameNodes.) The NameNode controls two critical tables: 1) filename->blocksequence(namespace) 2) block->machinelist (“inodes”) The first table isstored on disk and is very precious. The second table is rebuilt every time theNameNode comes up. ‘NameNode’ refers to both this class as well as the ‘NameNodeserver’. The ‘FSNamesystem’ class actually performs most of the filesystemmanagement. The majority of the ‘NameNode’ class itself is concerned withexposing the IPC interface and the HTTP server to the outside world, plus someconfiguration management. NameNode implements the ClientProtocol interface,which allows clients to ask for DFS services. ClientProtocol is not designedfor direct use by authors of DFS client code. End-users should instead use theFileSystem class. NameNode also implements the DatanodeProtocol interface, usedby DataNodes that actually store DFS data blocks. These methods are invokedrepeatedly and automatically by all the DataNodes in a DFS deployment. NameNodealso implements the NamenodeProtocol interface, used by secondary namenodes orrebalancing processes to get partial NameNode state, for example partialblocksMap etc.

2)ctrl + f,查找main方法

NameNode.java

public static void main(String argv[]) throws Exception {
       if (DFSUtil.parseHelpArgument(argv,NameNode.USAGE, System.out, true)) {
              System.exit(0);
       }
 
       try {
              StringUtils.startupShutdownMessage(NameNode.class,argv, LOG);
              // 创建NameNode
              NameNode namenode = createNameNode(argv, null);
              if (namenode != null) {
                     namenode.join();
              }
       } catch (Throwable e) {
              LOG.error("Failed to startnamenode.", e);
              terminate(1, e);
       }
}

点击createNameNode

public static NameNode createNameNode(String argv[], Configurationconf)
    throws IOException {
  … …
  StartupOption startOpt = parseArguments(argv);
  if (startOpt == null) {
    printUsage(System.err);
    return null;
  }
  setStartupOption(conf, startOpt);
 
  boolean aborted = false;
  switch (startOpt) {
  case FORMAT:
    aborted = format(conf,startOpt.getForceFormat(),
        startOpt.getInteractiveFormat());
    terminate(aborted ? 1 : 0);
    return null; // avoid javac warning
  case GENCLUSTERID:
    … …
  default:
   DefaultMetricsSystem.initialize("NameNode");
       // 创建NameNode对象
    return new NameNode(conf);
  }
}

点击NameNode

public NameNode(Configuration conf) throwsIOException {
  this(conf, NamenodeRole.NAMENODE);
}
 
protected NameNode(Configuration conf,NamenodeRole role)
    throws IOException {
  ... ...
 
  try {
    initializeGenericKeys(conf, nsId, namenodeId);
    initialize(getConf());
    ... ...
  } catch (IOException e) {
    this.stopAtException(e);
    throw e;
  } catch (HadoopIllegalArgumentException e) {
    this.stopAtException(e);
    throw e;
  }
  this.started.set(true);
}

点击initialize

protected void initialize(Configuration conf) throwsIOException {
  ... ...
 
  if (NamenodeRole.NAMENODE == role) {
       // 启动HTTP服务端(9870)
    startHttpServer(conf);
  }
 
  // 加载镜像文件和编辑日志到内存
  loadNamesystem(conf);
  startAliasMapServerIfNecessary(conf);
 
  // 创建NN的RPC服务端
  rpcServer = createRpcServer(conf);
 
  initReconfigurableBackoffKey();
 
  if (clientNamenodeAddress == null) {
    // This is expected for MiniDFSCluster. Setit now using
    // the RPC server's bind address.
    clientNamenodeAddress =
       NetUtils.getHostPortString(getNameNodeAddress());
    LOG.info("Clients are to use " +clientNamenodeAddress + " to access"
        + " this namenode/service.");
  }
  if (NamenodeRole.NAMENODE == role) {
   httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }
 
  // NN启动资源检查
  startCommonServices(conf);
  startMetricsLogger(conf);
}

一、启动9870端口服务

1)点击startHttpServer

NameNode.java
private void startHttpServer(final Configuration conf)throws IOException {
       httpServer = new NameNodeHttpServer(conf,this, getHttpServerBindAddress(conf));
       httpServer.start();
       httpServer.setStartupProgress(startupProgress);
}
 
protected InetSocketAddress getHttpServerBindAddress(Configurationconf) {
  InetSocketAddress bindAddress = getHttpServerAddress(conf);
 
  ... ...
  return bindAddress;
}
 
protected InetSocketAddress getHttpServerAddress(Configurationconf) {
  return getHttpAddress(conf);
}
 
public static InetSocketAddress getHttpAddress(Configurationconf) {
       return NetUtils.createSocketAddr(
     conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}
 
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
 
public static final int    DFS_NAMENODE_HTTP_PORT_DEFAULT =
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
 
int  DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;

2)点击startHttpServer方法中的httpServer.start();

NameNodeHttpServer.java

void start() throws IOException {
  ... ...
  // Hadoop自己封装了HttpServer,形成自己的HttpServer2
  HttpServer2.Builder builder= DFSUtil.httpServerTemplateForNNAndJN(conf,
      httpAddr, httpsAddr, "hdfs",
     DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
     DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
  ... ...
 
  httpServer=builder.build();
 
  ... ...
 
 httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
 httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
  setupServlets(httpServer,conf);
  httpServer.start();
 
  ... ...
}

点击setupServlets

private static void setupServlets(HttpServer2 httpServer,Configuration conf) {
       httpServer.addInternalServlet("startupProgress",
              StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
       httpServer.addInternalServlet("fsck","/fsck", FsckServlet.class,
              true);
       httpServer.addInternalServlet("imagetransfer",ImageServlet.PATH_SPEC,
      ImageServlet.class, true);
}

二、加载镜像文件和编辑日志

1)点击loadNamesystem

NameNode.java

protected void loadNamesystem(Configuration conf) throwsIOException {
       this.namesystem = FSNamesystem.loadFromDisk(conf);
}
 
static FSNamesystem loadFromDisk(Configuration conf) throwsIOException {
 
  checkConfiguration(conf);
 
  FSImage fsImage = new FSImage(conf,
      FSNamesystem.getNamespaceDirs(conf),
      FSNamesystem.getNamespaceEditsDirs(conf));
 
  FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
  StartupOption startOpt =NameNode.getStartupOption(conf);
  if (startOpt == StartupOption.RECOVER) {
   namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
  }
 
  long loadStart = monotonicNow();
  try {
    namesystem.loadFSImage(startOpt);
  } catch (IOException ioe) {
    LOG.warn("Encountered exceptionloading fsimage", ioe);
    fsImage.close();
    throw ioe;
  }
  long timeTakenToLoadFSImage = monotonicNow()- loadStart;
  LOG.info("Finished loading FSImage in" + timeTakenToLoadFSImage + " msecs");
  NameNodeMetrics nnMetrics =NameNode.getNameNodeMetrics();
  if (nnMetrics != null) {
    nnMetrics.setFsImageLoadTime((int)timeTakenToLoadFSImage);
  }
 namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
  return namesystem;
}

三、初始化NN的RPC服务端

1)点击createRpcServer

NameNode.java

protected NameNodeRpcServer createRpcServer(Configurationconf)
    throws IOException {
  return new NameNodeRpcServer(conf, this);
}

NameNodeRpcServer.java

public NameNodeRpcServer(Configuration conf, NameNodenn)
      throws IOException {
       ... ....     
    serviceRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(serviceRpcAddr.getPort())
        .setNumHandlers(serviceHandlerCount)
        .setVerbose(false)
       .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .build();
       ... ....     
}

四、NN启动资源检查

1)点击startCommonServices

NameNode.java

private void startCommonServices(Configuration conf) throwsIOException {
 
  namesystem.startCommonServices(conf, haContext);
 
  registerNNSMXBean();
  if (NamenodeRole.NAMENODE != role) {
    startHttpServer(conf);
   httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }
  rpcServer.start();
  try {
    plugins =conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
        ServicePlugin.class);
  } catch (RuntimeException e) {
    String pluginsValue =conf.get(DFS_NAMENODE_PLUGINS_KEY);
    LOG.error("Unable to load NameNodeplugins. Specified list of plugins: " +
        pluginsValue, e);
    throw e;
  }
  … …
}

2)点击startCommonServices

FSNamesystem.java

void startCommonServices(Configuration conf,HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBeanfor the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
    checkAvailableResources();
 
    assert!blockManager.isPopulatingReplQueues();
    StartupProgress prog =NameNode.getStartupProgress();
    prog.beginPhase(Phase.SAFEMODE);
longcompleteBlocksTotal = getCompleteBlocksTotal();
 
    // 安全模式
    prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);
 
    // 启动块服务
    blockManager.activate(conf,completeBlocksTotal);
  } finally {
   writeUnlock("startCommonServices");
  }
 
  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
   dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress =NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress !=null) ?
      serviceAddress.getHostName() :"";
}

点击NameNodeResourceChecker

NameNodeResourceChecker.java

public NameNodeResourceChecker(Configurationconf) throws IOException {
  this.conf = conf;
  volumes = new HashMap<String,CheckedVolume>();
 
  // dfs.namenode.resource.du.reserved默认值 1024 * 1024 * 100 =》100m
  duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
      DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
 
  Collection<URI> extraCheckedVolumes =Util.stringCollectionAsURIs(conf
     .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
 
  Collection<URI> localEditDirs =Collections2.filter(
      FSNamesystem.getNamespaceEditsDirs(conf),
      new Predicate<URI>() {
        @Override
        public boolean apply(URI input) {
          if(input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
            return true;
          }
         return false;
        }
      });
 
  // 对所有路径进行资源检查
  for (URI editsDirToCheck : localEditDirs) {
    addDirToCheck(editsDirToCheck,
       FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
            editsDirToCheck));
  }
 
  // All extra checked volumes are marked"required"
  for (URI extraDirToCheck :extraCheckedVolumes) {
    addDirToCheck(extraDirToCheck, true);
  }
 
  minimumRedundantVolumes = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
     DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
}

点击checkAvailableResources

FNNamesystem.java

void checkAvailableResources() {
       long resourceCheckTime = monotonicNow();
       Preconditions.checkState(nnResourceChecker!= null,
              "nnResourceChecker notinitialized");
 
       // 判断资源是否足够,不够返回false
       hasResourcesAvailable=nnResourceChecker.hasAvailableDiskSpace();
 
       resourceCheckTime = monotonicNow() -resourceCheckTime;
       NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime);
}

NameNodeResourceChecker.java

public boolean hasAvailableDiskSpace() {
       return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
      minimumRedundantVolumes);
}

NameNodeResourcePolicy.java

static boolean areResourcesAvailable(
    Collection<? extendsCheckableNameNodeResource> resources,
    int minimumRedundantResources) {
 
  // TODO: workaround:
  // - during startup, if there are no editsdirs on disk, then there is
  // a call to areResourcesAvailable() with nodirs at all, which was
  // previously causing the NN to entersafemode
  if (resources.isEmpty()) {
    return true;
  }
 
  int requiredResourceCount = 0;
  int redundantResourceCount = 0;
  int disabledRedundantResourceCount = 0;
 
  // 判断资源是否充足
  for(CheckableNameNodeResource resource : resources) {
    if (!resource.isRequired()) {
      redundantResourceCount++;
      if (!resource.isResourceAvailable()) {
        disabledRedundantResourceCount++;
      }
    } else {
      requiredResourceCount++;
      if (!resource.isResourceAvailable()) {
        // Short circuit - a required resourceis not available. 不充足返回false
        return false;
      }
    }
  }
 
  if (redundantResourceCount == 0) {
    // If there are no redundant resources,return true if there are any
    // required resources available.
    return requiredResourceCount > 0;
  } else {
    return redundantResourceCount -disabledRedundantResourceCount >=
        minimumRedundantResources;
  }
}
 
interface CheckableNameNodeResource {
 
  public boolean isResourceAvailable();
 
  public boolean isRequired();
}

ctrl + h,查找实现类CheckedVolume

NameNodeResourceChecker.java

public boolean isResourceAvailable() {
 
  // 获取当前目录的空间大小
  long availableSpace = df.getAvailable();
 
  if (LOG.isDebugEnabled()) {
    LOG.debug("Space available on volume'" + volume + "' is "
        + availableSpace);
  }
 
  // 如果当前空间大小,小于100m,返回false
  if (availableSpace < duReserved) {
    LOG.warn("Space available onvolume '" + volume + "' is "
       + availableSpace +
       ", which is below the configured reserved amount " +duReserved);
   return false;
  } else {
    return true;
  }
}

五、NN对心跳超时判断

Ctrl + n 搜索namenode,ctrl + f搜索startCommonServices

点击
namesystem.startCommonServices(conf,haContext);

点击blockManager.activate(conf,completeBlocksTotal);

点击datanodeManager.activate(conf);

DatanodeManager.java

void activate(final Configuration conf) {
  datanodeAdminManager.activate(conf);
  heartbeatManager.activate();
}
       DatanodeManager.java
void activate() {
  // 启动的线程,搜索run方法
  heartbeatThread.start();
}
 
public void run() {
  while(namesystem.isRunning()) {
    restartHeartbeatStopWatch();
    try {
      final long now = Time.monotonicNow();
      if (lastHeartbeatCheck +heartbeatRecheckInterval < now) {
              // 心跳检查
       heartbeatCheck();
        lastHeartbeatCheck = now;
      }
      if (blockManager.shouldUpdateBlockKey(now- lastBlockKeyUpdate)) {
        synchronized(HeartbeatManager.this) {
          for(DatanodeDescriptor d : datanodes){
            d.setNeedKeyUpdate(true);
          }
        }
        lastBlockKeyUpdate = now;
      }
    } catch (Exception e) {
      LOG.error("Exception while checkingheartbeat", e);
    }
    try {
      Thread.sleep(5000);  // 5 seconds
    } catch (InterruptedException ignored) {
    }
    // avoid declaring nodes dead for anothercycle if a GC pause lasts
    // longer than the node recheck interval
    if (shouldAbortHeartbeatCheck(-5000)) {
      LOG.warn("Skipping next heartbeatscan due to excessive pause");
      lastHeartbeatCheck = Time.monotonicNow();
    }
  }
}
 
void heartbeatCheck() {
  final DatanodeManager dm =blockManager.getDatanodeManager();
 
  boolean allAlive = false;
  while (!allAlive) {
    // locate the first dead node.
    DatanodeDescriptor dead = null;
 
    // locate the first failed storage thatisn't on a dead node.
    DatanodeStorageInfo failedStorage = null;
 
    // check the number of stale nodes
    int numOfStaleNodes = 0;
    int numOfStaleStorages = 0;
    synchronized(this) {
      for (DatanodeDescriptor d : datanodes) {
        // check if an excessive GC pause hasoccurred
        if (shouldAbortHeartbeatCheck(0)) {
          return;
        }
              // 判断DN节点是否挂断
        if (dead == null && dm.isDatanodeDead(d)) {
          stats.incrExpiredHeartbeats();
          dead = d;
        }
        if (d.isStale(dm.getStaleInterval())) {
          numOfStaleNodes++;
        }
        DatanodeStorageInfo[] storageInfos =d.getStorageInfos();
        for(DatanodeStorageInfo storageInfo :storageInfos) {
          if(storageInfo.areBlockContentsStale()) {
            numOfStaleStorages++;
          }
 
          if (failedStorage == null &&
             storageInfo.areBlocksOnFailedStorage() &&
              d != dead) {
            failedStorage = storageInfo;
          }
        }
      }
     
      // Set the number of stale nodes in theDatanodeManager
      dm.setNumStaleNodes(numOfStaleNodes);
     dm.setNumStaleStorages(numOfStaleStorages);
    }
    ... ...
  }
}
 
boolean isDatanodeDead(DatanodeDescriptor node) {
  return (node.getLastUpdateMonotonic() <
          (monotonicNow() - heartbeatExpireInterval));
}
 
private long heartbeatExpireInterval;
// 10分钟 + 30秒
this.heartbeatExpireInterval= 2 * heartbeatRecheckInterval + 10 * 1000 *heartbeatIntervalSeconds;
 
private volatile int heartbeatRecheckInterval;
heartbeatRecheckInterval = conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
 
private volatile long heartbeatIntervalSeconds;
heartbeatIntervalSeconds= conf.getTimeDuration(
       DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
public staticfinal long   DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;

六、安全模式

FSNamesystem.java

void startCommonServices(Configuration conf,HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBeanfor the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
    checkAvailableResources();
 
    assert!blockManager.isPopulatingReplQueues();
    StartupProgress prog =NameNode.getStartupProgress();
 
    // 开始进入安全模式
    prog.beginPhase(Phase.SAFEMODE);
 
    // 获取所有可以正常使用的block
long completeBlocksTotal = getCompleteBlocksTotal();
 
    prog.setTotal(Phase.SAFEMODE,STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);
 
    // 启动块服务
    blockManager.activate(conf, completeBlocksTotal);
  } finally {
    writeUnlock("startCommonServices");
  }
 
  registerMXBean();
 DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
   dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress =NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress !=null) ?
      serviceAddress.getHostName() :"";
}

点击getCompleteBlocksTotal

public long getCompleteBlocksTotal() {
  // Calculate number of blocks underconstruction
  long numUCBlocks = 0;
  readLock();
  try {
    // 获取正在构建的block
    numUCBlocks =leaseManager.getNumUnderConstructionBlocks();
       // 获取所有的块 - 正在构建的block = 可以正常使用的block
    return getBlocksTotal() - numUCBlocks;
  } finally {
   readUnlock("getCompleteBlocksTotal");
  }
}

点击activate

public void activate(Configuration conf, longblockTotal) {
       pendingReconstruction.start();
       datanodeManager.activate(conf);
 
       this.redundancyThread.setName("RedundancyMonitor");
       this.redundancyThread.start();
 
       storageInfoDefragmenterThread.setName("StorageInfoMonitor");
       storageInfoDefragmenterThread.start();
       this.blockReportThread.start();
 
       mxBeanName = MBeans.register("NameNode","BlockStats", this);
 
       bmSafeMode.activate(blockTotal);
}

点击activate

void activate(long total) {
  assert namesystem.hasWriteLock();
  assert status == BMSafeModeStatus.OFF;
 
  startTime = monotonicNow();
 
  // 计算是否满足块个数的阈值
  setBlockTotal(total);
 
  // 判断DataNode节点和块信息是否达到退出安全模式标准
  if (areThresholdsMet()) {
    boolean exitResult = leaveSafeMode(false);
    Preconditions.checkState(exitResult,"Failed to leave safe mode.");
  } else {
    // enter safe mode
status =BMSafeModeStatus.PENDING_THRESHOLD;
 
initializeReplQueuesIfNecessary();
 
    reportStatus("STATE* Safe modeON.", true);
    lastStatusReport = monotonicNow();
  }
}

点击setBlockTotal

void setBlockTotal(long total) {
  assert namesystem.hasWriteLock();
  synchronized (this) {
    this.blockTotal = total;
       // 计算阈值:例如:1000个正常的块 * 0.999 = 999
   this.blockThreshold = (long) (total * threshold);
  }
 
  this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
}
 
this.threshold =conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
       DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
 
public staticfinal float  DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;

点击areThresholdsMet

private boolean areThresholdsMet() {
  assert namesystem.hasWriteLock();
  // Calculating the number of live datanodesis time-consuming
  // in large clusters. Skip it whendatanodeThreshold is zero.
  int datanodeNum = 0;
 
  if (datanodeThreshold > 0) {
    datanodeNum =blockManager.getDatanodeManager().getNumLiveDataNodes();
  }
  synchronized (this) {
  // 已经正常注册的块数》= 块的最小阈值》=最小可用DataNode
   return blockSafe >= blockThreshold && datanodeNum >=datanodeThreshold;
  }
}


想要了解跟多关于

大数据培训

课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程

视频供广大学员下载学习。

上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)