Skip to content

Commit

Permalink
[SPARK-46332][SQL] Migrate CatalogNotFoundException to the error cl…
Browse files Browse the repository at this point in the history
…ass `CATALOG_NOT_FOUND`

### What changes were proposed in this pull request?
In the PR, I propose to migrate the `CatalogNotFoundException` exception to the new error class `CATALOG_NOT_FOUND`, improve the format of the exception message, and prohibit creation of the exception without the error class.

### Why are the changes needed?
This is a part of the migration process onto error classes and new error framework. The changes improve user experience w/ Spark SQL, and make `CatalogNotFoundException` consistent to other Spark exceptions.

### Does this PR introduce _any_ user-facing change?
Yes, if user's code depends on the error message format of `CatalogNotFoundException`.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44259 from MaxGekk/catalog-plugin-not-found.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
MaxGekk authored and dongjoon-hyun committed Dec 8, 2023
1 parent 3224cdd commit d9f0fcc
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 32 deletions.
5 changes: 1 addition & 4 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -4103,10 +4103,7 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", {
expect_equal(currentCatalog(), "spark_catalog")
expect_error(setCurrentCatalog("spark_catalog"), NA)
expect_error(setCurrentCatalog("zxwtyswklpf"),
paste0("Error in setCurrentCatalog : ",
"org.apache.spark.sql.connector.catalog.CatalogNotFoundException: ",
"Catalog 'zxwtyswklpf' plugin class not found: ",
"spark.sql.catalog.zxwtyswklpf is not defined"))
"[CATALOG_NOT_FOUND]*`zxwtyswklpf`*")
catalogs <- collect(listCatalogs())
})

Expand Down
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@
],
"sqlState" : "22003"
},
"CATALOG_NOT_FOUND" : {
"message" : [
"The catalog <catalogName> not found. Consider to set the SQL config <config> to a catalog plugin."
],
"sqlState" : "42P08"
},
"CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" : {
"message" : [
"Checkpoint block <rddBlockId> not found!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ class CatalogSuite extends RemoteSparkSession with SQLHelper {
val catalogs = spark.catalog.listCatalogs().collect()
assert(catalogs.length == 1)
assert(catalogs.map(_.name) sameElements Array("spark_catalog"))
val message = intercept[SparkException] {
val exception = intercept[SparkException] {
spark.catalog.setCurrentCatalog("notExists")
}.getMessage
assert(message.contains("plugin class not found"))
}
assert(exception.getErrorClass == "CATALOG_NOT_FOUND")
spark.catalog.setCurrentCatalog("testcat")
assert(spark.catalog.currentCatalog().equals("testcat"))
val catalogsAfterChange = spark.catalog.listCatalogs().collect()
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ The value `<value>` of the type `<sourceType>` cannot be cast to `<targetType>`

Fail to assign a value of `<sourceType>` type to the `<targetType>` type column or variable `<columnName>` due to an overflow. Use `try_cast` on the input value to tolerate overflow and return NULL instead.

### CATALOG_NOT_FOUND

[SQLSTATE: 42P08](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

The catalog `<catalogName>` not found. Consider to set the SQL config `<config>` to a catalog plugin.

### CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND

SQLSTATE: 56000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.Experimental

@Experimental
class CatalogNotFoundException(message: String, cause: Throwable)
extends SparkException(message, cause) {
class CatalogNotFoundException(
errorClass: String,
messageParameters: Map[String, String],
cause: Throwable)
extends SparkException(errorClass, messageParameters, cause) {

def this(message: String) = this(message, null)
def this(errorClass: String, messageParameters: Map[String, String]) =
this(errorClass, messageParameters, null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[sql] object Catalogs {
_pluginClassName
} catch {
case _: NoSuchElementException =>
throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)
throw QueryExecutionErrors.catalogNotFoundError(name)
}
val loader = Utils.getContextOrSparkClassLoader
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1811,9 +1811,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = null)
}

def catalogPluginClassNotFoundError(name: String): Throwable = {
def catalogNotFoundError(name: String): Throwable = {
new CatalogNotFoundException(
s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined")
errorClass = "CATALOG_NOT_FOUND",
messageParameters = Map(
"catalogName" -> toSQLId(name),
"config" -> toSQLConf(s"spark.sql.catalog.$name")))
}

def catalogPluginClassNotImplementedError(name: String, pluginClassName: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,8 @@ public void testLoadWithoutConfig() {
SparkException exc = Assertions.assertThrows(CatalogNotFoundException.class,
() -> Catalogs.load("missing", conf));

Assertions.assertTrue(
exc.getMessage().contains("plugin class not found: spark.sql.catalog.missing is not defined"),
"Should complain that implementation is not configured");
Assertions.assertTrue(exc.getMessage().contains("missing"),
"Should identify the catalog by name");
Assertions.assertEquals(exc.getErrorClass(), "CATALOG_NOT_FOUND");
Assertions.assertEquals(exc.getMessageParameters().get("catalogName"), "`missing`");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.scalatest.matchers.must.Matchers
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, InMemoryTable, InMemoryTableCatalog, Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryTable, InMemoryTableCatalog, Table}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._

class TableLookupCacheSuite extends AnalysisTest with Matchers {
Expand Down Expand Up @@ -60,8 +61,7 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers {
when(catalogManager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => {
invocation.getArgument[String](0) match {
case CatalogManager.SESSION_CATALOG_NAME => v2Catalog
case name =>
throw new CatalogNotFoundException(s"No such catalog: $name")
case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(catalogManager.v1SessionCatalog).thenReturn(v1Catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -46,7 +47,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
val manager = mock(classOf[CatalogManager])
when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => {
val name = invocation.getArgument[String](0)
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
catalogs.getOrElse(name, throw QueryExecutionErrors.catalogNotFoundError(name))
})
when(manager.currentCatalog).thenReturn(sessionCatalog)
when(manager.v2SessionCatalog).thenReturn(sessionCatalog)
Expand Down Expand Up @@ -114,7 +115,7 @@ class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog wit
val manager = mock(classOf[CatalogManager])
when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => {
val name = invocation.getArgument[String](0)
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
catalogs.getOrElse(name, throw QueryExecutionErrors.catalogNotFoundError(name))
})
when(manager.currentCatalog).thenReturn(catalogs("prod"))
when(manager.currentNamespace).thenReturn(Array.empty[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2763,8 +2763,10 @@ class DataSourceV2SQLSuiteV1Filter
exception = intercept[CatalogNotFoundException] {
sql("SET CATALOG not_exist_catalog")
},
errorClass = null,
parameters = Map.empty)
errorClass = "CATALOG_NOT_FOUND",
parameters = Map(
"catalogName" -> "`not_exist_catalog`",
"config" -> "\"spark.sql.catalog.not_exist_catalog\""))
}

test("SPARK-35973: ShowCatalogs") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogV2Util, Column, ColumnDefaultValue, Identifier, SupportsRowLevelOperations, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, ColumnDefaultValue, Identifier, SupportsRowLevelOperations, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType}
Expand Down Expand Up @@ -177,7 +178,7 @@ abstract class AlignAssignmentsSuiteBase extends AnalysisTest {
invocation.getArguments()(0).asInstanceOf[String] match {
case "testcat" => v2Catalog
case CatalogManager.SESSION_CATALOG_NAME => v2SessionCatalog
case name => throw new CatalogNotFoundException(s"No such catalog: $name")
case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(manager.currentCatalog).thenReturn(v2Catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCom
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Column, ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -194,8 +195,7 @@ class PlanResolutionSuite extends AnalysisTest {
testCat
case CatalogManager.SESSION_CATALOG_NAME =>
v2SessionCatalog
case name =>
throw new CatalogNotFoundException(s"No such catalog: $name")
case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(manager.currentCatalog).thenReturn(testCat)
Expand All @@ -211,8 +211,7 @@ class PlanResolutionSuite extends AnalysisTest {
invocation.getArguments()(0).asInstanceOf[String] match {
case "testcat" =>
testCat
case name =>
throw new CatalogNotFoundException(s"No such catalog: $name")
case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(manager.currentCatalog).thenReturn(v2SessionCatalog)
Expand Down

0 comments on commit d9f0fcc

Please sign in to comment.