SparkSQL简介
基本特点
SparkSQL是Spark用来操作结构化和半结构化数据的接口,其提供了三大功能:
- SparkSQL可以从各种数据源中读取数据(JSON,Hive…)
- SparkSQL不仅支持Spark程序内部使用SQL查询,也支持外部工具(Tableau等)使用JDBC等进行连接
- SparkSQL支持SQL与常见的Python/Java/Scala等代码高度整合,包括连接RDD和SQL表,公开的自定义SQL函数接口等。
其运行原理也十分简单:把Spark SQL转化为RDD,然后提交到集群运行。
SparkSession
SparkSession是Spark2.0引入的新概念。为用户提供了一个统一的切入点,让用户来使用Spark的各项功能。
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
SparkSession是SQLContext和HiveContext的组合,所以两者的API在SparkSession上都是可用的。SparkSession内部封装了SparkContext,其计算是由SparkContext完成的。
与Spark交互的时候不需要显示地创建SparkConf,SparkContext和SQLcontext,这些对象已经封闭在了SparkSession中。
使用SparkSQL
DataFrame
在Spark中DataFrame是一种以RDD为基础的分布式数据集,类似于我们传统数据库中的表。其与RDD的区别是DataFrame带有Schema元信息,即DataFrame所表示的二维数据集的每一列都带有名称和类型。SparkSQL依据此进行了优化,以大幅提升运行时的效率。
读取和存储数据
SparkSQL支持很多结构化的数据源,包括Hive表、JSON和Parquet文件等等,也指定结构信息可以把RDD转化为DataFrame。
Hive
SparkSQL对Hive的支持十分友好。支持Hive的支持的任何存储格式(Serde)。
我们提供一份Hive配置即可把SparkSQL连接到已经配置好的Hive上,具体操作Goole一下就好。
示例1
2
3
4
5from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("select key,value from my_table")
key = rows.map(lambda row:row[0])
Parquest
Parquest是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。
首先我们可以通过HiveContext.parquestFile或者SQLContext.parquestFile来读取数据1
2
3
rows = hiveCtx.parquestFile(parquestfile)
names = rows.map(lambda row:row.name)
其次我们也可以把Parquest文件注册为SparkSQL的临时表,然后再这张表上进行查询。
JSON
我们只需要调用hiveCtx中的jsonFile()方法即可读取JSON数据。1
input = hiveCtx.jsonFile(jsonfile)
基于RDD
除了读取数据,我们也可以基于RDD创建DataFrame。Python中可以创建一个Row对象组成的RDD,然后调用inferSchema()。1
2
3RDDn = sc.parallelize([Row(name="zy",tel="186")])
SchemaRDDn = hiveCtx.inferSchema(RDDn)
SchemaRDDn.registerTempTable("info")
UDF
SparkSQL不仅有自己的UDF接口,也支持Hive的UDF接口。
SparkSQL UDF
我们可以使用Spark支持的内建语言编写好函数,然后通过SparkSQL内建的方法传递进来,配合Python/Scala很好用。
Hive UDF
标准的Hive UDF已经自动添加在SparkSQL中,如果需要添加用户自定义的UDF,我们就需要确保该UDF的JAR包已经在应用中。
SparkSQL性能调优
这个估计是要单写一篇文章讲的吧…