6.2 Top K

Top K算法有两步,一是统计词频,二是找出词频最高的前K个词。

1.实例描述

假设取Top 1,则有如下输入和输出。

输入:


  1. Hello World Bye World
  2. Hello Hadoop Bye Hadoop
  3. Bye Hadoop Hello Hadoop

输出:


  1. Hadoop 词频4

2.设计思路

首先统计WordCount的词频,将数据转化为(词,词频)的数据对,第二个阶段采用分治的思想,求出RDD每个分区的Top K,最后将每个分区的Top K结果合并以产生新的集合,在集合中统计出Top K的结果。每个分区由于存储在单机的,所以可以采用单机求Top K的方式。本例采用堆的方式。也可以直接维护一个含K个元素的数组,感兴趣的读者可以参考其他资料了解堆的实现。

3.代码示例

Top K算法示例代码如下:


  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. object TopK {
  4. def mainargsArray[String]) {
  5. /*执行WordCount,统计出最高频的词*/
  6. val spark = new SparkContext"local" "TopK"
  7. System.getenv"SPARK_HOME"), SparkContext.jarOfClassthis.getClass))
  8. val count = spark.textFile"data").flatMapline =>
  9. line.split" ")).mapword =>
  10. word 1)).reduceByKey_ + _
  11. /*统计RDD每个分区内的Top K查询*/
  12. val topk = count.mapPartitionsiter => {
  13. whileiter.hasNext {
  14. putToHeapiter.next())
  15. }
  16. getHeap().iterator
  17. }
  18. ).collect()
  19. /*将每个分区内统计出的TopK查询合并为一个新的集合,统计出TopK查询*/
  20. val iter = topk.iterator
  21. whileiter.hasNext {
  22. putToHeapiter.next())
  23. }
  24. val outiter=getHeap().iterator
  25. /*输出TopK的值*/
  26. println"Topk 值 :"
  27. whileoutiter.hasNext {
  28. println"\n 词频:"+outiter.next()._1+" 词:"+outiter.next()._2
  29. }
  30. spark.stop()
  31. }
  32. }
  33. def putToHeapiter String Int)) {
  34. /*数据加入含k个元素的堆中*/
  35. ……
  36. }
  37. def getHeap(): Array[(String Int)] = {
  38. /*获取含k个元素的堆中的元素*/
  39. val a=new Array[(String Int)]()
  40. ……
  41. }

4.应用场景

Top K的示例模型可以应用在求过去一段时间消费次数最多的消费者、访问最频繁的IP地址和最近、更新、最频繁的微博等应用场景。