7.4.3 ArrayFile、SetFile和BloomMapFile
ArrayFile继承自MapFile,它保存的是从Integer到value的映射关系。这一点从它的代码实现上也可以看出:
public Writer(Configuration conf, FileSystem fs,
String file, Class<?extends Writable>valClass)
throws IOException{
super(conf, fs, file, LongWritable.class, valClass);
}
public static class Reader extends MapFile.Reader{
private LongWritable key=new LongWritable();
public Reader(FileSystem fs, String file, Configuration conf)throws IOException{
super(fs, file, conf);
}
}
从上面的代码中看出,在写出时,key的数据类型是LongWritable,而不是MapFile中的WritableComparator.get(keyClass),在读入的时候,可以直接定义成LongWriable。ArrayFile更加具体的定义缩小了其适用范围,但是也降低了使用的难度,提高了使用的准确性。
SetFile同样继承自MapFile,它同Java中的set类似,仅仅是一个Key的集合,而没有任何value。
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<?extends WritableComparable>keyClass,
SequenceFile.CompressionType compress)
throws IOException{
this(conf, fs, dirName, WritableComparator.get(keyClass),compress);
}
public void append(WritableComparable key)throws IOException{
append(key, NullWritable.get());
}
public Reader(FileSystem fs, String dirName, WritableComparator comparator,
Configuration conf)
throws IOException{
super(fs, dirName, comparator, conf);
}
public boolean seek(WritableComparable key)
throws IOException{
return super.seek(key);
}
public boolean next(WritableComparable key)
throws IOException{
return next(key, NullWritable.get());
}
从上面SetFile的实现代码(读、插入、写、查找、下一个key)也可以看出,它仅仅是一个key的集合,而非映射。需要注意的是向SetFile中插入key时,必须保证此key比set中的key都大,即SetFile实际上是一个key的有序集合。
BloomMapFile没有从MapFile继承,但是它的两个核心内部类Writer/Reader均继承自MapFile对应的两个内部类,其在实际使用中发挥的作用也和MapFile类似,只是增加了过滤的功能。它使用动态的Bloom Filter(请参见本书第5章)来检查key是否包含在预定的key集合内。BloomMapFile的数据结构有key/value的映射和一个Bloom Filter,在写出数据时先根据配置初始化Bloom Fliter,将key加入Bloom Filter中,然后写出key/value数据,最后在关闭输出流时写出Bloom Filter,具体可见代码:
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass)throws IOException{
super(conf, fs, dirName, comparator, valClass);
this.fs=fs;
this.dir=new Path(dirName);
initBloomFilter(conf);
}
private synchronized void initBloomFilter(Configuration conf){
……
}
@Override
public synchronized void append(WritableComparable key, Writable val)
throws IOException{
……
bloomFilter.add(bloomKey);//向BloomFilter插入数据
}
@Override
public synchronized void close()throws IOException{
super.close();
DataOutputStream out=fs.create(new Path(dir, BLOOM_FILE_NAME),true);
bloomFilter.write(out);//写出BloomFilter
out.flush();
out.close();
}
在读入数据的时候,同样先是在初始化Reader时初始化Bloom Filter,并立刻读入输入数据中的Bloom Filter,接下来再读入key/value数据,具体代码如下:
public Reader(FileSystem fs, String dirName, WritableComparator comparator,
Configuration conf)throws IOException{
super(fs, dirName, comparator, conf);
initBloomFilter(fs, dirName, conf);
}
private void initBloomFilter(FileSystem fs, String dirName,
Configuration conf){
DataInputStream in=fs.open(new Path(dirName, BLOOM_FILE_NAME));
bloomFilter=new DynamicBloomFilter();
bloomFilter.readFields(in);
in.close();
}
除了提供基本的读入和写出操作,BloomMapFile类还提供了Bloom Filter的一些操作—probablyHasKey和get:第一个操作是检测某个key是否已存在于BloomMapFile中,第二个操作是如果key存在BloomMapFile中则返回其value,具体代码实现如下:
public boolean probablyHasKey(WritableComparable key)throws IOException{
if(bloomFilter==null){
return true;
}
buf.reset();
key.write(buf);
bloomKey.set(buf.getData(),1.0);
return bloomFilter.membershipTest(bloomKey);
}
@Override
public synchronized Writable get(WritableComparable key, Writable val)
throws IOException{
if(!probablyHasKey(key)){
return null;
}
return super.get(key, val);
}
使用BloomMapFile可以利用Bloom Filter的特点减少MapReduce无用的key数据,加快数据传输和处理的速度。