6.3 中位数
海量数据中通常有统计集合中位数的计算需求,读者可以通过以下示例了解Spark求中位数的方式。
1.实例描述
若有很大一组数据,数据的个数是N,在分布式数据存储情况下,找到这N个数的中位数。
数据输入是以下整型数据。
- 1、2、3、4、5、6、8、9、11、12、34
输出为:
- 6
2.设计思路
海量数据求中位数有很多解决方案。假设海量数据已经预先排序本例的解决方案为:将整个数据空间划分为K个桶。第一轮,在mapPartition阶段先将每个分区内的数据划分为K个桶,统计桶中的数据量,然后通过reduceByKey聚集整个RDD每个桶中的数据量。第二轮,根据桶统计的结果和总的数据量,可以判读数据落在哪个桶里,以及中位数的偏移量(offset)。针对这个桶的数据进行排序或者采用Top K的方式,获取到偏移为offset的数据。
3.代码示例
- import org.apache.spark.{SparkContext, SparkConf}
- import org.apache.spark.SparkContext.rddToPairRDDFunctions
- */
- object Median {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Spark Pi")
- val spark = new SparkContext(conf)
- val data = spark.textFile("data")
- /*将数据逻辑划分为10个桶,这里用户可以自行设置桶数量,统计每个桶中落入的数据量*/
- val mappeddata =data.map(num => {
- (num/1000 , num)
- })
- val count = mappeddata.reduceByKey((a , b) => {
- a+b
- }).collect()
- /*根据总的数据量,逐次根据桶序号由低到高依次累加,判断中位数落在哪个
- 桶中,并获取到中位数在桶中的偏移量*/
- val sum_count=count.map(data => {
- data._2
- }).sum
- var temp = 0
- var index = 0
- var mid = sum_count/2
- for( i <- 0 to 10) {
- temp=temp+count(i)
- if(temp >= mid) {
- index=i
- break
- }
- }
- /*中位数在桶中的偏移量*/
- val offset = temp - mid
- /*获取到中位数所在桶中的偏移量为offset的数,也就是中位数*/
- val result = mappeddata.filter(num => num._1 == index ).takeOrdered(offset)
- println("Median is " + result(offset))
- spark.stop()
- }
- }
4.应用场景
统计海量数据时,经常需要预估中位数,由中位数大致了解某列数据,做机器学习和数据挖掘的很多公式中也需要用到中位数。