从一个问题出发思考Spark的特性

文章目录
  1. 1. 遇到的问题
  2. 2. 问题之后的思考
  3. 3. RDD的定义和特性

我在Spark的学习过程中,和大多数同学一样,我也遇到了这样一个问题,下面的这段代码是求data数据有小到大排列后的,下标为4的数字,正确的结果应该是43,但是在注释掉A.cache()后结果会变为67。这中间究竟发生了什么,使得结果发生变化?这与RDD的特性息息相关。

遇到的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# import pyspark modules
from pyspark import SparkContext
sc = SparkContext()

# Linear-time selection
data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
# 12 21 26 34 43 44 47 56 67 74 89

A = sc.parallelize(data,2) # slice = 2
# print(A.collect())
k = 4 # find the element with index = 4 from small to big---> 43 (index begin from 0)

while True:
x = A.first()
A1 = A.filter(lambda z: z < x)
A2 = A.filter(lambda z: z > x)
mid = A1.count()
if mid == k:
print (x)
break
if k < mid:
A = A1
else:
A = A2
k = k - mid - 1
A.cache()

要理解这个问题,首先要明白两个问题:

  • 继承(也叫依赖)
  • 重计算

我们在编程中常认为=运算是赋值的意思,但在RDD中这里面除了赋值,还有一层继承的关系,何为继承呢?我们先看下面一个简单的例子:

1
2
3
4
5
data = [2, 5, 9, 7, 8, 1, 3, 4]
x = 5
A = sc.parallelize(data,2)
A1 = A.filter(lambda z: z < x)
A2 = A.filter(lambda z: z > x)

A可以认为是一个父结点,它是一个RDD对象,在代码第4,5行,我们使用赋值语句时,会创建两个新的RDD对象,并且构建如下图的依赖关系,我们可以说A1, A2是从父结点A继承而来的。
1
2
A = [2,5,9,7,8,1,3,4] ---- |--(小于5)--> A1 = [2,1,3,4]
|--(大于5)--> A2 = [9,7,8]

我们再运行下面的语句,又会发生什么呢?

1
A = A2

如果按照我们以前编程的知识,是不是认为A只是被重新赋了值,即A = [9,7,8]。但在RDD中,这会新创建一个RDD对象,如下图所示:
1
2
A = [2,5,9,7,8,1,3,4] ----- |--(小于5)--> A1 = [2,1,3,4]
|--(大于5)--> A2 = [9,7,8] --(等于)--> A = [9,7,8]

如果我们再运行下面的代码,取出A中的第一个元素,这时会发生什么呢?这就要涉及到重计算的概念了。

1
x = A.first()

因为我们并没有运行A.cache()A.persist()将A持久化,这就会导致在执行完A.first()得到x = 9后,还会按照上面的依赖关系从头到尾再计算一遍。由于x值的改变,会导致A1,A2发生变化,A1是小于9的集合,而A2变成了空集,进而导致A也变成了空集,但是最开始的那个父结点A并不受影响,虽然都是A但是是两个不同的RDD对象。
1
2
A = [2,5,9,7,8,1,3,4] ---- |--(小于9)--> A1 = [2,5,7,8,1,3,4]
|--(大于9)--> A2 = [] --(等于)--> A = []

我们回到真正的问题上,为什么不用A.cache()会导致最终的输出是x=67,我们画出在第一次循环执行完的RDD-依赖链式关系图:
1
2
3
A = [34,67,21,56,47,89,12,44,74,43,26] ---- |--(小于34)--> A1 = [21,12,26]
|--(大于34)--> A2 = [67,56,47,89,44,74,43]
|--(等于)--> A = [67,56,47,89,44,74,43]

由于没有运行A.cache()在第二次循环执行A.first()语句得到x = 67后,Spark在后面偷偷摸摸地重新计算,也就是
1
2
3
A = [34,67,21,56,47,89,12,44,74,43,26] ---- |--(小于67)--> A1 = [34,21,56,47,12,44,43,26]
|--(大于67)--> A2 = [89,74]
|--(等于)--> A = [89,74]

这时候运行下面的代码
1
2
A1 = A.filter(lambda z: z < x)
A2 = A.filter(lambda z: z > x)

会继续延展依赖关系,即
1
2
3
4
5
6
A = [34,67,21,56,47,89,12,44,74,43,26] ---- |--(小于67)--> A1 = [34,21,56,47,12,44,43,26]
|--(大于67)--> A2 = [89,74]
|--(等于)--> A = [89,74]
|--(小于67)--> A1 = []
|--(大于67)--> A2 = [89,74]
|--(等于)--> A = [89,74]

这时的mid = A1.count()应该会得到0等于k,所以输出x = 67

问题之后的思考

显然从上面的问题中,我们管窥蠡测,瞥得RDD的一部分特性。RDD ( Resilient Distributed Datasets ),中文翻译叫做弹性分布式数据集,被称为Spark最基本的数据抽象,那什么又是“弹性的”,什么又是“分布式的”呢?

在大数据时代来临之际,面对海量数据的巨大压力,单台机器的运算能力已经远远不够,于是人们想到了“众人拾柴火焰高”的道理,建立一个大规模的计算机集群,把数据分成许多部分,然后分发给不同的机器去运算,最后将结果汇总起来,实现大数据的计算,这个就是“分布式”的意思。而实现分布式的计算,必然需要一个适合它的数据结构,或者说数据模型,什么意思呢?

举个例子,你现在有一堆书需要布置到你的书房,方便你查阅资料和学习,你会怎么做?我想,你应该会买一个书柜,在书架上分门别类地、整齐地摆放好你的书。但如果现在你毕业了,这些书你需要收起来,你又会怎么做?这时候应该需要收纳箱吧,你需要将书紧密地塞在箱子里面,尽可能地在一个箱子中装更多的书,最后将箱子摞起来,给房间腾出更多的空间。

当我们的目的不同,我们对书采取的策略也会不同,如果我们把书看作是“数据”,那么数据结构的特点也不同,我们看下面这张表:

目的 措施 数据结构的特点 优点 缺点
将书布置在房间里 放书架 分门别类、整齐 查阅方便快捷 占空间
将书收起来 放箱子里 紧密、无序 省空间 不方便查阅、取书麻烦

这种“数据结构”的概念其实就在我们的生活中,无处不在,我们总会选取最适合我们当前任务的数据结构来进行工作,为了展示工作成果,我们会将数据组织成PPT的形式;写报告和文章,我们会用word;公司的信息数据的管理,我们会用数据库;笔记本、草稿本和错题本,我们会分开使用,而且记录的方法也不同。

那么在应对大规模的数据,以及需要成千上万的机器协同计算的场景呢?我们自然需要去设计一套最适合这样的场景的“数据结构”,它需要有许多新颖的特性,以满足这样的需求,RDD就是其中一种数据结构。

RDD的定义和特性

jaceklaskowski在RDD — Resilient Distributed Dataset中对RDD做了很好的总结,我们能看到RDD的最初定义、Apache官方的定义以及它的特性。

RDD的特点 (根据RDD的全称):

  • 弹性的(Resilient), 即在RDD的继承关系(Lineage,有些文献称为血缘关系)的帮助下具有容错能力,因此能够重新计算由于节点故障而丢失或损坏的分区。
  • 分布式的(Distributed),数据驻留在群集中的多个节点上。
  • 数据集(Dataset),具有原始值或值的分区数据的集合,例如元组或其他对象(表示您使用的数据的记录)。

我们可以看出,我们最开始提出的那个问题,其中涉及的继承与重计算,是属于RDD“弹性”的特点。

来自org.apache.spark.rdd.RDD的scaladoc :

弹性分布式数据集(RDD),Spark中的基本抽象。表示可以并行操作的不可变的分区元素集合。

从关于RDD的原始论文 - 弹性分布式数据集:内存中集群计算的容错抽象:

弹性分布式数据集(RDD)是一种分布式内存抽象,它允许程序员以容错的方式在大型集群上执行内存计算。

除了上述特征之外,它还具有以下附加特征:

  • 内存中(In-Memory),即RDD内的数据尽可能多(大小)和长(时间)存储在内存中。
  • 不可变或只读(Immutable or Read-Only),即一旦创建它就不会改变,只能使用转换(transition)转换为新的RDD。
  • 延迟评估(Lazy evaluated), 即RDD内的数据不可用或转换,直到执行触发执行的操作。
  • 可缓存(Cacheable),即您可以将所有数据保存在持久性“存储”中,如内存(默认和最优选)或磁盘(由于访问速度最不受欢迎)。
  • 并行(Parallel),即并行处理数据。
  • 类型(Typed),RDD记录具有类型,例如Longin RDD[Long](Int, String)in RDD[(Int, String)]
  • 分区(Partitioned),记录被分区(拆分为逻辑分区)并分布在群集中的节点上。
  • 位置粘性(Location-Stickiness),RDD可以定义放置首选项以计算分区(尽可能接近记录)。