3.3.5 Partitioner接口的设计与实现
Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reducer处理,它直接影响Reduce阶段的负载均衡。旧版API中Partitioner的类图如图3-20所示。它继承了JobConfigurable,可通过configure方法初始化。它本身只包含一个待实现的方法getPartition。该方法包含三个参数,均由框架自动传入,前面两个参数是key/value,第三个参数numPartitions表示每个Mapper的分片数,也就是Reducer的个数。
图 3-20 旧版API的Partitioner类图
MapReduce提供了两个Partitioner实现:HashPartitioner和TotalOrderPartitioner。其中HashPartitioner是默认实现,它实现了一种基于哈希值的分片方法,代码如下:
public int getPartition(K2 key, V2 value,
int numReduceTasks){
return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks;
}
TotalOrderPartitioner提供了一种基于区间的分片方法,通常用在数据全排序中。在MapReduce环境中,容易想到的全排序方案是归并排序,即在Map阶段,每个Map Task进行局部排序;在Reduce阶段,启动一个Reduce Task进行全局排序。由于作业只能有一个Reduce Task,因而Reduce阶段会成为作业的瓶颈。为了提高全局排序的性能和扩展性,MapReduce提供了TotalOrderPartitioner。它能够按照大小将数据分成若干个区间(分片),并保证后一个区间的所有数据均大于前一个区间数据,这使得全排序的步骤如下:
步骤1 数据采样。在Client端通过采样获取分片的分割点。Hadoop自带了几个采样算法,如IntercalSampler、RandomSampler、SplitSampler等(具体见org.apache.hadoop.mapred.lib包中的InputSampler类)。下面举例说明。
采样数据为:b, abc, abd, bcd, abcd, efg, hii, afd, rrr, mnk
经排序后得到:abc, abcd, abd, afd, b,bcd, efg, hii, mnk, rrr
如果Reduce Task个数为4,则采样数据的四等分点为abd、bcd、mnk,将这3个字符串作为分割点。
步骤2 Map阶段。本阶段涉及两个组件,分别是Mapper和Partitioner。其中,Mapper可采用IdentityMapper,直接将输入数据输出,但Partitioner必须选用TotalOrderPartitioner,它将步骤1中获取的分割点保存到trie树中以便快速定位任意一个记录所在的区间,这样,每个Map Task产生R(Reduce Task个数)个区间,且区间之间有序。
TotalOrderPartitioner通过trie树查找每条记录所对应的Reduce Task编号。如图3-21所示,我们将分割点保存在深度为2的trie树中,假设输入数据中有两个字符串“abg”和“mnz”,则字符串“abg”对应partition1,即第2个Reduce Task,字符串“mnz”对应partition3,即第4个Reduce Task。
图 3-21 利用trie树对数据进行分片
步骤3 Reduce阶段。每个Reducer对分配到的区间数据进行局部排序,最终得到全排序数据。
从以上步骤可以看出,基于TotalOrderPartitioner全排序的效率跟key分布规律和采样算法有直接关系;key值分布越均匀且采样越具有代表性,则Reduce Task负载越均衡,全排序效率越高。
TotalOrderPartitioner有两个典型的应用实例:TeraSort和HBase批量数据导入。其中,TeraSort是Hadoop自带的一个应用程序实例。它曾在TB级数据排序基准评估中赢得第一名[1],而TotalOrderPartitioner正是从该实例中提炼出来的。HBase[2]是一个构建在Hadoop之上的NoSQL数据仓库。它以Region为单位划分数据,Region内部数据有序(按key排序),Region之间也有序。很明显,一个MapReduce全排序作业的R个输出文件正好可对应HBase的R个Region。
新版API中的Partitioner类图如图3-22所示。它不再实现JobConfigurable接口。当用户需要让Partitioner通过某个JobConf对象初始化时,可自行实现Configurable接口,如:
public class TotalOrderPartitioner<K, V>
extends Partitioner<K, V>implements Configurable
图 3-22 新版API中的Partitioner类图
[1]http://sortbenchmark. org/
[2]http://hbase. apache.org/