
3.1 RDD基础
本节概述RDD编程基本要点,对RDD编程兴趣不大的读者,可在本章的学习中只阅读3.1节RDD基础、3.2节RDD简单实例等内容跳过本章,继续进行Spark SQL这一核心内容的学习,因为在后面的章节中只涉及少量RDD转化问题,所以读者掌握了RDD的基本内容即可理解Spark RDD、DataFrame的区别与共性,进而了解它们各自的编程特点以及应用场合,强烈推荐有志于深入理解Spark的读者全面学习本章内容。
Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区(partitions),这些分区被分发到集群中的不同节点上进行计算。RDD可以包含Python、Java、Scala中任意类型的对象,甚至可以包含用户自定义的对象。
3.1.1 创建RDD
用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动程序里转化驱动程序中的对象集合(比如list和set)为RDD。
例3-1:使用 textFile() 创建一个字符串的 RDD

3.1.2 RDD转化操作、行动操作
创建出来后,RDD支持两种类型的操作:转化操作(transformations)和行动操作(actions)。
1.转化操作
转化操作会由一个RDD生成一个新的RDD,例如,RDD通过map(func)函数遍历并利用func处理每一个元素,进而生成新的RDD就是一个常见的转化操作。
在示例3-2中,map遍历RDD[String]中的每一个String对象,此时的每一个String对象表示的便是文件的每一行,进而借助传入map的(s => s.length)匿名函数求出每一行(String对象)长度,转化为记录着每一行长度的新的RDD(lineLengths)。
例3-2:调用转化操作map()

2.行动操作
另一方面,行动操作会对RDD计算出一个结果,是向应用程序返回值,或向存储系统导出数据的那些操作,例如count(返回RDD中的元素个数)、collect(返回RDD所有元素)、save(将RDD输出到存储系统)。
take(n)是返回RDD前n个元素的一个行动操作,如例3-3所示,查看前二十行的字数。
例3-3:调用take()行动操作

reduce()是并行整合所有RDD数据的行动操作,例如求和操作,如例3-4,根据例3-2得到记录每行字数的RDD(lineLengths),可用reduce()对每行字数进行求和,进而求出文件总字长。
例3-4:调用reduce()行动操作

3.1.3 惰性求值
转化操作和行动操作的区别在于Spark计算RDD的方式不同。虽然你可以在任何时候通过转化操作定义新的RDD,Spark只是记录RDD的转换过程,不会直接进行计算,它们只有第一次在一个行动操作中用到时,才会真正触发计算。
大家看下面的示例(见图3-1)。

图3-1
该示例中,笔者通过SparkContext(图中第一行中的sc)提供的方法textFile()读取本地文件(/etc/profile2)来创建RDD,哪怕实际上该文件并不存在,也能成功创建RDD。当RDD遇到第一个行动(actions)操作时,需要对RDD进行计算,此时才会报错,也就说明了转化操作的本质:记录旧RDD如何转化成新RDD,而不会立即进行计算,以免浪费资源。
这种策略刚开始看起来可能会显得有些奇怪,不过在大数据领域却十分有道理。
比如,看看例3-2和例3-3,我们以一个文本文件定义了RDD,然后借助map(s=>s.length)定义了一个新的记录着每一行字数的新的RDD。如果Spark在运行val lines =sc.textFile("data.txt")、val lineLengths = lines.map(s => s.length)这样的转化操作时,就把文件中所有的行都读取并存储起来,并进行对每一行字数的计算,就会消耗很多存储空间和计算资源。相反,一旦Spark了解了完整的转化、行动操作链之后,它就可以只计算求结果时真正需要的数据,以及必要的运算。事实上,如例3-3在运行lineLengths.take(20).foreach(println)行动操作时,Spark只需要扫描文件直到找到前20行进行计算即可,即在例3-3中,不管数据源文件多大,真正读取并进行字数计算的只有该文件前20行,因为take()行动操作只涉及文件前20行,而不需要读取整个文件,从而节省了大量存储、计算资源。
3.1.4 RDD缓存概述
默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist() /RDD.cache()让Spark把这个RDD缓存下来。我们可以让Spark把数据以多种形式持久化到许多不同的地方(memory、disk),可用的选项会在3.8节具体讲述。在第一次对持久化的RDD计算之后(假如我们的持久化级别是MEMORY_ONLY),Spark会把RDD的内容保存到内存中(以分区方式存储到集群中的各节点上),这样在之后的行动操作中就可以重用这些数据了。我们也可以把RDD缓存到磁盘上而不是内存中。
默认不进行持久化可能也显得有些奇怪,不过这对于大规模数据集是很有意义的:在实际情况中,通常大部分的数据只使用一次。我们可以用Spark遍历数据一遍,计算得出我们想要的结果,所以我们没有必要浪费存储空间来将这些数据持久化。Spark在计算过后就默认释放掉这些使用过的数据,这种方式可以避免内存的浪费。
在实际操作中,会经常用persist()来把数据的一部分读取到内存中,并反复查询这部分数据。例如,如果我们想多次对记录着文件每一行字数的RDD(lineLengths)进行计算,就可以将lineLengths持久化到内存,如例3-5所示。
例3-5:把RDD持久化到内存中

3.1.5 RDD基本编程步骤
RDD的基本编程步骤如下:
读取内、外部数据源创建RDD。
使用诸如map()、filter()这样的转化操作对RDD进行转化,以定义新的RDD。
对需要被重用的RDD手动执行persist()/cache()操作。
使用行动操作,例如count()和first()等,来触发一次并行计算,Spark会对记录下来的RDD转换过程进行优化后再执行计算。