7.1 I/O操作中的数据检查

Apache的Hadoop官网上有一个名为Sort900的具体的Hadoop配置实例,所谓Sort900就是在900台主机上对9TB的数据进行排序。一般而言,在Hadoop集群的实际应用中,主机的数目是很大的,Sort900使用了900台主机,而淘宝目前则使用了1100台主机来存储他们的数据(据说计划扩充到1500台)。在这么多的主机同时运行时,你会发现主机损坏是非常常见的,这就会涉及很多程序上的预处理了。对于本章而言,就体现在Hadoop中进行数据完整性检查的重要性上。

校验和方式是检查数据完整性的重要方式。一般会通过对比新旧校验和来确定数据情况,如果两者不同则说明数据已经损坏。比如,在传输数据前生成了一个校验和,将数据传输到目的主机时再次计算校验和,如果两次的校验和不同,则说明数据已经损坏。或者在系统启动时计算校验和,如果其值和硬盘上已经存在的校验和不同,那么也说明数据已经损坏。校验和不能恢复数据,只能检测错误。

Hadoop采用CRC-32(Cyclic Redundancy Check—-循环冗余校验,32指生成的校验和是32位的)的方式检查数据完整性。这是一种非常常见的校验和验证方式,检错能力强,开销小,易于实现。如果大家有兴趣可以自行查阅资料了解。

Hadoop采用HDFS作为默认的文件系统,因此我们需要讨论两方面的数据完整性:

1)本地文件系统的数据完整性;

2)HDFS的数据完整性。

1.对本地文件I/O的检查

在Hadoop中,本地文件系统的数据完整性由客户端负责。重点是在存储和读取文件时进行校验和的处理。

具体做法是,每当Hadoop创建文件a时,Hadoop就会同时在同一文件夹下创建隐藏文件.a.crc,这个文件记录了文件a的校验和。针对数据文件的大小,每512个字节Hadoop就会生成一个32位的校验和(4字节),你可以在src/core/core-default.xml中通过修改io.bytes.per.checksum的大小来修改每个校验和所针对的文件的大小。如下所示:


<property>

<name>io.bytes.per.checksum</name>

<value>512</value>

<description>The number of bytes per checksum.Must not be larger than io.file.

buffer.size.</description>

</property>


一般来说,主流的文件系统都能在一定程度上保证数据的完整性,因此有可能你并不需要Hadoop的这部分功能。如果不需要,你可以通过修改文件src/core/core-default.xml中fs.file.impl的值来禁用校验和机制,如下所示:


<property>

<name>fs.file.impl</name>

<value>org.apache.hadoop.fs.LocalFileSystem</value>

<description>The FileSystem for file:uris.</description>

</property>


把值修改为org.apache.hadoop.fs.RawLocalFileSystem即可禁用校验和机制。

如果你只想在程序中对某些读取禁用校验和检验,那么你可以声明RawLocalFileSystem实例。例如:


FileSystem fs=new RawFileSystem();

Fs.initialize(null, conf);


在Hadoop中,校验和系统单独为一类—org.apache.hadoop.fs.ChecksumFileSystem,当需要校验和机制时,你可以很方便地调用它来为你服务。

引用方法为:


FileSystem rawFS=……;

FileSystem checksumFS=new ChecksumFileSystem(rawFS);


事实上,org.apache.hadoop.fs.ChecksumFileSystem是org.apache.hadoop.fs.FileSystem子类的子类,其继承关系如下:


java.lang.Object

-org.apache.hadoop.conf.Configured

-org.apache.hadoop.fs.FileSystem

-org.apache.hadoop.fs.FilterFileSystem

-org.apache.hadoop.fs.ChecksumFileSystem

-org.apache.hadoop.fs.LocalFileSystem


如果大家对这些类的作用感兴趣,可以查阅Hadoop的app文档,地址为http://hadoop.apache.org/common/docs/current/api/index.html。

读取文件时,如果ChecksumFileSystem检测到错误,便会调用reportChecksumFailure。这是一个布尔类型的函数,此时,LocalFileSystem会把这些问题文件及其校验和一起移动到同一台主机的次级目录下,命名为bad_files。一般而言,使用者需要经常处理这些文件。

2.对HDFS的I/O数据进行检查

一般来说,HDFS会在三种情况下检验校验和:

(1)DataNode接收数据后存储数据前

要了解这种情况,大家先要了解DataNode一般会在什么时候接收数据。它接收数据一般有两种情况:一是用户从客户端上传数据;二是DataNode从其他DataNode上接收数据。一般来说,客户端往往也是DataNode,不过有时候客户端仅仅是客户端而已,并不是Hadoop集群中的节点。当客户端上传数据时,Hadoop会根据预定规则形成一条数据管线。图7-1就是一个典型的副本管线(数据备份为3)。数据0是原数据,数据1、数据2、数据3是备份。

7.1 I/O操作中的数据检查 - 图1

图 7-1 数据管线及数据备份流程图

数据将按管线流动以完成数据的上传及备份过程,图7-1中顺序就是先在客户端这个节点上保存数据(在这张图上,客户端也是Hadoop集群中的一个节点)。注意这个流动的过程,备份1在接收数据的同时也会把接收到的数据发送给备份2所在的机器,因此如果过程执行顺利,三个备份形成的时间相差不多(相对依次备份而言)。这里面涉及一个负载均衡的问题,不过这个问题不是本章的重点,这里不再详述。我们在这里只关心数据完整性的问题。在传输数据的最开始阶段,Hadoop会简单地检查数据块的完整性信息,这一点从DataNode的源代码也可以看出。下面是DataNode在各个待传输节点之间传输数据的主要函数transferBlock(Block block, DataNodeInfo xferTargets[]),其中检查的主要代码如下:


//检查数据块是否真正存在

if(!data.isValidBlock(block)){

……

return;

}

//检查NameNode上数据块长度和硬盘数据块长度是否匹配

long onDiskLength=data.getLength(block);

if(block.getNumBytes()>onDiskLength){

……

return;

}


上面简单地检查之后,就开始向各个DataNode传输数据,在传输过程中会一同发送数据头信息,包括块信息、源DataNode信息、备份个数、校验和等,可参考DataTransfer中run函数的部分代码:


//数据头信息out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);

//数据传输版本

out.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);

out.writeLong(b.getBlockId());//块ID

out.writeLong(b.getGenerationStamp());//生成时间戳

……

srcNode.write(out);//写入源DataNode信息

out.writeInt(targets.length-1);//备份个数

for(int i=1;i<targets.length;i++){

targets[i].write(out);

}

blockSender.sendBlock(out, baseStream, null);//数据块和校验和


Hadoop不会在数据每流动到一个DataNode时都检查校验和,它只会在数据流动到最后一个节点时才检验校验和。也就是说Hadoop会在备份3所在的DataNode接受完数据后检查校验和。具体核心代码如BlockSender.java中的部分代码:


//通过设置的DataNode序列流正常传输数据

IOUtils.readFully(blockIn, buf, dataOff, len);

//传输结束后,根据配置的verifyChecksum来检测数据完整性

if(verifyChecksum){

……

for(int i=0;i<numChunks;i++){

checksum.reset();

int dLen=Math.min(dLeft, bytesPerChecksum);

checksum.update(buf, dOff, dLen);

if(!checksum.compare(buf, cOff)){

throw new ChecksumException("Checksum failed at"+

(offset+len-dLeft),len);

}

……

}

}


这就是从客户端上传数据时Hadoop对数据完整性检测进行的相关处理。

DataNode从其他DataNode接收数据时也是同样的处理过程。

(2)客户端读取DataNode上的数据时

Hadoop会在客户端读取DataNode上的数据时,使用DFSClient中的read函数先将数据读入到用户的数据缓冲区,然后再检验校验和。具体代码片段如下:


//读取数据到缓冲区

int nRead=super.read(buf, off, len);

if(dnSock!=null&&gotEOS&&!eosBefore&&nRead>=0

&&needChecksum()){

//检查校验和

checksumOk(dnSock);

}


(3)DataNode后台守护进程的定期检测

DataNode会在后台运行DataBlockScanner,这个程序会定期检测此DataNode上的所有数据块。从DataNode.java中startDataNode函数的源代码就可以看出:


//根据配置信息初始化DataNode上的定期数据扫描器

String reason=null;

if(conf.getInt("dfs.DataNode.scan.period.hours",0)<0){

reason="verification is turned off by configuration";

}else if(!(data instanceof FSDataset)){

reason="verifcation is supported only with FSDataset";

}

if(reason==null){

blockScanner=new DataBlockScanner(this,(FSDataset)data, conf);

}else{

LOG.info("Periodic Block Verification is disabled because"+reason+".");

}

……

//将扫描服务加入DataNode服务中

this.infoServer.addServlet(null,"/blockScannerReport",

DataBlockScanner.Servlet.class);

……

this.infoServer.start();


3.数据恢复策略

在Hadoop上进行数据读操作时,如果发现某数据块失效,读操作涉及的用户、DataNode和NameNode都会尝试来恢复数据块,恢复成功后会设置标签,防止其他角色重复恢复。下面以DataNode端的恢复为例说明恢复数据块的详细步骤,代码参见DataNode中的recoverBlock函数。

(1)检查已恢复标签

检查一致的数据块恢复标记,如果已经恢复,则直接跳过恢复阶段。


//如果数据块已经被回复,则直接跳过恢复阶段

synchronized(ongoingRecovery){

Block tmp=new Block();

tmp.set(block.getBlockId(),block.getNumBytes(),GenerationStamp.WILDCARD_STAMP);

if(ongoingRecovery.get(tmp)!=null){

String msg="Block"+block+"is already being recovered,"+"

ignoring this request to recover it.";

LOG.info(msg);

throw new IOException(msg);

}

ongoingRecovery.put(block, block);

}


(2)统计各个备份数据块恢复状态

在这个阶段,DataNode会检查所有出错数据块备份的DataNode,查看这些节点上数据块的恢复信息,然后将所有版本正确的数据块信息、DataNode信息作为一条记录保存在数据块记录表中。


//检查每个数据块备份DataNode

for(DataNodeID id:datanodeids){

try{

//获取数据块信息

BlockRecoveryInfo info=datanode.startBlockRecovery(block);

//数据块已不存在

if(info==null){

continue;

}

//数据块版本较晚

if(info.getBlock().getGenerationStamp()<block.getGenerationStamp()){

continue;

}

//正确版本数据块的信息保存起来

blockRecords.add(new BlockRecord(id, datanode, info));

if(info.wasRecoveredOnStartup()){

rwrCount++;//等待回复数

}else{

rbwCount++;//正在恢复数

}

}catch(IOException e){

++errorCount;//出错数

}

}


(3)找出所有正确版本数据块中最小长度的版本

在这一步骤中,DataNode会逐个扫描上一阶段中保存的数据块记录,首先判断当前副本是否正在恢复,如果正在恢复则跳过,如果不是正在恢复并且配置参数设置了恢复需要保持原副本长度,则将恢复长度相同的副本加入待恢复队列,否则将所有版本正确的副本加入待恢复队列。


for(BlockRecord record:blockRecords){

BlockRecoveryInfo info=record.info;

if(!shouldRecoverRwrs&&info.wasRecoveredOnStartup()){

continue;

}

if(keepLength){

if(info.getBlock().getNumBytes()==block.getNumBytes())

{syncList.add(record);}

}else{

syncList.add(record);

if(info.getBlock().getNumBytes()<minlength){

minlength=info.getBlock().getNumBytes();

}

}

}


(4)副本同步

如果需要保持副本长度,那么直接同步长度相同的副本即可,否则以长度最小的副本同步其他副本。


if(!keepLength){

block.setNumBytes(minlength);

}

return syncBlock(block, syncList, targets, closeFile);


与读取本地文件的情况相同,用户也可以使用命令来禁用检验和检验(从前面的代码中也可以看出,通常在检查校验和之前都有needChecksum等选项)。有两种方法可以达到这个目的。

一个是在使用open()读取文件前,设置FileSystem中的setVerifyChecksum值为false。


FileSystem fs=new FileSystem();

Fs.setVerifyChecksum(false);


另一个是使用shell命令,比如get命令和copyToLocal命令。

get命令的使用方法如下所示:


hadoop fs-get[-ignoreCrc][-crc]<src><localdst>


举个例子:


hadoop fs-get-ignoreCrc input~/Desktop/


get命令会复制文件到本地文件系统。可用-ignorecrc选项复制CRC校验失败的文件,或者使用-crc选项复制文件,以及CRC信息。

copyToLocal的使用方法如下所示:


hadoop fs-copyToLocal[-ignorecrc][-crc]URI<localdst>


再举个例子:


hadoop fs-copyToLocal-ignoreCrc input~/Desktop


除了要限定目标路径是一个本地文件外,其他和get命令类似。

禁用校验和检验的最主要目的并不是节约时间,用于检验校验和的开销一般情况都是可以接受的,禁用校验和检验的主要原因是,如果不禁用校验和检验,就无法下载那些已经损坏的文件来查看是否可以挽救,而有时候即使是只能挽救一小部分文件也是很值得的。