6.3 中位数

海量数据中通常有统计集合中位数的计算需求,读者可以通过以下示例了解Spark求中位数的方式。

1.实例描述

若有很大一组数据,数据的个数是N,在分布式数据存储情况下,找到这N个数的中位数。

数据输入是以下整型数据。


  1. 12345689111234

输出为:


  1. 6

2.设计思路

海量数据求中位数有很多解决方案。假设海量数据已经预先排序本例的解决方案为:将整个数据空间划分为K个桶。第一轮,在mapPartition阶段先将每个分区内的数据划分为K个桶,统计桶中的数据量,然后通过reduceByKey聚集整个RDD每个桶中的数据量。第二轮,根据桶统计的结果和总的数据量,可以判读数据落在哪个桶里,以及中位数的偏移量(offset)。针对这个桶的数据进行排序或者采用Top K的方式,获取到偏移为offset的数据。

3.代码示例


  1. import org.apache.spark.{SparkContext SparkConf}
  2. import org.apache.spark.SparkContext.rddToPairRDDFunctions
  3. */
  4. object Median {
  5. def mainargs Array[String]) {
  6. val conf = new SparkConf().setAppName"Spark Pi"
  7. val spark = new SparkContextconf
  8. val data = spark.textFile"data"
  9. /*将数据逻辑划分为10个桶,这里用户可以自行设置桶数量,统计每个桶中落入的数据量*/
  10. val mappeddata =data.mapnum => {
  11. num/1000 num
  12. })
  13. val count = mappeddata.reduceByKey((a b => {
  14. a+b
  15. }).collect()
  16. /*根据总的数据量,逐次根据桶序号由低到高依次累加,判断中位数落在哪个
  17. 桶中,并获取到中位数在桶中的偏移量*/
  18. val sum_count=count.mapdata => {
  19. data._2
  20. }).sum
  21. var temp = 0
  22. var index = 0
  23. var mid = sum_count/2
  24. for i <- 0 to 10 {
  25. temp=temp+counti
  26. iftemp >= mid {
  27. index=i
  28. break
  29. }
  30. }
  31. /*中位数在桶中的偏移量*/
  32. val offset = temp - mid
  33. /*获取到中位数所在桶中的偏移量为offset的数,也就是中位数*/
  34. val result = mappeddata.filternum => num._1 == index ).takeOrderedoffset
  35. println"Median is " + resultoffset))
  36. spark.stop()
  37. }
  38. }

4.应用场景

统计海量数据时,经常需要预估中位数,由中位数大致了解某列数据,做机器学习和数据挖掘的很多公式中也需要用到中位数。