Spark学习笔记之RDD

什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:

  • 自动容错
  • 位置感知性调度
  • 可伸缩性

RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度(持久化)。

RDD核心概念

首先我们需要初始化Spark的上下文环境,Spark2.4.0版本应该是默认初始化了的。

1
2
from pyspark import SparkConf, SparkContext
sc = SparkContext()

然后再创建RDD

1
2
3
4
5
intRDD = sc.parallelize([1,2,3,4,5,6])
intRDD.collect()

stringRDD = sc.parallelize(['zhang','yang'])
stringRDD.collect()

转化运算

转化运算是惰性的

RDD基本转化运算

map运算

1
intRDD.map(lambda x:x+1).collect()
[2, 3, 4, 5, 6, 7, 3, 4]
1
intRDD.map(lambda x:x**2).collect()
[1, 4, 9, 16, 25, 36, 4, 9]
1
2
lines = sc.textFile('README.md')
lines.first()
'[![buildstatus](https://travis-ci.org/holdenk/learning-spark-examples.svg?branch=master)](https://travis-ci.org/holdenk/learning-spark-examples)'

filter运算

1
2
3
4
5
6
pairs = lines.map(lambda x:(x.split(' ')[0],x))

#对第二个元素进行筛选

result = pairs.filter(lambda keyValue:len(keyValue[1])<20)
result.collect()[0]
('===============', '===============')

distinct运算

1
intRDD.distinct().collect()
[4, 1, 5, 2, 6, 3]

randomsplit运算

1
2
3
4
sRDD = intRDD.randomSplit([0.1,0.1])
print(len(sRDD))
print(sRDD[0].collect())
print(sRDD[1].collect())
2
[3, 4, 5, 2, 3]
[1, 2, 6]

groupby运算

1
2
3
result = intRDD.groupBy(lambda x : x % 2).collect()
for x,y in result:
print(x,sorted(y))
0 [2, 2, 4, 6]
1 [1, 3, 3, 5]

多个RDD转化运算

1
2
3
intRDD1 = sc.parallelize([3,1,2,5,5])
intRDD2 = sc.parallelize([5,6,2])
intRDD3 = sc.parallelize([2,7])

取并集

1
2
3
# union()
print(intRDD1.union(intRDD2).union(intRDD3).collect()) #取并集
print(intRDD1.union(intRDD2).union(intRDD3).distinct().collect()) #并集且去重
[3, 1, 2, 5, 5, 5, 6, 2, 2, 7]
[1, 2, 3, 5, 6, 7]

取交集

1
2
# intersection()
print(intRDD1.intersection(intRDD2).intersection(intRDD3).collect())
[2]

取差集

1
2
# subtract()
print(intRDD1.subtract(intRDD2).collect())
[1, 3]

笛卡尔积运算 cartesian()

1
2
# cartsection()
print(intRDD1.cartesian(intRDD2).collect())
[(3, 5), (3, 6), (3, 2), (1, 5), (1, 6), (1, 2), (2, 5), (2, 6), (2, 2), (5, 5), (5, 5), (5, 6), (5, 6), (5, 2), (5, 2)]

动作运算

基本动作运算

读取元素

1
2
3
4
5
6
7
8
#取第一条数据
print (intRDD.first())
#取前两条数据
print (intRDD.take(2))
#升序排列,并取前3条数据
print (intRDD.takeOrdered(3))
#降序排列,并取前3条数据
print (intRDD.takeOrdered(3,lambda x:-x))
1
[1, 2]
[1, 2, 2]
[6, 5, 4]

统计值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#统计
print (intRDD.stats())
#最小值
print (intRDD.min())
#最大值
print (intRDD.max())
#标准差
print (intRDD.stdev())
#计数
print (intRDD.count())
#求和
print (intRDD.sum())
#平均
print (intRDD.mean())
(count: 8, mean: 3.25, stdev: 1.5612494995995996, max: 6.0, min: 1.0)
1
6
1.5612494995995996
8
26
3.25

k-v转换

1
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
1
print(kvRDD1.keys().collect())
[3, 3, 5, 1]
1
print(kvRDD1.values().collect())
[4, 6, 6, 2]

筛选元素

1
print (kvRDD1.filter(lambda x:x[0] < 5).collect())
[(3, 4), (3, 6), (1, 2)]

值运算

使用mapvalue()进行排序

1
print(kvRDD1.mapValues(lambda x:x**2).collect())
[(3, 16), (3, 36), (5, 36), (1, 4)]

按照key排序

sortByKey(),传入的参数默认为True,表示从小到大排序

1
print(kvRDD1.sortByKey().collect())
[(1, 2), (3, 4), (3, 6), (5, 6)]
1
print(kvRDD1.sortByKey(False).collect())
[(5, 6), (3, 4), (3, 6), (1, 2)]

合并相同key值的数据

reduceByKey()

1
print(kvRDD1.reduceByKey(lambda x,y:x+y).collect())
[(5, 6), (1, 2), (3, 10)]

多个RDD kv转化运算

1
2
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD2 = sc.parallelize([(3,8)])

内连接

1
print(kvRDD1.join(kvRDD2).collect())
[(3, (4, 8)), (3, (6, 8))]

左外连接

1
print(kvRDD1.leftOuterJoin(kvRDD2).collect())
[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]

右外连接

1
print(kvRDD1.rightOuterJoin(kvRDD2).collect())
[(3, (4, 8)), (3, (6, 8))]

删除相同key值数据

1
print(kvRDD1.subtractByKey(kvRDD2).collect())
[(1, 2), (5, 6)]

kv-动作运算

读取数据

1
2
3
4
5
6
7
8
#读取第一条数据
print(kvRDD1.first())
#读取前两条数据
print(kvRDD1.take(2))
#读取第一条数据的key值
print(kvRDD1.first()[0])
#读取第一条数据的value值
print(kvRDD1.first()[1])
(3, 4)
[(3, 4), (3, 6)]
3
4

按key值统计

1
print(kvRDD1.countByKey())
defaultdict(<class 'int'>, {3: 2, 5: 1, 1: 1})

lookup查找

1
print(kvRDD1.lookup(3))
[4, 6]

持久化操作

spark RDD的持久化机制,可以将重复运算的RDD存储在内存中,可以大幅提升运算效率

持久化

1
kvRDD1.persist()
ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195
等级 说明
MEMORY_ONLY 以反序列化的JAVA对象的方式存储在JVM中. 如果内存不够, RDD的一些分区将不会被缓存, 这样当再次需要这些分区的时候,将会重新计算。这是默认的级别。
MEMORY_AND_DISK 以反序列化的JAVA对象的方式存储在JVM中. 如果内存不够, RDD的一些分区将将会缓存在磁盘上,再次需要的时候从磁盘读取。
MEMORY_ONLY_SER 以序列化JAVA对象的方式存储 (每个分区一个字节数组). 相比于反序列化的方式,这样更高效的利用空间, 尤其是使用快速序列化时。但是读取是CPU操作很密集。
MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER相似, 区别是但内存不足时,存储在磁盘上而不是每次重新计算。
DISK_ONLY 只存储RDD在磁盘
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上面的级别相同,只不过每个分区的副本只存储在两个集群节点上。
OFF_HEAP (experimental) 将RDD以序列化的方式存储在 Tachyon. 与 MEMORY_ONLY_SER相比, OFF_HEAP减少了垃圾回收。允许执行体更小通过共享一个内存池。因此对于拥有较大堆内存和高并发的环境有较大的吸引力。更重要的是,因为RDD存储在Tachyon上,执行体的崩溃不会造成缓存的丢失。在这种模式下.Tachyon中的内存是可丢弃的,这样 Tachyon 对于从内存中挤出的块不会试图重建它。如果你打算使用Tachyon作为堆缓存,Spark提供了与Tachyon相兼容的版本。
1
2
3
4
5
6
7
8
9

#pyspark中封装了StorageLevel类,实现不同存储等级的存储
from pyspark.storagelevel import StorageLevel
def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1):
self.useDisk = useDisk
self.useMemory = useMemory
self.useOffHeap = useOffHeap
self.deserialized = deserialized
self.replication = replication

取消持久化

1
kvRDD1.unpersist()
ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195