Skip to content

Commit

Permalink
Skip SortExec for partitioning columns in OPTIMIZE
Browse files Browse the repository at this point in the history
Signed-off-by: Eunjin Song <sezruby@gmail.com>
  • Loading branch information
sezruby committed Oct 18, 2022
1 parent 94fdfc2 commit 13927b5
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class DeltaInvariantChecker(
object DeltaInvariantCheckerStrategy extends SparkStrategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case DeltaInvariantChecker(child, constraints) =>
DeltaInvariantCheckerExec(planLater(child), constraints) :: Nil
DeltaInvariantCheckerExec(planLater(child), constraints, None) :: Nil
case _ => Nil
}
}
Expand All @@ -64,7 +64,8 @@ object DeltaInvariantCheckerStrategy extends SparkStrategy {
*/
case class DeltaInvariantCheckerExec(
child: SparkPlan,
constraints: Seq[Constraint]) extends UnaryExecNode {
constraints: Seq[Constraint],
childOrdering: Option[Seq[SortOrder]]) extends UnaryExecNode {

override def output: Seq[Attribute] = child.output

Expand All @@ -83,7 +84,7 @@ case class DeltaInvariantCheckerExec(
}
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputOrdering: Seq[SortOrder] = childOrdering.getOrElse(child.outputOrdering)

override def outputPartitioning: Partitioning = child.outputPartitioning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, HadoopFsRelation, LogicalRelation, WriteJobStatsTracker}
import org.apache.spark.sql.functions.{col, to_json}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -348,16 +348,22 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints
val isOptimize = isOptimizeCommand(queryExecution.analyzed)

SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(
outputPath.toString,
Map.empty,
output)

val sortOrder = if (isOptimize && partitioningColumns.nonEmpty) {
Some(partitioningColumns.map(SortOrder(_, Ascending)))
} else {
None
}
val empty2NullPlan = convertEmptyToNullIfNeeded(queryExecution.executedPlan,
partitioningColumns, constraints)
val physicalPlan = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val physicalPlan = DeltaInvariantCheckerExec(empty2NullPlan, constraints, sortOrder)

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

Expand Down Expand Up @@ -414,4 +420,13 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

resultFiles.toSeq ++ committer.changeFiles
}

private[sql] def isOptimizeCommand(plan: LogicalPlan): Boolean = {
val leaves = plan.collectLeaves()
leaves.size == 1 && leaves.head.collect {
case LogicalRelation(HadoopFsRelation(
index: TahoeBatchFileIndex, _, _, _, _, _), _, _, _) =>
index.actionType.equals("Optimize")
}.headOption.getOrElse(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.QueryExecutionListener

/**
* Base class containing tests for Delta table Optimize (file compaction)
Expand Down Expand Up @@ -562,6 +566,55 @@ class OptimizeCompactionScalaSuite extends OptimizeCompactionSuiteBase
DeltaTable.forPath(path).optimize().executeCompaction()
}
}

test("test optimize plan for isOptimizeCommand") {
class TestQueryListener extends QueryExecutionListener {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
assert(false, "execution failure")
}
var plan: Option[LogicalPlan] = None
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
if (funcName.equals("deltaTransactionalWrite")) {
plan = Some(qe.analyzed)
}
}
}

val listener = new TestQueryListener
spark.listenerManager.register(listener)

val numPartitions = 2
withTempDir { tempDir =>
val path = tempDir.getAbsolutePath
spark.range(100)
.withColumn("pCol", rand() % numPartitions)
.repartition(10)
.write
.format("delta")
.partitionBy("pCol")
.save(path)

val deltaLog = DeltaLog.forTable(spark, path)
val txn = deltaLog.startTransaction()

{
val df = spark.read.format("delta").load(path).limit(10)
df.write.format("delta")
.partitionBy("pCol")
.mode("append")
.save(path)
val plan = listener.plan
assert(plan.isDefined && !txn.isOptimizeCommand(plan.get))
}

{
DeltaTable.forPath(path).optimize().executeCompaction()
val plan = listener.plan
assert(plan.isDefined && txn.isOptimizeCommand(plan.get))
}
spark.listenerManager.unregister(listener)
}
}
}


Expand Down

0 comments on commit 13927b5

Please sign in to comment.