Hhanwen 2020-05-04
从一开始 RDD 就是 Spark 提供的面向用户的主要 API。从根本上来说,一个 RDD 就是你的数据的一个不可变的分布式元素集合,在集群中跨节点分布,可以通过若干提供了转换和处理的底层 API 进行并行处理。关于RDD的详细介绍可以参考这篇文章:https://www.cnblogs.com/xiexiandong/p/12817807.html。
下面是使用 RDD 的场景和常见案例:
import spark.implicits._ // //这里的spark是SparkSession的变量名
testDF map {
case Row(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=> // 为了提高稳健性,最好后面有一个_通配操作(匹配所有)
""
}case class Coltest(col1:String,col2:Int) //定义字段名和类型
testDS map {
case Coltest(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=>
""
}DataFrame:
testDF.foreach{
line =>
val col1=line.getAs[String]("col1")
val col2=line.getAs[String]("col2")
}dataDF.createOrReplaceTempView("tmp")
spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)//保存
val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")
datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")
val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load()
// 利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定Dataset:
case class Coltest(name: String, col2: Int)
//定义字段名和类型
/**
* rdd
* ("a", 1)
* ("b", 1)
* ("a", 1)
**/
val test: Dataset[Coltest] = rdd map {
line =>
Coltest (line._1, line._2)
}.toDS
test map {
line =>
println (line.name)
println (line.col2)
}DataFrame/Dataset转RDD:
val rdd1=testDF.rdd val rdd2=testDS.rdd
RDD转DataFrame:
import spark.implicits._
val testDF = rdd.map {line=>
(line._1,line._2)
}.toDF("col1","col2")
// 一般用元组把一行的数据写在一起,然后在toDF中指定字段名RDD转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
// 可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可Dataset转DataFrame:
import spark.implicits._ val testDF = testDS.toDF
DataFrame转Dataset:
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型 val testDS = testDF.as[Coltest]
Series是一种类似于一维数组的对象,由一组数据以及一组与之对应的索引组成。 index: 索引序列,必须是唯一的,且与数据的长度相同. 如果没有传入索引参数,则默认会自动创建一个从0~N的整数索引