spark执行流程中,包括transformation,action两种操作。action切分job,transformation中的shuffle切分stage。
以spark自带examples里的的GroupByTest
为例,进行分析
代码:
1 | object GroupByTest { |
1 | bin/spark-submit --master spark://localhost:7077 --jars /opt/spark/examples/jars/spark-examples_2.11-2.3.1.jar --class org.apache.spark.examples.GroupByTest 2 1000 1000 2 |
运行完后,进入18080端口,查看Jobs
如下:
可见,被两次action操作——count()
切割为两个job,
重点看Job Id 1,
job被groupByKey
切分为两个stage。stage 1 进行shuffle write,stage 2 进行shuffle wirte
不执行count(), 直接执行println(pairs1.toDebugString)
,输出1
2(2) MapPartitionsRDD[1] at flatMap at GroupByTest.scala:40 [Memory Deserialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:40 [Memory Deserialized 1x Replicated]
可见输出并不完整(因为没有action)
先执行count,再运行println(pairs1.toDebugString)
,输出1
2
3(2) MapPartitionsRDD[1] at flatMap at GroupByTest.scala:40 [Memory Deserialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 2.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:40 [Memory Deserialized 1x Replicated]
输出少ShuffledRDD
将groupByKey后的rdd重新赋值1
2
3val groupedRdd = pairs1.groupByKey()
println(groupedRdd.count())
println(groupedRdd.toDebugString)
输出1
2
3
4(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:53 []
+-(2) MapPartitionsRDD[1] at flatMap at GroupByTest.scala:40 []
| CachedPartitions: 2; MemorySize: 2.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:40 []
RDD.toDebugString的作用:A description of this RDD and its recursive dependencies for debugging.
总结:
两次count()将application分为两个job,
job0由第一个action——count()触发,只包含一个stage.包含 numMappers
个ResultTask。因为执行了cache,会将MapPartitionsRDD缓存到executor的内存
如下图:
task 执行完后,driver 收集每个 task 的执行结果,然后进行 sum()。job0结束
job1由第二个action——count()触发,包含两个stage(被groupByKey切分).
stage1 包含 numMappers
个ShuffleMapTask,进行shuffle write
stage2 包含 numReducers
个ResultTask,进行shuffle read,count
task 执行完后,driver 收集每个 task 的执行结果,然后进行 sum()。job1结束
参考