6.1 WordCount

WordCount是大数据领域的经典范例,如同程序设计中的Hello World一样,是一个入门程序。本节主要从并行处理的角度出发,介绍设计Spark程序的过程。

1.实例描述

输入:


  1. Hello World Bye World
  2. Hello Hadoop Bye Hadoop
  3. Bye Hadoop Hello Hadoop

输出:


  1. <Bye,3>
  2. <Hadoop,4>
  3. <Hello,3>
  4. <World,2>

2.设计思路

在map阶段会将数据映射为:


  1. <Hello,1>
  2. <World,1>
  3. <Bye,1>
  4. <World,1>
  5. <Hello,1>
  6. <Hadoop,1>
  7. <Bye,1>
  8. <Hadoop,1>
  9. <Bye,1>
  10. <Hadoop,1>
  11. <Hello,1>
  12. <Hadoop,1>

在reduceByKey阶段会将相同key的数据合并,并将合并结果相加。


  1. <Bye,1,1,1>
  2. <Hadoop,1,1,1,1>
  3. <Hello,1,1,1>
  4. <World,1,1>

3.代码示例

WordCount的主要功能是统计输入中所有单词出现的总次数,编写步骤如下。

(1)初始化

创建一个SparkContext对象,该对象有4个参数:Spark master位置、应用程序名称、Spark安装目录和Jar存放位置。

需要引入下面两个文件。


  1. import org.apache.spark._
  2. import SparkContext._
  3. val sc = new SparkContextargs0), "WordCount"
  4. System.getenv"SPARK_HOME"),
  5. SeqSystem.getenv"SPARK_TEST_JAR")))

(2)加载输入数据

从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用Hadoop中的TextInputFormat解析输入数据。


  1. val textRDD = sc.textFileargs1))

textFile中的每个Hadoop Block相当于一个RDD分区。

(3)词频统计

对于WordCount而言,首先需要从输入数据中的每行字符串中解析出单词,然后分而治之,将相同单词放到一个组中,统计每个组中每个单词出现的频率。


  1. val result = textRDD.flatMap{
  2. casekey value => value.toString().split"\\s+");
  3. }.mapword => word 1)). reduceByKey _ + _

其中,flatMap函数每条记录转换,转换后,如果每个记录是一个集合;则将集合中的元素变为RDD中的记录;map函数将一条记录映射为另一条记录;reduceByKey函数将key相同的关键字的数据聚合到一起进行函数运算。

(4)存储结果

可以使用SparkContext中的saveAsTextFile函数将数据集保存到HDFS目录下。


  1. result.saveAsSequenceFileargs2))

4.应用场景

WordCount的模型可以在很多场景中使用,如统计过去一年中访客的浏览量、最近一段时间相同查询的数量和海量文本中的词频等。