7.4.3 ArrayFile、SetFile和BloomMapFile

ArrayFile继承自MapFile,它保存的是从Integer到value的映射关系。这一点从它的代码实现上也可以看出:


public Writer(Configuration conf, FileSystem fs,

String file, Class<?extends Writable>valClass)

throws IOException{

super(conf, fs, file, LongWritable.class, valClass);

}

public static class Reader extends MapFile.Reader{

private LongWritable key=new LongWritable();

public Reader(FileSystem fs, String file, Configuration conf)throws IOException{

super(fs, file, conf);

}

}


从上面的代码中看出,在写出时,key的数据类型是LongWritable,而不是MapFile中的WritableComparator.get(keyClass),在读入的时候,可以直接定义成LongWriable。ArrayFile更加具体的定义缩小了其适用范围,但是也降低了使用的难度,提高了使用的准确性。

SetFile同样继承自MapFile,它同Java中的set类似,仅仅是一个Key的集合,而没有任何value。


public Writer(Configuration conf, FileSystem fs, String dirName,

Class<?extends WritableComparable>keyClass,

SequenceFile.CompressionType compress)

throws IOException{

this(conf, fs, dirName, WritableComparator.get(keyClass),compress);

}

public void append(WritableComparable key)throws IOException{

append(key, NullWritable.get());

}

public Reader(FileSystem fs, String dirName, WritableComparator comparator,

Configuration conf)

throws IOException{

super(fs, dirName, comparator, conf);

}

public boolean seek(WritableComparable key)

throws IOException{

return super.seek(key);

}

public boolean next(WritableComparable key)

throws IOException{

return next(key, NullWritable.get());

}


从上面SetFile的实现代码(读、插入、写、查找、下一个key)也可以看出,它仅仅是一个key的集合,而非映射。需要注意的是向SetFile中插入key时,必须保证此key比set中的key都大,即SetFile实际上是一个key的有序集合。

BloomMapFile没有从MapFile继承,但是它的两个核心内部类Writer/Reader均继承自MapFile对应的两个内部类,其在实际使用中发挥的作用也和MapFile类似,只是增加了过滤的功能。它使用动态的Bloom Filter(请参见本书第5章)来检查key是否包含在预定的key集合内。BloomMapFile的数据结构有key/value的映射和一个Bloom Filter,在写出数据时先根据配置初始化Bloom Fliter,将key加入Bloom Filter中,然后写出key/value数据,最后在关闭输出流时写出Bloom Filter,具体可见代码:


public Writer(Configuration conf, FileSystem fs, String dirName,

WritableComparator comparator, Class valClass)throws IOException{

super(conf, fs, dirName, comparator, valClass);

this.fs=fs;

this.dir=new Path(dirName);

initBloomFilter(conf);

}

private synchronized void initBloomFilter(Configuration conf){

……

}

@Override

public synchronized void append(WritableComparable key, Writable val)

throws IOException{

……

bloomFilter.add(bloomKey);//向BloomFilter插入数据

}

@Override

public synchronized void close()throws IOException{

super.close();

DataOutputStream out=fs.create(new Path(dir, BLOOM_FILE_NAME),true);

bloomFilter.write(out);//写出BloomFilter

out.flush();

out.close();

}


在读入数据的时候,同样先是在初始化Reader时初始化Bloom Filter,并立刻读入输入数据中的Bloom Filter,接下来再读入key/value数据,具体代码如下:


public Reader(FileSystem fs, String dirName, WritableComparator comparator,

Configuration conf)throws IOException{

super(fs, dirName, comparator, conf);

initBloomFilter(fs, dirName, conf);

}

private void initBloomFilter(FileSystem fs, String dirName,

Configuration conf){

DataInputStream in=fs.open(new Path(dirName, BLOOM_FILE_NAME));

bloomFilter=new DynamicBloomFilter();

bloomFilter.readFields(in);

in.close();

}


除了提供基本的读入和写出操作,BloomMapFile类还提供了Bloom Filter的一些操作—probablyHasKey和get:第一个操作是检测某个key是否已存在于BloomMapFile中,第二个操作是如果key存在BloomMapFile中则返回其value,具体代码实现如下:


public boolean probablyHasKey(WritableComparable key)throws IOException{

if(bloomFilter==null){

return true;

}

buf.reset();

key.write(buf);

bloomKey.set(buf.getData(),1.0);

return bloomFilter.membershipTest(bloomKey);

}

@Override

public synchronized Writable get(WritableComparable key, Writable val)

throws IOException{

if(!probablyHasKey(key)){

return null;

}

return super.get(key, val);

}


使用BloomMapFile可以利用Bloom Filter的特点减少MapReduce无用的key数据,加快数据传输和处理的速度。