7.3 数据的I/O中序列化操作
序列化是将对象转化为字节流的方法,或者说用字节流描述对象的方法。与序列化相对的是反序列化,反序列化是将字节流转化为对象的方法。序列化有两个目的:
1)进程间通信;
2)数据持久性存储。
Hadoop采用RPC来实现进程间通信。一般而言,RPC的序列化机制有以下特点:
1)紧凑:紧凑的格式可以充分利用带宽,加快传输速度;
2)快速:能减少序列化和反序列化的开销,这会有效地减少进程间通信的时间;
3)可扩展:可以逐步改变,是客户端与服务器端直接相关的,例如,可以随时加入一个新的参数方法调用;
4)互操作性:支持不同语言编写的客户端与服务器交换数据。
Hadoop也希望数据持久性存储同样具有以上这些优点,因此它的数据序列化机制就是依照以上这些目的而设计的(或者说是希望设计成这样)。
在Hadoop中,序列化处于核心地位。因为无论是存储文件还是在计算中传输数据,都需要执行序列化的过程。序列化与反序列化的速度,序列化后的数据大小等都会影响数据传输的速度,以致影响计算的效率。正是因为这些原因,Hadoop并没有采用Java提供的序列化机制(Java Object Serialization),而是自己重新写了一个序列化机制Writeables。Writeables具有紧凑、快速的优点(但不易扩展,也不利于不同语言的互操作),同时也允许对自己定义的类加入序列化与反序列化方法,而且很方便。
7.3.1 Writable类
Writable是Hadoop的核心,Hadoop通过它定义了Hadoop中基本的数据类型及其操作。一般来说,无论是上传下载数据还是运行Mapreduce程序,你无时无刻不需要使用Writable类,因此Hadoop中具有庞大的一类Writable类(见图7-2),不过Writable类本身却很简单。
Writable类中只定义了两个方法:
//序列化输出数据流
void write(DataOutput out)throws IOException
//反序列化输入数据流
void readFields(DataInput in)throws IOException
Hadoop还有很多其他的Writable类。比如WritableComparable、ArrayWritable、Two-DArrayWritable及AbstractMapWritable,它们直接继承自Writable类。还有一些类,如BooleanWritale、ByteWritable等,它们不是直接继承于Writable类,而是继承自WritableComparable类。Hadoop的基本数据类型就是由这些类构成的。这些类构成了以下的层次关系(如图7-2所示)。
图 7-2 Writable类层次关系图
1.Hadoop的比较器
WritableComparable是Hadoop中非常重要的接口类。它继承自org.apache.hadoop.io.Writable类和java.lang.Comparable类。WritableComparator是Writablecomparable的比较器,它是RawComparator针对WritableComparate类的一个通用实现,而RawComparator则继承自java.util.Comparator,它们之间的关系如图7-3所示。
图 7-3 WritableComparable和WritableComparablor类层次关系图
这两个类对MapReduce而言至关重要,大家都知道,MapReduce执行时,Reducer(执行Reduce任务的机器)会搜集相同key值的key/value对,并且在Reduce之前会有一个排序过程,这些键值的比较都是对WritableComparate类型进行的。
Hadoop在RawComparator中实现了对未反序列化对象的读取。这样做的好处是,可以不必创建对象就能比较想要比较的内容(多是key值),从而省去了创建对象的开销。例如,大家可以使用如下函数,对指定了开始位置(s1和s2)及固定长度(l1和l2)的数组进行比较:
public interface RawComparator<T>extends Comparator<T>{
public int compare(byte[]b1,int s1,int l1,byte[]b2,int s2,int l2);
}
WritableComparator是RawComparator的子类,在这里,添加了一个默认的对象进行反序列化,并调用了比较函数compare()进行比较。下面是WritableComparator中对固定字节反序列化的执行情况,以及比较的实现过程:
public int compare(byte[]b1,int s1,int l1,byte[]b2,int s2,int l2){
try{
buffer.reset(b1,s1,l1);//parse key1
key1.readFields(buffer);
buffer.reset(b2,s2,l2);//parse key2
key2.readFields(buffer);
}catch(IOException e){
throw new RuntimeException(e);
}
return compare(key1,key2);//compare them
}
2.Writable类中的数据类型
(1)基本类
Writable中封装有很多Java的基本类,如表7-3所示。
其中最简单的要数Hadoop中对Boolean的实现,如下所示:
package cn.edn.ruc.cloudcomputing.book.chapter07;
import java.io.*;
public class BooleanWritable implements WritableComparable{
private boolean value;
public BooleanWritable(){};
public BooleanWritable(boolean value){
set(value);
}
public void set(boolean value){
this.value=value;
}
public boolean get(){
return value;
}
public void readFields(DataInput in)throws IOException{
value=in.readBoolean();
}
public void write(DataOutput out)throws IOException{
out.writeBoolean(value);
}
public boolean equals(Object o){
if(!(o instanceof BooleanWritable)){
return false;
}
BooleanWritable other=(BooleanWritable)o;
return this.value==other.value;
}
public int hashCode(){
return value?0:1;
}
public int compareTo(Object o){
boolean a=this.value;
boolean b=((BooleanWritable)o).value;
return((a==b)?0:(a==false)?-1:1);
}
public String toString(){
return Boolean.toString(get());
}
public static class Comparator extends WritableComparator{
public Comparator(){
super(BooleanWritable.class);
}
public int compare(byte[]b1,int s1,int l1,
byte[]b2,int s2,int l2){
boolean a=(readInt(b1,s1)==1)?true:false;
boolean b=(readInt(b2,s2)==1)?true:false;
return((a==b)?0:(a==false)?-1:1);
}
}
static{
WritableComparator.define(BooleanWritable.class, new Comparator());
}
}
可以看到Hadoop直接将boolean写入到字节流(out.writeBoolean(value))中了,并没有采用Java的序列化机制。同时,除了构造函数、set()函数、get()函数等外,Hadoop还定义了三个用于比较的函数:equals()、compareTo()、compare()。前两个很简单,第三个就是前文中重点介绍的比较器。Hadoop中封装定义的其他Java基本数据类型(如Boolean、byte、int、float、long、double)都是相似的。
如果大家对Java流处理比较了解的话可能会知道,Java流处理中并没有DataOutput.writeVInt()。实际上,这是Hadoop自己定义的变长类型(VInt, VLong),而且VInt和VLong的处理方式实际上是一样的。
public static void writeVInt(DataOutput stream, int i)throws IOException{
writeVLong(stream, i);
}
Hadoop对VLong类型的处理方法如下:
public static void writeVLong(DataOutput stream, long i)throws IOException{
if(i>=-112&&i<=127){
stream.writeByte((byte)i);
return;
}
int len=-112;
if(i<0){
i^=-1L;//take one's complement'
len=-120;
}
long tmp=i;
while(tmp!=0){
tmp=tmp>>8;
len—;
}
stream.writeByte((byte)len);
len=(len<-120)?-(len+120):-(len+112);
for(int idx=len;idx!=0;idx—){
int shiftbits=(idx-1)*8;
long mask=0xFFL<<shiftbits;
stream.writeByte((byte)((i&mask)>>shiftbits));
}
}
上面代码的意思是如果数值较小(在-112和127之间),那么就直接将这个数值写入数据流内(stream.writeByte((byte)i))。如果不是,则先用len表示字节长度与正负,并写入数据流中,然后在其后写入这个数值。
(2)其他类
下面将按照先易后难的顺序一一讲解。
1)NullWritable。这是一个占位符,它的序列化长度为零,没有数值从流中读出或是写入流中。
public void readFields(DataInput in)throws IOException{}
public void write(DataOutput out)throws IOException{}
在任何编程语言或编程框架时,占位符都是很有用的,这个类型不可以和其他类型比较,在MapReduce,你可以将任何键或值设为空值。
2)BytesWritable和ByteWritable。ByteWritable是一个二进制数据的封装。它的所有方法都是基于单个Byte来处理的。BytesWritable是一个二进制数据数组的封装。它对输出流的处理如下所示:
public BytesWritable(byte[]bytes){
this.bytes=bytes;
this.size=bytes.length;
}
public void write(DataOutput out)throws IOException{
out.writeInt(size);
out.write(bytes,0,size);
}
可以看到,它首先会把这个二进制数据数组的长度写入输入流中,这个长度一般是在声明时所获得的二进制数据数组的实际长度。当然这个值也可以人为设定。如果要把长度为3、位置为129的字节数组序列化,根据程序可知,结果应为:
Size=00000003 bytes[]={(01),(02),(09)}
数据流中的值就是:
00000003010209
3)Text。这可能是这几个自定义类型中相对复杂的一个了。实际上,这是Hadoop中对string类型的重写,但是又与其有一些不同。Text使用标准的UTF-8编码,同时Hadoop使用变长类型VInt来存储字符串,其存储上限是2GB。
Text类型与String类型的主要差别如下:
String的长度定义为String包含的字符个数;Text的长度定义为UTF-8编码的字节数。
String内的indexOf()方法返回的是char类型字符的索引,比如字符串(1234),字符3的位置就是2(字符1的位置是0);而Text的find()方法返回的是字节偏移量。
String的charAt()方法返回的是指定位置的char字符;而Text的charAT()方法需要指定偏移量。
另外,Text内定义了一个方法toString(),它用于将Text类型转化为String类型。
看如下这个例子:
package cn.edn.ruc.cloudcomputing.book.chapter07;
import java.io.*;
import org.apache.hadoop.io.*;
public class MyMapre{
public static void strings(){
String s="\u0041\u00DF\u6771\uD801\uDC00";
System.out.println(s.length());
System.out.println(s.indexOf("\u0041"));
System.out.println(s.indexOf("\u00DF"));
System.out.println(s.indexOf("\u6771"));
System.out.println(s.indexOf("\uD801\uDC00"));
}
public static void texts(){
Text t=new Text("\u0041\u00DF\u6771\uD801\uDC00");
System.out.println(t.getLength());
System.out.println(t.find("\u0041"));
System.out.println(t.find("\u00DF"));
System.out.println(t.find("\u6771"));
System.out.println(t.find("\uD801\uDC00"));
}
public static void main(String args[]){
strings();
texts();
}
}
输出结果为
5
0
1
2
3
10
0
1
3
6
上面例子可以验证前面所列的那些差别。
4)ObjectWritable。ObjectWritable是一种多类型的封装。可以适用于Java的基本类型、字符串等。不过,这并不是一个好方法,因为Java在每次被序列化时,都要写入被封装类型的类名。但是如果类型过多,使用静态数组难以表示时,采用这个类仍是不错的做法。
5)ArrayWritable和TwoDArrayWritable。ArrayWritable和TwoDArrayWritable,顾名思义,是针对数组和二维数组构建的数据类型。这两个类型声明的变量需要在使用时指定类型,因为ArrayWritable和TwoDArrayWritable并没有空值的构造函数。
ArrayWritable a=new ArrayWritable(IntWritable.class)
同样,在声明它们的子类时,必须使用super()来指定ArrayWritable和TwoDArrayWritable的数据类型。
public class IntArrayWritable extends ArrayWritable{
public IntArrayWritable(){
super(IntWritable.class);
}
}
一般情况下,ArrayWritable和TwoDArrayWritable都有set()和get()函数,在将Text转化为String时,它们也都提供了一个转化函数toArray()。但是它们没有提供比较器comparator,这点需要注意。同时从TwoDArrayWritable的write和readFields可以看出是横向读写的,同时还会读写每一维的数据长度。
public void readFields(DataInput in)throws IOException{
for(int i=0;i<values.length;i++){
for(int j=0;j<values[i].length;j++){
……
value.readFields(in);
values[i][j]=value;//保存读取的数据
}
}
}
public void write(DataOutput out)throws IOException{
for(int i=0;i<values.length;i++){
out.writeInt(values[i].length);
}
for(int i=0;i<values.length;i++){
for(int j=0;j<values[i].length;j++){
values[i][j].write(out);
}
}
}
6)MapWritable和SortedMapWritable。MapWritable和SortedMapWritable分别是java.util.Map()和java.util.SortedMap()的实现。
这两个实例是按照如下格式声明的:
private Map<Writable, Writable>instance;
private SortedMap<WritableComparable, Writable>instance;
我们可以用Hadoop定义的Writable类型来填充key或value,也可以使用自己定义的Writable类型来填充。
在java.util.Map()和java.util.SortedMap()中定义的功能,如getKey()、getValue()、keySet()等,在这两个类中均有实现。Map的使用也很简单,见如下程序,需要注意的是,不同key值对应的value数据类型可以不同。
package cn.edn.rm.cloodcomputing.book.chapter07;
import java.io.*;
import java.util.*;
import org.apache.hadoop.io.*;
public class MyMapre{
public static void main(String args[])throws IOException{
MapWritable a=new MapWritable();
a.put(new IntWritable(1),new Text("Hello"));
a.put(new IntWritable(2),new Text("World"));
MapWritable b=new MapWritable();
WritableUtils.cloneInto(b, a);
System.out.println(b.get(new IntWritable(1)));
System.out.println(b.get(new IntWritable(2)));
}
}
显示结果为
Hello
World
7)CompressedWritable。CompressedWritable是保存压缩数据的数据结构。跟之前介绍的数据结构不同,它实现Writable接口,主要面向在Map和Reduce阶段中的大数据对象操作,对这些大数据对象的压缩能够大大加快数据的传输速率。它的主要数据结构是一个byte数组,提供给用户必须实现的函数是readFieldsCompressed和writeCompressed。CompressedWritable在读取数据时先读取二进制字节流,然后调用ensureInflated函数进行解压,在写数据时,将输出的二进制字节流封装成压缩后的二进制字节流。
8)GenericWritable。这个数据类型是一个通用的数据封装类型。由于是通用的数据封装,它需要保存数据和数据的原始类型,其数据结构如下:
private static final byte NOT_SET=-1;
private byte type=NOT_SET;
private Writable instance;
private Configuration conf=null;
由于其特殊的数据结构,在读写时也需要读写对应的数据结构:实际数据和数据类型,并且要保证固定的顺序。
public void readFields(DataInput in)throws IOException{
//先读取数据类型
type=in.readByte();
……
//再读取数据
instance.readFields(in);
}
public void write(DataOutput out)throws IOException{
if(type==NOT_SET||instance==null)
throw new IOException("The GenericWritable has NOT been set correctly.type="
+type+",instance="+instance);
//先写出数据类型
out.writeByte(type);
//在写出数据
instance.write(out);
}
9)VersionedWritable。VersionedWritable是一个抽象的版本检查类,它主要保证在一个类的发展过程中,使用旧类编写的程序仍然能由新类解析处理。在这个类的实现中只有简单的三个函数:
//返回版本信息
public abstract byte getVersion();
//写出版本信息
public void write(DataOutput out)throws IOException{
out.writeByte(getVersion());
}
//读入版本信息
public void readFields(DataInput in)throws IOException{
byte version=in.readByte();
if(version!=getVersion())
throw new VersionMismatchException(getVersion(),version);
}