7.3 数据的I/O中序列化操作

序列化是将对象转化为字节流的方法,或者说用字节流描述对象的方法。与序列化相对的是反序列化,反序列化是将字节流转化为对象的方法。序列化有两个目的:

1)进程间通信;

2)数据持久性存储。

Hadoop采用RPC来实现进程间通信。一般而言,RPC的序列化机制有以下特点:

1)紧凑:紧凑的格式可以充分利用带宽,加快传输速度;

2)快速:能减少序列化和反序列化的开销,这会有效地减少进程间通信的时间;

3)可扩展:可以逐步改变,是客户端与服务器端直接相关的,例如,可以随时加入一个新的参数方法调用;

4)互操作性:支持不同语言编写的客户端与服务器交换数据。

Hadoop也希望数据持久性存储同样具有以上这些优点,因此它的数据序列化机制就是依照以上这些目的而设计的(或者说是希望设计成这样)。

在Hadoop中,序列化处于核心地位。因为无论是存储文件还是在计算中传输数据,都需要执行序列化的过程。序列化与反序列化的速度,序列化后的数据大小等都会影响数据传输的速度,以致影响计算的效率。正是因为这些原因,Hadoop并没有采用Java提供的序列化机制(Java Object Serialization),而是自己重新写了一个序列化机制Writeables。Writeables具有紧凑、快速的优点(但不易扩展,也不利于不同语言的互操作),同时也允许对自己定义的类加入序列化与反序列化方法,而且很方便。

7.3.1 Writable类

Writable是Hadoop的核心,Hadoop通过它定义了Hadoop中基本的数据类型及其操作。一般来说,无论是上传下载数据还是运行Mapreduce程序,你无时无刻不需要使用Writable类,因此Hadoop中具有庞大的一类Writable类(见图7-2),不过Writable类本身却很简单。

Writable类中只定义了两个方法:


//序列化输出数据流

void write(DataOutput out)throws IOException

//反序列化输入数据流

void readFields(DataInput in)throws IOException


Hadoop还有很多其他的Writable类。比如WritableComparable、ArrayWritable、Two-DArrayWritable及AbstractMapWritable,它们直接继承自Writable类。还有一些类,如BooleanWritale、ByteWritable等,它们不是直接继承于Writable类,而是继承自WritableComparable类。Hadoop的基本数据类型就是由这些类构成的。这些类构成了以下的层次关系(如图7-2所示)。

7.3 数据的I/O中序列化操作 - 图1

图 7-2 Writable类层次关系图

1.Hadoop的比较器

WritableComparable是Hadoop中非常重要的接口类。它继承自org.apache.hadoop.io.Writable类和java.lang.Comparable类。WritableComparator是Writablecomparable的比较器,它是RawComparator针对WritableComparate类的一个通用实现,而RawComparator则继承自java.util.Comparator,它们之间的关系如图7-3所示。

7.3 数据的I/O中序列化操作 - 图2

图 7-3 WritableComparable和WritableComparablor类层次关系图

这两个类对MapReduce而言至关重要,大家都知道,MapReduce执行时,Reducer(执行Reduce任务的机器)会搜集相同key值的key/value对,并且在Reduce之前会有一个排序过程,这些键值的比较都是对WritableComparate类型进行的。

Hadoop在RawComparator中实现了对未反序列化对象的读取。这样做的好处是,可以不必创建对象就能比较想要比较的内容(多是key值),从而省去了创建对象的开销。例如,大家可以使用如下函数,对指定了开始位置(s1和s2)及固定长度(l1和l2)的数组进行比较:


public interface RawComparator<T>extends Comparator<T>{

public int compare(byte[]b1,int s1,int l1,byte[]b2,int s2,int l2);

}


WritableComparator是RawComparator的子类,在这里,添加了一个默认的对象进行反序列化,并调用了比较函数compare()进行比较。下面是WritableComparator中对固定字节反序列化的执行情况,以及比较的实现过程:


public int compare(byte[]b1,int s1,int l1,byte[]b2,int s2,int l2){

try{

buffer.reset(b1,s1,l1);//parse key1

key1.readFields(buffer);

buffer.reset(b2,s2,l2);//parse key2

key2.readFields(buffer);

}catch(IOException e){

throw new RuntimeException(e);

}

return compare(key1,key2);//compare them


}

2.Writable类中的数据类型

(1)基本类

Writable中封装有很多Java的基本类,如表7-3所示。

7.3 数据的I/O中序列化操作 - 图3

其中最简单的要数Hadoop中对Boolean的实现,如下所示:


package cn.edn.ruc.cloudcomputing.book.chapter07;

import java.io.*;

public class BooleanWritable implements WritableComparable{

private boolean value;

public BooleanWritable(){};

public BooleanWritable(boolean value){

set(value);

}

public void set(boolean value){

this.value=value;

}

public boolean get(){

return value;

}

public void readFields(DataInput in)throws IOException{

value=in.readBoolean();

}

public void write(DataOutput out)throws IOException{

out.writeBoolean(value);

}

public boolean equals(Object o){

if(!(o instanceof BooleanWritable)){

return false;

}

BooleanWritable other=(BooleanWritable)o;

return this.value==other.value;

}

public int hashCode(){

return value?0:1;

}

public int compareTo(Object o){

boolean a=this.value;

boolean b=((BooleanWritable)o).value;

return((a==b)?0:(a==false)?-1:1);

}

public String toString(){

return Boolean.toString(get());

}

public static class Comparator extends WritableComparator{

public Comparator(){

super(BooleanWritable.class);

}

public int compare(byte[]b1,int s1,int l1,

byte[]b2,int s2,int l2){

boolean a=(readInt(b1,s1)==1)?true:false;

boolean b=(readInt(b2,s2)==1)?true:false;

return((a==b)?0:(a==false)?-1:1);

}

}

static{

WritableComparator.define(BooleanWritable.class, new Comparator());

}

}


可以看到Hadoop直接将boolean写入到字节流(out.writeBoolean(value))中了,并没有采用Java的序列化机制。同时,除了构造函数、set()函数、get()函数等外,Hadoop还定义了三个用于比较的函数:equals()、compareTo()、compare()。前两个很简单,第三个就是前文中重点介绍的比较器。Hadoop中封装定义的其他Java基本数据类型(如Boolean、byte、int、float、long、double)都是相似的。

如果大家对Java流处理比较了解的话可能会知道,Java流处理中并没有DataOutput.writeVInt()。实际上,这是Hadoop自己定义的变长类型(VInt, VLong),而且VInt和VLong的处理方式实际上是一样的。


public static void writeVInt(DataOutput stream, int i)throws IOException{

writeVLong(stream, i);

}


Hadoop对VLong类型的处理方法如下:


public static void writeVLong(DataOutput stream, long i)throws IOException{

if(i>=-112&&i<=127){

stream.writeByte((byte)i);

return;

}

int len=-112;

if(i<0){

i^=-1L;//take one's complement'

len=-120;

}

long tmp=i;

while(tmp!=0){

tmp=tmp>>8;

len—;

}

stream.writeByte((byte)len);

len=(len<-120)?-(len+120):-(len+112);

for(int idx=len;idx!=0;idx—){

int shiftbits=(idx-1)*8;

long mask=0xFFL<<shiftbits;

stream.writeByte((byte)((i&mask)>>shiftbits));

}

}


上面代码的意思是如果数值较小(在-112和127之间),那么就直接将这个数值写入数据流内(stream.writeByte((byte)i))。如果不是,则先用len表示字节长度与正负,并写入数据流中,然后在其后写入这个数值。

(2)其他类

下面将按照先易后难的顺序一一讲解。

1)NullWritable。这是一个占位符,它的序列化长度为零,没有数值从流中读出或是写入流中。


public void readFields(DataInput in)throws IOException{}

public void write(DataOutput out)throws IOException{}


在任何编程语言或编程框架时,占位符都是很有用的,这个类型不可以和其他类型比较,在MapReduce,你可以将任何键或值设为空值。

2)BytesWritable和ByteWritable。ByteWritable是一个二进制数据的封装。它的所有方法都是基于单个Byte来处理的。BytesWritable是一个二进制数据数组的封装。它对输出流的处理如下所示:


public BytesWritable(byte[]bytes){

this.bytes=bytes;

this.size=bytes.length;

}

public void write(DataOutput out)throws IOException{

out.writeInt(size);

out.write(bytes,0,size);

}


可以看到,它首先会把这个二进制数据数组的长度写入输入流中,这个长度一般是在声明时所获得的二进制数据数组的实际长度。当然这个值也可以人为设定。如果要把长度为3、位置为129的字节数组序列化,根据程序可知,结果应为:


Size=00000003 bytes[]={(01),(02),(09)}


数据流中的值就是:


00000003010209


3)Text。这可能是这几个自定义类型中相对复杂的一个了。实际上,这是Hadoop中对string类型的重写,但是又与其有一些不同。Text使用标准的UTF-8编码,同时Hadoop使用变长类型VInt来存储字符串,其存储上限是2GB。

Text类型与String类型的主要差别如下:

String的长度定义为String包含的字符个数;Text的长度定义为UTF-8编码的字节数。

String内的indexOf()方法返回的是char类型字符的索引,比如字符串(1234),字符3的位置就是2(字符1的位置是0);而Text的find()方法返回的是字节偏移量。

String的charAt()方法返回的是指定位置的char字符;而Text的charAT()方法需要指定偏移量。

另外,Text内定义了一个方法toString(),它用于将Text类型转化为String类型。

看如下这个例子:


package cn.edn.ruc.cloudcomputing.book.chapter07;

import java.io.*;

import org.apache.hadoop.io.*;

public class MyMapre{

public static void strings(){

String s="\u0041\u00DF\u6771\uD801\uDC00";

System.out.println(s.length());

System.out.println(s.indexOf("\u0041"));

System.out.println(s.indexOf("\u00DF"));

System.out.println(s.indexOf("\u6771"));

System.out.println(s.indexOf("\uD801\uDC00"));

}

public static void texts(){

Text t=new Text("\u0041\u00DF\u6771\uD801\uDC00");

System.out.println(t.getLength());

System.out.println(t.find("\u0041"));

System.out.println(t.find("\u00DF"));

System.out.println(t.find("\u6771"));

System.out.println(t.find("\uD801\uDC00"));

}

public static void main(String args[]){

strings();

texts();

}

}

输出结果为

5

0

1

2

3

10

0

1

3

6


上面例子可以验证前面所列的那些差别。

4)ObjectWritable。ObjectWritable是一种多类型的封装。可以适用于Java的基本类型、字符串等。不过,这并不是一个好方法,因为Java在每次被序列化时,都要写入被封装类型的类名。但是如果类型过多,使用静态数组难以表示时,采用这个类仍是不错的做法。

5)ArrayWritable和TwoDArrayWritable。ArrayWritable和TwoDArrayWritable,顾名思义,是针对数组和二维数组构建的数据类型。这两个类型声明的变量需要在使用时指定类型,因为ArrayWritable和TwoDArrayWritable并没有空值的构造函数。


ArrayWritable a=new ArrayWritable(IntWritable.class)


同样,在声明它们的子类时,必须使用super()来指定ArrayWritable和TwoDArrayWritable的数据类型。


public class IntArrayWritable extends ArrayWritable{

public IntArrayWritable(){

super(IntWritable.class);

}

}


一般情况下,ArrayWritable和TwoDArrayWritable都有set()和get()函数,在将Text转化为String时,它们也都提供了一个转化函数toArray()。但是它们没有提供比较器comparator,这点需要注意。同时从TwoDArrayWritable的write和readFields可以看出是横向读写的,同时还会读写每一维的数据长度。


public void readFields(DataInput in)throws IOException{

for(int i=0;i<values.length;i++){

for(int j=0;j<values[i].length;j++){

……

value.readFields(in);

values[i][j]=value;//保存读取的数据

}

}

}

public void write(DataOutput out)throws IOException{

for(int i=0;i<values.length;i++){

out.writeInt(values[i].length);

}

for(int i=0;i<values.length;i++){

for(int j=0;j<values[i].length;j++){

values[i][j].write(out);

}

}

}


6)MapWritable和SortedMapWritable。MapWritable和SortedMapWritable分别是java.util.Map()和java.util.SortedMap()的实现。

这两个实例是按照如下格式声明的:


private Map<Writable, Writable>instance;

private SortedMap<WritableComparable, Writable>instance;


我们可以用Hadoop定义的Writable类型来填充key或value,也可以使用自己定义的Writable类型来填充。

在java.util.Map()和java.util.SortedMap()中定义的功能,如getKey()、getValue()、keySet()等,在这两个类中均有实现。Map的使用也很简单,见如下程序,需要注意的是,不同key值对应的value数据类型可以不同。


package cn.edn.rm.cloodcomputing.book.chapter07;

import java.io.*;

import java.util.*;

import org.apache.hadoop.io.*;

public class MyMapre{

public static void main(String args[])throws IOException{

MapWritable a=new MapWritable();

a.put(new IntWritable(1),new Text("Hello"));

a.put(new IntWritable(2),new Text("World"));

MapWritable b=new MapWritable();

WritableUtils.cloneInto(b, a);

System.out.println(b.get(new IntWritable(1)));

System.out.println(b.get(new IntWritable(2)));

}

}

显示结果为

Hello

World


7)CompressedWritable。CompressedWritable是保存压缩数据的数据结构。跟之前介绍的数据结构不同,它实现Writable接口,主要面向在Map和Reduce阶段中的大数据对象操作,对这些大数据对象的压缩能够大大加快数据的传输速率。它的主要数据结构是一个byte数组,提供给用户必须实现的函数是readFieldsCompressed和writeCompressed。CompressedWritable在读取数据时先读取二进制字节流,然后调用ensureInflated函数进行解压,在写数据时,将输出的二进制字节流封装成压缩后的二进制字节流。

8)GenericWritable。这个数据类型是一个通用的数据封装类型。由于是通用的数据封装,它需要保存数据和数据的原始类型,其数据结构如下:


private static final byte NOT_SET=-1;

private byte type=NOT_SET;

private Writable instance;

private Configuration conf=null;


由于其特殊的数据结构,在读写时也需要读写对应的数据结构:实际数据和数据类型,并且要保证固定的顺序。


public void readFields(DataInput in)throws IOException{

//先读取数据类型

type=in.readByte();

……

//再读取数据

instance.readFields(in);

}

public void write(DataOutput out)throws IOException{

if(type==NOT_SET||instance==null)

throw new IOException("The GenericWritable has NOT been set correctly.type="

+type+",instance="+instance);

//先写出数据类型

out.writeByte(type);

//在写出数据

instance.write(out);

}


9)VersionedWritable。VersionedWritable是一个抽象的版本检查类,它主要保证在一个类的发展过程中,使用旧类编写的程序仍然能由新类解析处理。在这个类的实现中只有简单的三个函数:


//返回版本信息

public abstract byte getVersion();

//写出版本信息

public void write(DataOutput out)throws IOException{

out.writeByte(getVersion());

}

//读入版本信息

public void readFields(DataInput in)throws IOException{

byte version=in.readByte();

if(version!=getVersion())

throw new VersionMismatchException(getVersion(),version);

}