Spark学习笔记之SparkSQL

SparkSQL简介

基本特点

SparkSQL是Spark用来操作结构化和半结构化数据的接口,其提供了三大功能:

  • SparkSQL可以从各种数据源中读取数据(JSON,Hive…)
  • SparkSQL不仅支持Spark程序内部使用SQL查询,也支持外部工具(Tableau等)使用JDBC等进行连接
  • SparkSQL支持SQL与常见的Python/Java/Scala等代码高度整合,包括连接RDD和SQL表,公开的自定义SQL函数接口等。

其运行原理也十分简单:把Spark SQL转化为RDD,然后提交到集群运行。

SparkSession

SparkSession是Spark2.0引入的新概念。为用户提供了一个统一的切入点,让用户来使用Spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

SparkSession是SQLContext和HiveContext的组合,所以两者的API在SparkSession上都是可用的。SparkSession内部封装了SparkContext,其计算是由SparkContext完成的。

与Spark交互的时候不需要显示地创建SparkConf,SparkContext和SQLcontext,这些对象已经封闭在了SparkSession中。

使用SparkSQL

DataFrame

在Spark中DataFrame是一种以RDD为基础的分布式数据集,类似于我们传统数据库中的表。其与RDD的区别是DataFrame带有Schema元信息,即DataFrame所表示的二维数据集的每一列都带有名称和类型。SparkSQL依据此进行了优化,以大幅提升运行时的效率。

RDD与DataFrame

读取和存储数据

SparkSQL支持很多结构化的数据源,包括Hive表、JSON和Parquet文件等等,也指定结构信息可以把RDD转化为DataFrame。

Hive

SparkSQL对Hive的支持十分友好。支持Hive的支持的任何存储格式(Serde)。

我们提供一份Hive配置即可把SparkSQL连接到已经配置好的Hive上,具体操作Goole一下就好。

示例

1
2
3
4
5
from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("select key,value from my_table")
key = rows.map(lambda row:row[0])

Parquest

Parquest是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。

首先我们可以通过HiveContext.parquestFile或者SQLContext.parquestFile来读取数据

1
2
3

rows = hiveCtx.parquestFile(parquestfile)
names = rows.map(lambda row:row.name)

其次我们也可以把Parquest文件注册为SparkSQL的临时表,然后再这张表上进行查询。

JSON

我们只需要调用hiveCtx中的jsonFile()方法即可读取JSON数据。

1
input = hiveCtx.jsonFile(jsonfile)

基于RDD

除了读取数据,我们也可以基于RDD创建DataFrame。Python中可以创建一个Row对象组成的RDD,然后调用inferSchema()。

1
2
3
RDDn = sc.parallelize([Row(name="zy",tel="186")])
SchemaRDDn = hiveCtx.inferSchema(RDDn)
SchemaRDDn.registerTempTable("info")

UDF

SparkSQL不仅有自己的UDF接口,也支持Hive的UDF接口。

SparkSQL UDF

我们可以使用Spark支持的内建语言编写好函数,然后通过SparkSQL内建的方法传递进来,配合Python/Scala很好用。

Hive UDF

标准的Hive UDF已经自动添加在SparkSQL中,如果需要添加用户自定义的UDF,我们就需要确保该UDF的JAR包已经在应用中。

SparkSQL性能调优

这个估计是要单写一篇文章讲的吧…