Hhanwen 2020-05-04
从一开始 RDD 就是 Spark 提供的面向用户的主要 API。从根本上来说,一个 RDD 就是你的数据的一个不可变的分布式元素集合,在集群中跨节点分布,可以通过若干提供了转换和处理的底层 API 进行并行处理。关于RDD的详细介绍可以参考这篇文章:https://www.cnblogs.com/xiexiandong/p/12817807.html。
下面是使用 RDD 的场景和常见案例:
import spark.implicits._ // //这里的spark是SparkSession的变量名
testDF map { case Row(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> // 为了提高稳健性,最好后面有一个_通配操作(匹配所有) "" }
case class Coltest(col1:String,col2:Int) //定义字段名和类型 testDS map { case Coltest(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" }
DataFrame:
testDF.foreach{ line => val col1=line.getAs[String]("col1") val col2=line.getAs[String]("col2") }
dataDF.createOrReplaceTempView("tmp") spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
//保存 val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test") datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save() //读取 val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test") val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load() // 利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定
Dataset:
case class Coltest(name: String, col2: Int) //定义字段名和类型 /** * rdd * ("a", 1) * ("b", 1) * ("a", 1) **/ val test: Dataset[Coltest] = rdd map { line => Coltest (line._1, line._2) }.toDS test map { line => println (line.name) println (line.col2) }
DataFrame/Dataset转RDD:
val rdd1=testDF.rdd val rdd2=testDS.rdd
RDD转DataFrame:
import spark.implicits._ val testDF = rdd.map {line=> (line._1,line._2) }.toDF("col1","col2") // 一般用元组把一行的数据写在一起,然后在toDF中指定字段名
RDD转Dataset:
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型 val testDS = rdd.map {line=> Coltest(line._1,line._2) }.toDS // 可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可
Dataset转DataFrame:
import spark.implicits._ val testDF = testDS.toDF
DataFrame转Dataset:
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型 val testDS = testDF.as[Coltest]
Series是一种类似于一维数组的对象,由一组数据以及一组与之对应的索引组成。 index: 索引序列,必须是唯一的,且与数据的长度相同. 如果没有传入索引参数,则默认会自动创建一个从0~N的整数索引