From d9f0fccd967b5c8686353d524d2b31e27b7a473b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 8 Dec 2023 12:54:20 -0800 Subject: [PATCH] [SPARK-46332][SQL] Migrate `CatalogNotFoundException` to the error class `CATALOG_NOT_FOUND` ### What changes were proposed in this pull request? In the PR, I propose to migrate the `CatalogNotFoundException` exception to the new error class `CATALOG_NOT_FOUND`, improve the format of the exception message, and prohibit creation of the exception without the error class. ### Why are the changes needed? This is a part of the migration process onto error classes and new error framework. The changes improve user experience w/ Spark SQL, and make `CatalogNotFoundException` consistent to other Spark exceptions. ### Does this PR introduce _any_ user-facing change? Yes, if user's code depends on the error message format of `CatalogNotFoundException`. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44259 from MaxGekk/catalog-plugin-not-found. Authored-by: Max Gekk Signed-off-by: Dongjoon Hyun --- R/pkg/tests/fulltests/test_sparkSQL.R | 5 +---- .../utils/src/main/resources/error/error-classes.json | 6 ++++++ .../test/scala/org/apache/spark/sql/CatalogSuite.scala | 6 +++--- docs/sql-error-conditions.md | 6 ++++++ .../connector/catalog/CatalogNotFoundException.scala | 10 +++++++--- .../apache/spark/sql/connector/catalog/Catalogs.scala | 2 +- .../apache/spark/sql/errors/QueryExecutionErrors.scala | 7 +++++-- .../sql/connector/catalog/CatalogLoadingSuite.java | 7 ++----- .../sql/catalyst/analysis/TableLookupCacheSuite.scala | 6 +++--- .../sql/connector/catalog/LookupCatalogSuite.scala | 5 +++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 6 ++++-- .../execution/command/AlignAssignmentsSuiteBase.scala | 5 +++-- .../sql/execution/command/PlanResolutionSuite.scala | 9 ++++----- 13 files changed, 48 insertions(+), 32 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index f2bef7a004467..0d96f708a544f 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -4103,10 +4103,7 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", { expect_equal(currentCatalog(), "spark_catalog") expect_error(setCurrentCatalog("spark_catalog"), NA) expect_error(setCurrentCatalog("zxwtyswklpf"), - paste0("Error in setCurrentCatalog : ", - "org.apache.spark.sql.connector.catalog.CatalogNotFoundException: ", - "Catalog 'zxwtyswklpf' plugin class not found: ", - "spark.sql.catalog.zxwtyswklpf is not defined")) + "[CATALOG_NOT_FOUND]*`zxwtyswklpf`*") catalogs <- collect(listCatalogs()) }) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 7a672fa5e557a..62d10c0d34cb3 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -407,6 +407,12 @@ ], "sqlState" : "22003" }, + "CATALOG_NOT_FOUND" : { + "message" : [ + "The catalog not found. Consider to set the SQL config to a catalog plugin." + ], + "sqlState" : "42P08" + }, "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" : { "message" : [ "Checkpoint block not found!", diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala index cefa63ecd353e..d646fad00c075 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala @@ -66,10 +66,10 @@ class CatalogSuite extends RemoteSparkSession with SQLHelper { val catalogs = spark.catalog.listCatalogs().collect() assert(catalogs.length == 1) assert(catalogs.map(_.name) sameElements Array("spark_catalog")) - val message = intercept[SparkException] { + val exception = intercept[SparkException] { spark.catalog.setCurrentCatalog("notExists") - }.getMessage - assert(message.contains("plugin class not found")) + } + assert(exception.getErrorClass == "CATALOG_NOT_FOUND") spark.catalog.setCurrentCatalog("testcat") assert(spark.catalog.currentCatalog().equals("testcat")) val catalogsAfterChange = spark.catalog.listCatalogs().collect() diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index d97e2ceef4c2b..82befaae81df3 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -335,6 +335,12 @@ The value `` of the type `` cannot be cast to `` Fail to assign a value of `` type to the `` type column or variable `` due to an overflow. Use `try_cast` on the input value to tolerate overflow and return NULL instead. +### CATALOG_NOT_FOUND + +[SQLSTATE: 42P08](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +The catalog `` not found. Consider to set the SQL config `` to a catalog plugin. + ### CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND SQLSTATE: 56000 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala index d376b98afa415..4a8910fde4c5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala @@ -21,8 +21,12 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.Experimental @Experimental -class CatalogNotFoundException(message: String, cause: Throwable) - extends SparkException(message, cause) { +class CatalogNotFoundException( + errorClass: String, + messageParameters: Map[String, String], + cause: Throwable) + extends SparkException(errorClass, messageParameters, cause) { - def this(message: String) = this(message, null) + def this(errorClass: String, messageParameters: Map[String, String]) = + this(errorClass, messageParameters, null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala index 5a49883be4084..419191f8f9c00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala @@ -53,7 +53,7 @@ private[sql] object Catalogs { _pluginClassName } catch { case _: NoSuchElementException => - throw QueryExecutionErrors.catalogPluginClassNotFoundError(name) + throw QueryExecutionErrors.catalogNotFoundError(name) } val loader = Utils.getContextOrSparkClassLoader try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 24332479f1937..113f995968a0c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1811,9 +1811,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE cause = null) } - def catalogPluginClassNotFoundError(name: String): Throwable = { + def catalogNotFoundError(name: String): Throwable = { new CatalogNotFoundException( - s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined") + errorClass = "CATALOG_NOT_FOUND", + messageParameters = Map( + "catalogName" -> toSQLId(name), + "config" -> toSQLConf(s"spark.sql.catalog.$name"))) } def catalogPluginClassNotImplementedError(name: String, pluginClassName: String): Throwable = { diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java index e6c6a18623b34..238b8ac04e7e6 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java @@ -80,11 +80,8 @@ public void testLoadWithoutConfig() { SparkException exc = Assertions.assertThrows(CatalogNotFoundException.class, () -> Catalogs.load("missing", conf)); - Assertions.assertTrue( - exc.getMessage().contains("plugin class not found: spark.sql.catalog.missing is not defined"), - "Should complain that implementation is not configured"); - Assertions.assertTrue(exc.getMessage().contains("missing"), - "Should identify the catalog by name"); + Assertions.assertEquals(exc.getErrorClass(), "CATALOG_NOT_FOUND"); + Assertions.assertEquals(exc.getMessageParameters().get("catalogName"), "`missing`"); } @Test diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala index 2c4215e70287a..189509e317364 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -29,7 +29,8 @@ import org.scalatest.matchers.must.Matchers import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, InMemoryTable, InMemoryTableCatalog, Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryTable, InMemoryTableCatalog, Table} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ class TableLookupCacheSuite extends AnalysisTest with Matchers { @@ -60,8 +61,7 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers { when(catalogManager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { invocation.getArgument[String](0) match { case CatalogManager.SESSION_CATALOG_NAME => v2Catalog - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(catalogManager.v1SessionCatalog).thenReturn(v1Catalog) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala index 0db758d5147f0..49e119b56bc86 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -46,7 +47,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { val manager = mock(classOf[CatalogManager]) when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { val name = invocation.getArgument[String](0) - catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + catalogs.getOrElse(name, throw QueryExecutionErrors.catalogNotFoundError(name)) }) when(manager.currentCatalog).thenReturn(sessionCatalog) when(manager.v2SessionCatalog).thenReturn(sessionCatalog) @@ -114,7 +115,7 @@ class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog wit val manager = mock(classOf[CatalogManager]) when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { val name = invocation.getArgument[String](0) - catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + catalogs.getOrElse(name, throw QueryExecutionErrors.catalogNotFoundError(name)) }) when(manager.currentCatalog).thenReturn(catalogs("prod")) when(manager.currentNamespace).thenReturn(Array.empty[String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2b93e8bd3200b..302a8e5d41db1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2763,8 +2763,10 @@ class DataSourceV2SQLSuiteV1Filter exception = intercept[CatalogNotFoundException] { sql("SET CATALOG not_exist_catalog") }, - errorClass = null, - parameters = Map.empty) + errorClass = "CATALOG_NOT_FOUND", + parameters = Map( + "catalogName" -> "`not_exist_catalog`", + "config" -> "\"spark.sql.catalog.not_exist_catalog\"")) } test("SPARK-35973: ShowCatalogs") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala index 2bc747c0abee4..ebb719a35a8bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala @@ -30,8 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogV2Util, Column, ColumnDefaultValue, Identifier, SupportsRowLevelOperations, TableCapability, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, ColumnDefaultValue, Identifier, SupportsRowLevelOperations, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType} @@ -177,7 +178,7 @@ abstract class AlignAssignmentsSuiteBase extends AnalysisTest { invocation.getArguments()(0).asInstanceOf[String] match { case "testcat" => v2Catalog case CatalogManager.SESSION_CATALOG_NAME => v2SessionCatalog - case name => throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(manager.currentCatalog).thenReturn(v2Catalog) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 69b3285fc7f12..db6c7175c526f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCom import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Column, ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -194,8 +195,7 @@ class PlanResolutionSuite extends AnalysisTest { testCat case CatalogManager.SESSION_CATALOG_NAME => v2SessionCatalog - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(manager.currentCatalog).thenReturn(testCat) @@ -211,8 +211,7 @@ class PlanResolutionSuite extends AnalysisTest { invocation.getArguments()(0).asInstanceOf[String] match { case "testcat" => testCat - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + case name => throw QueryExecutionErrors.catalogNotFoundError(name) } }) when(manager.currentCatalog).thenReturn(v2SessionCatalog)