8.3.4 运行实例
上文介绍了GraphX的组件和API。下面通过Berkeley的示例构建一个真实的图数据分析流水线。[1]示例中将会使用Wikipedia的连接数据,使用Spark的操作符清洗数据和抽取结构,使用GraphX操作符分析图结构,最后检验和评价图分析的结果。上面的操作可以通过Spark Shell执行。
GraphX为了达到最佳的性能表现需要使用Kyro序列化器。用户可以通过Spark的Web UI确认使用了哪种序列化器。在浏览器中输入链接(见图8-23):http://<MASTER_URL>:4040/environment/,检查spark.serializer属性:
在默认情况下,Spark没有使用Kyro序列化器。在实战中,用户可以首先退出Spark Shell。
编译配置文件/root/spark/conf/spark-env.sh,在文件中加入下面的内容。
- SPARK_JAVA_OPTS+='
- -Dspark.serializer=org.apache.spark.serializer.KryoSerializer
- -Dspark.kryo.registrator=org.apache.spark.graphx.GraphKryoRegistrator 'export SPARK_JAVA_OPTS
也可以使用下面的命令在命令行配置文件。
- echo -e "SPARK_JAVA_OPTS+=' -Dspark.serializer=org.apache.spark.serializer.KryoSerializer -Dspark.kryo.registrator=org.apache.spark.graphx.GraphKryoRegistrator ' \nexport SPARK_JAVA_OPTS" >> /root/spark/conf/spark-env.sh
如果用户在ec2的环境下,则运行下面的命令将配置文件更新到集群中的所有机器中。
- /root/spark-ec2/copy-dir.sh /root/spark/conf
图8-23 Spark配置监测UI
如果用户是在其他环境下,则可以使用pssh等集群分发工具将/conf文件同步到所有机器中。例如,在pssh环境下:
- ./pscp -h hosts.txt -r /root/spark/conf /root/spark/conf
最终通过下面的命令重启整个集群。
- /root/spark/sbin/stop-all.sh
- sleep 3/root/spark/sbin/start-all.sh
在启动Spark Shell之后,查看http://<MASTER_URL>:4040/environment/中的spark.serializer属性。
如果配置成功,则显示已经配置为org.apache.spark.serializer.KryoSerializer.
下面开始应用开发流程。
(1)开始
启动Spark Shell。
- /root/spark/bin/spark-shell
下面的命令都是在spark-shell中输入的,引入需要的标准包。
- import org.apache.spark.graphx._
- import org.apache.spark.rdd.RDD
(2)加载Wikipedia数据
加载原生数据进入Spark。假定读者已经将Wikipedia的数据加载进HDFS。下面将HDFS的数据加载进RDD。
将数据直接加载至内存,这样可以减少重复的磁盘IO开销。调用coalesce函数将分区紧缩至20个分区,以减少过量的通信开销。
- val wiki: RDD[String] = sc.textFile("/wiki_links/part*").coalesce(20)
查看第一个数据项:用户可以通过RDD中的first方法呈现第一篇文章。
- wiki.first
- /* res0: String = AccessibleComputing [[Computer accessibility]]*/
(3)数据清洗
清洗数据和抽取图结构。在这个例子中,将会抽取连接图,也可以在其他数据集迁移示例(如文档中的关键词图、贡献者图等)。在已经采样的数据中,可以观察到一些数据中蕴含的结构。每一行的第一个词是文章的名称,其余字符串包含这个文章中的链接。
使用已经观察到的结构进行数据清理。
- /*定义Article类 */
- case class Article(val title: String, val body: String)
- /*根据分隔符\t分隔文章 */
- val articles = wiki.map(_.split('\t')).
- /* 过滤字符串,保留字符串长度大于1,且第二个字符串中包含REDIRECT关键字的元组 */
- filter(line => (line.length > 1 && !(line(1) contains "REDIRECT"))).
- /*将结果存储到对象中,便于后续访问*/
- map(line => new Article(line(0).trim, line(1).trim)).cache
- 程序可以通过count函数查看清洗后文章的剩余数量。
- articles.count
(4)创建一个顶点RDD
此时,数据已经清洗,可以创建顶点RDD。由于之前的目的是从数据中抽取链接图,所以顶点的一个自然属性就是文章的标题。之后仍需要将文章标题哈希映射为顶点ID。
- //将文章标题转换为ID的哈希函数
- def pageHash(title: String): VertexId = {
- title.toLowerCase.replace(" ", "").hashCode.toLong
- }
- // 数据项为ID和文章标题的顶点RDD
- /* val vertices: RDD[(VertexId, String)] = /* implement */*/
- /*创建顶点RDD*/
- val vertices = articles.map(a => (pageHash(a.title), a.title)).cache
- /*通过 Action算子count强制触发RDD的执行*/
- vertices.count
(5)创建边RDD
数据清洗的下一步是抽取边,进而构建连接图的结构。MeidaWiki语法中是以格式“[[链接到的文章]].”存储数据,用户可以通过正则表达式抽取[[]]中的内容。通过上面的方法抽取所有的文章中包含的链接,返回包含在文章中的所有链接。
读者可以将下面的代码拷贝到Spark shell中执行。
- val pattern = "\\[\\[.+?\\]\\]".r
- val edges: RDD[Edge[Double]] = articles.flatMap { a =>
- val srcVid = pageHash(a.title)
- pattern.findAllIn(a.body).map { link =>
- val dstVid = pageHash(link.replace("[[", "").replace("]]", ""))
- Edge(srcVid, dstVid, 1.0)
- }
- }
这段代码抽取每个页面的所有导出链接,然后给边RDD的每个边分配一个统一的权重。
(6)创建图
之前的准备工作完成后,就可以开始创建图了。之前的准备工作是使用Spark的核心代码以关系表的视角创建数据。使用之前的顶点RDD、边RDD和默认的顶点属性创建图。默认的顶点属性是为了初始化那些现在还未在顶点RDD中的顶点。但是Wikipedia数据中经常有链接指向不存在的页面。在本例中将会使用一个空的文章标题字符串作为默认的顶点属性代表一个损坏的链接。注意:之所以进行这样的考虑,是因为在真实的数据中可能有脏数据。
- val graph = Graph(vertices, edges, "").subgraph(vpred = {(v, d) => d.nonEmpty}).cache
通过Action算子count强制计算图的一些属性(这大约会耗费2分钟)。
- graph.vertices.count
在第一次创建图时,GraphX针对所有图中的顶点创建索引,发现并重新分配丢失的顶点。由于创建了索引,所以会更快地计算图中的三元组。
- graph.triplets.count
(7)在Wikipedia数据集上运行PageRank算法
到此为止,用户可以进行一些实际的图分析操作了。在这个例子中,将会运行PageRank计算图中最重要的页面。
首先需要进行一些初始化工作。
- val prGraph=graph.staticPageRank(5).cache
Graph.staticPageRank方法返回的顶点属性是每个页面PageRank值的图。这里读者可能会怀疑,新的图属性包含的是PageRank值,不再包含原始的顶点属性文章标题了。其实之前的包含文章标题顶点属性的prGraph仍然存在,可以将两个图Join,这样就会返回一个包含两类信息的一个新图,两个图的所有顶点属性将以元组的形式存储作为一个新顶点属性。之后在这个新的顶点序列中还可以进行更深入的基于表的操作,如找到最重要的10个顶点(也就是PageRank值最高的10个顶点),并打印出它们的标题。将这些操作融合在一起就实现了在WiKi文章图中找到最重要的10个文章标题的功能。
- val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices)
- { (v, title, rank) => (rank.getOrElse(0.0), title)
- }
- titleAndPrGraph.vertices.top(10) {
- Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
- }.foreach(t => println(t._2._2 + ": " + t._2._1))
最后,可以通过下面的例子,在提到“Berkeley”的文章所构成的图中,找到最重要的页面。
- val berkeleyGraph = graph.subgraph(vpred = (v, t) => t.toLowerCase contains "berkeley")
- val prBerkeley = berkeleyGraph.staticPageRank(5).cache
- berkeleyGraph.outerJoinVertices(prBerkeley.vertices) {
- (v, title, r) => (r.getOrElse(0.0), title)
- }.vertices.top(10) {
- Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
- }.foreach(t => println(t._2._2 + ": " + t._2._1))
读者可以以上面的方式在GraphX平台进行更加深入的数据分析。
[1] 示例参考:http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html