adayan0 2019-12-19
数据结构:时间戳,省份,城市,用户,广告,中间字段使用空格分割。
样本如下:
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
package Spark02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
// 需求:统计出每一个省份广告被点击次数的Top3
object Practice {
def main(args: Array[String]): Unit = {
// 1.初始化Spark配置信息并建立与Spark的连接
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Practice")
val sc = new SparkContext(conf)
// 2.读取数据生成RDD
val line: RDD[String] = sc.textFile("G:\\input\\agent\\agent.log",1)
// 3.按照最小粒度聚合 key=(Province,AD) value=1
val provinceADtoOne: RDD[((String, String), Int)] = line.map(x => {
val fields: Array[String] = x.split(" ")
(((fields(1), fields(4)), 1))
})
// 4.计算每个省中每个广告被点击次数的总和
val provinceAdToSum: RDD[((String, String), Int)] = provinceADtoOne.reduceByKey(_+_)
// 5.将省份作为key,广告加点击次数作为value,(Province,(AD,sum))
val provinceToAdSum: RDD[(String, (String, Int))] = provinceAdToSum.map(x=>(x._1._1,(x._1._2,x._2)))
// 6.将同一个省份的所有广告进行聚合(Province,List((AD1,sum1),(AD2,sum2)))
val provinceGroup: RDD[(String, Iterable[(String, Int)])] = provinceToAdSum.groupByKey()
// 7.对同一省份的所有广告的集合进行排序并取前三条,排序规则为广告点击总次数
val result: RDD[(String, List[(String, Int)])] = provinceGroup.mapValues(x => {
x.toList.sortWith((x, y) => {
x._2 > y._2
}).take(3)
})
// 8.保存结果
// result.collect.foreach(println)
result.saveAsTextFile("output/practice")
sc.stop()
}
}