diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala index 7248a2d3f056e..f0eef9ae1cbb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala @@ -426,8 +426,10 @@ final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends DataFram import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val session = df.sparkSession - val canUseV2 = lookupV2Provider().isDefined || - df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined + val canUseV2 = lookupV2Provider().isDefined || (df.sparkSession.sessionState.conf.getConf( + SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined && + !df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME) + .isInstanceOf[DelegatingCatalogExtension]) session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 5e7feec149c97..da8aad16f55d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.{SqlApiConf, SQLConf} -import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType} class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { @@ -158,7 +157,6 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } test("disable bucketing on collated string column") { - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) def createTable(bucketColumns: String*): Unit = { val tableName = "test_partition_tbl" withTable(tableName) { @@ -760,7 +758,6 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } test("disable partition on collated string column") { - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) def createTable(partitionColumns: String*): Unit = { val tableName = "test_partition_tbl" withTable(tableName) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 7bbb6485c273f..fe078c5ae4413 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite "and a same-name temp view exist") { withTable("same_name") { withTempView("same_name") { - val format = spark.sessionState.conf.defaultDataSourceName - sql(s"CREATE TABLE same_name(id LONG) USING $format") + sql(s"CREATE TABLE same_name(id LONG) USING $v2Format") spark.range(10).createTempView("same_name") spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name") checkAnswer(spark.table("same_name"), spark.range(10).toDF()) @@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite assert(tableInfo.properties().get("provider") === v2Format) } } + + test("SPARK-49246: saveAsTable with v1 format") { + withTable("t") { + sql("CREATE TABLE t(c INT) USING csv") + val df = spark.range(10).toDF() + df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t") + verifyTable("t", df) + } + } } class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index ff944dbb805cb..2254abef3fcb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating if (tables.containsKey(ident)) { tables.get(ident) } else { - // Table was created through the built-in catalog - super.loadTable(ident) match { - case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW => v1Table - case t => - val table = newTable(t.name(), t.schema(), t.partitioning(), t.properties()) - addTable(ident, table) - table - } + // Table was created through the built-in catalog via v1 command, this is OK as the + // `loadTable` should always be invoked, and we set the `tableCreated` to pass validation. + tableCreated.set(true) + super.loadTable(ident) } }