Spark算子

This is about Spark

Posted by PsycheLee on 2015-09-08

Spark算子

Transformation

mapValues

K-V 对map的V进行操作

eg :对每个V加1

1
2
3
val rdd = sc.parallelize(List(("x",1),("ys",3),("ab",5),("ab",6)))
//源码实现
rdd.map({case(k,v)=>(k,v+1)}).foreach(println)
1
2
3
4
5
6
rdd.mapValues(_+1).foreach(println)

(ab,7)
(x,2)
(ys,4)
(ab,6)

flatMapValues

K-V 对V进行操作, 把一个pair里的values变成一个数组,然后k-v1 k-v2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val rdd = sc.parallelize(Array(("spark","1,2,3"),("hadoop","4,5,6"),("flink","7,8")))
//源码实现
rdd.flatMap({case (k,v)=>v.split(",").map(x=>(k,x))}).foreach(println)
//使用
rdd.flatMapValues(v =>v.split(",")).foreach(println)
//结果
(spark,1)
(spark,2)
(spark,3)
(hadoop,4)
(hadoop,5)
(hadoop,6)
(flink,7)
(flink,8)
//如果直接v=>v 不做处理, v中的","也会被映射

keys

返回K-V中所有的key

1
2
3
4
5
//源码
def keys: RDD[K] = self.map(_._1)

rdd.keys.collect
(spark, hadoop, flink)

values

返回rdd中每个tuple的value

1
2
3
4
5
6
7
8
//源码
def values: RDD[V] = self.map(_._2)

rdd.values.foreach(println)

1,2,3
4,5,6
7,8

keyBy

单value转成(k,v)

1
2
3
4
5
6
7
8
9
10
val rdd = sc.parallelize(List("a","ab","ccdd","dde"))
//源码实现
rdd.map(x=>(x.length,x)).foreach(println)
//使用
rdd.keyBy(_.length).foreach(println)

(1,a)
(2,ab)
(4,ccdd)
(3,dde)

reduceByKey

1
2
入参: func: (V, V) => V)
返回: RDD[(K, V)]

有shuffle, 被拆成2个stage

底层走combiner(预聚合), 愈合后的数据走shuffle

1
2
3
4
5
6
7
val rdd = sc.parallelize(List("ab,ac","ac,bb,aa","bb"))
rdd.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).foreach(println)

(aa,1)
(ac,2)
(ab,1)
(bb,2)

groupByKey

1
返回RDD[(K, Iterable[V])]

有shuffle, 但是不会做combiner

1
2
3
4
5
6
7
val rdd = sc.parallelize(List("ab,ac","ac,bb,aa","bb"))
rdd.flatMap(_.split(",")).map((_,1)).groupByKey().foreach(println)

(aa,CompactBuffer(1))
(ac,CompactBuffer(1, 1))
(ab,CompactBuffer(1))
(bb,CompactBuffer(1, 1))

groupBy

返回: RDD[(K, Iterable[T])]

作用于单value, 可自定义分组规则

1
2
3
4
5
val rdd = sc.parallelize(1 to 10)
rdd.groupBy(x=>x%2==0).map(x => if (x._1)("even",x._2) else ("odd",x._2)).foreach(println)

(odd,CompactBuffer(1, 3, 5, 7, 9))
(even,CompactBuffer(2, 4, 6, 8, 10))

底层使用map把单value转成(k,v)再使用groupByKey实现分组

1
2
3
4
5
6
7
8
9
10
val rdd = sc.parallelize(List("ab","ac","ac","bb","aa","bb"))
//源码实现
rdd.map(x=>(x,null)).groupByKey().foreach(println)
//使用
rdd.groupBy(x=>x).foreach(println)

(aa,CompactBuffer(aa))
(ac,CompactBuffer(ac, ac))
(ab,CompactBuffer(ab))
(bb,CompactBuffer(bb, bb))

sortBy

排序, 默认升序(第二个参数true)

1
2
3
4
5
6
7
8
9
10
11
val rdd = sc.parallelize(List(("jim",20),("tom",40),("abby",10)))
rdd.sortBy(x=>x._1,false).foreach(println)

(tom,15)
(jim,20)
(abby,10)

rdd.sortBy(x=>x._2,false).foreach(println)
(jim,20)
(tom,15)
(abby,10)

sortByKey

只针对K-V类型, 根据K排序

1
2
3
4
5
6
val rdd = sc.parallelize(List(("jim",20),("tom",40),("abby",10)))
rdd.sortByKey()foreach(println)

(abby,10)
(jim,20)
(tom,15)

distinct