diff --git a/Cargo.lock b/Cargo.lock index d1ea4fc82b3c..17247bffd21e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6354,6 +6354,7 @@ dependencies = [ "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", + "risingwave_sqlparser", "risingwave_test_runner", "scopeguard", "serde", diff --git a/e2e_test/ddl/alter_rename_relation.slt b/e2e_test/ddl/alter_rename_relation.slt new file mode 100644 index 000000000000..a14bd2db3eb0 --- /dev/null +++ b/e2e_test/ddl/alter_rename_relation.slt @@ -0,0 +1,148 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +CREATE TABLE t (v1 INT primary key, v2 STRUCT>); + +statement ok +CREATE TABLE t_as AS ( WITH source_data AS ( SELECT 1 AS id) SELECT * FROM source_data); + +statement ok +CREATE MATERIALIZED VIEW mv AS SELECT v1, (t.v2).v1 AS v21 FROM t; + +statement ok +CREATE SINK sink AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv AS mv3 WITH ( + connector = 'blackhole' +); + +statement ok +CREATE VIEW v1 AS ( SELECT * FROM t_as WHERE id = 1); + +statement ok +CREATE VIEW v2 AS (SELECT COUNT(*) FROM t, t AS t2 WHERE t.v1 = t2.v1); + +statement ok +CREATE VIEW v3 AS (SELECT MAX((t.v2).v1) FROM t AS t); + +statement ok +CREATE VIEW v4 AS (SELECT * FROM t join t as t2 on (t.v1 = t2.v1) ORDER BY t.v1, t2.v1); + +statement ok +CREATE index idx ON t(v1); + +query TT +SHOW CREATE TABLE t; +---- +public.t CREATE TABLE t (v1 INT PRIMARY KEY, v2 STRUCT>) + +# alter table rename with alias conflict +statement ok +ALTER TABLE t RENAME TO t2; + +query TT +SHOW CREATE TABLE t2; +---- +public.t2 CREATE TABLE t2 (v1 INT PRIMARY KEY, v2 STRUCT>) + +query TT +SHOW CREATE VIEW v2; +---- +public.v2 CREATE VIEW v2 AS (SELECT COUNT(*) FROM t2 AS t, t2 AS t2 WHERE t.v1 = t2.v1) + +query TT +SHOW CREATE VIEW v3; +---- +public.v3 CREATE VIEW v3 AS (SELECT MAX((t.v2).v1) FROM t2 AS t) + +query TT +SHOW CREATE VIEW v4; +---- +public.v4 CREATE VIEW v4 AS (SELECT * FROM t2 AS t JOIN t2 AS t2 ON (t.v1 = t2.v1) ORDER BY t.v1, t2.v1) + +query TT +SHOW CREATE MATERIALIZED VIEW mv; +---- +public.mv CREATE MATERIALIZED VIEW mv AS SELECT v1, (t.v2).v1 AS v21 FROM t2 AS t + +# alter mview rename +statement ok +ALTER MATERIALIZED VIEW mv RENAME TO mv2; + +query TT +SHOW CREATE MATERIALIZED VIEW mv2; +---- +public.mv2 CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 AS t + +statement ok +ALTER SINK sink RENAME TO sink1; + +# alter mview rename with alias conflict, used by sink1 +statement ok +ALTER MATERIALIZED VIEW mv2 RENAME TO mv3; + +statement ok +ALTER TABLE t_as RENAME TO t_as_1; + +# alter view rename +statement ok +ALTER VIEW v1 RENAME TO v5; + +query TT +SHOW CREATE VIEW v5; +---- +public.v5 CREATE VIEW v5 AS (SELECT * FROM t_as_1 AS t_as WHERE id = 1) + +statement ok +ALTER INDEX idx RENAME TO idx1; + +statement ok +INSERT INTO t2 VALUES(1,(1,(1,2))); + +statement ok +INSERT INTO t2 VALUES(2,(2,(2,4))); + +query II rowsort +SELECT * from mv3 +---- +1 1 +2 2 + +query I +SELECT * from v2 +---- +2 + +query I +SELECT * from v3 +---- +2 + +query IIII rowsort +SELECT * from v4 +---- +1 (1,(1,2)) 1 (1,(1,2)) +2 (2,(2,4)) 2 (2,(2,4)) + +statement ok +DROP SINK sink1; + +statement ok +DROP VIEW v5; + +statement ok +DROP VIEW v4; + +statement ok +DROP VIEW v3; + +statement ok +DROP VIEW v2; + +statement ok +DROP MATERIALIZED VIEW mv3; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t_as_1; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 81c8ead1f851..cb460b97845b 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -139,6 +139,21 @@ message CreateTableResponse { uint64 version = 3; } +message AlterRelationNameRequest { + oneof relation { + uint32 table_id = 1; + uint32 view_id = 2; + uint32 index_id = 3; + uint32 sink_id = 4; + } + string new_name = 20; +} + +message AlterRelationNameResponse { + common.Status status = 1; + uint64 version = 2; +} + message CreateFunctionRequest { catalog.Function function = 1; } @@ -279,6 +294,7 @@ service DdlService { rpc CreateMaterializedView(CreateMaterializedViewRequest) returns (CreateMaterializedViewResponse); rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse); rpc CreateTable(CreateTableRequest) returns (CreateTableResponse); + rpc AlterRelationName(AlterRelationNameRequest) returns (AlterRelationNameResponse); rpc DropTable(DropTableRequest) returns (DropTableResponse); rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse); rpc CreateView(CreateViewRequest) returns (CreateViewResponse); diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 275ece08b67d..3dc55c511425 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -227,6 +227,10 @@ impl Binder { self.param_types.export() } + pub fn shared_views(&self) -> &HashMap { + &self.shared_views + } + fn push_context(&mut self) { let new_context = std::mem::take(&mut self.context); self.context.cte_to_relation = new_context.cte_to_relation.clone(); diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 18e0fdd20674..4ead248cafb4 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -203,6 +203,21 @@ impl Binder { Self::resolve_single_name(name.0, "index name") } + /// return the `view_name` + pub fn resolve_view_name(name: ObjectName) -> Result { + Self::resolve_single_name(name.0, "view name") + } + + /// return the `sink_name` + pub fn resolve_sink_name(name: ObjectName) -> Result { + Self::resolve_single_name(name.0, "sink name") + } + + /// return the `table_name` + pub fn resolve_table_name(name: ObjectName) -> Result { + Self::resolve_single_name(name.0, "table name") + } + /// return the `user_name` pub fn resolve_user_name(name: ObjectName) -> Result { Self::resolve_single_name(name.0, "user name") diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 6700f94ff8e7..80114869c376 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -23,6 +23,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, }; +use risingwave_pb::ddl_service::alter_relation_name_request::Relation; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_rpc_client::MetaClient; use tokio::sync::watch::Receiver; @@ -114,6 +115,14 @@ pub trait CatalogWriter: Send + Sync { async fn drop_index(&self, index_id: IndexId) -> Result<()>; async fn drop_function(&self, function_id: FunctionId) -> Result<()>; + + async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>; + + async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>; + + async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()>; + + async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()>; } #[derive(Clone)] @@ -264,6 +273,38 @@ impl CatalogWriter for CatalogWriterImpl { let version = self.meta_client.drop_database(database_id).await?; self.wait_version(version).await } + + async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> { + let version = self + .meta_client + .alter_relation_name(Relation::TableId(table_id), table_name) + .await?; + self.wait_version(version).await + } + + async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()> { + let version = self + .meta_client + .alter_relation_name(Relation::ViewId(view_id), view_name) + .await?; + self.wait_version(version).await + } + + async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()> { + let version = self + .meta_client + .alter_relation_name(Relation::IndexId(index_id), index_name) + .await?; + self.wait_version(version).await + } + + async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()> { + let version = self + .meta_client + .alter_relation_name(Relation::SinkId(sink_id), sink_name) + .await?; + self.wait_version(version).await + } } impl CatalogWriterImpl { diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs index 9cd7c44365eb..1763cfbc18bc 100644 --- a/src/frontend/src/catalog/index_catalog.rs +++ b/src/frontend/src/catalog/index_catalog.rs @@ -23,8 +23,9 @@ use risingwave_pb::catalog::PbIndex; use risingwave_pb::expr::expr_node::RexNode; use super::ColumnId; -use crate::catalog::{DatabaseId, SchemaId, TableCatalog}; +use crate::catalog::{DatabaseId, RelationCatalog, SchemaId, TableCatalog}; use crate::expr::{Expr, InputRef}; +use crate::user::UserId; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct IndexCatalog { @@ -152,3 +153,9 @@ impl IndexCatalog { } } } + +impl RelationCatalog for IndexCatalog { + fn owner(&self) -> UserId { + self.index_table.owner + } +} diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index d961554f5cc8..b563dd38bb8e 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -238,6 +238,14 @@ impl Catalog { .drop_source(source_id); } + pub fn update_source(&mut self, proto: &PbSource) { + self.get_database_mut(proto.database_id) + .unwrap() + .get_schema_mut(proto.schema_id) + .unwrap() + .update_source(proto); + } + pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) { self.get_database_mut(db_id) .unwrap() @@ -246,6 +254,14 @@ impl Catalog { .drop_sink(sink_id); } + pub fn update_sink(&mut self, proto: &PbSink) { + self.get_database_mut(proto.database_id) + .unwrap() + .get_schema_mut(proto.schema_id) + .unwrap() + .update_sink(proto); + } + pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) { self.get_database_mut(db_id) .unwrap() @@ -262,6 +278,14 @@ impl Catalog { .drop_view(view_id); } + pub fn update_view(&mut self, proto: &PbView) { + self.get_database_mut(proto.database_id) + .unwrap() + .get_schema_mut(proto.schema_id) + .unwrap() + .update_view(proto); + } + pub fn drop_function( &mut self, db_id: DatabaseId, @@ -379,6 +403,30 @@ impl Catalog { .ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string())) } + // Used by test_utils only. + pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) { + let (mut database_id, mut schema_id) = (0, 0); + let mut found = false; + for database in self.database_by_name.values() { + if !found { + for schema in database.iter_schemas() { + if schema.iter_table().any(|t| t.id() == *table_id) { + found = true; + database_id = database.id(); + schema_id = schema.id(); + break; + } + } + } + } + + if found { + let mut table = self.get_table_by_id(table_id).unwrap(); + table.name = table_name.to_string(); + self.update_table(&table.to_prost(schema_id, database_id)); + } + } + #[cfg(test)] pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) { self.table_by_id.insert( diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index faa8b281f5a6..4a529c7ecb4e 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -81,6 +81,11 @@ impl SchemaCatalog { let table: TableCatalog = prost.into(); let table_ref = Arc::new(table); + let old_table = self.table_by_id.get(&id).unwrap(); + // check if table name get updated. + if old_table.name() != name { + self.table_by_name.remove(old_table.name()); + } self.table_by_name.insert(name, table_ref.clone()); self.table_by_id.insert(id, table_ref); } @@ -88,6 +93,7 @@ impl SchemaCatalog { pub fn update_index(&mut self, prost: &PbIndex) { let name = prost.name.clone(); let id = prost.id.into(); + let old_index = self.index_by_id.get(&id).unwrap(); let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap(); let primary_table = self .get_table_by_id(&prost.primary_table_id.into()) @@ -95,6 +101,10 @@ impl SchemaCatalog { let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table); let index_ref = Arc::new(index); + // check if index name get updated. + if old_index.name != name { + self.index_by_name.remove(&old_index.name); + } self.index_by_name.insert(name, index_ref.clone()); self.index_by_id.insert(id, index_ref.clone()); @@ -177,6 +187,21 @@ impl SchemaCatalog { self.source_by_name.remove(&source_ref.name).unwrap(); } + pub fn update_source(&mut self, prost: &PbSource) { + let name = prost.name.clone(); + let id = prost.id; + let source = SourceCatalog::from(prost); + let source_ref = Arc::new(source); + + // check if source name get updated. + let old_source = self.source_by_id.get(&id).unwrap(); + if old_source.name != name { + self.source_by_name.remove(&old_source.name); + } + self.source_by_name.insert(name, source_ref.clone()); + self.source_by_id.insert(id, source_ref); + } + pub fn create_sink(&mut self, prost: &PbSink) { let name = prost.name.clone(); let id = prost.id; @@ -194,6 +219,22 @@ impl SchemaCatalog { self.sink_by_name.remove(&sink_ref.name).unwrap(); } + pub fn update_sink(&mut self, prost: &PbSink) { + let name = prost.name.clone(); + let id = prost.id; + let sink = SinkCatalog::from(prost); + let sink_ref = Arc::new(sink); + + let old_sink = self.sink_by_id.get(&id).unwrap(); + // check if sink name get updated. + if old_sink.name != name { + self.sink_by_name.remove(&old_sink.name); + } + + self.sink_by_name.insert(name, sink_ref.clone()); + self.sink_by_id.insert(id, sink_ref); + } + pub fn create_view(&mut self, prost: &PbView) { let name = prost.name.clone(); let id = prost.id; @@ -211,6 +252,22 @@ impl SchemaCatalog { self.view_by_name.remove(&view_ref.name).unwrap(); } + pub fn update_view(&mut self, prost: &PbView) { + let name = prost.name.clone(); + let id = prost.id; + let view = ViewCatalog::from(prost); + let view_ref = Arc::new(view); + + let old_view = self.view_by_id.get(&id).unwrap(); + // check if view name get updated. + if old_view.name != name { + self.view_by_name.remove(&old_view.name); + } + + self.view_by_name.insert(name, view_ref.clone()); + self.view_by_id.insert(id, view_ref); + } + pub fn create_function(&mut self, prost: &PbFunction) { let name = prost.name.clone(); let id = prost.id; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 9fc4073c48d6..5f89b030ecd2 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -158,7 +158,7 @@ impl TableType { } } - fn to_prost(self) -> PbTableType { + pub(crate) fn to_prost(self) -> PbTableType { match self { Self::Table => PbTableType::Table, Self::MaterializedView => PbTableType::MaterializedView, diff --git a/src/frontend/src/handler/alter_relation_rename.rs b/src/frontend/src/handler/alter_relation_rename.rs new file mode 100644 index 000000000000..2b72b918e8d6 --- /dev/null +++ b/src/frontend/src/handler/alter_relation_rename.rs @@ -0,0 +1,198 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::error::{ErrorCode, Result}; +use risingwave_sqlparser::ast::ObjectName; + +use super::{HandlerArgs, RwPgResponse}; +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::table_catalog::TableType; +use crate::Binder; + +pub async fn handle_rename_table( + handler_args: HandlerArgs, + table_type: TableType, + table_name: ObjectName, + new_table_name: ObjectName, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, real_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; + let new_table_name = Binder::resolve_table_name(new_table_name)?; + let search_path = session.config().get_search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let table_id = { + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_table_by_name(db_name, schema_path, &real_table_name)?; + if table_type != table.table_type { + return Err(ErrorCode::InvalidInputSyntax(format!( + "\"{table_name}\" is not a {}", + table_type.to_prost().as_str_name() + )) + .into()); + } + + session.check_privilege_for_drop_alter(schema_name, &**table)?; + table.id + }; + + let catalog_writer = session.env().catalog_writer(); + catalog_writer + .alter_table_name(table_id.table_id, &new_table_name) + .await?; + + let stmt_type = match table_type { + TableType::Table => StatementType::ALTER_TABLE, + TableType::MaterializedView => StatementType::ALTER_MATERIALIZED_VIEW, + _ => unreachable!(), + }; + Ok(PgResponse::empty_result(stmt_type)) +} + +pub async fn handle_rename_index( + handler_args: HandlerArgs, + index_name: ObjectName, + new_index_name: ObjectName, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, real_index_name) = + Binder::resolve_schema_qualified_name(db_name, index_name.clone())?; + let new_index_name = Binder::resolve_index_name(new_index_name)?; + let search_path = session.config().get_search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let index_id = { + let reader = session.env().catalog_reader().read_guard(); + let (index, schema_name) = + reader.get_index_by_name(db_name, schema_path, &real_index_name)?; + session.check_privilege_for_drop_alter(schema_name, &**index)?; + index.id + }; + + let catalog_writer = session.env().catalog_writer(); + catalog_writer + .alter_index_name(index_id.index_id, &new_index_name) + .await?; + + Ok(PgResponse::empty_result(StatementType::ALTER_INDEX)) +} + +pub async fn handle_rename_view( + handler_args: HandlerArgs, + view_name: ObjectName, + new_view_name: ObjectName, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, real_view_name) = + Binder::resolve_schema_qualified_name(db_name, view_name.clone())?; + let new_view_name = Binder::resolve_view_name(new_view_name)?; + let search_path = session.config().get_search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let view_id = { + let reader = session.env().catalog_reader().read_guard(); + let (view, schema_name) = reader.get_view_by_name(db_name, schema_path, &real_view_name)?; + session.check_privilege_for_drop_alter(schema_name, &**view)?; + view.id + }; + + let catalog_writer = session.env().catalog_writer(); + catalog_writer + .alter_view_name(view_id, &new_view_name) + .await?; + + Ok(PgResponse::empty_result(StatementType::ALTER_VIEW)) +} + +pub async fn handle_rename_sink( + handler_args: HandlerArgs, + sink_name: ObjectName, + new_sink_name: ObjectName, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, real_sink_name) = + Binder::resolve_schema_qualified_name(db_name, sink_name.clone())?; + let new_sink_name = Binder::resolve_sink_name(new_sink_name)?; + let search_path = session.config().get_search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let sink_id = { + let reader = session.env().catalog_reader().read_guard(); + let (sink, schema_name) = reader.get_sink_by_name(db_name, schema_path, &real_sink_name)?; + session.check_privilege_for_drop_alter(schema_name, &**sink)?; + sink.id + }; + + let catalog_writer = session.env().catalog_writer(); + catalog_writer + .alter_sink_name(sink_id.sink_id, &new_sink_name) + .await?; + + Ok(PgResponse::empty_result(StatementType::ALTER_SINK)) +} + +#[cfg(test)] +mod tests { + + use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; + + use crate::catalog::root_catalog::SchemaPath; + use crate::test_utils::LocalFrontend; + + #[tokio::test] + async fn test_alter_table_name_handler() { + let frontend = LocalFrontend::new(Default::default()).await; + let session = frontend.session_ref(); + let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME); + + let sql = "create table t (i int, r real);"; + frontend.run_sql(sql).await.unwrap(); + + let table_id = { + let catalog_reader = session.env().catalog_reader().read_guard(); + catalog_reader + .get_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t") + .unwrap() + .0 + .id + }; + + // Alter table name. + let sql = "alter table t rename to t1;"; + frontend.run_sql(sql).await.unwrap(); + + let catalog_reader = session.env().catalog_reader().read_guard(); + let altered_table_name = catalog_reader + .get_table_by_id(&table_id) + .unwrap() + .name() + .to_string(); + assert_eq!(altered_table_name, "t1"); + } +} diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 3f19cf96c09d..44f2b0c3d44a 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::catalog::PbTable; @@ -87,7 +88,11 @@ pub fn gen_create_mv_plan( let bound = { let mut binder = Binder::new_for_stream(session); - binder.bind_query(query)? + let bound = binder.bind_query(query)?; + // FIXME: We should record the views into mv's dependent relations to to refine the drop + // check and recursive rename of the views. + let _views = binder.shared_views().keys().cloned().collect_vec(); + bound }; let col_names = get_column_names(&bound, session, columns)?; diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 01cd757f21c3..418597c8bec2 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -29,10 +29,12 @@ use risingwave_sqlparser::ast::*; use self::util::DataChunkToRowSetAdapter; use self::variable::handle_set_time_zone; +use crate::catalog::table_catalog::TableType; use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; use crate::session::SessionImpl; use crate::utils::WithOptions; +mod alter_relation_rename; mod alter_system; mod alter_table_column; pub mod alter_user; @@ -322,7 +324,6 @@ pub async fn handle( name, columns, query, - with_options: _, // It is put in OptimizerContext or_replace, // not supported emit_mode, @@ -387,6 +388,43 @@ pub async fn handle( operation @ (AlterTableOperation::AddColumn { .. } | AlterTableOperation::DropColumn { .. }), } => alter_table_column::handle_alter_table_column(handler_args, name, operation).await, + Statement::AlterTable { + name, + operation: AlterTableOperation::RenameTable { table_name }, + } => { + alter_relation_rename::handle_rename_table( + handler_args, + TableType::Table, + name, + table_name, + ) + .await + } + Statement::AlterIndex { + name, + operation: AlterIndexOperation::RenameIndex { index_name }, + } => alter_relation_rename::handle_rename_index(handler_args, name, index_name).await, + Statement::AlterView { + materialized, + name, + operation: AlterViewOperation::RenameView { view_name }, + } => { + if materialized { + alter_relation_rename::handle_rename_table( + handler_args, + TableType::MaterializedView, + name, + view_name, + ) + .await + } else { + alter_relation_rename::handle_rename_view(handler_args, name, view_name).await + } + } + Statement::AlterSink { + name, + operation: AlterSinkOperation::RenameSink { sink_name }, + } => alter_relation_rename::handle_rename_sink(handler_args, name, sink_name).await, Statement::AlterSystem { param, value } => { alter_system::handle_alter_system(handler_args, param, value).await } diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 353e9fe4c2e4..ae22e1f8913a 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -204,11 +204,12 @@ impl FrontendObserverNode { let old_table = catalog_guard.get_table_by_id(&table.id.into()).unwrap(); catalog_guard.update_table(table); - assert!(old_table.fragment_id != table.fragment_id); - // FIXME: the frontend node delete its fragment for the update - // operation by itself. - self.worker_node_manager - .remove_fragment_mapping(&old_table.fragment_id); + if old_table.fragment_id != table.fragment_id { + // FIXME: the frontend node delete its fragment for the update + // operation by itself. + self.worker_node_manager + .remove_fragment_mapping(&old_table.fragment_id); + } } _ => panic!("receive an unsupported notify {:?}", resp), }, @@ -219,6 +220,7 @@ impl FrontendObserverNode { source.schema_id, source.id, ), + Operation::Update => catalog_guard.update_source(source), _ => panic!("receive an unsupported notify {:?}", resp), }, RelationInfo::Sink(sink) => match resp.operation() { @@ -226,6 +228,7 @@ impl FrontendObserverNode { Operation::Delete => { catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id) } + Operation::Update => catalog_guard.update_sink(sink), _ => panic!("receive an unsupported notify {:?}", resp), }, RelationInfo::Index(index) => match resp.operation() { @@ -243,6 +246,7 @@ impl FrontendObserverNode { Operation::Delete => { catalog_guard.drop_view(view.database_id, view.schema_id, view.id) } + Operation::Update => catalog_guard.update_view(view), _ => panic!("receive an unsupported notify {:?}", resp), }, RelationInfo::Function(function) => match resp.operation() { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 1d33ae1d3806..f1d86f8874ad 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -73,15 +73,15 @@ impl SessionManager for LocalFrontend { } fn cancel_queries_in_session(&self, _session_id: SessionId) { - todo!() + unreachable!() } fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) { - todo!() + unreachable!() } fn end_session(&self, _session: &Self::Session) { - todo!() + unreachable!() } } @@ -294,7 +294,7 @@ impl CatalogWriter for MockCatalogWriter { } async fn create_function(&self, _function: PbFunction) -> Result<()> { - todo!() + unreachable!() } async fn drop_table(&self, source_id: Option, table_id: TableId) -> Result<()> { @@ -321,7 +321,7 @@ impl CatalogWriter for MockCatalogWriter { } async fn drop_view(&self, _view_id: u32) -> Result<()> { - todo!() + unreachable!() } async fn drop_materialized_view(&self, table_id: TableId) -> Result<()> { @@ -383,7 +383,7 @@ impl CatalogWriter for MockCatalogWriter { } async fn drop_function(&self, _function_id: FunctionId) -> Result<()> { - todo!() + unreachable!() } async fn drop_database(&self, database_id: u32) -> Result<()> { @@ -396,6 +396,25 @@ impl CatalogWriter for MockCatalogWriter { self.catalog.write().drop_schema(database_id, schema_id); Ok(()) } + + async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> { + self.catalog + .write() + .alter_table_name_by_id(&table_id.into(), table_name); + Ok(()) + } + + async fn alter_view_name(&self, _view_id: u32, _view_name: &str) -> Result<()> { + unreachable!() + } + + async fn alter_index_name(&self, _index_id: u32, _index_name: &str) -> Result<()> { + unreachable!() + } + + async fn alter_sink_name(&self, _sink_id: u32, _sink_name: &str) -> Result<()> { + unreachable!() + } } impl MockCatalogWriter { diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index d5a13237e98b..e1784a35fbe7 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -52,6 +52,7 @@ risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } risingwave_object_store = { path = "../object_store" } risingwave_pb = { path = "../prost" } risingwave_rpc_client = { path = "../rpc_client" } +risingwave_sqlparser = { path = "../sqlparser" } scopeguard = "1.1.0" serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index d678ba3445e5..01f24a358511 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -343,6 +343,14 @@ impl DatabaseManager { } } + pub fn ensure_view_id(&self, view_id: ViewId) -> MetaResult<()> { + if self.views.contains_key(&view_id) { + Ok(()) + } else { + Err(MetaError::catalog_id_not_found("view", view_id)) + } + } + pub fn ensure_table_id(&self, table_id: TableId) -> MetaResult<()> { if self.tables.contains_key(&table_id) { Ok(()) @@ -351,6 +359,30 @@ impl DatabaseManager { } } + pub fn ensure_source_id(&self, source_id: SourceId) -> MetaResult<()> { + if self.sources.contains_key(&source_id) { + Ok(()) + } else { + Err(MetaError::catalog_id_not_found("source", source_id)) + } + } + + pub fn ensure_sink_id(&self, sink_id: SinkId) -> MetaResult<()> { + if self.sinks.contains_key(&sink_id) { + Ok(()) + } else { + Err(MetaError::catalog_id_not_found("sink", sink_id)) + } + } + + pub fn ensure_index_id(&self, index_id: IndexId) -> MetaResult<()> { + if self.indexes.contains_key(&index_id) { + Ok(()) + } else { + Err(MetaError::catalog_id_not_found("index", index_id)) + } + } + // TODO(zehua): refactor when using SourceId. pub fn ensure_table_or_source_id(&self, table_id: &TableId) -> MetaResult<()> { if self.tables.contains_key(table_id) || self.sources.contains_key(table_id) { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 85de6c7d08eb..6446f5dbc864 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -16,6 +16,7 @@ mod connection; mod database; mod fragment; mod user; +mod utils; use std::collections::{HashMap, HashSet, VecDeque}; use std::iter; @@ -93,6 +94,8 @@ use risingwave_pb::expr::expr_node::RexNode; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup}; +use crate::manager::catalog::utils::{alter_relation_rename, alter_relation_rename_refs}; + pub type CatalogManagerRef = Arc>; /// `CatalogManager` manages database catalog information and user information, including @@ -462,41 +465,40 @@ where pub async fn drop_view(&self, view_id: ViewId) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; + database_core.ensure_view_id(view_id)?; + let user_core = &mut core.user; let mut views = BTreeMapTransaction::new(&mut database_core.views); let mut users = BTreeMapTransaction::new(&mut user_core.user_info); - let view = views.remove(view_id); - if let Some(view) = view { - match database_core.relation_ref_count.get(&view_id) { - Some(ref_count) => Err(MetaError::permission_denied(format!( - "Fail to delete view `{}` because {} other relation(s) depend on it", - view.name, ref_count - ))), - None => { - let users_need_update = - Self::update_user_privileges(&mut users, &[Object::ViewId(view_id)]); - commit_meta!(self, views, users)?; + let view = views.remove(view_id).unwrap(); - user_core.decrease_ref(view.owner); + match database_core.relation_ref_count.get(&view_id) { + Some(ref_count) => Err(MetaError::permission_denied(format!( + "Fail to delete view `{}` because {} other relation(s) depend on it", + view.name, ref_count + ))), + None => { + let users_need_update = + Self::update_user_privileges(&mut users, &[Object::ViewId(view_id)]); + commit_meta!(self, views, users)?; - for &dependent_relation_id in &view.dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); - } + user_core.decrease_ref(view.owner); - for user in users_need_update { - self.notify_frontend(Operation::Update, Info::User(user)) - .await; - } - let version = self - .notify_frontend_relation_info(Operation::Delete, RelationInfo::View(view)) - .await; + for &dependent_relation_id in &view.dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); + } - Ok(version) + for user in users_need_update { + self.notify_frontend(Operation::Update, Info::User(user)) + .await; } + let version = self + .notify_frontend_relation_info(Operation::Delete, RelationInfo::View(view)) + .await; + + Ok(version) } - } else { - Err(MetaError::catalog_id_not_found("view", view_id)) } } @@ -866,76 +868,327 @@ where ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; + database_core.ensure_index_id(index_id)?; + let user_core = &mut core.user; let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut users = BTreeMapTransaction::new(&mut user_core.user_info); - let index = indexes.remove(index_id); - if let Some(index) = index { - assert_eq!(index_table_id, index.index_table_id); - - // drop index table - let table = tables.remove(index_table_id); - if let Some(table) = table { - match database_core - .relation_ref_count - .get(&index_table_id) - .cloned() - { - Some(ref_count) => Err(MetaError::permission_denied(format!( - "Fail to delete table `{}` because {} other relation(s) depend on it", - table.name, ref_count - ))), - None => { - let dependent_relations = table.dependent_relations.clone(); - - let objects = &[Object::TableId(table.id)]; + let index = indexes.remove(index_id).unwrap(); + assert_eq!(index_table_id, index.index_table_id); - let users_need_update = Self::update_user_privileges(&mut users, objects); + // drop index table + let table = tables.remove(index_table_id); + if let Some(table) = table { + match database_core + .relation_ref_count + .get(&index_table_id) + .cloned() + { + Some(ref_count) => Err(MetaError::permission_denied(format!( + "Fail to delete table `{}` because {} other relation(s) depend on it", + table.name, ref_count + ))), + None => { + let dependent_relations = table.dependent_relations.clone(); - commit_meta!(self, tables, indexes, users)?; + let objects = &[Object::TableId(table.id)]; - // index table and index. - user_core.decrease_ref_count(index.owner, 2); + let users_need_update = Self::update_user_privileges(&mut users, objects); - for user in users_need_update { - self.notify_frontend(Operation::Update, Info::User(user)) - .await; - } + commit_meta!(self, tables, indexes, users)?; - for dependent_relation_id in dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); - } + // index table and index. + user_core.decrease_ref_count(index.owner, 2); - let version = self - .notify_frontend( - Operation::Delete, - Info::RelationGroup(RelationGroup { - relations: vec![ - Relation { - relation_info: RelationInfo::Table(table.to_owned()) - .into(), - }, - Relation { - relation_info: RelationInfo::Index(index).into(), - }, - ], - }), - ) + for user in users_need_update { + self.notify_frontend(Operation::Update, Info::User(user)) .await; + } - Ok(version) + for dependent_relation_id in dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); } + + let version = self + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: vec![ + Relation { + relation_info: RelationInfo::Table(table.to_owned()).into(), + }, + Relation { + relation_info: RelationInfo::Index(index).into(), + }, + ], + }), + ) + .await; + + Ok(version) } - } else { - bail!("index table doesn't exist",) } } else { - bail!("index doesn't exist",) + bail!("index table doesn't exist",) } } + pub async fn alter_table_name( + &self, + table_id: TableId, + table_name: &str, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + database_core.ensure_table_id(table_id)?; + + // 1. validate new table name. + let mut table = database_core.tables.get(&table_id).unwrap().clone(); + let old_name = table.name.clone(); + database_core.check_relation_name_duplicated(&( + table.database_id, + table.schema_id, + table_name.to_string(), + ))?; + + let source = table.optional_associated_source_id.as_ref().map( + |OptionalAssociatedSourceId::AssociatedSourceId(id)| { + let mut source = database_core.sources.get(id).unwrap().clone(); + source.name = table_name.to_string(); + source + }, + ); + + // 2. rename table and its definition. + table.name = table_name.to_string(); + table.definition = alter_relation_rename(&table.definition, table_name); + + // 3. update all relations that depend on this table, note that indexes are not included. + self.alter_relation_name_refs_inner( + database_core, + table_id, + &old_name, + table_name, + vec![table], + vec![], + vec![], + source, + ) + .await + } + + // TODO: refactor dependency cache in catalog manager for better performance. + #[allow(clippy::too_many_arguments)] + async fn alter_relation_name_refs_inner( + &self, + database_mgr: &mut DatabaseManager, + relation_id: RelationId, + from: &str, + to: &str, + mut to_update_tables: Vec, + mut to_update_views: Vec, + mut to_update_sinks: Vec, + to_update_source: Option, + ) -> MetaResult { + for table in database_mgr.tables.values() { + if table.dependent_relations.contains(&relation_id) { + let mut table = table.clone(); + table.definition = alter_relation_rename_refs(&table.definition, from, to); + to_update_tables.push(table); + } + } + + for view in database_mgr.views.values() { + if view.dependent_relations.contains(&relation_id) { + let mut view = view.clone(); + view.sql = alter_relation_rename_refs(&view.sql, from, to); + to_update_views.push(view); + } + } + + for sink in database_mgr.sinks.values() { + if sink.dependent_relations.contains(&relation_id) { + let mut sink = sink.clone(); + sink.definition = alter_relation_rename_refs(&sink.definition, from, to); + to_update_sinks.push(sink); + } + } + + // commit meta. + let mut tables = BTreeMapTransaction::new(&mut database_mgr.tables); + let mut views = BTreeMapTransaction::new(&mut database_mgr.views); + let mut sinks = BTreeMapTransaction::new(&mut database_mgr.sinks); + let mut sources = BTreeMapTransaction::new(&mut database_mgr.sources); + to_update_tables.iter().for_each(|table| { + tables.insert(table.id, table.clone()); + }); + // TODO: there are some inconsistencies in the process of notifying the frontend, we need to + // support batch notification. + to_update_views.iter().for_each(|view| { + views.insert(view.id, view.clone()); + }); + to_update_sinks.iter().for_each(|sink| { + sinks.insert(sink.id, sink.clone()); + }); + if let Some(source) = &to_update_source { + sources.insert(source.id, source.clone()); + } + commit_meta!(self, tables, views, sinks, sources)?; + + // 5. notify frontend. + assert!( + !to_update_tables.is_empty() + || !to_update_views.is_empty() + || !to_update_sinks.is_empty() + || to_update_source.is_some() + ); + let version = self + .notify_frontend( + Operation::Update, + Info::RelationGroup(RelationGroup { + relations: to_update_tables + .into_iter() + .map(|table| Relation { + relation_info: RelationInfo::Table(table).into(), + }) + .chain(to_update_views.into_iter().map(|view| Relation { + relation_info: RelationInfo::View(view).into(), + })) + .chain(to_update_sinks.into_iter().map(|sink| Relation { + relation_info: RelationInfo::Sink(sink).into(), + })) + .chain(to_update_source.into_iter().map(|source| Relation { + relation_info: RelationInfo::Source(source).into(), + })) + .collect(), + }), + ) + .await; + + Ok(version) + } + + pub async fn alter_view_name( + &self, + view_id: ViewId, + view_name: &str, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + database_core.ensure_view_id(view_id)?; + + // 1. validate new view name. + let mut view = database_core.views.get(&view_id).unwrap().clone(); + let old_name = view.name.clone(); + database_core.check_relation_name_duplicated(&( + view.database_id, + view.schema_id, + view_name.to_string(), + ))?; + + // 2. rename view, note that there's no need to update its definition since it only stores + // the query part. + view.name = view_name.to_string(); + + // 3. update all relations that depend on this view. + self.alter_relation_name_refs_inner( + database_core, + view_id, + &old_name, + view_name, + vec![], + vec![view], + vec![], + None, + ) + .await + } + + pub async fn alter_sink_name( + &self, + sink_id: SinkId, + sink_name: &str, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + database_core.ensure_sink_id(sink_id)?; + + // 1. validate new sink name. + let mut sink = database_core.sinks.get(&sink_id).unwrap().clone(); + database_core.check_relation_name_duplicated(&( + sink.database_id, + sink.schema_id, + sink_name.to_string(), + ))?; + + // 2. rename sink and its definition. + sink.name = sink_name.to_string(); + sink.definition = alter_relation_rename(&sink.definition, sink_name); + + // 3. commit meta. + let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); + sinks.insert(sink_id, sink.clone()); + commit_meta!(self, sinks)?; + + let version = self + .notify_frontend_relation_info(Operation::Update, RelationInfo::Sink(sink)) + .await; + + Ok(version) + } + + pub async fn alter_index_name( + &self, + index_id: IndexId, + index_name: &str, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + database_core.ensure_index_id(index_id)?; + + // 1. validate new index name. + let mut index = database_core.indexes.get(&index_id).unwrap().clone(); + database_core.check_relation_name_duplicated(&( + index.database_id, + index.schema_id, + index_name.to_string(), + ))?; + let mut index_table = database_core + .tables + .get(&index.index_table_id) + .unwrap() + .clone(); + + // 2. rename index name. + index.name = index_name.to_string(); + index_table.name = index_name.to_string(); + let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + indexes.insert(index_id, index.clone()); + tables.insert(index.index_table_id, index_table.clone()); + commit_meta!(self, indexes, tables)?; + + let version = self + .notify_frontend( + Operation::Update, + Info::RelationGroup(RelationGroup { + relations: vec![ + Relation { + relation_info: RelationInfo::Table(index_table).into(), + }, + Relation { + relation_info: RelationInfo::Index(index).into(), + }, + ], + }), + ) + .await; + + Ok(version) + } + pub async fn start_create_source_procedure(&self, source: &Source) -> MetaResult<()> { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -1007,40 +1260,36 @@ where pub async fn drop_source(&self, source_id: SourceId) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; + database_core.ensure_source_id(source_id)?; + let user_core = &mut core.user; let mut sources = BTreeMapTransaction::new(&mut database_core.sources); let mut users = BTreeMapTransaction::new(&mut user_core.user_info); - let source = sources.remove(source_id); - if let Some(source) = source { - match database_core.relation_ref_count.get(&source_id) { - Some(ref_count) => Err(MetaError::permission_denied(format!( - "Fail to delete source `{}` because {} other relation(s) depend on it", - source.name, ref_count - ))), - None => { - let users_need_update = - Self::update_user_privileges(&mut users, &[Object::SourceId(source_id)]); - commit_meta!(self, sources, users)?; + let source = sources.remove(source_id).unwrap(); - user_core.decrease_ref(source.owner); + match database_core.relation_ref_count.get(&source_id) { + Some(ref_count) => Err(MetaError::permission_denied(format!( + "Fail to delete source `{}` because {} other relation(s) depend on it", + source.name, ref_count + ))), + None => { + let users_need_update = + Self::update_user_privileges(&mut users, &[Object::SourceId(source_id)]); + commit_meta!(self, sources, users)?; - for user in users_need_update { - self.notify_frontend(Operation::Update, Info::User(user)) - .await; - } - let version = self - .notify_frontend_relation_info( - Operation::Delete, - RelationInfo::Source(source), - ) - .await; + user_core.decrease_ref(source.owner); - Ok(version) + for user in users_need_update { + self.notify_frontend(Operation::Update, Info::User(user)) + .await; } + let version = self + .notify_frontend_relation_info(Operation::Delete, RelationInfo::Source(source)) + .await; + + Ok(version) } - } else { - Err(MetaError::catalog_id_not_found("source", source_id)) } } @@ -1516,73 +1765,70 @@ where ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; + database_core.ensure_sink_id(sink_id)?; + let user_core = &mut core.user; let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut users = BTreeMapTransaction::new(&mut user_core.user_info); - let sink = sinks.remove(sink_id); - if let Some(sink) = sink { - match database_core.relation_ref_count.get(&sink_id).cloned() { - Some(_) => bail!("No relation should depend on Sink"), - None => { - let dependent_relations = sink.dependent_relations.clone(); - - let objects = &[Object::SinkId(sink.id)] - .into_iter() - .chain( - internal_table_ids - .iter() - .map(|table_id| Object::TableId(*table_id)) - .collect_vec(), - ) - .collect_vec(); + let sink = sinks.remove(sink_id).unwrap(); + match database_core.relation_ref_count.get(&sink_id).cloned() { + Some(_) => bail!("No relation should depend on Sink"), + None => { + let dependent_relations = sink.dependent_relations.clone(); - let internal_tables = internal_table_ids - .iter() - .map(|internal_table_id| { - tables - .remove(*internal_table_id) - .expect("internal table should exist") - }) - .collect_vec(); + let objects = &[Object::SinkId(sink.id)] + .into_iter() + .chain( + internal_table_ids + .iter() + .map(|table_id| Object::TableId(*table_id)), + ) + .collect_vec(); - let users_need_update = Self::update_user_privileges(&mut users, objects); + let internal_tables = internal_table_ids + .iter() + .map(|internal_table_id| { + tables + .remove(*internal_table_id) + .expect("internal table should exist") + }) + .collect_vec(); - commit_meta!(self, sinks, tables, users)?; + let users_need_update = Self::update_user_privileges(&mut users, objects); - user_core.decrease_ref(sink.owner); + commit_meta!(self, sinks, tables, users)?; - for user in users_need_update { - self.notify_frontend(Operation::Update, Info::User(user)) - .await; - } + user_core.decrease_ref(sink.owner); - for dependent_relation_id in dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); - } - - let version = self - .notify_frontend( - Operation::Delete, - Info::RelationGroup(RelationGroup { - relations: vec![Relation { - relation_info: RelationInfo::Sink(sink.to_owned()).into(), - }] - .into_iter() - .chain(internal_tables.into_iter().map(|internal_table| Relation { - relation_info: RelationInfo::Table(internal_table).into(), - })) - .collect_vec(), - }), - ) + for user in users_need_update { + self.notify_frontend(Operation::Update, Info::User(user)) .await; + } - Ok(version) + for dependent_relation_id in dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); } + + let version = self + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Sink(sink.to_owned()).into(), + }] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) + .await; + + Ok(version) } - } else { - Err(MetaError::catalog_id_not_found("sink", sink_id)) } } @@ -2059,6 +2305,7 @@ where grant_user.extend(user_ids); let mut version = 0; + // FIXME: user might not be updated. for user in user_updated { version = self .notify_frontend(Operation::Update, Info::User(user)) @@ -2217,6 +2464,7 @@ where core.build_grant_relation_map(); let mut version = 0; + // FIXME: user might not be updated. for (_, user_info) in user_updated { version = self .notify_frontend(Operation::Update, Info::User(user_info)) diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs new file mode 100644 index 000000000000..0d19dd3a2154 --- /dev/null +++ b/src/meta/src/manager/catalog/utils.rs @@ -0,0 +1,371 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_sqlparser::ast::{ + Array, CreateSink, CreateSinkStatement, CreateSourceStatement, Distinct, Expr, Function, + FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, SetExpr, Statement, + TableAlias, TableFactor, TableWithJoins, +}; +use risingwave_sqlparser::parser::Parser; + +/// `alter_relation_rename` renames a relation to a new name in its `Create` statement, and returns +/// the updated definition raw sql. Note that the `definition` must be a `Create` statement and the +/// `new_name` must be a valid identifier, it should be validated before calling this function. To +/// update all relations that depend on the renamed one, use `alter_relation_rename_refs`. +pub fn alter_relation_rename(definition: &str, new_name: &str) -> String { + // This happens when we try to rename a table that's created by `CREATE TABLE AS`. Remove it + // when we support `SHOW CREATE TABLE` for `CREATE TABLE AS`. + if definition.is_empty() { + tracing::warn!("found empty definition when renaming relation, ignored."); + return definition.into(); + } + let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); + let mut stmt = ast + .into_iter() + .exactly_one() + .expect("should contains only one statement"); + + match &mut stmt { + Statement::CreateTable { name, .. } + | Statement::CreateView { name, .. } + | Statement::CreateIndex { name, .. } + | Statement::CreateSource { + stmt: CreateSourceStatement { + source_name: name, .. + }, + } + | Statement::CreateSink { + stmt: CreateSinkStatement { + sink_name: name, .. + }, + } => replace_table_name(name, new_name), + _ => unreachable!(), + }; + + stmt.to_string() +} + +/// `alter_relation_rename_refs` updates all references of renamed-relation in the definition of +/// target relation's `Create` statement. +pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> String { + let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); + let mut stmt = ast + .into_iter() + .exactly_one() + .expect("should contains only one statement"); + + match &mut stmt { + Statement::CreateTable { + query: Some(query), .. + } + | Statement::CreateView { query, .. } + | Statement::Query(query) // Used by view, actually we store a query as the definition of view. + | Statement::CreateSink { + stmt: + CreateSinkStatement { + sink_from: CreateSink::AsQuery(query), + .. + }, + } => { + QueryRewriter::rewrite_query(query, from, to); + } + Statement::CreateIndex { table_name, .. } + | Statement::CreateSink { + stmt: + CreateSinkStatement { + sink_from: CreateSink::From(table_name), + .. + }, + } => replace_table_name(table_name, to), + _ => unreachable!(), + }; + stmt.to_string() +} + +/// Replace the last ident in the `table_name` with the given name, the object name is ensured to be +/// non-empty. e.g. `schema.table` or `database.schema.table`. +fn replace_table_name(table_name: &mut ObjectName, to: &str) { + let idx = table_name.0.len() - 1; + table_name.0[idx] = Ident::new_unchecked(to); +} + +/// `QueryRewriter` is a visitor that updates all references of relation named `from` to `to` in the +/// given query, which is the part of create statement of `relation`. +struct QueryRewriter<'a> { + from: &'a str, + to: &'a str, +} + +impl QueryRewriter<'_> { + fn rewrite_query(query: &mut Query, from: &str, to: &str) { + let rewriter = QueryRewriter { from, to }; + rewriter.visit_query(query) + } + + /// Visit the query and update all references of relation named `from` to `to`. + fn visit_query(&self, query: &mut Query) { + if let Some(with) = &mut query.with { + for cte_table in &mut with.cte_tables { + self.visit_query(&mut cte_table.query); + } + } + self.visit_set_expr(&mut query.body); + for expr in &mut query.order_by { + self.visit_expr(&mut expr.expr); + } + } + + /// Visit table factor and update all references of relation named `from` to `to`. + /// Rewrite idents(i.e. `schema.table`, `table`) that contains the old name in the + /// following pattern: + /// 1. `FROM a` to `FROM new_a AS a` + /// 2. `FROM a AS b` to `FROM new_a AS b` + /// + /// So that we DON'T have to: + /// 1. rewrite the select and expr part like `schema.table.column`, `table.column`, + /// `alias.column` etc. + /// 2. handle the case that the old name is used as alias. + /// 3. handle the case that the new name is used as alias. + fn visit_table_factor(&self, table_factor: &mut TableFactor) { + match table_factor { + TableFactor::Table { name, alias, .. } => { + let idx = name.0.len() - 1; + if name.0[idx].real_value() == self.from { + if alias.is_none() { + *alias = Some(TableAlias { + name: Ident::new_unchecked(self.from), + columns: vec![], + }); + } + name.0[idx] = Ident::new_unchecked(self.to); + } + } + TableFactor::Derived { subquery, .. } => self.visit_query(subquery), + TableFactor::TableFunction { args, .. } => { + for arg in args { + self.visit_function_args(arg); + } + } + TableFactor::NestedJoin(table_with_joins) => { + self.visit_table_with_joins(table_with_joins); + } + } + } + + /// Visit table with joins and update all references of relation named `from` to `to`. + fn visit_table_with_joins(&self, table_with_joins: &mut TableWithJoins) { + self.visit_table_factor(&mut table_with_joins.relation); + for join in &mut table_with_joins.joins { + self.visit_table_factor(&mut join.relation); + } + } + + /// Visit query body expression and update all references. + fn visit_set_expr(&self, set_expr: &mut SetExpr) { + match set_expr { + SetExpr::Select(select) => { + if let Distinct::DistinctOn(exprs) = &mut select.distinct { + for expr in exprs { + self.visit_expr(expr); + } + } + for select_item in &mut select.projection { + self.visit_select_item(select_item); + } + for from_item in &mut select.from { + self.visit_table_with_joins(from_item); + } + if let Some(where_clause) = &mut select.selection { + self.visit_expr(where_clause); + } + for expr in &mut select.group_by { + self.visit_expr(expr); + } + if let Some(having) = &mut select.having { + self.visit_expr(having); + } + } + SetExpr::Query(query) => self.visit_query(query), + SetExpr::SetOperation { left, right, .. } => { + self.visit_set_expr(left); + self.visit_set_expr(right); + } + SetExpr::Values(_) => {} + } + } + + /// Visit function arguments and update all references. + fn visit_function_args(&self, function_args: &mut FunctionArg) { + match function_args { + FunctionArg::Unnamed(arg) | FunctionArg::Named { arg, .. } => match arg { + FunctionArgExpr::Expr(expr) | FunctionArgExpr::ExprQualifiedWildcard(expr, _) => { + self.visit_expr(expr) + } + FunctionArgExpr::QualifiedWildcard(_) | FunctionArgExpr::Wildcard => {} + }, + } + } + + /// Visit function and update all references. + fn visit_function(&self, function: &mut Function) { + for arg in &mut function.args { + self.visit_function_args(arg); + } + } + + /// Visit expression and update all references. + fn visit_expr(&self, expr: &mut Expr) { + match expr { + Expr::FieldIdentifier(expr, ..) + | Expr::IsNull(expr) + | Expr::IsNotNull(expr) + | Expr::IsTrue(expr) + | Expr::IsNotTrue(expr) + | Expr::IsFalse(expr) + | Expr::IsNotFalse(expr) + | Expr::InList { expr, .. } + | Expr::SomeOp(expr) + | Expr::AllOp(expr) + | Expr::UnaryOp { expr, .. } + | Expr::Cast { expr, .. } + | Expr::TryCast { expr, .. } + | Expr::AtTimeZone { + timestamp: expr, .. + } + | Expr::Extract { expr, .. } + | Expr::Substring { expr, .. } + | Expr::Overlay { expr, .. } + | Expr::Trim { expr, .. } + | Expr::Nested(expr) + | Expr::ArrayIndex { obj: expr, .. } => self.visit_expr(expr), + + Expr::InSubquery { expr, subquery, .. } => { + self.visit_expr(expr); + self.visit_query(subquery); + } + Expr::Between { + expr, low, high, .. + } => { + self.visit_expr(expr); + self.visit_expr(low); + self.visit_expr(high); + } + + Expr::IsDistinctFrom(expr1, expr2) + | Expr::IsNotDistinctFrom(expr1, expr2) + | Expr::BinaryOp { + left: expr1, + right: expr2, + .. + } => { + self.visit_expr(expr1); + self.visit_expr(expr2); + } + Expr::Function(function) => self.visit_function(function), + Expr::Exists(query) | Expr::Subquery(query) => self.visit_query(query), + + Expr::GroupingSets(exprs_vec) | Expr::Cube(exprs_vec) | Expr::Rollup(exprs_vec) => { + for exprs in exprs_vec { + for expr in exprs { + self.visit_expr(expr); + } + } + } + + Expr::Row(exprs) | Expr::Array(Array { elem: exprs, .. }) => { + for expr in exprs { + self.visit_expr(expr); + } + } + + // No need to visit. + Expr::Identifier(_) + | Expr::CompoundIdentifier(_) + | Expr::Collate { .. } + | Expr::Value(_) + | Expr::Parameter { .. } + | Expr::TypedString { .. } + | Expr::Case { .. } => {} + } + } + + /// Visit select item and update all references. + fn visit_select_item(&self, select_item: &mut SelectItem) { + match select_item { + SelectItem::UnnamedExpr(expr) + | SelectItem::ExprQualifiedWildcard(expr, _) + | SelectItem::ExprWithAlias { expr, .. } => self.visit_expr(expr), + SelectItem::QualifiedWildcard(_) | SelectItem::Wildcard => {} + } + } +} + +#[cfg(test)] +mod tests { + use crate::manager::catalog::utils::{alter_relation_rename, alter_relation_rename_refs}; + + #[test] + fn test_alter_table_rename() { + let definition = "CREATE TABLE foo (a int, b int)"; + let new_name = "bar"; + let expected = "CREATE TABLE bar (a INT, b INT)"; + let actual = alter_relation_rename(definition, new_name); + assert_eq!(expected, actual); + } + + #[test] + fn test_rename_index_refs() { + let definition = "CREATE INDEX idx1 ON foo(v1 DESC, v2)"; + let from = "foo"; + let to = "bar"; + let expected = "CREATE INDEX idx1 ON bar(v1 DESC, v2)"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + } + + #[test] + fn test_rename_sink_refs() { + let definition = + "CREATE SINK sink_t FROM foo WITH (connector = 'kafka', format = 'append_only')"; + let from = "foo"; + let to = "bar"; + let expected = + "CREATE SINK sink_t FROM bar WITH (connector = 'kafka', format = 'append_only')"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + } + + #[test] + fn test_rename_with_alias_refs() { + let definition = + "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM foo"; + let from = "foo"; + let to = "bar"; + let expected = + "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM bar AS foo"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + + let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE"; + let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM bar AS foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + + let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM foo AS bar WHERE bar.v1 = 1"; + let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM bar AS bar WHERE bar.v1 = 1"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + } +} diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index f4c45cbfeae8..82e853d9063d 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -15,6 +15,7 @@ use itertools::Itertools; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::{Connection, Database, Function, Schema, Source, Table, View}; +use risingwave_pb::ddl_service::alter_relation_name_request::Relation; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; @@ -62,9 +63,10 @@ pub enum DdlCommand { DropFunction(FunctionId), CreateView(View), DropView(ViewId), - CreatingStreamingJob(StreamingJob, StreamFragmentGraphProto), + CreateStreamingJob(StreamingJob, StreamFragmentGraphProto), DropStreamingJob(StreamingJobId), ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), + AlterRelationName(Relation, String), CreateConnection(Connection), DropConnection(String), } @@ -135,7 +137,7 @@ where DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await, DdlCommand::CreateView(view) => ctrl.create_view(view).await, DdlCommand::DropView(view_id) => ctrl.drop_view(view_id).await, - DdlCommand::CreatingStreamingJob(stream_job, fragment_graph) => { + DdlCommand::CreateStreamingJob(stream_job, fragment_graph) => { ctrl.create_streaming_job(stream_job, fragment_graph).await } DdlCommand::DropStreamingJob(job_id) => ctrl.drop_streaming_job(job_id).await, @@ -143,6 +145,9 @@ where ctrl.replace_table(stream_job, fragment_graph, table_col_index_mapping) .await } + DdlCommand::AlterRelationName(relation, name) => { + ctrl.alter_relation_name(relation, &name).await + } DdlCommand::CreateConnection(connection) => { ctrl.create_connection(connection).await } @@ -715,4 +720,33 @@ where .cancel_replace_table_procedure(table) .await } + + async fn alter_relation_name( + &self, + relation: Relation, + new_name: &str, + ) -> MetaResult { + match relation { + Relation::TableId(table_id) => { + self.catalog_manager + .alter_table_name(table_id, new_name) + .await + } + Relation::ViewId(view_id) => { + self.catalog_manager + .alter_view_name(view_id, new_name) + .await + } + Relation::IndexId(index_id) => { + self.catalog_manager + .alter_index_name(index_id, new_name) + .await + } + Relation::SinkId(sink_id) => { + self.catalog_manager + .alter_sink_name(sink_id, new_name) + .await + } + } + } } diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index 0186bd6525b4..7b55c1188f5b 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -236,7 +236,7 @@ where let version = self .ddl_controller - .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateSinkResponse { @@ -279,7 +279,7 @@ where let version = self .ddl_controller - .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateMaterializedViewResponse { @@ -328,7 +328,7 @@ where let version = self .ddl_controller - .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateIndexResponse { @@ -436,7 +436,7 @@ where let version = self .ddl_controller - .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateTableResponse { @@ -563,6 +563,21 @@ where } } + async fn alter_relation_name( + &self, + request: Request, + ) -> Result, Status> { + let AlterRelationNameRequest { relation, new_name } = request.into_inner(); + let version = self + .ddl_controller + .run_command(DdlCommand::AlterRelationName(relation.unwrap(), new_name)) + .await?; + Ok(Response::new(AlterRelationNameResponse { + status: None, + version, + })) + } + async fn get_ddl_progress( &self, _request: Request, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 6d2fa541d805..6638b4307360 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -43,6 +43,7 @@ use risingwave_pb::catalog::{ Connection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, }; use risingwave_pb::common::{HostAddress, WorkerType}; +use risingwave_pb::ddl_service::alter_relation_name_request::Relation; use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient; use risingwave_pb::ddl_service::drop_table_request::SourceId; use risingwave_pb::ddl_service::*; @@ -350,6 +351,19 @@ impl MetaClient { Ok((resp.table_id.into(), resp.version)) } + pub async fn alter_relation_name( + &self, + relation: Relation, + name: &str, + ) -> Result { + let request = AlterRelationNameRequest { + relation: Some(relation), + new_name: name.to_string(), + }; + let resp = self.inner.alter_relation_name(request).await?; + Ok(resp.version) + } + pub async fn replace_table( &self, table: PbTable, @@ -1382,6 +1396,7 @@ macro_rules! for_all_meta_rpc { ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse } ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse } ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse } + ,{ ddl_client, alter_relation_name, AlterRelationNameRequest, AlterRelationNameResponse } ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse } ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse } ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 4a6dfb6ba9bb..b6aa7ecb3629 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -77,6 +77,27 @@ pub enum AlterTableOperation { }, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub enum AlterIndexOperation { + RenameIndex { index_name: ObjectName }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub enum AlterViewOperation { + RenameView { view_name: ObjectName }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub enum AlterSinkOperation { + RenameSink { sink_name: ObjectName }, +} + impl fmt::Display for AlterTableOperation { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -133,6 +154,36 @@ impl fmt::Display for AlterTableOperation { } } +impl fmt::Display for AlterIndexOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlterIndexOperation::RenameIndex { index_name } => { + write!(f, "RENAME TO {index_name}") + } + } + } +} + +impl fmt::Display for AlterViewOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlterViewOperation::RenameView { view_name } => { + write!(f, "RENAME TO {view_name}") + } + } + } +} + +impl fmt::Display for AlterSinkOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlterSinkOperation::RenameSink { sink_name } => { + write!(f, "RENAME TO {sink_name}") + } + } + } +} + /// An `ALTER COLUMN` (`Statement::AlterTable`) operation #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 52f591afba58..0ade589b9418 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -43,6 +43,7 @@ pub use self::query::{ }; pub use self::statement::*; pub use self::value::{DateTimeField, DollarQuotedString, TrimWhereField, Value}; +pub use crate::ast::ddl::{AlterIndexOperation, AlterSinkOperation, AlterViewOperation}; use crate::keywords::Keyword; use crate::parser::{Parser, ParserError}; @@ -991,6 +992,25 @@ pub enum Statement { name: ObjectName, operation: AlterTableOperation, }, + /// ALTER INDEX + AlterIndex { + /// Index name + name: ObjectName, + operation: AlterIndexOperation, + }, + /// ALTER VIEW + AlterView { + /// View name + name: ObjectName, + materialized: bool, + operation: AlterViewOperation, + }, + /// ALTER SINK + AlterSink { + /// Sink name + name: ObjectName, + operation: AlterSinkOperation, + }, /// DESCRIBE TABLE OR SOURCE Describe { /// Table or Source name @@ -1363,7 +1383,7 @@ impl fmt::Display for Statement { if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" }, name = name, table_name = table_name, - columns = display_separated(columns, ","), + columns = display_comma_separated(columns), include = if include.is_empty() { "".to_string() } else { @@ -1386,6 +1406,15 @@ impl fmt::Display for Statement { Statement::AlterTable { name, operation } => { write!(f, "ALTER TABLE {} {}", name, operation) } + Statement::AlterIndex { name, operation } => { + write!(f, "ALTER INDEX {} {}", name, operation) + } + Statement::AlterView { materialized, name, operation } => { + write!(f, "ALTER {}VIEW {} {}", if *materialized { "MATERIALIZED " } else { "" }, name, operation) + } + Statement::AlterSink { name, operation } => { + write!(f, "ALTER SINK {} {}", name, operation) + } Statement::Drop(stmt) => write!(f, "DROP {}", stmt), Statement::DropFunction { if_exists, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 9da52fc43f49..aa3a4ab86cb5 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -24,7 +24,9 @@ use core::fmt; use tracing::{debug, instrument}; -use crate::ast::ddl::SourceWatermark; +use crate::ast::ddl::{ + AlterIndexOperation, AlterSinkOperation, AlterViewOperation, SourceWatermark, +}; use crate::ast::{ParseTo, *}; use crate::keywords::{self, Keyword}; use crate::tokenizer::*; @@ -2415,6 +2417,14 @@ impl Parser { pub fn parse_alter(&mut self) -> Result { if self.parse_keyword(Keyword::TABLE) { self.parse_alter_table() + } else if self.parse_keyword(Keyword::INDEX) { + self.parse_alter_index() + } else if self.parse_keyword(Keyword::VIEW) { + self.parse_alter_view(false) + } else if self.parse_keywords(&[Keyword::MATERIALIZED, Keyword::VIEW]) { + self.parse_alter_view(true) + } else if self.parse_keyword(Keyword::SINK) { + self.parse_alter_sink() } else if self.parse_keyword(Keyword::USER) { self.parse_alter_user() } else if self.parse_keyword(Keyword::SYSTEM) { @@ -2515,6 +2525,70 @@ impl Parser { }) } + pub fn parse_alter_index(&mut self) -> Result { + let index_name = self.parse_object_name()?; + let operation = if self.parse_keyword(Keyword::RENAME) { + if self.parse_keyword(Keyword::TO) { + let index_name = self.parse_object_name()?; + AlterIndexOperation::RenameIndex { index_name } + } else { + return self.expected("TO after RENAME", self.peek_token()); + } + } else { + return self.expected("RENAME after ALTER INDEX", self.peek_token()); + }; + + Ok(Statement::AlterIndex { + name: index_name, + operation, + }) + } + + pub fn parse_alter_view(&mut self, materialized: bool) -> Result { + let view_name = self.parse_object_name()?; + let operation = if self.parse_keyword(Keyword::RENAME) { + if self.parse_keyword(Keyword::TO) { + let view_name = self.parse_object_name()?; + AlterViewOperation::RenameView { view_name } + } else { + return self.expected("TO after RENAME", self.peek_token()); + } + } else { + return self.expected( + &format!( + "RENAME after ALTER {}VIEW", + if materialized { "MATERIALIZED " } else { "" } + ), + self.peek_token(), + ); + }; + + Ok(Statement::AlterView { + materialized, + name: view_name, + operation, + }) + } + + pub fn parse_alter_sink(&mut self) -> Result { + let sink_name = self.parse_object_name()?; + let operation = if self.parse_keyword(Keyword::RENAME) { + if self.parse_keyword(Keyword::TO) { + let sink_name = self.parse_object_name()?; + AlterSinkOperation::RenameSink { sink_name } + } else { + return self.expected("TO after RENAME", self.peek_token()); + } + } else { + return self.expected("RENAME after ALTER SINK", self.peek_token()); + }; + + Ok(Statement::AlterSink { + name: sink_name, + operation, + }) + } + pub fn parse_alter_system(&mut self) -> Result { self.expect_keyword(Keyword::SET)?; let param = self.parse_identifier()?; diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index 1d3d280d3e56..361940cc3d0b 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -3514,7 +3514,7 @@ fn parse_rollback() { #[test] fn parse_create_index() { - let sql = "CREATE UNIQUE INDEX IF NOT EXISTS idx_name ON test(name,age DESC) INCLUDE(other) DISTRIBUTED BY(name)"; + let sql = "CREATE UNIQUE INDEX IF NOT EXISTS idx_name ON test(name, age DESC) INCLUDE(other) DISTRIBUTED BY(name)"; let indexed_columns = vec![ OrderByExpr { expr: Expr::Identifier(Ident::new_unchecked("name")), diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 14e9c989fb6c..a5572f0e4abe 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -63,7 +63,11 @@ pub enum StatementType { DROP_SCHEMA, DROP_DATABASE, DROP_USER, + ALTER_INDEX, + ALTER_VIEW, ALTER_TABLE, + ALTER_MATERIALIZED_VIEW, + ALTER_SINK, ALTER_SYSTEM, REVOKE_PRIVILEGE, // Introduce ORDER_BY statement type cuz Calcite unvalidated AST has SqlKind.ORDER_BY. Note