7.1 I/O操作中的数据检查
Apache的Hadoop官网上有一个名为Sort900的具体的Hadoop配置实例,所谓Sort900就是在900台主机上对9TB的数据进行排序。一般而言,在Hadoop集群的实际应用中,主机的数目是很大的,Sort900使用了900台主机,而淘宝目前则使用了1100台主机来存储他们的数据(据说计划扩充到1500台)。在这么多的主机同时运行时,你会发现主机损坏是非常常见的,这就会涉及很多程序上的预处理了。对于本章而言,就体现在Hadoop中进行数据完整性检查的重要性上。
校验和方式是检查数据完整性的重要方式。一般会通过对比新旧校验和来确定数据情况,如果两者不同则说明数据已经损坏。比如,在传输数据前生成了一个校验和,将数据传输到目的主机时再次计算校验和,如果两次的校验和不同,则说明数据已经损坏。或者在系统启动时计算校验和,如果其值和硬盘上已经存在的校验和不同,那么也说明数据已经损坏。校验和不能恢复数据,只能检测错误。
Hadoop采用CRC-32(Cyclic Redundancy Check—-循环冗余校验,32指生成的校验和是32位的)的方式检查数据完整性。这是一种非常常见的校验和验证方式,检错能力强,开销小,易于实现。如果大家有兴趣可以自行查阅资料了解。
Hadoop采用HDFS作为默认的文件系统,因此我们需要讨论两方面的数据完整性:
1)本地文件系统的数据完整性;
2)HDFS的数据完整性。
1.对本地文件I/O的检查
在Hadoop中,本地文件系统的数据完整性由客户端负责。重点是在存储和读取文件时进行校验和的处理。
具体做法是,每当Hadoop创建文件a时,Hadoop就会同时在同一文件夹下创建隐藏文件.a.crc,这个文件记录了文件a的校验和。针对数据文件的大小,每512个字节Hadoop就会生成一个32位的校验和(4字节),你可以在src/core/core-default.xml中通过修改io.bytes.per.checksum的大小来修改每个校验和所针对的文件的大小。如下所示:
<property>
<name>io.bytes.per.checksum</name>
<value>512</value>
<description>The number of bytes per checksum.Must not be larger than io.file.
buffer.size.</description>
</property>
一般来说,主流的文件系统都能在一定程度上保证数据的完整性,因此有可能你并不需要Hadoop的这部分功能。如果不需要,你可以通过修改文件src/core/core-default.xml中fs.file.impl的值来禁用校验和机制,如下所示:
<property>
<name>fs.file.impl</name>
<value>org.apache.hadoop.fs.LocalFileSystem</value>
<description>The FileSystem for file:uris.</description>
</property>
把值修改为org.apache.hadoop.fs.RawLocalFileSystem即可禁用校验和机制。
如果你只想在程序中对某些读取禁用校验和检验,那么你可以声明RawLocalFileSystem实例。例如:
FileSystem fs=new RawFileSystem();
Fs.initialize(null, conf);
在Hadoop中,校验和系统单独为一类—org.apache.hadoop.fs.ChecksumFileSystem,当需要校验和机制时,你可以很方便地调用它来为你服务。
引用方法为:
FileSystem rawFS=……;
FileSystem checksumFS=new ChecksumFileSystem(rawFS);
事实上,org.apache.hadoop.fs.ChecksumFileSystem是org.apache.hadoop.fs.FileSystem子类的子类,其继承关系如下:
java.lang.Object
-org.apache.hadoop.conf.Configured
-org.apache.hadoop.fs.FileSystem
-org.apache.hadoop.fs.FilterFileSystem
-org.apache.hadoop.fs.ChecksumFileSystem
-org.apache.hadoop.fs.LocalFileSystem
如果大家对这些类的作用感兴趣,可以查阅Hadoop的app文档,地址为http://hadoop.apache.org/common/docs/current/api/index.html。
读取文件时,如果ChecksumFileSystem检测到错误,便会调用reportChecksumFailure。这是一个布尔类型的函数,此时,LocalFileSystem会把这些问题文件及其校验和一起移动到同一台主机的次级目录下,命名为bad_files。一般而言,使用者需要经常处理这些文件。
2.对HDFS的I/O数据进行检查
一般来说,HDFS会在三种情况下检验校验和:
(1)DataNode接收数据后存储数据前
要了解这种情况,大家先要了解DataNode一般会在什么时候接收数据。它接收数据一般有两种情况:一是用户从客户端上传数据;二是DataNode从其他DataNode上接收数据。一般来说,客户端往往也是DataNode,不过有时候客户端仅仅是客户端而已,并不是Hadoop集群中的节点。当客户端上传数据时,Hadoop会根据预定规则形成一条数据管线。图7-1就是一个典型的副本管线(数据备份为3)。数据0是原数据,数据1、数据2、数据3是备份。
图 7-1 数据管线及数据备份流程图
数据将按管线流动以完成数据的上传及备份过程,图7-1中顺序就是先在客户端这个节点上保存数据(在这张图上,客户端也是Hadoop集群中的一个节点)。注意这个流动的过程,备份1在接收数据的同时也会把接收到的数据发送给备份2所在的机器,因此如果过程执行顺利,三个备份形成的时间相差不多(相对依次备份而言)。这里面涉及一个负载均衡的问题,不过这个问题不是本章的重点,这里不再详述。我们在这里只关心数据完整性的问题。在传输数据的最开始阶段,Hadoop会简单地检查数据块的完整性信息,这一点从DataNode的源代码也可以看出。下面是DataNode在各个待传输节点之间传输数据的主要函数transferBlock(Block block, DataNodeInfo xferTargets[]),其中检查的主要代码如下:
//检查数据块是否真正存在
if(!data.isValidBlock(block)){
……
return;
}
//检查NameNode上数据块长度和硬盘数据块长度是否匹配
long onDiskLength=data.getLength(block);
if(block.getNumBytes()>onDiskLength){
……
return;
}
上面简单地检查之后,就开始向各个DataNode传输数据,在传输过程中会一同发送数据头信息,包括块信息、源DataNode信息、备份个数、校验和等,可参考DataTransfer中run函数的部分代码:
//数据头信息out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
//数据传输版本
out.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
out.writeLong(b.getBlockId());//块ID
out.writeLong(b.getGenerationStamp());//生成时间戳
……
srcNode.write(out);//写入源DataNode信息
out.writeInt(targets.length-1);//备份个数
for(int i=1;i<targets.length;i++){
targets[i].write(out);
}
blockSender.sendBlock(out, baseStream, null);//数据块和校验和
Hadoop不会在数据每流动到一个DataNode时都检查校验和,它只会在数据流动到最后一个节点时才检验校验和。也就是说Hadoop会在备份3所在的DataNode接受完数据后检查校验和。具体核心代码如BlockSender.java中的部分代码:
//通过设置的DataNode序列流正常传输数据
IOUtils.readFully(blockIn, buf, dataOff, len);
//传输结束后,根据配置的verifyChecksum来检测数据完整性
if(verifyChecksum){
……
for(int i=0;i<numChunks;i++){
checksum.reset();
int dLen=Math.min(dLeft, bytesPerChecksum);
checksum.update(buf, dOff, dLen);
if(!checksum.compare(buf, cOff)){
throw new ChecksumException("Checksum failed at"+
(offset+len-dLeft),len);
}
……
}
}
这就是从客户端上传数据时Hadoop对数据完整性检测进行的相关处理。
DataNode从其他DataNode接收数据时也是同样的处理过程。
(2)客户端读取DataNode上的数据时
Hadoop会在客户端读取DataNode上的数据时,使用DFSClient中的read函数先将数据读入到用户的数据缓冲区,然后再检验校验和。具体代码片段如下:
//读取数据到缓冲区
int nRead=super.read(buf, off, len);
if(dnSock!=null&&gotEOS&&!eosBefore&&nRead>=0
&&needChecksum()){
//检查校验和
checksumOk(dnSock);
}
(3)DataNode后台守护进程的定期检测
DataNode会在后台运行DataBlockScanner,这个程序会定期检测此DataNode上的所有数据块。从DataNode.java中startDataNode函数的源代码就可以看出:
//根据配置信息初始化DataNode上的定期数据扫描器
String reason=null;
if(conf.getInt("dfs.DataNode.scan.period.hours",0)<0){
reason="verification is turned off by configuration";
}else if(!(data instanceof FSDataset)){
reason="verifcation is supported only with FSDataset";
}
if(reason==null){
blockScanner=new DataBlockScanner(this,(FSDataset)data, conf);
}else{
LOG.info("Periodic Block Verification is disabled because"+reason+".");
}
……
//将扫描服务加入DataNode服务中
this.infoServer.addServlet(null,"/blockScannerReport",
DataBlockScanner.Servlet.class);
……
this.infoServer.start();
3.数据恢复策略
在Hadoop上进行数据读操作时,如果发现某数据块失效,读操作涉及的用户、DataNode和NameNode都会尝试来恢复数据块,恢复成功后会设置标签,防止其他角色重复恢复。下面以DataNode端的恢复为例说明恢复数据块的详细步骤,代码参见DataNode中的recoverBlock函数。
(1)检查已恢复标签
检查一致的数据块恢复标记,如果已经恢复,则直接跳过恢复阶段。
//如果数据块已经被回复,则直接跳过恢复阶段
synchronized(ongoingRecovery){
Block tmp=new Block();
tmp.set(block.getBlockId(),block.getNumBytes(),GenerationStamp.WILDCARD_STAMP);
if(ongoingRecovery.get(tmp)!=null){
String msg="Block"+block+"is already being recovered,"+"
ignoring this request to recover it.";
LOG.info(msg);
throw new IOException(msg);
}
ongoingRecovery.put(block, block);
}
(2)统计各个备份数据块恢复状态
在这个阶段,DataNode会检查所有出错数据块备份的DataNode,查看这些节点上数据块的恢复信息,然后将所有版本正确的数据块信息、DataNode信息作为一条记录保存在数据块记录表中。
//检查每个数据块备份DataNode
for(DataNodeID id:datanodeids){
try{
//获取数据块信息
BlockRecoveryInfo info=datanode.startBlockRecovery(block);
//数据块已不存在
if(info==null){
continue;
}
//数据块版本较晚
if(info.getBlock().getGenerationStamp()<block.getGenerationStamp()){
continue;
}
//正确版本数据块的信息保存起来
blockRecords.add(new BlockRecord(id, datanode, info));
if(info.wasRecoveredOnStartup()){
rwrCount++;//等待回复数
}else{
rbwCount++;//正在恢复数
}
}catch(IOException e){
++errorCount;//出错数
}
}
(3)找出所有正确版本数据块中最小长度的版本
在这一步骤中,DataNode会逐个扫描上一阶段中保存的数据块记录,首先判断当前副本是否正在恢复,如果正在恢复则跳过,如果不是正在恢复并且配置参数设置了恢复需要保持原副本长度,则将恢复长度相同的副本加入待恢复队列,否则将所有版本正确的副本加入待恢复队列。
for(BlockRecord record:blockRecords){
BlockRecoveryInfo info=record.info;
if(!shouldRecoverRwrs&&info.wasRecoveredOnStartup()){
continue;
}
if(keepLength){
if(info.getBlock().getNumBytes()==block.getNumBytes())
{syncList.add(record);}
}else{
syncList.add(record);
if(info.getBlock().getNumBytes()<minlength){
minlength=info.getBlock().getNumBytes();
}
}
}
(4)副本同步
如果需要保持副本长度,那么直接同步长度相同的副本即可,否则以长度最小的副本同步其他副本。
if(!keepLength){
block.setNumBytes(minlength);
}
return syncBlock(block, syncList, targets, closeFile);
与读取本地文件的情况相同,用户也可以使用命令来禁用检验和检验(从前面的代码中也可以看出,通常在检查校验和之前都有needChecksum等选项)。有两种方法可以达到这个目的。
一个是在使用open()读取文件前,设置FileSystem中的setVerifyChecksum值为false。
FileSystem fs=new FileSystem();
Fs.setVerifyChecksum(false);
另一个是使用shell命令,比如get命令和copyToLocal命令。
get命令的使用方法如下所示:
hadoop fs-get[-ignoreCrc][-crc]<src><localdst>
举个例子:
hadoop fs-get-ignoreCrc input~/Desktop/
get命令会复制文件到本地文件系统。可用-ignorecrc选项复制CRC校验失败的文件,或者使用-crc选项复制文件,以及CRC信息。
copyToLocal的使用方法如下所示:
hadoop fs-copyToLocal[-ignorecrc][-crc]URI<localdst>
再举个例子:
hadoop fs-copyToLocal-ignoreCrc input~/Desktop
除了要限定目标路径是一个本地文件外,其他和get命令类似。
禁用校验和检验的最主要目的并不是节约时间,用于检验校验和的开销一般情况都是可以接受的,禁用校验和检验的主要原因是,如果不禁用校验和检验,就无法下载那些已经损坏的文件来查看是否可以挽救,而有时候即使是只能挽救一小部分文件也是很值得的。