6.1 WordCount
WordCount是大数据领域的经典范例,如同程序设计中的Hello World一样,是一个入门程序。本节主要从并行处理的角度出发,介绍设计Spark程序的过程。
1.实例描述
输入:
- Hello World Bye World
- Hello Hadoop Bye Hadoop
- Bye Hadoop Hello Hadoop
输出:
- <Bye,3>
- <Hadoop,4>
- <Hello,3>
- <World,2>
2.设计思路
在map阶段会将数据映射为:
- <Hello,1>
- <World,1>
- <Bye,1>
- <World,1>
- <Hello,1>
- <Hadoop,1>
- <Bye,1>
- <Hadoop,1>
- <Bye,1>
- <Hadoop,1>
- <Hello,1>
- <Hadoop,1>
在reduceByKey阶段会将相同key的数据合并,并将合并结果相加。
- <Bye,1,1,1>
- <Hadoop,1,1,1,1>
- <Hello,1,1,1>
- <World,1,1>
3.代码示例
WordCount的主要功能是统计输入中所有单词出现的总次数,编写步骤如下。
(1)初始化
创建一个SparkContext对象,该对象有4个参数:Spark master位置、应用程序名称、Spark安装目录和Jar存放位置。
需要引入下面两个文件。
- import org.apache.spark._
- import SparkContext._
- val sc = new SparkContext(args(0), "WordCount",
- System.getenv("SPARK_HOME"),
- Seq(System.getenv("SPARK_TEST_JAR")))
(2)加载输入数据
从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用Hadoop中的TextInputFormat解析输入数据。
- val textRDD = sc.textFile(args(1))
textFile中的每个Hadoop Block相当于一个RDD分区。
(3)词频统计
对于WordCount而言,首先需要从输入数据中的每行字符串中解析出单词,然后分而治之,将相同单词放到一个组中,统计每个组中每个单词出现的频率。
- val result = textRDD.flatMap{
- case(key, value) => value.toString().split("\\s+");
- }.map(word => (word, 1)). reduceByKey (_ + _)
其中,flatMap函数每条记录转换,转换后,如果每个记录是一个集合;则将集合中的元素变为RDD中的记录;map函数将一条记录映射为另一条记录;reduceByKey函数将key相同的关键字的数据聚合到一起进行函数运算。
(4)存储结果
可以使用SparkContext中的saveAsTextFile函数将数据集保存到HDFS目录下。
- result.saveAsSequenceFile(args(2))
4.应用场景
WordCount的模型可以在很多场景中使用,如统计过去一年中访客的浏览量、最近一段时间相同查询的数量和海量文本中的词频等。