算子分类
1、 Transformation算子(转换算子),此类算子操作是延迟计算的,即是将要从一个RDD转换成另一个RDD的操作,但不是马上执行,并不触发提交Job作业,需要等到有Action操作的时候才会真正触发运算。
根据操作数据类型的不同,可细分为 Value数据类型的Transformation算子 和 Key-Value数据类型的Transfromation算子
2、Action算子(行动算子), 这类算子会触发 SparkContext 提交Job作业,并将数据输出 Spark系统。
重难 Transformation算子
glom
该函数是将RDD中每一个分区中各个元素合并成一个Array,这样每一个分区就只有一个数组元素1
2
3
4val a = sc.parallelize(1 to 9, 3)
a.glom.collect
//输出
res66: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))
mapPartitions(function)
与 map 类似,但函数单独在 RDD 的每个分区上运行1
2
3
4
5
6
7
8
9
10val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitions(iterator => {
val buffer = new ListBuffer[Int]
while (iterator.hasNext) {
buffer.append(iterator.next() * 100)
}
buffer.toIterator
}).foreach(println)
//输出结果
100 200 300 400 500 600
join
在一个(K, V)和(K, W)类型的RDD 上调用时,返回一个(K, (V, W)) pairs 的 RDD,等价于内连接操作(不含 V或W 为空的)。
执行外连接,可以使用:
leftOuterJoin (不含 W 为空)
rightOuterJoin (不含 W 为空)
fullOuterJoin (包含V 和 W 为空)
sample
数据采样。有三个可选参数:设置是否放回 (withReplacement)、采样的百分比 (fraction,小于等于1)、随机数生成器的种子 (seed)1
2
3val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println)
//输出结果随机
groupByKey
按照键进行分组,在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable1
2
3
4
5
6val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)
//输出:
(spark,List(3, 5))
(hadoop,List(2, 2))
(storm,List(6))
cogroup
先同一个 (K, V) RDD 中的元素先按照 key 进行分组,然后再对不同 RDD 中的元素按照 key 进行分组1
2
3
4
5
6
7
8val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
val list02 = List((1, "A"), (2, "B"), (3, "E"))
val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)
// 输出
(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB]))
reduceByKey
按照键进行归约操作1
2
3
4
5
6val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)
//输出
(spark,8)
(hadoop,4)
(storm,6)
sortBy(function) & sortByKey
按照键进行排序,需要 collect 等action算子后才是有序的1
2
3
4
5
6val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
sc.parallelize(list01).sortByKey(ascending = false).collect.foreach(println)
// 输出
(120,storm)
(100,hadoop)
(90,spark)
按照指定function进行排序,需要 collect 等action算子后才是有序的1
2
3
4
5
6val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).collect.foreach(println)
// 输出
(storm,120)
(hadoop,100)
(spark,90)
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
当针对(K,V)对的数据集时,返回(K,U)对的数据集,先对分区内执行seqOp函数,zeroValue 聚合每个键的值,再对分区间执行combOp函数。与groupByKey 类似,reduce 任务的数量可通过第二个参数 numPartitions 进行配置。示例如下:1
2
3
4
5
6
7
8
9
10// 为了清晰,以下所有参数均使用具名传参
val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
seqOp = math.max(_, _),
combOp = _ + _
).collect.foreach(println)
//输出结果:
(hadoop,3)
(storm,8)
(spark,7)
这里使用了 numSlices = 2 指定 aggregateByKey 父操作 parallelize 的分区数量为 2,其执行流程如下:
基于同样的执行流程,如果 numSlices = 1,则意味着只有输入一个分区,则其最后一步 combOp 相当于是无效的,执行结果为:1
2
3(hadoop,3)
(storm,8)
(spark,4)
同样的,如果每个单词对一个分区,即 numSlices = 6,此时相当于求和操作,执行结果为:1
2
3(hadoop,5)
(storm,14)
(spark,7)
aggregateByKey(zeroValue = 0,numPartitions = 3) 的第二个参数 numPartitions 决定的是输出 RDD 的分区数量,想要验证这个问题,可以对上面代码进行改写,使用 getNumPartitions 方法获取分区数量
combineByKeyC C,
mergeValue:(C, V) C,
mergeCombiners:(C, C) C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null
):RDD[(K,C)]
参数:createCombiner:V=>C 分组内的创建组合的函数。即是对读进来的数据进行初始化,其把当前的值作为参数,可以对该值做一些转换操作,转换为我们想要的数据格式
参数:mergeValue:(C,V)=>C 该函数主要是分区内的合并函数,作用在每一个分区内部。其功能主要是将V合并到之前(createCombiner)的元素C上,注意,这里的C指的是上一函数转换之后的数据格式,而这里的V指的是原始数据格式(上一函数为转换之前的)
参数:mergeCombiners:(C,C)=>R 该函数主要是进行分区之间合并,此时是将两个C合并为一个C,例如两个C:(Int)进行相加之后得到一个R:(Int)
参数:partitioner:自定义分区数,默认是hashPartitioner
参数:mapSideCombine:Boolean=true 该参数是设置是否在map端进行combine操作,为了减小传输量,很多 combine 可以在 map 端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,
参数:serializerClass: String = null,传输需要序列化,用户可以自定义序列化类
1 | val ls3 = List(("001", "011"), ("001","012"), ("002", "011"), ("002", "013"), ("002", "014")) |
重难 Action算子
takeOrdered
按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。需要注意的是 takeOrdered 使用隐式参数进行隐式转换,以下为其源码。所以在使用自定义排序时,需要继承 Ordering[T] 实现自定义比较器,然后将其作为隐式参数引入。1
2
3
4
5
6
7
8
9
10
11// 继承 Ordering[T],实现自定义比较器,按照 value 值的长度进行排序
class CustomOrdering extends Ordering[(Int, String)] {
override def compare(x: (Int, String), y: (Int, String)): Int
= if (x._2.length > y._2.length) 1 else -1
}
val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
// 引入隐式默认值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list).takeOrdered(5)
//输出
Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)
take(n)
将RDD中的前 n 个元素作为一个 array 数组返回,是无序的。
first
返回 RDD 中的第一个元素,等价于 take(1)。
top(num:Int)(implicit ord:Ordering[T]):Array[T]
默认返回最大的k个元素,可以定义排序的方式Ordering[T]。1
2
3
4
5
6
7
8
9
10class CustomOrdering extends Ordering[Int] {
override def compare(x: Int, y: Int): Int
= if (x > y) -1 else 1
}
val list0 = List(3,5,1,6,2)
// 引入隐式默认值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list0).top(5)
//输出
Array(1,2,3,5,6)