4.7.3 过滤数据
数据过滤主要指在面对海量输入数据作业时,在作业执行之前先将数据中无用数据、噪声数据和异常数据清除。通过数据过滤可以降低数据处理的规模,较大程度地提高数据处理效率,同时避免异常数据或不规范数据对最终结果造成负面影响。
在数据处理的时候如何进行数据过滤呢?在MapReduce中可以根据过滤条件利用很多办法完成数据预处理中的数据过滤,比如编写预处理程序,在程序中加上过滤条件,形成真正的处理数据;也可以在数据处理任务的最开始代码处加上过滤条件;还可以使用特殊的过滤器数据结果来完成过滤。下面笔者以一种在并行程序中功能强大的过滤器结构为例来介绍如何在MapReduce中对海量数据进行过滤。
Bloom Filter是在1970年由Howard Bloom提出的二进制向量数据结构。在保存所有集合元素特征的同时,它能在保证高效空间效率和一定出错率的前提下迅速检测一个元素是不是集合中的成员。Bloom Filter的误报(false positive)只会发生在检测集合内的数据上,而不会对集合外的数据产生漏报(false negative)。这样每个检测请求返回有“在集合内(可能错误)”和“不在集合内(绝对不在集合内)”两种情况,可见Bloom Filter牺牲了极少正确率换取时间和空间,所以它不适合那些“零错误”的应用场合。在MapReduce中,Bloom Filter由Bloom Filter类(此类继承了Filter类,Filter类实现了Writable序列化接口)实现,使用add(Key key)函数将一个key值加入Filter,使用membershipTest(Key key)来测试某个key是否在Filter内。
以上说明了Bloom Filter的大概思想,那么在实践中如何使用Bloom Filter呢?假设有两个表需要进行内连接,其中一个表非常大,另一个表非常小,这时为了加快处理速度和减小网络带宽,可以基于小表创建连接列上的Bloom Filter。具体做法是先创建Bloom Filter对象,将小表中所有连接列上的值都保存到Bloom Filter中,然后开始通过MapReduce作业执行内连接。在连接的Map阶段,读小表的数据时直接输出以连接列值为key、以数据为value的<key, value>对;读大表数据时,在输出前先判断当前元组的连接列值是否在Bloom Filter内,如果不存在就说明在后面的连接阶段不会使用到,不需要输出,如果存在就采用与小表同样的输出方式输出。最后在Reduce阶段,针对每个连接列值连接两个表的元组并输出结果。
大家已经知道了Bloom Filter的作用和使用方法,那么Bloom Filter具体是如何实现的呢?又是如何保证空间和时间的高效性呢?如何用正确率换取时间和空间的呢?(基于MapReduce中实现的BloomFilter代码进行分析)Bloom Filter自始至终是一个M位的位数组:
private static final byte[]bitvalues=new byte[]{
(byte)0x01,
(byte)0x02,
(byte)0x04,
(byte)0x08,
(byte)0x10,
(byte)0x20,
(byte)0x40,
(byte)0x80
};
它有两个重要接口,分别是add()和membershipTest(),add()负责保存集合元素的特征到位数组(类似于一个学习的过程),在保存所有集合元素特征之后可以使用membershipTest()来判断某个值是否是集合中的元素。
在初始状态下,Bloom Filter的所有位都被初始化为0。为了表示集合中的所有元素,Bloom Fliter使用k个互相独立的Hash函数,它们分别将集合中的每个元素映射到(1,2,……,M)这个范围上,映射的位置作为此元素特征值的一维,并将位数组中此位置的值设置为1,最终得到的k个Hash函数值将形成集合元素的特征值向量,同时此向量也被保存在位数组中。从获取k个Hash函数值到修改对应位数组值,这就是add接口所完成的任务。
public void add(Key key){
if(key==null){
throw new NullPointerException("key cannot be null");
}
int[]h=hash.hash(key);
hash.clear();
for(int i=0;i<nbHash;i++){
bits.set(h[i]);
}
}
利用add接口将所有集合元素的特征值向量保存到Bloom Filter之后,就可以使用此过滤器也就是membershipTest接口来判断某个值是否是集合元素。在判断时,首先还是计算待判断值的特征值向量,也就是k个Hash函数值,然后判断特征值向量每一维对应的位数组位置上的值是否是1,如果全部是1,那么membershipTest返回true,否则返回false,这就是判断值是否存在于集合中的原理。
public boolean membershipTest(Key key){
if(key==null){
throw new NullPointerException("key cannot be null");
}
int[]h=hash.hash(key);
hash.clear();
for(int i=0;i<nbHash;i++){
if(!bits.get(h[i])){
return false;
}
}
return true;
}
从上面add接口和membershipTest接口实现的原理可以看出,正是Hash函数冲突的可能性导致误判的可能。由于Hash函数冲突,两个值的特征值向量也有可能冲突(k个Hash函数全部冲突)。如果两个值中只有一个是集合元素,那么该值的特征值向量会保存在位数组中,从而在判断另外一个非集合元素的值时,会发现该值的特征值向量已经保存在位数组中,最终返回true,形成误判。那么都有哪些因素影响了错误率呢?通过上面的分析可以看出,Hash函数的个数和位数组的大小影响了错误率。位数组越大,特征值向量冲突的可能性越小,错误率也小。在位数组大小一定的情况下,Hash函数个数越多,形成的特征值向量维数越多,冲突的可能性越小;但是维数越多,占用的位数组位置越多,又提高了冲突的可能性。所以在实际应用中,在使用Bloom Filter时应根据实际需要和一定的估计来确定合适的数组规模和哈希函数规模。
通过上面的介绍和分析可以发现,在Bloom Filter中插入元素和查询值都是O(1)的操作;同时它并不保存元素而是采用位数组保存特征值,并且每一位都可以重复利用。所以同集合、链表和树等传统方法相比,Bloom Filter无疑在时间和空间性能上都极为优秀。但错误率限制了Bloom Filter的使用场景,只允许误报(false positive)的场景;同时由于一位多用,因此Bloom Filter并不支持删除集合元素,在删除某个元素时可能会同时删除另外一个元素的部分特征值。图4-3是一个简单的例子,既说明了Bloom Filter的实现过程,又说明了错误发生的原因(步骤⑤判断的值是包含在集合中的,但是返回值为true)。
图 4-3 Bloom Filter实现过程图