Skip to content

Commit

Permalink
feat: support alter rename relations including table/mview/view/sink/…
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Mar 27, 2023
1 parent 0069678 commit 96ff27f
Show file tree
Hide file tree
Showing 27 changed files with 1,651 additions and 176 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 148 additions & 0 deletions e2e_test/ddl/alter_rename_relation.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
CREATE TABLE t (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

statement ok
CREATE TABLE t_as AS ( WITH source_data AS ( SELECT 1 AS id) SELECT * FROM source_data);

statement ok
CREATE MATERIALIZED VIEW mv AS SELECT v1, (t.v2).v1 AS v21 FROM t;

statement ok
CREATE SINK sink AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv AS mv3 WITH (
connector = 'blackhole'
);

statement ok
CREATE VIEW v1 AS ( SELECT * FROM t_as WHERE id = 1);

statement ok
CREATE VIEW v2 AS (SELECT COUNT(*) FROM t, t AS t2 WHERE t.v1 = t2.v1);

statement ok
CREATE VIEW v3 AS (SELECT MAX((t.v2).v1) FROM t AS t);

statement ok
CREATE VIEW v4 AS (SELECT * FROM t join t as t2 on (t.v1 = t2.v1) ORDER BY t.v1, t2.v1);

statement ok
CREATE index idx ON t(v1);

query TT
SHOW CREATE TABLE t;
----
public.t CREATE TABLE t (v1 INT PRIMARY KEY, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>)

# alter table rename with alias conflict
statement ok
ALTER TABLE t RENAME TO t2;

query TT
SHOW CREATE TABLE t2;
----
public.t2 CREATE TABLE t2 (v1 INT PRIMARY KEY, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>)

query TT
SHOW CREATE VIEW v2;
----
public.v2 CREATE VIEW v2 AS (SELECT COUNT(*) FROM t2 AS t, t2 AS t2 WHERE t.v1 = t2.v1)

query TT
SHOW CREATE VIEW v3;
----
public.v3 CREATE VIEW v3 AS (SELECT MAX((t.v2).v1) FROM t2 AS t)

query TT
SHOW CREATE VIEW v4;
----
public.v4 CREATE VIEW v4 AS (SELECT * FROM t2 AS t JOIN t2 AS t2 ON (t.v1 = t2.v1) ORDER BY t.v1, t2.v1)

query TT
SHOW CREATE MATERIALIZED VIEW mv;
----
public.mv CREATE MATERIALIZED VIEW mv AS SELECT v1, (t.v2).v1 AS v21 FROM t2 AS t

# alter mview rename
statement ok
ALTER MATERIALIZED VIEW mv RENAME TO mv2;

query TT
SHOW CREATE MATERIALIZED VIEW mv2;
----
public.mv2 CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 AS t

statement ok
ALTER SINK sink RENAME TO sink1;

# alter mview rename with alias conflict, used by sink1
statement ok
ALTER MATERIALIZED VIEW mv2 RENAME TO mv3;

statement ok
ALTER TABLE t_as RENAME TO t_as_1;

# alter view rename
statement ok
ALTER VIEW v1 RENAME TO v5;

query TT
SHOW CREATE VIEW v5;
----
public.v5 CREATE VIEW v5 AS (SELECT * FROM t_as_1 AS t_as WHERE id = 1)

statement ok
ALTER INDEX idx RENAME TO idx1;

statement ok
INSERT INTO t2 VALUES(1,(1,(1,2)));

statement ok
INSERT INTO t2 VALUES(2,(2,(2,4)));

query II rowsort
SELECT * from mv3
----
1 1
2 2

query I
SELECT * from v2
----
2

query I
SELECT * from v3
----
2

query IIII rowsort
SELECT * from v4
----
1 (1,(1,2)) 1 (1,(1,2))
2 (2,(2,4)) 2 (2,(2,4))

statement ok
DROP SINK sink1;

statement ok
DROP VIEW v5;

statement ok
DROP VIEW v4;

statement ok
DROP VIEW v3;

statement ok
DROP VIEW v2;

statement ok
DROP MATERIALIZED VIEW mv3;

statement ok
DROP TABLE t2;

statement ok
DROP TABLE t_as_1;
16 changes: 16 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,21 @@ message CreateTableResponse {
uint64 version = 3;
}

message AlterRelationNameRequest {
oneof relation {
uint32 table_id = 1;
uint32 view_id = 2;
uint32 index_id = 3;
uint32 sink_id = 4;
}
string new_name = 20;
}

message AlterRelationNameResponse {
common.Status status = 1;
uint64 version = 2;
}

message CreateFunctionRequest {
catalog.Function function = 1;
}
Expand Down Expand Up @@ -279,6 +294,7 @@ service DdlService {
rpc CreateMaterializedView(CreateMaterializedViewRequest) returns (CreateMaterializedViewResponse);
rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse);
rpc CreateTable(CreateTableRequest) returns (CreateTableResponse);
rpc AlterRelationName(AlterRelationNameRequest) returns (AlterRelationNameResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
rpc CreateView(CreateViewRequest) returns (CreateViewResponse);
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ impl Binder {
self.param_types.export()
}

pub fn shared_views(&self) -> &HashMap<ViewId, ShareId> {
&self.shared_views
}

fn push_context(&mut self) {
let new_context = std::mem::take(&mut self.context);
self.context.cte_to_relation = new_context.cte_to_relation.clone();
Expand Down
15 changes: 15 additions & 0 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,21 @@ impl Binder {
Self::resolve_single_name(name.0, "index name")
}

/// return the `view_name`
pub fn resolve_view_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "view name")
}

/// return the `sink_name`
pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "sink name")
}

/// return the `table_name`
pub fn resolve_table_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "table name")
}

/// return the `user_name`
pub fn resolve_user_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "user name")
Expand Down
41 changes: 41 additions & 0 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;
Expand Down Expand Up @@ -114,6 +115,14 @@ pub trait CatalogWriter: Send + Sync {
async fn drop_index(&self, index_id: IndexId) -> Result<()>;

async fn drop_function(&self, function_id: FunctionId) -> Result<()>;

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>;

async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>;

async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()>;

async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -264,6 +273,38 @@ impl CatalogWriter for CatalogWriterImpl {
let version = self.meta_client.drop_database(database_id).await?;
self.wait_version(version).await
}

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_relation_name(Relation::TableId(table_id), table_name)
.await?;
self.wait_version(version).await
}

async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_relation_name(Relation::ViewId(view_id), view_name)
.await?;
self.wait_version(version).await
}

async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_relation_name(Relation::IndexId(index_id), index_name)
.await?;
self.wait_version(version).await
}

async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_relation_name(Relation::SinkId(sink_id), sink_name)
.await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use risingwave_pb::catalog::PbIndex;
use risingwave_pb::expr::expr_node::RexNode;

use super::ColumnId;
use crate::catalog::{DatabaseId, SchemaId, TableCatalog};
use crate::catalog::{DatabaseId, RelationCatalog, SchemaId, TableCatalog};
use crate::expr::{Expr, InputRef};
use crate::user::UserId;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct IndexCatalog {
Expand Down Expand Up @@ -152,3 +153,9 @@ impl IndexCatalog {
}
}
}

impl RelationCatalog for IndexCatalog {
fn owner(&self) -> UserId {
self.index_table.owner
}
}
48 changes: 48 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ impl Catalog {
.drop_source(source_id);
}

pub fn update_source(&mut self, proto: &PbSource) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_source(proto);
}

pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
self.get_database_mut(db_id)
.unwrap()
Expand All @@ -246,6 +254,14 @@ impl Catalog {
.drop_sink(sink_id);
}

pub fn update_sink(&mut self, proto: &PbSink) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_sink(proto);
}

pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
self.get_database_mut(db_id)
.unwrap()
Expand All @@ -262,6 +278,14 @@ impl Catalog {
.drop_view(view_id);
}

pub fn update_view(&mut self, proto: &PbView) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_view(proto);
}

pub fn drop_function(
&mut self,
db_id: DatabaseId,
Expand Down Expand Up @@ -379,6 +403,30 @@ impl Catalog {
.ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
}

// Used by test_utils only.
pub fn alter_table_name_by_id(&mut self, table_id: &TableId, table_name: &str) {
let (mut database_id, mut schema_id) = (0, 0);
let mut found = false;
for database in self.database_by_name.values() {
if !found {
for schema in database.iter_schemas() {
if schema.iter_table().any(|t| t.id() == *table_id) {
found = true;
database_id = database.id();
schema_id = schema.id();
break;
}
}
}
}

if found {
let mut table = self.get_table_by_id(table_id).unwrap();
table.name = table_name.to_string();
self.update_table(&table.to_prost(schema_id, database_id));
}
}

#[cfg(test)]
pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
self.table_by_id.insert(
Expand Down
Loading

0 comments on commit 96ff27f

Please sign in to comment.