How Does Namenode Find The Nearest Datanode

2014/07/21

问题的全部:客户端每次都首先与Namenode通信,然后与Namenode指定的Datanode直接读或写数据。这里Namenode会找到离Client最近的Datanode交给Client,请问这个步骤是如何实现的?

根据层次的加深,我有三个不同的答案:

有NameNode确定客户端所在的网络,然后找到一个具有某Blocks的与Client在同一个局域网或机架的数据节点 NameNode维护着一个集群的网络拓扑结构,在这个图中可以确定与Client最近的具有某Blocks的数据节点 从源码级别来看,NameNode用了什么函数来做这件事情。 个人觉得第三个答案是最好的,Talk is cheap, show me the code。

“输入流和输出流是DFSClient实现中最复杂的部分,它们不但需要和名字节点通信,还需要访问数据节点。相比之下,输入流比输出流简单,读数据的过程中,名字节点只提供了两个远程方法,getBlockLocations()和reportBadBlocks()。”

客户端在读取数据前先要向NameNode调用RPC获得BlockLocations,这个过程中NameNode就会提供与客户端最近的数据节点给它。NameNode.getBlockLocations()代码如下:

/**org.apache.hadoop.hdfs.server.namenode.NameNode*/
/** {@inheritDoc} */
public LocatedBlocks getBlockLocations(String src,
    long offset,
    long length) throws IOException {
    myMetrics.incrNumGetBlockLocations();//统计计数
    return namesystem.getBlockLocations(getClientMachine(),
        src, offset, length);//获取BlockLocations
}
/**获取Client的地址*/
private static String getClientMachine() {
    String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();
    if (clientMachine == null) { //not a web client
        clientMachine = Server.getRemoteAddress();
    }
    if (clientMachine == null) { //not a RPC client
        clientMachine = "";
    }
    return clientMachine;
}

重点来了。

/**
 * Get block locations within the specified range.
 *
 * @see #getBlockLocations(String, long, long)
 */
LocatedBlocks getBlockLocations(String clientMachine, String src,
        long offset, long length) throws IOException {
    LocatedBlocks blocks = getBlockLocations(src, offset, length, true,
            true, true);// 1,获取源文件所在的所有Block
    if (blocks != null) {
        // sort the blocks 2,打算将这些Block按从近到远进行排序
        // In some deployment cases, cluster is with separation of task
        // tracker
        // and datanode which means client machines will not always be
        // recognized
        // as known data nodes, so here we should try to get node (but not
        // datanode only) for locality based sort.
        Node client = host2DataNodeMap.getDatanodeByHost(clientMachine);// 3,查看client是否是DataNode节点
        if (client == null) {// 3.1,若不是,则对client进行dns解析生成一个NodeBase
            List<String> hosts = new ArrayList<String>(1);
            hosts.add(clientMachine);
            String rName = dnsToSwitchMapping.resolve(hosts).get(0);
            if (rName != null)
                client = new NodeBase(clientMachine, rName);
        }

        DFSUtil.StaleComparator comparator = null;
        if (avoidStaleDataNodesForRead) {
            comparator = new DFSUtil.StaleComparator(staleInterval);
        }
        // Note: the last block is also included and sorted
        // 4,将所有Block所在位置与Client进行比较,然后按比较结果从小到大排序
        for (LocatedBlock b : blocks.getLocatedBlocks()) {
            // 4.1,使用NetworkTopology实例clusterMap进行排序
            clusterMap.pseudoSortByDistance(client, b.getLocations());
            if (avoidStaleDataNodesForRead) {
                Arrays.sort(b.getLocations(), comparator);
            }
        }
    }
    return blocks;
}

排序代码

/**
 * Get block locations within the specified range.
 *
 * @see #getBlockLocations(String, long, long)
 */
LocatedBlocks getBlockLocations(String clientMachine, String src,
        long offset, long length) throws IOException {
    LocatedBlocks blocks = getBlockLocations(src, offset, length, true,
            true, true);// 1,获取源文件所在的所有Block
    if (blocks != null) {
        // sort the blocks 2,打算将这些Block按从近到远进行排序
        // In some deployment cases, cluster is with separation of task
        // tracker
        // and datanode which means client machines will not always be
        // recognized
        // as known data nodes, so here we should try to get node (but not
        // datanode only) for locality based sort.
        Node client = host2DataNodeMap.getDatanodeByHost(clientMachine);// 3,查看client是否是DataNode节点
        if (client == null) {// 3.1,若不是,则对client进行dns解析生成一个NodeBase
            List<String> hosts = new ArrayList<String>(1);
            hosts.add(clientMachine);
            String rName = dnsToSwitchMapping.resolve(hosts).get(0);
            if (rName != null)
                client = new NodeBase(clientMachine, rName);
        }

        DFSUtil.StaleComparator comparator = null;
        if (avoidStaleDataNodesForRead) {
            comparator = new DFSUtil.StaleComparator(staleInterval);
        }
        // Note: the last block is also included and sorted
        // 4,将所有Block所在位置与Client进行比较,然后按比较结果从小到大排序
        for (LocatedBlock b : blocks.getLocatedBlocks()) {
            // 4.1,使用NetworkTopology实例clusterMap进行排序
            clusterMap.pseudoSortByDistance(client, b.getLocations());
            if (avoidStaleDataNodesForRead) {
                Arrays.sort(b.getLocations(), comparator);
            }
        }
    }
    return blocks;
}

至此,这个过程算是大概明白了。但是还需要结合整个Namenode的数据结构来看才行,不然就一叶障目不见泰山了。