difficulty: Medium tags:

  • Big Data
  • Map Reduce
  • EditorsChoice title: Top K Frequent Words (Map Reduce)

Top K Frequent Words (Map Reduce)

Problem

Metadata

Description

Find top k frequent words with map reduce framework.

The mapper’s key is the document id, value is the content of the document, words in a document are split by spaces.

For reducer, the output should be at most k key-value pairs, which are the top k words and their frequencies in this reducer. The judge will take care about how to merge different reducers’ results to get the global top k frequent words, so you don’t need to care about that part.

The k is given in the constructor of TopK class.

Notice

For the words with same frequency, rank them with alphabet.

Example

Given document A =

  1. lintcode is the best online judge
  2. I love lintcode

and document B =

  1. lintcode is an online judge for coding interview
  2. you can test your code online at lintcode

The top 2 words and their frequencies should be

  1. lintcode, 4
  2. online, 3

题解

使用 Map Reduce 来做 Top K, 相比传统的 Top K 多了 Map 和 Reduce 这两大步骤。Map Reduce 模型实际上是在处理分布式问题时总结出的抽象模型,主要分为 Map 和 Reduce 两个阶段。

  • Map 阶段:数据分片,每个分片由一个 Map task 处理,不进行分片则无法分布式处理
  • Reduce 阶段:并行对前一阶段的结果进行规约处理并得到最终最终结果

实际的 MapReduce 编程模型可由以下5个分布式步骤组成:

  1. 将输入数据解析为 <key, value>
  2. 将输入的 <key, value> map 为另一种 <key, value>
  3. 根据 key 对 map 阶段的数据分组
  4. 对上一阶段的分组数据进行规约(Reduce) 并生成新的 <key, value>
  5. 进一步处理 Reduce 阶段的数据并进行持久化

根据题意,我们只需要实现 Map, Reduce 这两个步骤即可,输出出现频率最高的 K 个单词并对相同频率的单词按照字典序排列。如果我们使用大根堆维护,那么我们可以在输出结果时依次移除根节点即可。这种方法虽然可行,但不可避免会产生不少空间浪费,理想情况下,我们仅需要维护 K 个大小的堆即可。所以接下来的问题便是我们怎么更好地维护这种 K 大小的堆,并且在新增元素时剔除的是最末尾(最小)的节点。

Java

  1. /**
  2. * Definition of OutputCollector:
  3. * class OutputCollector<K, V> {
  4. * public void collect(K key, V value);
  5. * // Adds a key/value pair to the output buffer
  6. * }
  7. * Definition of Document:
  8. * class Document {
  9. * public int id;
  10. * public String content;
  11. * }
  12. */
  13. class KeyFreq implements Comparable<KeyFreq> {
  14. public String key = null;
  15. public int freq = 0;
  16. public KeyFreq(String key, int freq) {
  17. this.key = key;
  18. this.freq = freq;
  19. }
  20. @Override
  21. public int compareTo(KeyFreq kf) {
  22. if (kf.freq != this.freq) {
  23. return this.freq - kf.freq;
  24. }
  25. // keep small alphabet
  26. return kf.key.compareTo(this.key);
  27. }
  28. }
  29. public class TopKFrequentWords {
  30. public static class Map {
  31. public void map(String _, Document value,
  32. OutputCollector<String, Integer> output) {
  33. // Write your code here
  34. // Output the results into output buffer.
  35. // Ps. output.collect(String key, int value);
  36. if (value == null || value.content == null) return;
  37. String[] splits = value.content.split(" ");
  38. for (String split : splits) {
  39. if (split.length() > 0) {
  40. output.collect(split, 1);
  41. }
  42. }
  43. }
  44. }
  45. public static class Reduce {
  46. private int k = 0;
  47. private PriorityQueue<KeyFreq> pq = null;
  48. public void setup(int k) {
  49. // initialize your data structure here
  50. this.k = k;
  51. pq = new PriorityQueue<KeyFreq>(k);
  52. }
  53. public void reduce(String key, Iterator<Integer> values) {
  54. int sum = 0;
  55. while (values.hasNext()) {
  56. int value = values.next();
  57. sum += value;
  58. }
  59. KeyFreq kf = new KeyFreq(key, sum);
  60. if (pq.size() < k) {
  61. pq.offer(kf);
  62. } else {
  63. KeyFreq peekKf = pq.peek();
  64. if (peekKf.compareTo(kf) <= 0) {
  65. pq.poll();
  66. pq.offer(kf);
  67. }
  68. }
  69. }
  70. public void cleanup(OutputCollector<String, Integer> output) {
  71. // Output the top k pairs <word, times> into output buffer.
  72. // Ps. output.collect(String key, Integer value);
  73. List<KeyFreq> kfList = new ArrayList<KeyFreq>(k);
  74. for (int i = 0; i < k && (!pq.isEmpty()); i++) {
  75. kfList.add(pq.poll());
  76. }
  77. // get max k from min-heapqueue
  78. int kfLen = kfList.size();
  79. for (int i = 0; i < kfLen; i++) {
  80. KeyFreq kf = kfList.get(kfLen - i - 1);
  81. output.collect(kf.key, kf.freq);
  82. }
  83. }
  84. }
  85. }

源码分析

使用 Java 自带的 PriorityQueue 来实现堆,由于需要定制大小比较,所以这里自定义类中实现了 ComparablecompareTo 接口,另外需要注意的是这里原生使用了小根堆,所以我们在覆写 compareTo 时需要注意字符串的比较,相同频率的按照字典序排序,即优先保留字典序较小的字符串,所以正好和 freq 的比较相反。最后再输出答案时,由于是小根堆,所以还需要再转置一次。此题的 Java 实现中,使用的 PriorityQueue 并非线程安全,实际使用中需要注意是否需要用到线程安全的 PriorityBlockingQueue

对于 Java, 虽然标准库中暂未有定长的 PriorityQueue 实现,但是我们常用的 Google guava 库中其实已有类似实现,见 MinMaxPriorityQueue 不必再自己造轮子了。

复杂度分析

堆的插入删除操作,定长为 K, n 个元素,故时间复杂度约 O(n \log K), 空间复杂度为 O(n).

Reference