Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Optimized Write #1198

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,16 @@ trait DeltaConfigsBase extends DeltaLogging {
_ => true,
"needs to be a boolean.")

/**
* Whether this table will automagically optimize the layout of files while writing data.
*/
val OPTIMIZE_WRITE = buildConfig[Boolean](
"autoOptimize.optimizeWrite",
"false",
_.toBoolean,
_ => true,
"needs to be a boolean.")

/**
* The number of columns to collect stats on for data skipping. A value of -1 means collecting
* stats for all columns. Updating this conf does not trigger stats re-collection, but redefines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.delta.util.DeltaShufflePartitionsUtil
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, HadoopFsRelation, LogicalRelation, WriteJobStatsTracker}
import org.apache.spark.sql.functions.{col, to_json}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -294,6 +295,8 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

val isOptimize = isOptimizeCommand(queryExecution.analyzed)

SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(
outputPath.toString,
Expand All @@ -302,7 +305,9 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

val empty2NullPlan = convertEmptyToNullIfNeeded(queryExecution.executedPlan,
partitioningColumns, constraints)
val physicalPlan = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val optimizeWritePlan =
applyOptimizeWriteIfNeeded(spark, empty2NullPlan, partitionSchema, isOptimize)
val physicalPlan = DeltaInvariantCheckerExec(optimizeWritePlan, constraints)

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

Expand Down Expand Up @@ -359,4 +364,32 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

resultFiles.toSeq ++ committer.changeFiles
}

private def applyOptimizeWriteIfNeeded(
spark: SparkSession,
physicalPlan: SparkPlan,
partitionSchema: StructType,
isOptimize: Boolean): SparkPlan = {
val optimizeWriteEnabled = !isOptimize &&
spark.sessionState.conf.getConf(DeltaSQLConf.OPTIMIZE_WRITE_ENABLED)
.getOrElse(DeltaConfigs.OPTIMIZE_WRITE.fromMetaData(metadata))
if (optimizeWriteEnabled) {
val planWithoutTopRepartition =
DeltaShufflePartitionsUtil.removeTopRepartition(physicalPlan)
val partitioning = DeltaShufflePartitionsUtil.partitioningForRebalance(
physicalPlan.output, partitionSchema, spark.sessionState.conf.numShufflePartitions)
OptimizeWriteExchangeExec(partitioning, planWithoutTopRepartition)
} else {
physicalPlan
}
}

private def isOptimizeCommand(plan: LogicalPlan): Boolean = {
val leaves = plan.collectLeaves()
leaves.size == 1 && leaves.head.collect {
case LogicalRelation(HadoopFsRelation(
index: TahoeBatchFileIndex, _, _, _, _, _), _, _, _) =>
index.actionType.equals("Optimize")
}.headOption.getOrElse(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,34 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val OPTIMIZE_WRITE_ENABLED =
buildConf(key = "optimizeWrite.enabled")
.internal()
.doc("Enable optimize write.")
.booleanConf
.createOptional

val OPTIMIZE_WRITE_BIN_SIZE =
buildConf(key = "optimizeWrite.binSize")
.internal()
.doc("File size hint for optimize write.")
.longConf
.createWithDefault(134217728)

val OPTIMIZE_WRITE_SMALL_PARTITION_FACTOR =
buildConf("optimizeWrite.smallPartitionFactor")
.internal()
.doc("Factor used to coalesce partitions for optimize write.")
.doubleConf
.createWithDefault(0.5)

val OPTIMIZE_WRITE_MERGED_PARTITION_FACTOR =
buildConf("optimizeWrite.mergedPartitionFactor")
.internal()
.doc("Factor used to rebalance partitions for optimize write.")
.doubleConf
.createWithDefault(1.2)

val DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS =
buildConf("alterTable.changeColumn.checkExpressions")
.internal()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.util

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RoundRobinPartitioning}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec}
import org.apache.spark.sql.execution.{CoalesceExec, PartialReducerPartitionSpec, SparkPlan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{MapOutputTrackerMaster, SparkEnv}

import scala.collection.mutable.ArrayBuffer

/**
* Utility functions to rebalance shuffle partitions for OptimizeWrite.
*/
object DeltaShufflePartitionsUtil {
sezruby marked this conversation as resolved.
Show resolved Hide resolved

// scalastyle:off line.size.limit
/**
* Splits the skewed partition based on the map size and the target partition size
* after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split.
*
* The function is copied from Spark 3.2:
* https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L376
* EDIT: Configurable smallPartitionFactor and mergedPartitionFactor.
*
* @shuffleId Shuffle ID for retrieve partition info.
* @reducerId Reducer ID (Partition ID) for retrieve partition info.
* @targetSize Target bin size for splitting.
* @smallPartitionFactor Threshold to avoid too small partition. If a partial partition is
* smaller than targetSize * smallPartitionFactor, merge it to an adjacent
* partition.
* @mergedPartitionFactor Threshold to avoid too large partition. If a partial partition is
* larger than targetSize * mergedPartitionFactor, do not merge other
* adjacent partitions to it.
*/
// scalastyle:on
def createSkewPartitionSpecs(
shuffleId: Int,
reducerId: Int,
targetSize: Long,
smallPartitionFactor: Double,
mergedPartitionFactor: Double): Option[Seq[PartialReducerPartitionSpec]] = {
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId)
if (mapPartitionSizes.exists(_ < 0)) return None
val mapStartIndices = splitSizeListByTargetSize(
mapPartitionSizes,
targetSize,
smallPartitionFactor,
mergedPartitionFactor)
if (mapStartIndices.length > 1) {
Some(mapStartIndices.indices.map { i =>
val startMapIndex = mapStartIndices(i)
val endMapIndex = if (i == mapStartIndices.length - 1) {
mapPartitionSizes.length
} else {
mapStartIndices(i + 1)
}
val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an update on the Spark side to make this more performant: apache/spark@9e1d00c

Is the main reason to not just use the Spark versions directly to add the configurable mergedPartitionFactor, or to not rely on those internal helper functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's for both; make configurable and not rely on spark internal util which can change frequently. But I'm also okay to use Spark one. I'm just waiting for databricks team to get back on this. (and auto compaction too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/optimize-write-for-apache-spark

FYI, the code is being used in prod at least 6 months and no major issue so far.
We might need to increase binSize config / PARQUET COMPRESSION RATIO for larger files. (60~80mb avg for now)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I tried building and using this myself and I don't seem to be getting my large partitions split, gonna add some more logging to try to see why/what's happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's the case this approach (Spark rebalance logic) cannot handle. Because the unit of rebalancing is determined by the source partition layout. If the data is skewed or only few number of partitions, it cannot be rebalanced properly.
Please refer the figure in #1158 (comment)

If the source dataframe consists of all same key=1 and 10GB of 1 partition, it cannot be split.
e.g. df.repartition(1, col("key")).select(col("key")).write.format("delta").parquet("path")

I removed redundant repartition execution plan on top of the child plan, but it cannot cover all the cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The smallest unit of splitting is the single task map output for a single reducer ID right? That wasn't what I was seeing, where I had all my map tasks shuffle write < 1 GB, but I had some reducer tasks reading > 10 GB of shuffle data. After digging through how map output sizes work a little bit, I'm gonna try again and see if this is some weird side effect of HighlyCompressedMapStatus for large numbers of reducing partitions (> 2000 by default). Only thought is some weird effect of a lot of small blocks being "averaged out" to compute map output sizes per reducer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That appeared to be it. I dropped my shuffle partitions to 1k and it behaved as I would expect. Not sure how common my case would be with particular types of data skew, but maybe it would be good to log a warning if your shuffle partitions exceeds the threshold for using HighlyCompressedMapStatus, it can limit the ability to properly split skewed partitions, because average map output size is used for individual map reducer outputs, instead of the real value.

PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
})
} else {
None
}
}

// scalastyle:off line.size.limit
/**
* Given a list of size, return an array of indices to split the list into multiple partitions,
* so that the size sum of each partition is close to the target size. Each index indicates the
* start of a partition.
*
* The function is copied from Spark 3.2:
* https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L319
* EDIT: Configurable smallPartitionFactor and mergedPartitionFactor.
*
*
* @targetSize Target bin size for splitting.
* @smallPartitionFactor Threshold to avoid too small partition. If a partial partition is
* smaller than targetSize * smallPartitionFactor, merge it to an adjacent
* partition.
* @mergedPartitionFactor Threshold to avoid too large partition. If a partial partition is
* larger than targetSize * mergedPartitionFactor, do not merge other
* adjacent partitions to it.
*/
// scalastyle:on
// Visible for testing
private[sql] def splitSizeListByTargetSize(
sezruby marked this conversation as resolved.
Show resolved Hide resolved
sizes: Seq[Long],
targetSize: Long,
smallPartitionFactor: Double,
mergedPartitionFactor: Double): Array[Int] = {
val partitionStartIndices = ArrayBuffer[Int]()
partitionStartIndices += 0
var i = 0
var currentPartitionSize = 0L
var lastPartitionSize = -1L

def tryMergePartitions() = {
// When we are going to start a new partition, it's possible that the current partition or
// the previous partition is very small and it's better to merge the current partition into
// the previous partition.
val shouldMergePartitions = lastPartitionSize > -1 &&
((currentPartitionSize + lastPartitionSize) < targetSize * mergedPartitionFactor ||
(currentPartitionSize < targetSize * smallPartitionFactor ||
lastPartitionSize < targetSize * smallPartitionFactor))
if (shouldMergePartitions) {
// We decide to merge the current partition into the previous one, so the start index of
// the current partition should be removed.
partitionStartIndices.remove(partitionStartIndices.length - 1)
lastPartitionSize += currentPartitionSize
} else {
lastPartitionSize = currentPartitionSize
}
}

while (i < sizes.length) {
// If including the next size in the current partition exceeds the target size, package the
// current partition and start a new partition.
if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
tryMergePartitions()
partitionStartIndices += i
currentPartitionSize = sizes(i)
} else {
currentPartitionSize += sizes(i)
}
i += 1
}
tryMergePartitions()
partitionStartIndices.toArray
}

// scalastyle:off line.size.limit
/**
* Get the map size of the specific shuffle and reduce ID. Note that, some map outputs can be
* missing due to issues like executor lost. The size will be -1 for missing map outputs and the
* caller side should take care of it.
*
* The function is copied from Spark 3.2:
* https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L365
*/
// scalastyle:on
private[sql] def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = {
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
mapOutputTracker.shuffleStatuses(shuffleId).withMapStatuses(_.map { stat =>
if (stat == null) -1 else stat.getSizeForBlock(partitionId)
})
}

private[sql] def removeTopRepartition(plan: SparkPlan): SparkPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain each of the cases with comments... these are pretty complicated on to reason about

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I'll add some comments & classdoc and try to improve documentation in #1158

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas I added some example for OptimizeWrite partitioned data.
#1158 (comment)
Could you have a look and let me know if there is something unclear?

For non-partitioned data, here I use RoundRobinPartitioning but it could be inefficient in some cases as it distributes all rows into all partitions which is unnecessary. I think we could improve it later.

plan match {
case p@AdaptiveSparkPlanExec(inputPlan: ShuffleExchangeExec, _, _, _, _)
if !inputPlan.shuffleOrigin.equals(ENSURE_REQUIREMENTS) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand this and map it spark code. what you are trying to do is remove a shuffle if it wasnt added automatically by the planner to ensure requirement. Doesnt that mean if user asked for repartitioning by a certain way with an explicit programmatic API (e.g., DataFrame.repartition) we will be ignoring that completely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's because for Optimize Write, we add repartition(partitionColumns)( + rebalancing) at top of the plan, so unnecessary repartition(n) or coalesce(n) could be removed.

#1158 - Things to do - 3 for detail

p.copy(inputPlan = inputPlan.child)
case ShuffleExchangeExec(_, child, shuffleOrigin)
if !shuffleOrigin.equals(ENSURE_REQUIREMENTS) =>
child
case AdaptiveSparkPlanExec(inputPlan: CoalesceExec, _, _, _, _) =>
inputPlan.child
case CoalesceExec(_, child) =>
child
case _ =>
plan
}
}

private[sql] def partitioningForRebalance(
outputColumns: Seq[Attribute],
partitionSchema: StructType,
numShufflePartitions: Int): Partitioning = {
if (partitionSchema.fields.isEmpty) {
// Non-partitioned data.
RoundRobinPartitioning(numShufflePartitions)
} else {
// Partitioned data.
val partitionColumnsExpr = partitionSchema.fields.map { f =>
outputColumns.find(c => c.name.equals(f.name)).get
}
HashPartitioning(partitionColumnsExpr, numShufflePartitions)
}
}

}
Loading