Skip to content

Commit

Permalink
[SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable should not …
Browse files Browse the repository at this point in the history
…be changed by falling back to v1 command

### What changes were proposed in this pull request?

This is a followup of apache#47772 . The behavior of SaveAsTable should not be changed by switching v1 to v2 command. This is similar to apache#47995. For the case of `DelegatingCatalogExtension` we need it goes to V1 commands to be consistent with previous behavior.

### Why are the changes needed?

Behavior regression.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48019 from amaliujia/regress_v2.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan and amaliujia committed Sep 9, 2024
1 parent 3ed5a4d commit 37b39b4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,10 @@ final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends DataFram
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined ||
df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
val canUseV2 = lookupV2Provider().isDefined || (df.sparkSession.sessionState.conf.getConf(
SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
.isInstanceOf[DelegatingCatalogExtension])

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.{SqlApiConf, SQLConf}
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType}

class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
Expand Down Expand Up @@ -158,7 +157,6 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}

test("disable bucketing on collated string column") {
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
def createTable(bucketColumns: String*): Unit = {
val tableName = "test_partition_tbl"
withTable(tableName) {
Expand Down Expand Up @@ -760,7 +758,6 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}

test("disable partition on collated string column") {
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
def createTable(partitionColumns: String*): Unit = {
val tableName = "test_partition_tbl"
withTable(tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
val format = spark.sessionState.conf.defaultDataSourceName
sql(s"CREATE TABLE same_name(id LONG) USING $format")
sql(s"CREATE TABLE same_name(id LONG) USING $v2Format")
spark.range(10).createTempView("same_name")
spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
Expand Down Expand Up @@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite
assert(tableInfo.properties().get("provider") === v2Format)
}
}

test("SPARK-49246: saveAsTable with v1 format") {
withTable("t") {
sql("CREATE TABLE t(c INT) USING csv")
val df = spark.range(10).toDF()
df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t")
verifyTable("t", df)
}
}
}

class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating
if (tables.containsKey(ident)) {
tables.get(ident)
} else {
// Table was created through the built-in catalog
super.loadTable(ident) match {
case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW => v1Table
case t =>
val table = newTable(t.name(), t.schema(), t.partitioning(), t.properties())
addTable(ident, table)
table
}
// Table was created through the built-in catalog via v1 command, this is OK as the
// `loadTable` should always be invoked, and we set the `tableCreated` to pass validation.
tableCreated.set(true)
super.loadTable(ident)
}
}

Expand Down

0 comments on commit 37b39b4

Please sign in to comment.