1.RDD创建
1.1 从集合(内存)中创建 RDD
从集合中创建RDD,Spark主要提供了两个方法:parallelize
和makeRDD
从底层代码实现来讲,makeRDD方法其实就是parallelize方法
1.2 从外部存储(文件)创建 RDD
由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集
比如HDFS、HBase等。
1.3 从其他 RDD 创建
主要是通过一个RDD 运算完后,再产生新的 RDD
1.4 直接创建 RDD new
使用new 的方式直接构造 RDD
2.RDD 并行度与分区
Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。– 并行执行的任务数量,并不是指的切分任务的数量
读取内存数据时,数据可以按照并行度的设定进行数据的分区操作;读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异。
3.RDD 转换算子
RDD根据数据处理方式
的不同将算子整体上分为Value类型
、双Value类型
和Key-Value类型
3.1 Value类型
3.1.1 map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
3.1.2 map Partitions
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
3.1.3 map和mapPartitions的区别
- 数据处理角度
- Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
- 功能的角度
- Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
- 性能的角度
- Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。
3.1.4 mapPartitionsWithIndex
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
3.1.5 flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
将List(List(1,2),3,List(4,5))进行扁平化操作
3.1.6 glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
3.1.7 groupBy
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
3.1.8 filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
3.1.9 sample
根据指定的规则从数据集中抽取数据
3.1.10 distinct
将数据集中重复的数据去重
3.1.11 coalesce
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本
3.1.12 repartition
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
3.1.13 sortBy
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程
3.2 双Value类型
3.2.1 intersection
对源RDD和参数RDD求交集后返回一个新的RDD
3.2.2 union
对源RDD和参数RDD求并集后返回一个新的RDD
3.2.3 subtract
以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集
3.2.4 zip
将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。
3.3 Key-Value类型
3.3.1 partitionBy
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
3.3.2 reduceByKey
可以将数据按照相同的Key对Value进行聚合
3.3.3 groupByKey
将数据源的数据根据key对value进行分组
从shuffle
的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。
从功能
的角度:reduceByKey其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey。
3.3.4 aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算
3.3.5 foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
3.3.6 combineByKey
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
3.3.7 reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
- reduceByKey: 相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同
- FoldByKey: 相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
- AggregateByKey:相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
- CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
3.3.8 sortByKey
在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的
3.3.9 join
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的
3.3.10 leftOuterJoin
类似于SQL语句的左外连接
3.3.11 cogroup
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
4.RDD 行动算子
4.1 reduce
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
4.2 collect
在驱动程序中,以数组Array的形式返回数据集的所有元素
4.3 count
返回RDD中元素的个数
4.4 first
返回RDD中的第一个元素
4.5 take
返回一个由RDD的前n个元素组成的数组
4.6 takeOrdered
返回该RDD排序后的前n个元素组成的数组
4.7 aggregate
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
4.8 fold
折叠操作,aggregate的简化版操作
4.9 countByKey
统计每种key的个数
4.10 save 相关算子
4.10.1 函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
4.10.2 函数说明
将数据保存到不同格式的文件中
4.11 foreach
分布式遍历RDD中的每一个元素,调用指定函数