技术成就梦想

spark之广播变量设计和实现 原 spark之广播变量设计和实现 加米谷大数据



spark 广播的方式

spark 历史上采用了两种广播的方式:

一种是通过 Http 协议传输数据;

一种是通过 Torrent 协议来传输数据。

但是最新的 spark 版本中, http 的方式已经废弃了(pr 在此https://github.com/apache/spark/pull/10531), spark 是在 spark 1.1 版本中引入了 TorrentBroadcast, 此后就没有更新 HttpBroadcast 和相关文档了, spark2.0 的时候完全可以删除 HttpBroadcast 了, 之后统一把 TorrentBroadcast 作为广播变量的唯一实现方式。 但是代码没有写死, 还是保留了扩展性(BroadcastFactory 作为一个 trait, TorrentBroadcastFactory 只是一种实现方式, 符合依赖倒置原则, 依赖抽象,不依赖具体实现), 万一之后想到了更牛x 的实现方式, 可以方便的加上,但是我估计一时半会应该没有了。本着过时不讲的原则, 我们这里只说 TorrentBroadcast 大家可以到这里看下图

加米谷大数据.png

你能看到不同的数据块是来自不同的节点, 多个节点一起组成一个网络,在你下载的同时,你也在上传,所以说在享受别人提供的下载的同时,你也在贡献,最终所有人一起受益。

我们看下 BitTorrent 协议, wiki 定义

BitTorrent协议(简称BT,俗称比特洪流、BT下载)是用在对等网络中文件分享的网络协议程序。和点对点(point-to-point)的协议程序不同,它是用户群对用户群(peer-to-peer),而且用户越多,下载同一文件的人越多,下载该档案的速度越快。且下载后,继续维持上传的状态,就可以“分享”,成为其用户端节点下载的种子文件(.torrent),同时上传及下载。

关键的几个点

1、下载者要下载文件内容,需要先得到相应的种子文件,然后使用BT客户端软件进行下载。

2、提供下载的文件虚拟分成大小相等的块, 并把每个块的索引信息和Hash验证码写入种子文件中。

3、有一个 Tracker 负责维护元信息, 所有的客户端都可以通过 Tracker 找到每个快离自己最近的其他下载者。

4、下载时,BT客户端首先解析种子文件得到Tracker地址,然后连接Tracker服务器。Tracker服务器回应下载者的请求,提供下载者其他下载者(包括发布者)的IP。下载者再连接其他下载者,根据种子文件,两者分别告知对方自己已经有的块,然后交换对方所没有的数据。此时不需要其他服务器参与,分散了单个线路上的数据流量,因此减轻了服务器负担。

5、下载者每得到一个块,需要算出下载块的Hash验证码与种子文件中的对比,如果一样则说明块正确,不一样则需要重新下载这个块。这种规定是为了解决下载内容准确性的问题。

针对以上的几个点, spark 是怎么做的

TorrentBroadcast 底层使用的是 BlockManager, 下载每个数据块先要去 master 去获取 Block 所在的位置 (location)。

在把大变量写到广播变量的时候, 通过 ChunkedByteBufferOutputStream把输入的数据分成多个小块, zipWithIndex 中, 为每个小块加一个唯一标识, 形如 broadcast_broadcastId_pieceId。 作为BlockId, 存储在 BlockManager 中。 而且对每个小的数据块加上一个校验码。

BlockManagerMaster 作为 tracker 维护所有 Block块的元信息, 知道每个数据块所在的 executor和存储级别。 Broadcast 变量中维护属于自己的所有小块的 BlockId 通过 value 方法读取 Boradcast 变量的时候, 取出所有小块的 BlockId, 对于每个 BlockId, 通过BlockManagerMaster 获取了该BlockId的位置的集合, 随机化,位置集合被打乱, 优先找同主机的地址(这样可以走回环),然后从随机的地址集合按顺序取地址一个一个尝试去获取数据,因为随机化了地址,那么executor不只会从Driver去获取数据。分散了driver 上的压力。

取到 Block piece 后, 使用校验码进行校验,看看数据块有没有损坏, 如果没有损坏, 然后按照顺序拼在一起。

大家比较一下, 流程是不是差不多, 基本贯穿了 BitTorrent 的思想原理。

BitTorrent 的思想原理.png

大家看下上面的图, 开始的时候, 大家都是通过 driver 拿数据, 但是一旦其他 executor 上有了数据块之后, 所有的 executor 都是有机会通过别的 executor 来获取数据块, 这样就分散了 driver 的压力。 套用一句话, 下载的 executor 越多, 下载的越快。

spark 广播变量的使用姿势

广播变量的使用姿势.png

上面的一个小的 demo 就是把一个 数组通过 broadcast 的方式广播出去, 然后就可以在task里面使用数组变量了, 这个数组变量是驻留在executor上的,不用每次调度 task运行的时候都得传输一次数组。

我们可以看到对于broadcast的使用, 无非就是 sc.broadcast 定义了一个 广播变量 和 broadcasted.value 使用广播变量的 value 方法,找到真正的数组。

spark context 初始化的时候, sparkEnv 中初始化了一个 broadcastManager,初始化方法里面, 现在默认使用的 TorrentBroadcastFactory, 调用 sc.broadcast 方法, 就会使用工厂模式创建一个 TorrentBroadcast,这时候就会调用写操作, 把数据分成小块写到 BlockManager 中, broadcasted 只是一个 TorrentBroadcast 类型的实例, 并没有数组数据, 这个实例只维护了数据的 元信息, 也就是一组BlockId 信息, 这个实例被序列化被传到 executor上, 在 executor 上调用这个实例的 value 方法,才会触发去 BlockManager 上读真正的数据。

加米谷是一家专注于大数据培训机构,作为成都大数据联盟会员单位、长虹集团软件中心实训基地、中科招商·中科创大创业孵化基地、中科创商·中科创大创业孵化基地、翰林科技实训基地等。由来自阿里、华为、京东、星环等国内知名企业的多位技术大牛联合创办,技术底蕴丰厚,勤奋创新,精通主流前沿大数据及人工智能相关技术。