Spark SQL与DataFrame详解以及使用

huiqinbo 2018-06-18

Spark SQL是Spark的一个结构化数据处理模块,提供一个DataFrame编程模型抽象,可以看做是一个分布式SQL查询引擎。Spark SQL主要由Catalyst优化,Spark SQL内核,Hive支持三部分组成。

Catalyst优化处理查询语句的整个过程,包括解析,绑定,优化,物理计划等,主要由关系代数,表达式以及查询优化组成。

Spark SQL内核处理数据的输入输出,从不同的数据源(结构化Parquet文件和JSON文件,Hive表,外部数据库,创建RDD)获取数据,执行查询,并将结果输出成DataFrame。

Hive支持是值对Hive数据的处理,主要包括HiveSQL,MetaStore,SerDes,UDFS等。

Spark SQL架构

Spark SQL对SQL语句的处理和关系型数据库SQL处理类似,将SQL语句解析成一颗数(Tree),然后通过规则(rule)的模式匹配,对树进行绑定,优化等,然后得到查询结果。

Tree的具体操作是通过TreeNode实现的;Rule是一个抽象类,是通过RuleExecutor完成的,应用与Spark SQL的Analyzer,Optimizer,Spark Planner等组件中,可以简便,模块化地对Tree进行Transform操作。

在整个SQL语句执行的过程中,主要依赖了优化框架Catalyst,把SQL语句解析,绑定,优化等,最终将逻辑计划优化后并且转换为物理执行计划,最后变成DataFrame模型。

Spark SQL的整个架构图如下所示:

Spark SQL与DataFrame详解以及使用

  1. 使用SqlParser对SQL语句进行解析,生成Unresolved逻辑计划(没有提取schema信息)。
  2. 使用Catalyst分析器,结合数据字典(catalog)进行绑定,生成Analyzed逻辑计划,在此过程中,Schema Catalog则要提取schema信息。
  3. 使用Catalyst优化器对Analyzed逻辑计划进行优化,按照优化规则得到Optimized逻辑计划。
  4. 接着和Spark Planner交互,使用相应的策略将逻辑计划转换为物理计划,然后调用next函数,生成可执行物理计划。
  5. 调用toDF,最后生成DataFrame。

Spark SQL有以下几点特征:

1.兼容多种数据格式,如上面所说的parquet文件,HIve表,JSON文件等等。

2.方便扩展,它的优化器,解析器都可以重新定义。

3.性能优化方面:采用了内存列式存储,动态字节码生成等技术,还采用了内存缓存数据。

4.支持多种语言操作,包括JAVA,SCALA,PYTHON,R语言等。

Spark SQL性能

1.内存列式存储

Spark SQL内部使用内存列式模式缓存表,仅扫描需要的列,并且自动调整压缩比使内存使用率和GC压力最小化,如果缓存了数据,则下次执行时不需要重复读取数据。

2.动态代码生成和字节码生成技术

对于一个简单的查询,如:

select a+b from table
1

这个查询,在传统的方式中,会将SQL语句生成一个表达树,然后调用虚函数确认Add两边的数据类型,然后再调用虚函数计算装箱最后返回结果,计算时需要多次涉及虚函数的使用,打断了CPU的利用,减缓了执行效率。

因此,使用字节码技术会将表达式使用特定的代码动态 编译,为每一个查询生成自定义字节码,然后运行。最后需要说明的是DataFrame也是惰性的,在遇见Action操作的时候才会真正的去执行。

创建DataFrame

创建一个json格式文件,内容如下所示;

{"name":"Mirckel"}
{"name":"Andy","age":30}
{"name":"Jsutin","age":13}
1
2
3

创建以及相关操作如下代码所示:

val sqlContext=new sql.SQLContext(sc)
 val df=sqlContext.jsonFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql01.json")
 println(df.show()) //打印表数据
 println(df.printSchema()) //以树的形式打印DataFrame的Schema
 println(df.select(df("name"),df("age")+1).show())
1
2
3
4
5
6
7
8

下面有两种方式来操作RDD数据源,即将现有的RDD转化为DataFrame。

操作的文件内容格式如下:

shinelon,19
mike,20
wangwu,25
1
2
3

1.以反射机制推断RDD模式

主要分下面三个步骤:

  1. 必须创建case类,只有case类才能隐式转换为DataFrame。
  2. 必须生成DataFrame,进行注册临时表操作。
  3. 必须在内存中register成临时表,才能供查询使用。
val conf=new SparkConf()
 .setMaster("local")
 .setAppName("SparkSqlDemo01")
 val sc=new SparkContext(conf)
 val sqlContext=new sql.SQLContext(sc)
 //使用case定义Schema(不能超过22个属性),实现Person接口
 //只有case类才能隐式转换为一个DataFrame
 case class Person(name:String,age:Int)
 //使用前缀hdfs://来标识HDFS存储系统的文件
 val people=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt")
 .map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF
 //DataFrame注册临时表
 people.refisterTempTable("person")
 //使用sql运行SQL表达式
 val result=sqlContext.sql("SELECT name,age from person WHERE age>=19")
 println(result.map(t=>"Name:"+t(0)).collect())
 println(result.map(t=>"Name:"+t.getAs[String](1)).collect())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

2.以编程方式定义RDD模型

主要分三个步骤:

  1. 从原始RDD中创建一个Rows的RDD。
  2. 创建一个表示为StructType类型的Schema,匹配在第一步创建的RDD的Rows的结构。
  3. 通过SQLContext提供的createDataFrame方法,应用Schema到Rows的RDD。

代码如下所示:

val conf=new SparkConf()
 .setMaster("local")
 .setAppName("SparkSqlDemo01")
 val sc=new SparkContext(conf)
 val sqlContext=new sql.SQLContext(sc)
val sqlContext=new sql.SQLContext(sc)
 //使用case定义Schema(不能超过22个属性),实现Person接口
 //只有case类才能隐式转换为一个DataFrame
 case class Person(name:String,age:Int)
 //使用前缀hdfs://来标识HDFS存储系统的文件
 val people=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt")
 .map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF
 //DataFrame注册临时表
 people.refisterTempTable("person")
 //使用sql运行SQL表达式
 val result=sqlContext.sql("SELECT name,age from person WHERE age>=19")
 println(result.map(t=>"Name:"+t(0)).collect())
 println(result.map(t=>"Name:"+t.getAs[String](1)).collect())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

至此,就介绍了Spark SQL的基本原理以及初步使用,Spark SQL凭借着内存的快速访问快速在大数据界发展,有人说它取代了HIVE,这种说法过于狭隘,Hive还是应用广泛,只能说代替了Hive的 部分功能,并不能完全代替Hive来使用。

相关推荐