什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:
- 自动容错
- 位置感知性调度
- 可伸缩性
RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度(持久化)。
RDD核心概念
首先我们需要初始化Spark的上下文环境,Spark2.4.0版本应该是默认初始化了的。1
2from pyspark import SparkConf, SparkContext
sc = SparkContext()
然后再创建RDD1
2
3
4
5intRDD = sc.parallelize([1,2,3,4,5,6])
intRDD.collect()
stringRDD = sc.parallelize(['zhang','yang'])
stringRDD.collect()
转化运算
转化运算是惰性的
RDD基本转化运算
map运算
1 | intRDD.map(lambda x:x+1).collect() |
[2, 3, 4, 5, 6, 7, 3, 4]
1 | intRDD.map(lambda x:x**2).collect() |
[1, 4, 9, 16, 25, 36, 4, 9]
1 | lines = sc.textFile('README.md') |
'[](https://travis-ci.org/holdenk/learning-spark-examples)'
filter运算
1 | pairs = lines.map(lambda x:(x.split(' ')[0],x)) |
('===============', '===============')
distinct运算
1 | intRDD.distinct().collect() |
[4, 1, 5, 2, 6, 3]
randomsplit运算
1 | sRDD = intRDD.randomSplit([0.1,0.1]) |
2
[3, 4, 5, 2, 3]
[1, 2, 6]
groupby运算
1 | result = intRDD.groupBy(lambda x : x % 2).collect() |
0 [2, 2, 4, 6]
1 [1, 3, 3, 5]
多个RDD转化运算
1 | intRDD1 = sc.parallelize([3,1,2,5,5]) |
取并集
1 | # union() |
[3, 1, 2, 5, 5, 5, 6, 2, 2, 7]
[1, 2, 3, 5, 6, 7]
取交集
1 | # intersection() |
[2]
取差集
1 | # subtract() |
[1, 3]
笛卡尔积运算 cartesian()
1 | # cartsection() |
[(3, 5), (3, 6), (3, 2), (1, 5), (1, 6), (1, 2), (2, 5), (2, 6), (2, 2), (5, 5), (5, 5), (5, 6), (5, 6), (5, 2), (5, 2)]
动作运算
基本动作运算
读取元素
1 | #取第一条数据 |
1
[1, 2]
[1, 2, 2]
[6, 5, 4]
统计值
1 | #统计 |
(count: 8, mean: 3.25, stdev: 1.5612494995995996, max: 6.0, min: 1.0)
1
6
1.5612494995995996
8
26
3.25
k-v转换
1 | kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) |
1 | print(kvRDD1.keys().collect()) |
[3, 3, 5, 1]
1 | print(kvRDD1.values().collect()) |
[4, 6, 6, 2]
筛选元素
1 | print (kvRDD1.filter(lambda x:x[0] < 5).collect()) |
[(3, 4), (3, 6), (1, 2)]
值运算
使用mapvalue()进行排序
1 | print(kvRDD1.mapValues(lambda x:x**2).collect()) |
[(3, 16), (3, 36), (5, 36), (1, 4)]
按照key排序
sortByKey(),传入的参数默认为True,表示从小到大排序
1 | print(kvRDD1.sortByKey().collect()) |
[(1, 2), (3, 4), (3, 6), (5, 6)]
1 | print(kvRDD1.sortByKey(False).collect()) |
[(5, 6), (3, 4), (3, 6), (1, 2)]
合并相同key值的数据
reduceByKey()
1 | print(kvRDD1.reduceByKey(lambda x,y:x+y).collect()) |
[(5, 6), (1, 2), (3, 10)]
多个RDD kv转化运算
1 | kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) |
内连接
1 | print(kvRDD1.join(kvRDD2).collect()) |
[(3, (4, 8)), (3, (6, 8))]
左外连接
1 | print(kvRDD1.leftOuterJoin(kvRDD2).collect()) |
[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]
右外连接
1 | print(kvRDD1.rightOuterJoin(kvRDD2).collect()) |
[(3, (4, 8)), (3, (6, 8))]
删除相同key值数据
1 | print(kvRDD1.subtractByKey(kvRDD2).collect()) |
[(1, 2), (5, 6)]
kv-动作运算
读取数据
1 | #读取第一条数据 |
(3, 4)
[(3, 4), (3, 6)]
3
4
按key值统计
1 | print(kvRDD1.countByKey()) |
defaultdict(<class 'int'>, {3: 2, 5: 1, 1: 1})
lookup查找
1 | print(kvRDD1.lookup(3)) |
[4, 6]
持久化操作
spark RDD的持久化机制,可以将重复运算的RDD存储在内存中,可以大幅提升运算效率
持久化
1 | kvRDD1.persist() |
ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195
等级 | 说明 |
---|---|
MEMORY_ONLY | 以反序列化的JAVA对象的方式存储在JVM中. 如果内存不够, RDD的一些分区将不会被缓存, 这样当再次需要这些分区的时候,将会重新计算。这是默认的级别。 |
MEMORY_AND_DISK | 以反序列化的JAVA对象的方式存储在JVM中. 如果内存不够, RDD的一些分区将将会缓存在磁盘上,再次需要的时候从磁盘读取。 |
MEMORY_ONLY_SER | 以序列化JAVA对象的方式存储 (每个分区一个字节数组). 相比于反序列化的方式,这样更高效的利用空间, 尤其是使用快速序列化时。但是读取是CPU操作很密集。 |
MEMORY_AND_DISK_SER | 与MEMORY_ONLY_SER相似, 区别是但内存不足时,存储在磁盘上而不是每次重新计算。 |
DISK_ONLY | 只存储RDD在磁盘 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上面的级别相同,只不过每个分区的副本只存储在两个集群节点上。 |
OFF_HEAP (experimental) | 将RDD以序列化的方式存储在 Tachyon. 与 MEMORY_ONLY_SER相比, OFF_HEAP减少了垃圾回收。允许执行体更小通过共享一个内存池。因此对于拥有较大堆内存和高并发的环境有较大的吸引力。更重要的是,因为RDD存储在Tachyon上,执行体的崩溃不会造成缓存的丢失。在这种模式下.Tachyon中的内存是可丢弃的,这样 Tachyon 对于从内存中挤出的块不会试图重建它。如果你打算使用Tachyon作为堆缓存,Spark提供了与Tachyon相兼容的版本。 |
1 |
|
取消持久化
1 | kvRDD1.unpersist() |
ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195