8.3.2 GraphX的使用
类似Spark在RDD上提供了一组基本操作符(如map、filter、reduce),GraphX同样也有针对Graph的基本操作符,用户可以在这些操作符传入自定义函数和通过修改图的节点属性或结构生成新的图。
GraphX提供了丰富的针对图数据的操作符。Graph类中定义了核心的、优化过的操作符。一些更加方便的由底层核心操作符组合而成的上层操作符在GraphOps中定义。正是通过Scala语言的implicit关键字,GraphOps中定义的操作符可以作为Graph中的成员。这样做的目的是未来GrpahX会支持不同类型的图,而每种类型图的呈现必须实现核心的操作符和复用大部分GrpahOps中实现的操作符。
下面将操作符分为几个类别进行介绍。
1.属性操作符
属性操作符如表8-1所示。
表8-1 属性操作符
2.结构操作符
结构操作符如表8-2所示。
表8-2 结构操作符
3.图信息属性
图信息属性如表8-3所示。
表8-3 图信息属性
4.邻接聚集操作符与Join操作符
邻接聚集操作符与Join操作符如表8-4所示。
表8-4 邻接聚集操作符与Join操作符
5.缓存操作符
缓存操作符如表8-5所示。
表8-5 缓存操作符
6.Pregel API
Pregel基于BSP模型,提供了3个重要的需要用户书写的函数。通过官方PageRank算法进一步理解这3个函数的使用。
- def run[VD: ClassTag, ED: ClassTag](
- graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
- {
- val pagerankGraph: Graph[Double, Double] = graph
- /* 将节点的度与图中节点关联*/
- .outerJoinVertices(graph.outDegrees) {
- (vid, vdata, deg) => deg.getOrElse(0)
- }
- /* 根据度设置边的权重*/
- .mapTriplets(e => 1.0 / e.srcAttr)
- /* 设置节点属性为PageRank算法初始值*/
- .mapVertices((id, attr) => 1.0)
- def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
- resetProb + (1.0 - resetProb) * msgSum
- def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator [(VertexId, Double)] =
- Iterator((edge.dstId, edge.srcAttr * edge.attr))
- def messageCombiner(a: Double, b: Double): Double = a + b
- val initialMessage = 0.0
- /* 以固定迭代次数执行Pregel*/
- Pregel(pagerankGraph, initialMessage, numIter)(
- vertexProgram, sendMessage, messageCombiner)
- }
这3个函数按顺序执行完一次是一个迭代轮次,numIter决定需要执行多少伦次完成迭代。
但初始由于vprog函数是没有输入的,所以还需要用户输入initialMessage作为第一轮的初始化数据。
下面通过表8-6介绍Pregel APl。
表8-6 Pregel API