diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 7c03bad90ebbc..58fff2d4a1a29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -177,7 +177,10 @@ object SQLExecution extends Logging { shuffleIds.foreach { shuffleId => queryExecution.shuffleCleanupMode match { case RemoveShuffleFiles => - SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) + // Same as what we do in ContextCleaner.doCleanupShuffle, but do not unregister + // the shuffle on MapOutputTracker, so that stage retries would be triggered. + // Set blocking to Utils.isTesting to deflake unit tests. + sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting) case SkipMigration => SparkEnv.get.blockManager.migratableResolver.addShuffleToSkip(shuffleId) case _ => // this should not happen diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 3608e7c920767..974be2f627998 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -319,7 +319,7 @@ class QueryExecutionSuite extends SharedSparkSession { val blockManager = spark.sparkContext.env.blockManager blockManager.diskBlockManager.getAllBlocks().foreach { case ShuffleIndexBlockId(shuffleId, _, _) => - spark.sparkContext.env.shuffleManager.unregisterShuffle(shuffleId) + spark.sparkContext.shuffleDriverComponents.removeShuffle(shuffleId, true) case _ => } }