Spark GraphX实例(3)

BAT 批处理程序 2017-05-30

7. 图的聚合操作

图的聚合操作主要的方法有:

(1) Graph.mapReduceTriplets():该方法有一个mapFunc和一个reduceFunc,mapFunc对图中的每一个EdgeTriplet进行处理,生成一个或者多个消息,并且将这些消息发送个Edge的一个或者两个顶点,reduceFunc对发送到每一个顶点上的消息进行合并,生成最终的消息,最后返回一个VertexRDD(不包括没有收到消息的顶点);

(2) Graph.pregel():该方法采用BSP模型,包括三个函数vprog、sendMsg和mergeMsg,vprog是运行在每个节点上的顶点更新函数,接收消息,然后对顶点属性更新,sendMsg生成发送给下一次迭代的消息,mergeMsg对同一个顶点接收到的多个消息进行合并,迭代一直进行到收敛,或者达到了设置的最大迭代次数为止。

代码:

// 聚合操作
    println("*************************************************************")
    println("聚合操作")
    println("*************************************************************")
    println("找出年纪最大的追求者:")
    val oldestFollower:VertexRDD[(String,Int)] = userGraph.mapReduceTriplets[(String,Int)](
      // 将源顶点的属性发送给目标顶点,map过程
      edge => Iterator((edge.dstId,(edge.srcAttr.name,edge.srcAttr.age))),
      // 得到最大追求者,reduce过程
      (a,b) => if(a._2>b._2) a else b
    )
    userGraph.vertices.leftJoin(oldestFollower){(id,user,optOldestFollower) =>
      optOldestFollower match{
        case None => s"${user.name} does not have any followers."
        case Some(oldestAge) => s"The oldest age of ${user.name} \'s followers is ${oldestAge._2}(${oldestAge._1})."
      }
    }.collect.foreach{case(id,str) => println(str)}
    println

    // 找出追求者的平均年龄
    println("找出追求者的平均年龄:")
    val averageAge:VertexRDD[Double] = userGraph.mapReduceTriplets[(Int,Double)](
      // 将源顶点的属性(1,Age)发送给目标顶点,map过程
      edge => Iterator((edge.dstId,(1,edge.srcAttr.age.toDouble))),
      // 得到追求者的数量和总年龄
      (a,b) => ((a._1+b._1),(a._2+b._2))
    ).mapValues((id,p) => p._2/p._1)

    userGraph.vertices.leftJoin(averageAge){(id,user,optAverageAge) =>
      optAverageAge match{
        case None => s"${user.name} does not have any followers."
        case Some(avgAge) => s"The average age of ${user.name} \'s followers is $avgAge."
      }
    }.collect.foreach{case(id,str) => println(str)}
    println

    // 聚合操作2
    println("*************************************************************")
    println("聚合操作2")
    println("*************************************************************")
    println("找出3到各顶点的最短距离:")
    // 定义源点
    val sourceId:VertexId = 3L
    val initialGraph = graph.mapVertices((id,_) => if(id==sourceId) 0.0 else Double.PositiveInfinity)
    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
      (id,dist,newDist) => math.min(dist,newDist),
      // 权重计算
      triplet=>{
        if(triplet.srcAttr + triplet.attr < triplet.dstAttr){
          Iterator((triplet.dstId, triplet.srcAttr+triplet.attr))
        } else{
          Iterator.empty
        }
      },
      // 最短距离
      (a,b) => math.min(a,b)
    )
    println(sssp.vertices.collect.mkString("\n"))

运行结果:

*************************************************************
聚合操作
*************************************************************
找出年纪最大的追求者:
The oldest age of Peter 's followers is 27(Henry).
The oldest age of Kate 's followers is 55(Charlie).
The oldest age of Henry 's followers is 55(Charlie).
The oldest age of Alice 's followers is 32(Peter).
The oldest age of Charlie 's followers is 35(Mike).
Mike does not have any followers.

找出追求者的平均年龄:
The average age of Peter 's followers is 27.0.
The average age of Kate 's followers is 45.0.
The average age of Henry 's followers is 45.0.
The average age of Alice 's followers is 29.5.
The average age of Charlie 's followers is 35.0.
Mike does not have any followers.

*************************************************************
聚合操作2
*************************************************************
找出3到各顶点的最短距离:
(4,9.0)
(6,3.0)
(2,7.0)
(1,10.0)
(3,0.0)
(5,Infinity)

相关推荐