action 与 transformation
learn spark by an example中提到了 transformation 和 action 两种操作,意为准换、行动。org.apache.spark.rdd.RDD
包含这两种算子。可以通过函数返回值来区分这两种算子:
transformation
:返回RDD,如map,flatMap,union等action
:返回具体的值,如count,collect等
很多资料显示,transformation延迟执行,action立即执行。这是为什么呢?
以map为例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27//org.apache.spark.rdd.RDD
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
/**
* An RDD that applies the provided function to every partition of the parent RDD.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}
可见,map算子只是new了一个新 MapPartitionsRDD
返回,并没有实际运算.
以下算子也是一样1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
再来看看action1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
/**
* Return an array that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
可以看到,action本质上都在调用SparkContext
的runJob
方法,在RDD的所有partition上运行一个job,返回一个数组,每个元素包含一个partition的结果1
2
3
4
5
6
7
8
9
10
11/**
* Run a job on all partitions in an RDD and return the results in an array.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
SparkContext
后面有空再讲
dependency
dependency表示RDD之间的依赖关系
RDD的构造函数里有dependency.可通过构造函数,判断是什么依赖
1 | abstract class RDD[T: ClassTag]( |
有时,构造函数里的dependency是 Nil
,这时可以通过 getDependencies
方法获得依赖类型
1 |
|
dependency类图如下
父RDD : 子RDD | possible dependency |
---|---|
1:1 | OneToOneDependency |
N:1 | N:1 NarrowDependency |
N:N | N:N NarrowDependency(笛卡尔) |
ShuffleDependency, 见 ShuffledRDD
有时,RDD会使用匿名内部类,不在上面的类图里面!如 笛卡尔积EDD CartesianRDD
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42private[spark]
class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
extends RDD[(T, U)](sc, Nil)
with Serializable {
val numPartitionsInRdd2 = rdd2.partitions.length
override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
override def getPreferredLocations(split: Partition): Seq[String] = {
val currSplit = split.asInstanceOf[CartesianPartition]
(rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
}
override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
...
}
NarrowDependency VS ShuffleDependency => FullDependency VS PartialDependency
NarrowDependency: 子rdd依赖父RDD中固定的Partition
ShuffleDependency: 子RDD对父RDD中的所有RDD都可能产生依赖.父RDD中的每个partition分成多个部分transformation到子RDD,reducebyKey等,
OneToOneDependency: 1对1,map,mapPartitions等
RangeDependency: 仅用于UnionRDD
PruneDependency: PartitionPruningRDD
prune意为修剪
例如: RDD通过range分区,DAG在key上有filter,range不覆盖key的分区不用发起任务
1 | /** |
有时一个transformation算子导致的dependency并不固定,如 PairRDDFunctions.cogroup
1 | class PairRDDFunctions[K, V](self: RDD[(K, V)]) |
当子RDD的partitioner
与父RDD的partitioner
相等时,产生OneToOneDependency
. 否则,ShuffleDependency
ShuffleDependency 左边的 RDD 中的 record 要求是 [K, V] 型的,经过 ShuffleDependency 后,包含相同 key 的 records 会被 aggregate 到一起,然后在 aggregated 的 records 上执行不同的计算逻辑。实际执行时很多 transformation() 如 groupByKey(),reduceByKey() 是边 aggregate 数据边执行计算逻辑的,因此共同之处就是 aggregate 同时 compute()。Spark 使用 combineByKeyWithClassTag
来实现这个 aggregate + compute() 的基础操作。1
2
3
4
5
6
7def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)
stage and task
如何划分stage,task?
从 finalRDD
往前,遇到 ShuffleDependency
就切分 Stage,遇 NarrowDependency
就加入该 Stage,每个 Stage 里 Task 的数量,由该 Stage 最后一个 RDD 的 partition 数量决定。如果 Stage 要产生 result,则里面的 Task 都是 ResultTask,否则都是 ShuffleMapTask。
pipeline
思想:数据用的时候再算,而且数据是流到要计算的位置
下图stage中,包含 map union partitionBy join
以笛卡尔积为例
一共6个 ResultTask.每个 Task 计算3个 RDD,读2个 data block。计算 CartesianRDD 的 partition,需要从2个RDD获取records,由于存在一个ResultTask里,不需要shuffle
不管是1:1,N:1,N:N,只要是NarrowDependency chain,就可以进行 pipeline
CartesianRDD 源码如下,很简单1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47private[spark]
class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
extends RDD[(T, U)](sc, Nil)
with Serializable {
val numPartitionsInRdd2 = rdd2.partitions.length
// m * n
override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
override def getPreferredLocations(split: Partition): Seq[String] = {
val currSplit = split.asInstanceOf[CartesianPartition]
(rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
}
override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
override def clearDependencies() {
super.clearDependencies()
rdd1 = null
rdd2 = null
}
}
再看看MapPartitionsRDD源码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22/**
* An RDD that applies the provided function to every partition of the parent RDD.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
//firstParent 父RDD中的第一个
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}
可见,compute调用父RDD的iterator方法,将父RDD或data block中的records一个个fetch过来