手把手带你源码解析HDFS文件上传之create创建过程

发布时间:2021年11月04日作者:atguigu浏览次数:336

HDFS的写数据流程,如下图所示:

手把手带你源码解析HDFS文件上传之create创建过程

 

HDFS上传源码解析如下图所示:

手把手带你源码解析HDFS文件上传之create创建过程

 

0)在pom.xml中增加如下依赖

<dependencies>
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>3.1.3</version>
       </dependency>
 
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-hdfs</artifactId>
              <version>3.1.3</version>
       </dependency>
 
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-hdfs-client</artifactId>
              <version>3.1.3</version>
              <scope>provided</scope>
       </dependency>
 
       <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
              <version>4.12</version>
       </dependency>
       <dependency>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
              <version>1.7.30</version>
       </dependency>
</dependencies>

一、DN向NN发起创建请求

用户自己写的代码

@Test
public void testPut2() throws IOException {
       FSDataOutputStream fos = fs.create(newPath("/input"));
 
       fos.write("helloworld".getBytes());
}

FileSystem.java

public FSDataOutputStream create(Path f) throws IOException {
       return create(f, true);
}
 
public FSDataOutputStream create(Path f, booleanoverwrite)
  throws IOException {
 
       return create(f, overwrite,
                       getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
                              IO_FILE_BUFFER_SIZE_DEFAULT),
                       getDefaultReplication(f),
                       getDefaultBlockSize(f));
}
 
public FSDataOutputStream create(Path f,
       boolean overwrite,
       int bufferSize,
       short replication,
       long blockSize) throws IOException {
      
       return create(f, overwrite, bufferSize, replication, blockSize,null);
}
 
public FSDataOutputStream create(Path f,
       boolean overwrite,
       int bufferSize,
       short replication,
       long blockSize,
       Progressable progress
       ) throws IOException {
                                                                     
       return this.create(f, FsCreateModes.applyUMask(
       FsPermission.getFileDefault(),FsPermission.getUMask(getConf())),
       overwrite, bufferSize, replication,blockSize, progress);
}
 
public abstract FSDataOutputStream create(Path f,
       FsPermission permission,
       boolean overwrite,
       int bufferSize,
       short replication,
       long blockSize,
       Progressable progress) throwsIOException;

选中create,点击ctrl+h,找到实现类
DistributedFileSystem.java,查找create方法。

DistributedFileSystem.java

@Override
public FSDataOutputStream create(Path f,FsPermission permission,
  boolean overwrite, int bufferSize, shortreplication, long blockSize,
  Progressable progress) throws IOException {
 
       return this.create(f, permission,
       overwrite ? EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE)
              : EnumSet.of(CreateFlag.CREATE),bufferSize, replication,
       blockSize, progress, null);
}
 
@Override
public FSDataOutputStreamcreate(final Path f, finalFsPermission permission,
  final EnumSet<CreateFlag> cflags, finalint bufferSize,
  final short replication, final longblockSize,
  final Progressable progress, finalChecksumOpt checksumOpt)
  throws IOException {
 
       statistics.incrementWriteOps(1);
       storageStatistics.incrementOpCounter(OpType.CREATE);
       Path absF = fixRelativePart(f);
      
       return newFileSystemLinkResolver<FSDataOutputStream>() {
 
        @Override
        public FSDataOutputStream doCall(final Path p) throws IOException {
 
              // 创建获取了一个输出流对象
              final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                     cflags, replication,blockSize, progress, bufferSize,
                     checksumOpt);
              // 这里将上面创建的dfsos进行包装并返回
              return dfs.createWrappedOutputStream(dfsos,statistics);
         }
 
        @Override
        public FSDataOutputStream next(final FileSystem fs, final Path p)
               throws IOException {
              return fs.create(p, permission,cflags, bufferSize,
                     replication, blockSize,progress, checksumOpt);
         }
       }.resolve(this, absF);
}

点击create,进入DFSClient.java

public DFSOutputStream create(String src,FsPermission permission,
  EnumSet<CreateFlag> flag, short replication, long blockSize,
  Progressable progress, int buffersize, ChecksumOpt checksumOpt)
  throws IOException {
 
       return create(src, permission, flag, true,
       replication, blockSize, progress,buffersize, checksumOpt, null);
}
 
public DFSOutputStream create(String src,FsPermission permission,
  EnumSet<CreateFlag> flag, boolean createParent, short replication,
  long blockSize, Progressable progress, int buffersize,
  ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes)
  throws IOException {
 
       return create(src, permission, flag, createParent, replication,blockSize,
       progress, buffersize, checksumOpt,favoredNodes, null);
}
 
public DFSOutputStream create(String src,FsPermission permission,
  EnumSet<CreateFlag> flag, boolean createParent, short replication,
  long blockSize, Progressable progress, int buffersize,
  ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes,
  String ecPolicyName) throws IOException {
 
       checkOpen();
      
       final FsPermission masked =applyUMask(permission);
       LOG.debug("{}: masked={}", src,masked);
      
       final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
              src, masked, flag, createParent,replication, blockSize, progress,
              dfsClientConf.createChecksum(checksumOpt),
              getFavoredNodesStr(favoredNodes),ecPolicyName);
             
       beginFileLease(result.getFileId(),result);
      
       return result;
}

点击newStreamForCreate,进入DFSOutputStream.java

static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
  FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent,
  short replication, long blockSize,Progressable progress,
  DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
  throws IOException {
 
       try (TraceScope ignored =
                      dfsClient.newPathTraceScope("newStreamForCreate",src)) {
         HdfsFileStatusstat = null;
 
        // Retry the create if we get a RetryStartFileException up to a maximum
        // number of times
        boolean shouldRetry = true;
        int retryCount = CREATE_RETRY_COUNT;
 
        while (shouldRetry) {
              shouldRetry = false;
              try {
               // DN将创建请求发送给NN(RPC)
               stat = dfsClient.namenode.create(src, masked,dfsClient.clientName,
                       new EnumSetWritable<>(flag),createParent, replication,
                       blockSize, SUPPORTED_CRYPTO_VERSIONS,ecPolicyName);
               break;
              } catch (RemoteException re) {
               … ….
              }
         }
        Preconditions.checkNotNull(stat, "HdfsFileStatus should not benull!");
        final DFSOutputStream out;
 
        if(stat.getErasureCodingPolicy() != null) {
              out = newDFSStripedOutputStream(dfsClient, src, stat,
                     flag, progress, checksum,favoredNodes);
         }else {
              out = newDFSOutputStream(dfsClient, src, stat,
                     flag, progress, checksum,favoredNodes, true);
         }
 
         //开启线程run,DataStreamer extends Daemon extends Thread
         out.start();
 
        return out;
       }
}

二、NN处理DN的创建请求

1)点击create

ClientProtocol.java

HdfsFileStatus create(String src, FsPermissionmasked,
    String clientName,EnumSetWritable<CreateFlag> flag,
    boolean createParent, shortreplication, long blockSize,
    CryptoProtocolVersion[]supportedVersions, String ecPolicyName)
    throws IOException;

2)Ctrl + h查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create

NameNodeRpcServer.java

public HdfsFileStatuscreate(String src, FsPermissionmasked,
    String clientName,EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication,long blockSize,
    CryptoProtocolVersion[] supportedVersions,String ecPolicyName)
    throws IOException {
  // 检查NN启动
  checkNNStartup();
  ... ...
 
  HdfsFileStatus status = null;
  try {
    PermissionStatus perm = newPermissionStatus(getRemoteUser()
        .getShortUserName(), null, masked);
       // 重要
    status = namesystem.startFile(src, perm, clientName,clientMachine,
        flag.get(), createParent, replication,blockSize, supportedVersions,
        ecPolicyName, cacheEntry != null);
  } finally {
    RetryCache.setState(cacheEntry, status !=null, status);
  }
 
  metrics.incrFilesCreated();
  metrics.incrCreateFileOps();
  return status;
}

FSNamesystem.java

HdfsFileStatus startFile(String src, PermissionStatuspermissions,
    String holder, String clientMachine,EnumSet<CreateFlag> flag,
    boolean createParent, short replication,long blockSize,
    CryptoProtocolVersion[] supportedVersions,String ecPolicyName,
    boolean logRetryCache) throws IOException {
 
  HdfsFileStatus status;
  try {
    status = startFileInt(src, permissions, holder, clientMachine, flag,
        createParent, replication, blockSize,supportedVersions, ecPolicyName,
        logRetryCache);
  } catch (AccessControlException e) {
    logAuditEvent(false, "create",src);
    throw e;
  }
  logAuditEvent(true, "create", src,status);
  return status;
}
 
private HdfsFileStatus startFileInt(String src,
    PermissionStatus permissions, Stringholder, String clientMachine,
    EnumSet<CreateFlag> flag, booleancreateParent, short replication,
    long blockSize, CryptoProtocolVersion[]supportedVersions,
    String ecPolicyName, boolean logRetryCache)throws IOException {      
       ... ...
       stat = FSDirWriteFileOp.startFile(this, iip, permissions,holder,
        clientMachine, flag, createParent,replication, blockSize, feInfo,
        toRemoveBlocks, shouldReplicate,ecPolicyName, logRetryCache);
       ... ...
}
 
static HdfsFileStatus startFile(
    ... ...)
    throws IOException {
      
  ... ...
  FSDirectory fsd = fsn.getFSDirectory();
 
  // 文件路径是否存在校验
  if (iip.getLastINode() != null) {
    if (overwrite) {
      List<INode> toRemoveINodes = newChunkedArrayList<>();
      List<Long> toRemoveUCFiles = newChunkedArrayList<>();
      long ret = FSDirDeleteOp.delete(fsd, iip,toRemoveBlocks,
                                     toRemoveINodes, toRemoveUCFiles, now());
      if (ret >= 0) {
        iip = INodesInPath.replace(iip,iip.length() - 1, null);
       FSDirDeleteOp.incrDeletedFileCount(ret);
       fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
      }
    } else {
      // If lease soft limit time is expired,recover the lease
     fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
                               src, holder,clientMachine, false);
      throw new FileAlreadyExistsException(src + " forclient " +
         clientMachine + " already exists");
    }
  }
  fsn.checkFsObjectLimit();
  INodeFile newNode = null;
  INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd,iip, permissions);
  if (parent != null) {
    // 添加文件元数据信息
    iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
        replication, blockSize, holder,clientMachine, shouldReplicate,
        ecPolicyName);
    newNode = iip != null ?iip.getLastINode().asFile() : null;
  }
  ... ...
 setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
  fsd.getEditLog().logOpenFile(src, newNode,overwrite, logRetryEntry);
  if (NameNode.stateChangeLog.isDebugEnabled()){
    NameNode.stateChangeLog.debug("DIR*NameSystem.startFile: added " +
        src + " inode " +newNode.getId() + " " + holder);
  }
  return FSDirStatAndListingOp.getFileInfo(fsd,iip, false, false);
}
 
private static INodesInPath addFile(
    FSDirectory fsd, INodesInPath existing,byte[] localName,
    PermissionStatus permissions, shortreplication, long preferredBlockSize,
    String clientName, String clientMachine,boolean shouldReplicate,
    String ecPolicyName) throws IOException {
 
  Preconditions.checkNotNull(existing);
  long modTime = now();
  INodesInPath newiip;
  fsd.writeLock();
  try {
    … …
 
    newiip = fsd.addINode(existing, newNode, permissions.getPermission());
  } finally {
    fsd.writeUnlock();
  }
  ... ...
  return newiip;
}
 
INodesInPath addINode(INodesInPath existing, INodechild,
                      FsPermission modes)
    throws QuotaExceededException,UnresolvedLinkException {
  cacheName(child);
  writeLock();
  try {
    // 将数据写入到INode的目录树中
    return addLastINode(existing, child, modes, true);
  } finally {
    writeUnlock();
  }
}

三、DataStreamer启动流程

NN处理完DN请求后,再次回到DN端,启动对应的线程

DFSOutputStream.java

static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
  FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent,
  short replication, long blockSize,Progressable progress,
  DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
  throws IOException {
       ... ...
       // DN将创建请求发送给NN(RPC)
       stat = dfsClient.namenode.create(src,masked, dfsClient.clientName,
        new EnumSetWritable<>(flag), createParent, replication,
        blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
       ... ...
      
       // 创建输出流
       out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum,favoredNodes, true);
       // 开启线程run,DataStreamer extends Daemon extends Thread
       out.start();
 
       return out;
}

点击DFSOutputStream

protected DFSOutputStream(DFSClient dfsClient, String src,
    HdfsFileStatus stat,EnumSet<CreateFlag> flag, Progressable progress,
    DataChecksum checksum, String[]favoredNodes, boolean createStreamer) {
  this(dfsClient, src, flag, progress, stat,checksum);
  this.shouldSyncBlock =flag.contains(CreateFlag.SYNC_BLOCK);
 
  // Directory => File => Block(128M)=> packet(64K) => chunk(chunk 512byte +chunksum 4byte)
  computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
      bytesPerChecksum);
 
  if (createStreamer) {
    streamer = new DataStreamer(stat, null,dfsClient, src, progress,
        checksum, cachingStrategy, byteArrayManager,favoredNodes,
        addBlockFlags);
  }
}

点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java

protected synchronized void start() {
       getStreamer().start();
}
 
protected DataStreamer getStreamer() {
       return streamer;
}

点击DataStreamer,进入DataStreamer.java

class DataStreamer extends Daemon {
    。。。。。。
}

点击Daemon,进入Daemon.java

public class Daemon extends Thread {
    。。。。。。
}

说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法

DataStreamer.java

@Override
public void run(){
 
       long lastPacket = Time.monotonicNow();
       TraceScope scope = null;
       while (!streamerClosed &&dfsClient.clientRunning) {
        // if the Responder encountered an error, shutdown Responder
        if (errorState.hasError()) {
              closeResponder();
         }
 
         DFSPacketone;
        try {
              // process datanode IO errors ifany
              boolean doSleep =processDatanodeOrExternalError();
 
              final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;
              synchronized (dataQueue) {
               // wait for a packet to be sent.
               … …
                     try {
                       // 如果dataQueue里面没有数据,代码会阻塞在这儿
                       dataQueue.wait(timeout);
                     } catch(InterruptedException  e) {
                       LOG.warn("Caught exception", e);
                     }
                     doSleep = false;
                     now = Time.monotonicNow();
               }
               … …
                     //  队列不为空,从队列中取出packet
                     one = dataQueue.getFirst();// regular data packet
                     SpanId[] parents =one.getTraceParents();
                     if (parents.length > 0){
                       scope = dfsClient.getTracer().
                              newScope("dataStreamer",parents[0]);
                       scope.getSpan().setParents(parents);
                     }
               }
              }
              … …
}

想要了解跟多关于

大数据培训

课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园(成都校区)