7.4 针对Mapreduce的文件类

Hadoop定义了一些文件数据结构以适应Mapreduce编程框架的需要,其中SequenceFile和MapFile两种类型非常重要,Map输出的中间结果就是由它们表示的。其中,MapFile是经过排序并带有索引的SequenceFile。

7.4.1 SequenceFile类

SequenceFile记录的是key/value对的列表,是序列化之后的二进制文件,因此是不能直接查看的,我们可以通过如下命令来查看这个文件的内容。


hadoop fs-text MySequenceFile(你的SequenceFile文件)


Sequence有三种不同类型的结构:

1)未压缩的key/value对;

2)记录压缩的Key/value对(这种情况下只有value被压缩);

3)Block压缩的key/value对(在这种情况下,key与value被分别记录到块中并压缩)。

下面详细介绍它们的结构。

1.未压缩和只压缩value的SequenceFile数据格式

未压缩和只压缩value的SequenceFile数据格式基本是相同的。

Header是头,它记录的内容如图7-4所示,现在一一对其进行解释:

7.4 针对Mapreduce的文件类 - 图1

图 7-4 SequenceFile数据格式(未压缩和Record压缩格式)

version(版本号):这是一个形如SEQ4或SEQ5的字节数组,一共占四个字节;

keyClassName(key类名)和valueClassName(value类名):这两个都是String类型,记录的是key和value的数据类型;

compression(压缩):这是一个布尔类型,它记录的是在这个文件中压缩是否启用;

blockCompression(Block压缩):布尔类型,记录Block压缩是否启用;

compressor class(压缩类):这是Hadoop内封装的用于压缩key和value的代码;

metadata(元数据):用于记录文件的元数据,文件的元数据是一个<属性名,值>对的列表;

Record:它是数据内容,其内容简单明了,相信大家看图就很容易明白。

Sync-marker:它是一个标记,可以允许程序快速找到文件中随机的一个点。它可以使

MapReduce程序更有效率地分割大文件。

需要注意的是,Sync-marker每隔几百个字节会出现一次,因此最后的SequenceFile会是形如图7-5所示的序列文件。

7.4 针对Mapreduce的文件类 - 图2

图 7-5 SequenceFile数据存储示例

Sync出现的位置取决于字节数,而不是间隔的Recorder的个数。

从上面的内容可以知道,未压缩与只压缩value的SequenceFile数据格式有两点不同,一是compression(是否压缩)的值不同,二是value存储的数据是否经过了压缩不同。

2.Block压缩的SequenceFile数据格式

Block压缩的SequenceFile数据格式与上面两种也很相似,它们的头与上面是一样的,同时也会标记一个Sync-marker。不过它们的Recorder格式是不同的,并且Sync-marker是标记在每个块前面的。下面是Block压缩的SequenceFile的Recorder格式。如图7-6所示。

7.4 针对Mapreduce的文件类 - 图3

图 7-6 SequenceFile数据格式Recorder部分(Block压缩)

Block压缩一次会压缩多个Recorder, Recorder在达到一个值时被记录,这个值是由io.seqfile.compress.blocksize定义的。Block压缩的SequenceFile是形成图7-7所示的序列文件。

7.4 针对Mapreduce的文件类 - 图4

图 7-7 SequenceFile数据存储示例(Block压缩)

我们可以通过编写程序生成读取SequenceFile文件来实践一下。

程序如下(注意这个程序生成的数据大概会有150MB,需要的话可以减少循环次数以缩短运行时间):


package cn.edn.ruc.cloudcomputing.book.chapter07;

import java.io.IOException;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.io.*;

public class SequenceFileWriteDemo{

private static String[]myValue={

"hello world",

"bye world",

"hello hadoop",

"bye hadoop"

};

public static void main(String[]args)throws IOException{

String uri="你想要生成的SequenceFile的位置";

Configuration conf=new Configuration();

FileSystem fs=FileSystem.get(URI.create(uri),conf);

Path path=new Path(uri);

IntWritable key=new IntWritable();

Text value=new Text();

SequenceFile.Writer writer=null;

try{

writer=SequenceFile.createWriter(fs, conf, path, key.getClass(),value.

getClass());

for(int i=0;i<5000000;i++){

key.set(5000000-i);

value.set(myValue[i%myValue.length]);

writer.append(key, value);

}

}finally{

IOUtils.closeStream(writer);

}

}

}


程序结果是生成了一个SequenceFile文件,你可以使用前文提到的命令:Hadoop fs-text你的SequenceFile文件名,来查看这个文件。因为内容太多只展示一部分,其内容如下:


5000000 hello world

4999999 bye world

4999998 hello hadoop

4999997 bye hadoop

4999996 hello world

4999995 bye world

4999994 hello hadoop

4999993 bye hadoop

4999992 hello world

4999991 bye world

……

10 hello hadoop

9 bye hadoop

8 hello world

7 bye world

6 hello hadoop

5 bye hadoop

4 hello world

3 bye world

2 hello hadoop

1 bye hadoop


这个程序的关键是下面这段代码:


SequenceFile.Writer writer=null;

writer=SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass());

writer.append(key, value);


我们需要声明SequenceFile.Writer类并使用函数SequenceFile.createWriter()来给它赋值。这个函数中至少要指定四个参数,即输出流(fs)、conf对象(conf)、key的类型、value的类型,同时它还有很多重构函数,可以设置压缩等。然后我们就可以使用writer.append()来向流中写入key/value对了。

读取SequenceFile文件内容的程序也很简单,如下所示。

SequenceFileReadFile


package cn.edn.ruc.cloudcomputing.book.chapter07;

import java.io.IOException;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileReadFile{

public static void main(String[]args)throws IOException{

String uri="你想要读取的SequenceFile所在位置";

Configuration conf=new Configuration();

FileSystem fs=FileSystem.get(URI.create(uri),conf);

Path path=new Path(uri);

SequenceFile.Reader reader=null;

try{

reader=new SequenceFile.Reader(fs, path, conf);

Writable key=(Writable)ReflectionUtils.newInsta

nce(reader.getKeyClass(),conf);

Writable value=(WritableReflectionUtils.newInsta

nce(reader.getValueClass(),conf);

long position=reader.getPosition();

while(reader.next(key, value)){

String syncSeen=reader.syncSeen()?"*":"";

System.out.printf("[%s%s]\t%s\t%s\n",position, syncSeen, key, value);

position=reader.getPosition();//beginning of next record

}

}finally{

IOUtils.closeStream(reader);

}

}

}


读取SequenceFile文件的程序关键是以下代码:


SequenceFile.Reader reader=null;

reader=new SequenceFile.Reader(fs, path, conf);

reader.next(key, value);

Writable key=(Writable)ReflectionUtils.newInstance(reader.

getKeyClass(),conf);

Writable value=(Writable)ReflectionUtils.newInstance(reader.

getValueClass(),conf);


很简单,声明reader并赋值之后,我们可以通过getKeyClass()和getValueClass()得到key和value的类型,并通过ReflectionUtils直接实例化对象,然后就可以通过reader.next()跳到下一个key/value值,以遍历文件中所有的key/value对。

根据前面所述,生成SequenceFile文件时是可以采用压缩方式的,下面就采用Block压缩方式生成SequenceFile文件。此程序与生成不压缩SequenceFile文件的程序基本相同,只是在SequenceFile.createWrite()时修改了一下设置,如下所示:


SequenceFile.createWriter(fs,conf,path,key.getClass(),value.

getClass(),CompressionType.BLOCK)


然后查看生成的两个文件的大小:


-rwxrwxrwx 1 u u 10214801 2011-01-14 16:31 MySequenceOutput

-rwxrwxrwx 1 u u 159062628 2011-01-14 16:25 MySequenceOutput2


文件大小是以byte显示的,可以看到,采用Block压缩的文件是不压缩的1/16左右。

我们可以将这个Java文件编译打包,在运行时使用time函数记录这两个jar包的执行时间,如下所示:


//这是不使用压缩的程序

time hadoop jar UnComSequenceFileWriteFile.jar UnComSequence

FileWriteFile

real 0m47.668s

//这是使用压缩的程序

time hadoop jar ComSequenceFileWriteFile.jar ComSequenceFile

WriteFile

real 0m7.539s


上面记录了程序具体运行的时间,以毫秒为单位。可以看出,使用压缩的程序其执行效率要远远高于不使用压缩的程序。我们推测这个时间的差距主要是受硬盘写入时间的影响,再加上传输10MB的数据所花的时间要远远少于传输159MB的数据的。这就能很好地解释为什么在MapReduce程序中采用压缩会提高效率了(因为一般而言,这是Map的输出文件)。