Spark DAGScheduler stage提交
<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=37445077&page=1" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>
DAGScheduler 处事作业提交事件
用参数finalStage 调用 submitStage() 方法
private [scheduler] def handleJobSubmitted (jobId : Int ,
finalRDD : RDD [_],
func : (TaskContext , Iterator [_]) => _,
partitions : Array [Int ],
callSite : CallSite ,
listener : JobListener ,
properties : Properties ) {
var finalStage : ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e : Exception =>
logWarning(" Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob (jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo(" Got job %s (%s) with %d output partitions" .format(
job.jobId, callSite.shortForm, partitions.length))
logInfo(" Final stage: " + finalStage + " (" + finalStage.name + " )" )
logInfo(" Parents of final stage: " + finalStage.parents)
logInfo(" Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs + = job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart (job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()
}
验证当前stage所在的job是活动的才继续(如Job取消了,再继续也没有意义)
对waitingStages,runningStages,failedStages 进行验证(stage不能重复提交)
stage提交之前先验证当前stage的上级stage是否为空,只有为空的才可以提交
当ShuffleMapStage的所有partition处理完成后,会设置isAvailable为真,也就是该stage已被处理完成,不需要再处理了,这时他的子Stage就可以提交了
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage (stage : Stage ) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(" submitStage(" + stage + " )" )
if (! waitingStages(stage) && ! runningStages(stage) && ! failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug(" missing: " + missing)
if (missing.isEmpty) {
logInfo(" Submitting " + stage + " (" + stage.rdd + " ), which has no missing parents" )
submitMissingTasks(stage, jobId.get)
} else {
for (parent < - missing) {
submitStage(parent)
}
waitingStages + = stage
}
}
} else {
abortStage(stage, " No active job for stage " + stage.id, None )
}
}
查找上级Stage
内部会递归一直找到祖先Stage
(在这里判断)当ShuffleMapStage的所有partition处理完成后,会设置isAvailable为真,也就是该stage已被处理完成,不需要再处理了,这时他的子Stage就可以提交了
getShuffleMapStage 跟 FinalStage的构建,那时的Stage划分一样,并且在FinalStage已对ShuffleDenpendency的Stage进行了缓存,这时直接根据ShuffleId匹配,直接用
private def getMissingParentStages (stage : Stage ): List [Stage ] = {
val missing = new HashSet [Stage ]
val visited = new HashSet [RDD [_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack [RDD [_]]
def visit (rdd : RDD [_]) {
if (! visited(rdd)) {
visited + = rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil )
if (rddHasUncachedPartitions) {
for (dep < - rdd.dependencies) {
dep match {
case shufDep : ShuffleDependency [_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (! mapStage.isAvailable) {
missing + = mapStage
}
case narrowDep : NarrowDependency [_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}