Spark学习笔记之累加器与广播变量

概述

在我们与Spark的交互中,我们的操作在远程节点运行时,Spark操作的实际上是这个函数所用变量的一个独立副本。这些变量会被复制到每台机器上,并且这些变量在远程机器上所有更新不会被传递回驱动程序。同时,跨任务的读写变量是很低效的。

Spark为两种较为常见情况的提供了共享变量:广播变量和累加器

累加器

常见用途是在调试时对作业的执行过程中的事件进行计数

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
# 定义一个累加器
blackLines = sc.accumulator(0)


def extractlines(line):
global blackLines
if line == "\'\'\,":
blackLines += 1
return line.split(' ')

textRDD.flatMap(extractlines)
print('blackline %s'%(blackLines))

注意事项

  • 累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

广播变量

如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么这个值只会被发送到各节点一次,使用BitTorrent的通信机制。

1
2
3
4
5
6
7

# 定义一个广播变量
a = 'xxx'
broadcast = sc.broadcast(a)

# 还原一个广播变量
b = broadcast.value

变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改

注意事项:

  • 不能将一个RDD使用广播变量广播出去,可以广播RDD的结果

  • 广播变量只能在Driver端定义,不能在Executor端定义。

  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

  • 果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

  • 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。