首页 > 分布式 > Hadoop > Hadoop RPC机制及HDFS源码分析
2015
04-28

Hadoop RPC机制及HDFS源码分析

二、HDFS源码分析

1、NameNode启动过程分析

为了使我们的源码分析更有目的性,首先需要回想下在NameNode启动过程干了哪些事,如下:

  • 启动http服务器并提供web服务:在启动HDFS后,可以访问http://hadoop01:50070获取其提供的服务。(见《Hadoop基础及伪分布式环境搭建》一文)
  • 加载fsimage到内存,并执行edits中的操作:初始化HDFS系统中的元数据
  • 启动RPC server并对外提供服务:在通过jps查看到的namenode进程实际上是一个RPC server对外提供服务(接受来自用户或datanode的请求)

我们重点分析最后一个过程,当然前面两个过程也会在分析中提及。Hadoop中提供的如下脚本命令都可以启动namenode:

  • sbin/start-dfs.sh
  • bin/hdfs namenode
  • bin/hadoop namenode

不过这些脚本命令最后调用的都是org.apache.hadoop.hdfs.server.namenode.NameNode类的main方法,因此对namenode启动过程的分析就是对NameNode执行过程的分析。

整个执行过程如下:

1)在main()方法中调用createNameNode()方法,它返回一个NameNode实例;于是跟踪到createNameNode()方法中,该方法直接调用了NameNode的构造函数产生了一个实例并返回,不过在这之前会根据命令行传入的参数作出一些选择。

main(argv){
   NameNode namenode = createNameNode(argv, null);
}

NameNode createNameNode(argv, conf) {
   StartupOption startOpt = parseArguments(argv);

   switch (startOpt) { //根据命令行传入的参数作出不同的选择
     case FORMAT: {
       ……
       return null;
     }
     case ……
     default: { //如果是启动NameNode,就直接调用构造函数
       return new NameNode(conf);
     }
   }
}

扩展:在使用HDFS之前,会有一个格式化的过程:hadoop namenode -format,该命令也调用了NameNode,只是执行到createNameNode()方法时,会调用switch中的第一个case,该分支下并不产生NameNode的实例。有兴趣的童鞋可以扒一扒格式化HDFS的代码。

2)在NameNode构造函数中又调用了initialize(conf)方法初始化NameNode实例,该方法是整个NameNode启动过程的核心。我们先看看initialize方法中发生了什么:

private NameNodeRpcServer rpcServer; //rpcServer是NameNode中的一个字段
protected void initialize(Configuration conf) {
    if (NamenodeRole.NAMENODE == role) {
      startHttpServer(conf);
    }
    loadNamesystem(conf);

    rpcServer = createRpcServer(conf);

    startCommonServices(conf);
}

还记得本小节一开始提到的NameNode启动过程中所做的三件事么?它们全出现在initialize()方法中了:

  • startHttpServer(conf):启动http服务器对外提供Web服务,Hadoop中使用的是内嵌的jetty服务器。想要研究这一过程的可以继续跟进去。
  • loadNamesystem(conf):完成NameNode启动时的元数据恢复。同样,有兴趣的自己翻代码。
  • createRpcServer(conf)、startCommonServices(conf):创建、启动RPC server,对外提供服务。这是下面需要详解的过程。

3)在讲解创建、启动RPC server过程之前,先来分析NameNodeRpcServer类:

/**
 * NameNodeRpcServer 类负责处理NameNode节点上的所有RPC请求
 * It is created, started, and stopped by NameNode.
 */
class NameNodeRpcServer implements NamenodeProtocols {
  /** The RPC server that listens to requests from DataNodes */
  private final RPC.Server serviceRpcServer;
  /** The RPC server that listens to requests from clients */
  protected final RPC.Server clientRpcServer;

  public NameNodeRpcServer(Configuration conf, NameNode nn) {
    this.serviceRpcServer = new RPC.Builder(conf)
          .setProtocol().setInstance().setBindAddress().setPort()
          .build();

    this.clientRpcServer = new RPC.Builder(conf)
          .setProtocol().setInstance().setBindAddress().setPort()
          .build();
  }

  void start() { //Start client and service RPC servers
    clientRpcServer.start();
    if (serviceRpcServer != null) {
      serviceRpcServer.start();
    }
  }
}

public interface NamenodeProtocols
  extends ClientProtocol,
          DatanodeProtocol,
          NamenodeProtocol,
          RefreshAuthorizationPolicyProtocol,
          RefreshUserMappingsProtocol,
          RefreshCallQueueProtocol,
          GetUserMappingsProtocol,
          HAServiceProtocol {
}

关于NameNodeRpcServer类说明如下:

① NameNodeRpcServer实现了ClientProtocol、DatanodeProtocol、NamenodeProtocol等协议中的方法。其中ClientProtocol用于和客户端(DFSClient类,下一小节内容)通信,DatanodeProtocol用于和DatanodeProtocol通信,NamenodeProtocol用于和SecondaryNameNode通信。

② NameNodeRpcServer类中其实封装了两个RPC server,分别为上述两个协议的客户端服务。

③ 在上面的构造函数中,故意将setProtocol、setInstance这些方法参数留空,这是因为在RPC server端,使用了protobuf来封装协议数据,为了减少读者阅读难度,将其留空,读者可以将其理解为setProtocol(ClientProtocol.class)、setInstance(new NameNodeRpcServer())等等。

若读者想初步了解protobuf请阅读《开源点评:Protocol Buffers介绍》:http://blog.csdn.net/program_think/article/details/4229773,若想进一步了解protobuf在NameNodeRpcServer中的使用请阅读《Hadoop基于Protocol Buffer的RPC实现代码分析-Server端》:http://blog.csdn.net/bluejoe2000/article/details/41343857

好了,回到启动NameNode的流程,接下来就很简单了:createRpcServer(conf)方法中直接调用了NameNodeRpcServer的构造函数,创建了NameNode节点上的rpcServer;startCommonServices(conf)方法调用了rpcServer的start方法。

总结:在启动NameNode的过程中涉及到的类(接口)和方法如下图:

04. Hadoop RPC机制及HDFS源码分析9465

2、获取FileSystem子类实例过程

本小节不打算像上面那么仔细地讲解,而是对着如下的类图快速讲解,获取FileSystem子类实例对应着FileSystem.get()的执行过程,在该过程中完成:

  • 创建FileSystem子类实例:即DistributedFileSystem类的实例
  • 客户端应该有一个RPC client:即图中的DFSClient
  • 客户端应获取服务的代理对象:即DFSClient中的namenode字段

04. Hadoop RPC机制及HDFS源码分析9683

对上图的说明如下:

① 在FileSystem.get(conf)方法中间接调用createFileSystem()获取FileSystem子类实例。

② createFileSystem()方法部分如下:

Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);

FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;

其中uri字段的值为hdfs://hadoop:9000 (你的也许不是),于是FileSystem根据uri.getScheme的值为hdfs获取DistributedFileSystem的字节码,进而通过反射创建DistributedFileSystem的实例fs。但这时候还不能直接返回fs,因为fs中的字段均为null,需要使用initialize()方法初始化一次。

③ 在initialize()过程中对dfs字段进行赋值:this.dfs = new DFSClient(uri, conf, statistics); dfs就是一个RPC client。

④ 实例化DFSClient的过程如下:

proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
this.namenode = proxyInfo.getProxy();

在NameNodeProxies.createProxy中最终会调用RPC.getProxy方法获取服务的代理对象,此处不再进行深入。

3、HDFS读文件过程分析

首先通过下图对客户端读取HDFS文件(下载文件)的过程有一个大概的了解:

04. Hadoop RPC机制及HDFS源码分析10505

对上图的说明(分别对应图中的序号):

open:调用FileSystem的open()打开文件

get block locations:客户端通过RPC调用,请求NameNode获得文件的数据块元数据信息,并返回FSDataInputStream

③④⑤ read:HDFS客户端调用stream的read()函数直接从DataNode读取数据

close:读完文件时,调用FSDataInputStream的close函数

下面重点分析FIleSystem.open()的执行过程,open()方法对应上面的过程①②。该过程涉及到的类如下图所示,其中的序号表示过程中涉及到的方法调用:

04. Hadoop RPC机制及HDFS源码分析10808

对上图的说明如下(分别对应图中的序号):

① FileSystem.open()会调用子类DistributedFileSystem的open()方法

② DistributedFileSystem.open方法继续调用dfsClient.open方法

③ DFSClient.open方法中直接实例化一个DFSInputStream对象返回:

return new DFSInputStream(this, src, buffersize, verifyChecksum);

DFSInputStream对象中封装了一个locatedBlocks字段,locatedBlocks中记录了HDFS文件对应的文件块和所在DataNode节点列表等信息。因此,客户端在拿到DFSInputStream对象后,就可以直接从DataNode上读取数据了。

④⑤⑥ 实例化DFSInputStream的过程包括给一些必要的成员变量赋值,如DFSClient;最后又相继调用了openInfo()、fetchLocatedBlocksAndGetLastBlockLength()等方法,目的也是初始化DFSInputStream对象中的成员。相关代码如下:

public class DFSInputStream {
  private final DFSClient dfsClient;
// HDFS文件的路径
  private final String src;
// blockReader对象用于从一个DataNode节点上读取一个block块,在fileSystem.read()过程要用到,open()过程不涉及
  private BlockReader blockReader = null;
  private LocatedBlocks locatedBlocks = null;
// HDFS文件对应的block,除了最后一个,其余块的大小都是固定的,因此有必要将最后一个block大小记录下来
  private long lastBlockBeingWrittenLength = 0;

  DFSInputStream(dfsClient, src, buffersize, verifyChecksum){
    this.dfsClient = dfsClient;
    this.src = src;

    openInfo();
  }

  /**
   * Grab the open-file info from namenode
   */
  synchronized void openInfo() {
    lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
  }

  private long fetchLocatedBlocksAndGetLastBlockLength() {
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
locatedBlocks = newInfo;

    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
        final long len = readBlockLength(last);
        lastBlockBeingWrittenLength = len;
      }
    }
    return lastBlockBeingWrittenLength;
  }
}

⑦ 在fetchLocatedBlocksAndGetLastBlockLength()中调用了dfsClient.getLocatedBlocks方法,该方法通过调用NameNode上的getBlockLocations过程(ClientProtocol协议方法)。

4、HDFS写文件过程分析

向HDFS上写一个文件(上传文件)的过程如下图:

04. Hadoop RPC机制及HDFS源码分析12595

对上图进行简单的说明:

create:Client调用DistributedFileSystem.create(),并返回FSDataOutputStream

create:在①的过程中,客户端进行一次RPC调用,在NameNode上的Namespace中创建一个文件条目(Entry),该条目没有任何的block,你可以认为是一个空的元数据

write:做一些写数据的准备工作,首先将数据写入FSDataOutputStream对象内部的Buffer中,并被分割成一个个Packet数据包;然后通知NameNode分配一组DataNode节点来存储数据块(每块默认复制3块),这组DataNode节点组成了一个pipeline。

write packet:以Packet单位,将数据块写入pipeline中的第一个DataNode,第一个DataNode节点将数据块写到给第二个数据节点,第二个写第三个……依次类推。

ack packet:这组DataNode组成的Pipeline反方向上,发送ack,最终由Pipeline中第一个DataNode节点将Pipeline ack发送给Client

close:完成向DataNode写数据,关闭流

complete:调用DistributedFileSystem对象的complete方法,通知NameNode文件写入成功。

FileSystem.create()的源码分析类似于open(),有兴趣的读者可以阅读。可以参考《HDFS写文件过程分析》一文:http://shiyanjun.cn/archives/942.html


留下一个回复

你的email不会被公开。