您当前的位置:首页 > 互联网百科 > 大数据

大数据架构师,带你HDFS读文件过程分析:读取文件的Block数据

时间:2021-04-09 10:40:54  来源:大数据架构师  作者:

前言

我们可以从JAVA.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示:public abstract int read() throws IOException;Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。
从HDFS读文件过程分析:获取文件对应的Block列表中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。

大数据架构师,带你HDFS读文件过程分析:读取文件的Block数据

 

Client从Datanode读取文件的一个字节

下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始:

@Override
public synchronized int read() throws IOException {
int ret = read( oneByteBuf, 0, 1 );
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
}

上面调用read(oneByteBuf, 0, 1)读取一个字节到单字节缓冲区oneByteBuf中,具体实现见如下方法:

@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
checkOpen(); // 检查Client是否正在运行
if (closed) {
throw new IOException("Stream closed");
}
failures = 0;

if (pos < getFileLength()) { // getFileLength()获取文件所包含的总字节数,pos表示读取当前文件的第(pos+1)个字节
int retries = 2;
while (retries > 0) {
try {
if (pos > blockEnd) { // blockEnd表示文件的长度(字节数)
currentNode = blockSeekTo(pos); // 找到第pos个字节数据所在的Datanode(实际根据该字节数据所在的block元数据来定位)
}
int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
int result = readBuffer(buf, off, realLen); // 读取一个字节到缓冲区中

if (result >= 0) {
pos += result; // 每成功读取result个字节,pos增加result
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
if (stats != null && result != -1) {
stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
if (retries == 1) {
LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
}
blockEnd = -1;
if (currentNode != null) { addToDeadNodes(currentNode); }
if (--retries == 0) {
throw e;
}
}
}
}
return -1;
}

读取文件数据的一个字节,具体过程如下:

  1. 检查流对象是否处于打开状态(前面已经获取到文件对应的block列表的元数据,并打开一个InputStream对象)
  2. 从文件的第一个block开始读取,首先需要找到第一个block对应的数据块所在的Datanode,可以从缓存的block列表中查询到(如果查找不到,则会与Namenode进行一次RPC通信请求获取到)
  3. 打开一个到该读取的block所在Datanode节点的流,准备读取block数据
  4. 建立了到Datanode的连接后,读取一个字节数据到字节缓冲区中,返回读取的字节数(1个字节)

在读取的过程中,以字节为单位,通过判断某个偏移位置的字节属于哪个block(根据block元数据所限定的字节偏移范围),在根据这个block去定位某一个Datanode节点,这样就可连续地读取一个文件的全部数据(组成文件的、连续的多个block数据块)。

大数据架构师,带你HDFS读文件过程分析:读取文件的Block数据

 

查找待读取的一个字节所在的Datanode节点

上面public synchronized int read(byte buf[], int off, int len) throws IOException方法,调用了blockSeekTo方法来获取,文件某个字节索引位置的数据所在的Datanode节点。其实,很容易就能想到,想要获取到数据所在的Datanode节点,一定是从block元数据中计算得到,然后根据Client缓存的block映射列表,找到block对应的Datanode列表,我们看一下blockSeekTo方法的代码实现:

private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
... ...

DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
while (true) {
LocatedBlock targetBlock = getBlockAt(target, true); // 获取字节偏移位置为target的字节数据所在的block元数据对象
assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();

DNAddrPair retval = chooseDataNode(targetBlock); // 选择一个Datanode去读取数据
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;

// 先尝试从本地读取数据,如果数据不在本地,则正常去读取远程的Datanode节点
Block blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
if (shouldTryShortCircuitRead(targetAddr)) {
try {
blockReader = getLocalBlockReader(conf, src, blk, accessToken,
chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); // 创建一个用来读取本地数据的BlockReader对象
return chosenNode;
} catch (AccessControlException ex) {
LOG.warn("Short circuit access failed ", ex);
//Disable short circuit reads
shortCircuitLocalReads = false;
} catch (IOException ex) {
if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
/* Get a new access token and retry. */
refetchToken--;
fetchBlockAt(target);
continue;
} else {
LOG.info("Failed to read " + targetBlock.getBlock()
+ " on local machine" + StringUtils.stringifyException(ex));
LOG.info("Try reading via the datanode on " + targetAddr);
}
}
}

// 本地读取失败,按照更一般的方式去读取远程的Datanode节点来获取数据
try {
s = socketFactory.createSocket();
LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
s.setSoTimeout(socketTimeout);
blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
accessToken,
blk.getGenerationStamp(),
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, clientName); // 创建一个远程的BlockReader对象
return chosenNode;
} catch (IOException ex) {
if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
fetchBlockAt(target);
} else {
LOG.warn("Failed to connect to " + targetAddr
+ ", add to deadNodes and continue" + ex);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection failure", ex);
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode); // 读取失败,会将选择的Datanode加入到Client的dead node列表,为下次读取选择合适的Datanode读取文件数据提供参考元数据信息
}
if (s != null) {
try {
s.close();
} catch (IOException iex) { }
}
s = null;
}
}
}

上面代码中,主要包括如下几个要点:

  • 选择合适的Datanode节点,提高读取效率

在读取文件的时候,首先会从Namenode获取文件对应的block列表元数据,返回的block列表是按照Datanode的网络拓扑结构进行排序过的(本地节点优先,其次是同一机架节点),而且,Client还维护了一个dead node列表,只要此时bock对应的Datanode列表中节点不出现在dead node列表中就会被返回,用来作为读取数据的Datanode节点。

  • 如果Client为集群Datanode节点,尝试从本地读取block

通过调用chooseDataNode方法返回一个Datanode结点,通过判断,如果该节点地址是本地地址,并且该节点上对应的block元数据信息的状态不是正在创建的状态,则满足从本地读取数据块的条件,然后会创建一个LocalBlockReader对象,直接从本地读取。在创建LocalBlockReader对象的过程中,会先从缓存中查找一个本地Datanode相关的LocalDatanodeInfo对象,该对象定义了与从本地Datanode读取数据的重要信息,以及缓存了待读取block对应的本地路径信息,可以从LocalDatanodeInfo类定义的属性来说明:

private ClientDatanodeProtocol proxy = null;
private final Map<Block, BlockLocalPathInfo> cache;

如果缓存中存在待读取的block的相关信息,可以直接进行读取;否则,会创建一个proxy对象,以及计算待读取block的路径信息BlockLocalPathInfo,最后再加入到缓存,为后续可能的读取加速。我们看一下如果没有从缓存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),则会执行如下逻辑:

// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);

上面proxy为ClientDatanodeProtocol类型,Client与Datanode进行RPC通信的协议,RPC调用getBlockLocalPathInfo获取block对应的本地路径信息,可以在Datanode类中查看具体实现,如下所示:

BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);

Datanode调用FSDataset(实现接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:

@Override //FSDatasetInterface
public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
throws IOException {
File datafile = getBlockFile(block); // 获取本地block在本地Datanode文件系统中的文件路径
File metafile = getMetaFile(datafile, block); // 获取本地block在本地Datanode文件系统中的元数据的文件路径
BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
}

接着可以直接去读取该block文件(如果需要检查校验和文件,会读取block的元数据文件metafile):

... // BlockReaderLocal类的newBlockReader静态方法
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile);

if (!skipChecksum) { // 如果检查block的校验和
// get the metadata file
File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile);

// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn);
} else {
localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);
}

在上面代码中,返回了BlockLocalPathInfo,但是很可能在这个过程中block被删除了,在删除block的时候,Namenode会调度指派该Datanode删除该block,恰好在这个时间间隔内block对应的BlockLocalPathInfo信息已经失效(文件已经被删除),所以上面这段代码再try中会抛出异常,并在catch中捕获到IO异常,会从缓存中再清除掉失效的block到BlockLocalPathInfo的映射信息。

  • 如果Client非集群Datanode节点,远程读取block

如果Client不是Datanode本地节点,则只能跨网络节点远程读取,首先创建Socket连接:

s = socketFactory.createSocket();
LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
s.setSoTimeout(socketTimeout);

建立Client到目标Datanode(targetAddr)的连接,然后同样也是创建一个远程BlockReader对象RemoteBlockReader来辅助读取block数据。创建RemoteBlockReader过程中,首先向目标Datanode发送RPC请求:

// in and out will be closed when sock is closed (by the caller)
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));

//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
out.writeLong( blockId ); // block ID
out.writeLong( genStamp ); // 时间戳信息
out.writeLong( startOffset ); // block起始偏移量
out.writeLong( len ); // block长度
Text.writeString(out, clientName); // 客户端标识
accessToken.write(out);
out.flush();

然后获取到DataInputStream对象来读取Datanode的响应信息:

DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));

最后,返回一个对象RemoteBlockReader:

return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);

借助BlockReader来读取block字节

我们再回到blockSeekTo方法中,待读取block所在的Datanode信息、BlockReader信息都已经具备,接着就可以从包含输入流(InputStream)对象的BlockReader中读取数据块中一个字节数据:

int result = readBuffer(buf, off, realLen);

将block数据中一个字节读取到buf中,如下所示:

private synchronized int readBuffer(byte buf[], int off, int len) throws IOException {
IOException ioe;
boolean retryCurrentNode = true;

while (true) {
// retry as many times as seekToNewSource allows.
try {
return blockReader.read(buf, off, len); // 调用blockReader的read方法读取字节数据到buf中
} catch ( ChecksumException ce ) {
LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode.getName() + " at " + ce.getPos());
reportChecksumFailure(src, currentBlock, currentNode);
ioe = ce;
retryCurrentNode = false; // 只尝试读取当前选择的Datanode一次,失败的话就会被加入到Client的dead node列表中
} catch ( IOException e ) {
if (!retryCurrentNode) {
LOG.warn("Exception while reading from " + currentBlock + " of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e));
}
ioe = e;
}
boolean sourceFound = false;
if (retryCurrentNode) {
/* possibly retry the same node so that transient errors don't
* result in Application level failures (e.g. Datanode could have
* closed the connection because the client is idle for too long).
*/
sourceFound = seekToBlockSource(pos);
} else {
addToDeadNodes(currentNode); // 加入到Client的dead node列表中
sourceFound = seekToNewSource(pos); // 从当前选择的Datanode上读取数据失败,会再次选择一个Datanode,这里seekToNewSource方法内部调用了blockSeekTo方法去选择一个Datanode
}
if (!sourceFound) {
throw ioe;
}
retryCurrentNode = false;
}
}

通过BlockReaderLocal或者RemoteBlockReader来读取block数据,逻辑非常类似,主要是控制读取字节的偏移量,记录偏移量的状态信息,详细可以查看它们的源码。(原创时延军(包含链接:http://shiyanjun.cn))

DataNode节点处理读文件Block请求

我们可以在DataNode端看一下,如何处理一个读取Block的请求。如果Client与DataNode不是同一个节点,则为远程读取文件Block,首先Client需要发送一个请求头信息,代码如下所示:

//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
out.writeLong( blockId ); // block ID
out.writeLong( genStamp ); // 时间戳信息
out.writeLong( startOffset ); // block起始偏移量
out.writeLong( len ); // block长度
Text.writeString(out, clientName); // 客户端标识
accessToken.write(out);
out.flush();

DataNode节点端通过验证数据传输版本号(
DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,会判断传输操作类型,如果是读操作DataTransferProtocol.OP_READ_BLOCK,则会通过Client建立的Socket来创建一个OutputStream对象,然后通过BlockSender向Client发送Block数据,代码如下所示:

try {
blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); // 创建BlockSender对象
} catch(IOException e) {
out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
throw e;
}

out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 回复一个响应Header信息:成功状态
long read = blockSender.sendBlock(out, baseStream, null); // 发送请求的Block数据


Tags:大数据   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
为啥这几年偷税漏税的新闻这么多?不是偷的人多了,是因为国家有了查税大杀器: ...【详细内容】
2021-12-24  Tags: 大数据  点击:(8)  评论:(0)  加入收藏
张欣安科瑞电气股份有限公司 上海嘉定 201801 摘要:随着电力行业各系统接入,海量数据涌现,如何利用电网信息化中大量数据,对客户需求进行判断分析,服务于营销链条,提升企业市场竞...【详细内容】
2021-12-14  Tags: 大数据  点击:(9)  评论:(0)  加入收藏
1、什么是数据分析结合分析工具,运用数据分析思维,分析庞杂数据信息,为业务赋能。 2、数据分析师工作的核心流程:(1)界定问题:明确具体问题是什么;●what 发生了什么(是什么)●why 为...【详细内容】
2021-12-01  Tags: 大数据  点击:(25)  评论:(0)  加入收藏
数据作为新的生产要素,其蕴含的价值日益凸显,而安全问题却愈发突出。密码技术,是实现数据安全最经济、最有效、最可靠的手段,对数据进行加密,并结合有效的密钥保护手段,可在开放环...【详细内容】
2021-11-26  Tags: 大数据  点击:(17)  评论:(0)  加入收藏
导读:网易大数据平台的底层数据查询引擎,选用了Impala作为OLAP查询引擎,不但支撑了网易大数据的交互式查询与自助分析,还为外部客户提供了商业化的产品与服务。今天将为大家分享...【详细内容】
2021-11-26  Tags: 大数据  点击:(15)  评论:(0)  加入收藏
日前,北京市人力资源和社会保障局发布《2021年北京市人力资源市场薪酬大数据报告》,《报告》基于本市2020年度相关调研数据,按照行业、职位、群体等维度对薪酬数据进行了分析,首...【详细内容】
2021-11-04  Tags: 大数据  点击:(28)  评论:(0)  加入收藏
架构是数据仓库建设的总体规划,从整体视角描述了解决方案的高层模型,描述了各个子系统的功能以及关系,描述了数据从源系统到决策系统的数据流程。业务需求回答了要做什么,架构就...【详细内容】
2021-11-03  Tags: 大数据  点击:(35)  评论:(0)  加入收藏
同一产品对老客户的要价竟然比新客户要高?这是当下“大数据杀熟”的直接结果。近年来,随着平台经济的蓬勃发展,大数据在为用户服务之外,也引发了多种不合理现象。为了有效遏制“...【详细内容】
2021-10-29  Tags: 大数据  点击:(31)  评论:(0)  加入收藏
如今社会,手机电话在中国的使用率已达到99%以上,大大的地增强了我们的生活水平。而电话不但用以日常生活,还可以用以工作中,例如电话营销,电话便是他们的武器装备,他们根据手机的...【详细内容】
2021-10-26  Tags: 大数据  点击:(44)  评论:(0)  加入收藏
《个人信息保护法》11月1日即将生效,在大数据营销充斥在网络上的现在,如何引导大数据为善,如何更好的使用开发大数据,变得既重要也有现实意义。...【详细内容】
2021-10-26  Tags: 大数据  点击:(35)  评论:(0)  加入收藏
▌简易百科推荐
张欣安科瑞电气股份有限公司 上海嘉定 201801 摘要:随着电力行业各系统接入,海量数据涌现,如何利用电网信息化中大量数据,对客户需求进行判断分析,服务于营销链条,提升企业市场竞...【详细内容】
2021-12-14  安科瑞张欣    Tags:大数据   点击:(9)  评论:(0)  加入收藏
1、什么是数据分析结合分析工具,运用数据分析思维,分析庞杂数据信息,为业务赋能。 2、数据分析师工作的核心流程:(1)界定问题:明确具体问题是什么;●what 发生了什么(是什么)●why 为...【详细内容】
2021-12-01  逆风北极光    Tags:大数据   点击:(25)  评论:(0)  加入收藏
在实际工作中,我们经常需要整理各个业务部门发来的数据。不仅分散,而且数据量大、格式多。单是从不同地方汇总整理这些原始数据就花了大量的时间,更不用说还要把有效的数据收集...【详细内容】
2021-11-30  百数    Tags:数据   点击:(21)  评论:(0)  加入收藏
数据作为新的生产要素,其蕴含的价值日益凸显,而安全问题却愈发突出。密码技术,是实现数据安全最经济、最有效、最可靠的手段,对数据进行加密,并结合有效的密钥保护手段,可在开放环...【详细内容】
2021-11-26  炼石网络    Tags:数据存储   点击:(17)  评论:(0)  加入收藏
导读:网易大数据平台的底层数据查询引擎,选用了Impala作为OLAP查询引擎,不但支撑了网易大数据的交互式查询与自助分析,还为外部客户提供了商业化的产品与服务。今天将为大家分享...【详细内容】
2021-11-26  DataFunTalk    Tags:大数据   点击:(15)  评论:(0)  加入收藏
导读:数据挖掘是一种发现知识的手段。数据挖掘要求数据分析师通过合理的方法,从数据中获取与挖掘项目相关的知识。作者:赵仁乾 田建中 叶本华 常国珍来源:华章科技数据挖掘是一...【详细内容】
2021-11-23  华章科技  今日头条  Tags:数据挖掘   点击:(20)  评论:(0)  加入收藏
今天再给大家分享一个不错的可视化大屏分析平台模板DataColour。 data-colour 可视化分析平台采用前后端分离模式,后端架构设计采用微服务架构模式。 前端技术:Angularjs、Jq...【详细内容】
2021-11-04  web前端进阶    Tags:DashboardClient   点击:(39)  评论:(0)  加入收藏
在Kubernetes已经成了事实上的容器编排标准之下,微服务的部署变得非常容易。但随着微服务规模的扩大,服务治理带来的挑战也会越来越大。在这样的背景下出现了服务可观测性(obs...【详细内容】
2021-11-02  大数据推荐杂谈    Tags:Prometheus   点击:(40)  评论:(0)  加入收藏
同一产品对老客户的要价竟然比新客户要高?这是当下“大数据杀熟”的直接结果。近年来,随着平台经济的蓬勃发展,大数据在为用户服务之外,也引发了多种不合理现象。为了有效遏制“...【详细内容】
2021-10-29    海外网   Tags:大数据   点击:(31)  评论:(0)  加入收藏
本人03年开始从事贸易行业,多年来一直致力于外贸获客和跨境电商选品等领域,最近有些小伙伴反馈海关数据演示的都挺好为啥用起来不是那么回事?大家看到数据时关注的有产品、采购...【详细内容】
2021-10-28  QD云龙    Tags:数据   点击:(33)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条