diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 4258636e804a..48a5b9197927 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -57,7 +57,6 @@ create table shipments ( username = 'postgres', password = 'postgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipments', slot.name = 'shipments' ); diff --git a/e2e_test/source/cdc/cdc.validate.postgres.slt b/e2e_test/source/cdc/cdc.validate.postgres.slt index ddc46fe92c37..15fce0b554dc 100644 --- a/e2e_test/source/cdc/cdc.validate.postgres.slt +++ b/e2e_test/source/cdc/cdc.validate.postgres.slt @@ -16,7 +16,6 @@ create table shipments ( username = 'posres', password = 'postgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipments', slot.name = 'shipments' ); @@ -38,7 +37,6 @@ create table shipments ( username = 'postgres', password = 'otgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipments', slot.name = 'shipments' ); @@ -59,7 +57,6 @@ create table shipments ( username = 'postgres', password = 'postgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipment', slot.name = 'shipments' ); diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java index 8d9cf56ca8bc..686caf5ef046 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java @@ -3,7 +3,22 @@ import java.util.HashMap; import java.util.Map; -public class ConnectorConfig { +public class ConnectorConfig extends HashMap { + + public ConnectorConfig() {} + + public ConnectorConfig(Map m) { + super(m); + } + + public String getNonNull(String key) { + String value = super.get(key); + if (value == null) { + throw new RuntimeException(key + "cannot be null"); + } + return value; + } + /* Common configs */ public static final String HOST = "hostname"; public static final String PORT = "port"; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java index 14f715aa4f7d..cd246e894017 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java @@ -37,7 +37,7 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) { SourceTypeE.valueOf(startRequest.getSourceType()), startRequest.getSourceId(), startRequest.getStartOffset(), - startRequest.getPropertiesMap()); + new ConnectorConfig(startRequest.getPropertiesMap())); if (handler == null) { LOG.error("failed to create source handler"); responseObserver.onCompleted(); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java index 9b3a6239126d..d862fa7cbfba 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java @@ -1,10 +1,10 @@ package com.risingwave.sourcenode.core; +import com.risingwave.connector.api.source.ConnectorConfig; import com.risingwave.connector.api.source.SourceHandler; import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.sourcenode.mysql.MySqlSourceConfig; import com.risingwave.sourcenode.postgres.PostgresSourceConfig; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,7 +12,7 @@ public abstract class SourceHandlerFactory { static final Logger LOG = LoggerFactory.getLogger(SourceHandlerFactory.class); public static SourceHandler createSourceHandler( - SourceTypeE type, long sourceId, String startOffset, Map userProps) { + SourceTypeE type, long sourceId, String startOffset, ConnectorConfig userProps) { switch (type) { case MYSQL: return DefaultSourceHandler.newWithConfig( diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java index 2939e3e5d367..6b13ce97a865 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java @@ -5,7 +5,6 @@ import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore; import com.risingwave.sourcenode.common.DebeziumCdcUtils; -import java.util.Map; import java.util.Properties; /** MySQL Source Config */ @@ -15,7 +14,7 @@ public class MySqlSourceConfig implements SourceConfig { private final long id; private final String sourceName; - public MySqlSourceConfig(long sourceId, String startOffset, Map userProps) { + public MySqlSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) { id = sourceId; props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); props.setProperty( @@ -42,9 +41,9 @@ public MySqlSourceConfig(long sourceId, String startOffset, Map props.setProperty("database.include.list", userProps.get(ConnectorConfig.DB_NAME)); // only captures data of the specified table String tableFilter = - userProps.get(ConnectorConfig.DB_NAME) + userProps.getNonNull(ConnectorConfig.DB_NAME) + "." - + userProps.get(ConnectorConfig.TABLE_NAME); + + userProps.getNonNull(ConnectorConfig.TABLE_NAME); props.setProperty("table.include.list", tableFilter); // disable schema change events for current stage diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java index a314f037e8ae..8c2451d86c69 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java @@ -7,7 +7,6 @@ import com.risingwave.sourcenode.common.DebeziumCdcUtils; import io.debezium.heartbeat.Heartbeat; import java.time.Duration; -import java.util.Map; import java.util.Properties; /** Postgres Source Config */ @@ -18,7 +17,7 @@ public class PostgresSourceConfig implements SourceConfig { private final String sourceName; private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis(); - public PostgresSourceConfig(long sourceId, String startOffset, Map userProps) { + public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) { id = sourceId; props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); props.setProperty( @@ -32,12 +31,16 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map) -> bool pub(crate) async fn resolve_source_schema( source_schema: SourceSchema, columns: &mut Vec, - with_properties: &HashMap, + with_properties: &mut HashMap, row_id_index: &mut Option, pk_column_ids: &mut Vec, is_materialized: bool, @@ -213,11 +213,7 @@ pub(crate) async fn resolve_source_schema( columns_extend( columns, - extract_protobuf_table_schema( - protobuf_schema, - with_properties.clone().into_iter().collect(), - ) - .await?, + extract_protobuf_table_schema(protobuf_schema, with_properties.clone()).await?, ); StreamSourceInfo { @@ -525,7 +521,7 @@ fn source_shema_to_row_format(source_schema: &SourceSchema) -> RowFormatType { fn validate_compatibility( source_schema: &SourceSchema, - props: &HashMap, + props: &mut HashMap, ) -> Result<()> { let connector = get_connector(props); let row_format = source_shema_to_row_format(source_schema); @@ -561,6 +557,19 @@ fn validate_compatibility( connector, row_format )))); } + + if connector == POSTGRES_CDC_CONNECTOR { + if !props.contains_key("slot.name") { + // Build a random slot name with UUID + // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815" + let uuid = uuid::Uuid::new_v4().to_string().replace('-', ""); + props.insert("slot.name".into(), format!("rw_cdc_{}", uuid)); + } + if !props.contains_key("schema.name") { + // Default schema name is "public" + props.insert("schema.name".into(), "public".into()); + } + } Ok(()) } @@ -576,7 +585,7 @@ pub async fn handle_create_source( let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, stmt.source_name)?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; - let with_properties = handler_args + let mut with_properties = handler_args .with_options .inner() .clone() @@ -606,7 +615,7 @@ pub async fn handle_create_source( let source_info = resolve_source_schema( stmt.source_schema, &mut columns, - &with_properties, + &mut with_properties, &mut row_id_index, &mut pk_column_ids, false, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 5276ae8c8fe5..571685f9a5e5 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -292,7 +292,7 @@ pub(crate) async fn gen_create_table_plan_with_source( append_only: bool, ) -> Result<(PlanRef, Option, ProstTable)> { let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; - let properties = context.with_options().inner().clone().into_iter().collect(); + let mut properties = context.with_options().inner().clone().into_iter().collect(); let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; @@ -311,7 +311,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let source_info = resolve_source_schema( source_schema, &mut columns, - &properties, + &mut properties, &mut row_id_index, &mut pk_column_ids, true, @@ -322,6 +322,7 @@ pub(crate) async fn gen_create_table_plan_with_source( context.into(), table_name, columns, + properties, pk_column_ids, row_id_index, Some(source_info), @@ -346,12 +347,14 @@ pub(crate) fn gen_create_table_plan( let definition = context.normalized_sql().to_owned(); let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; + let properties = context.with_options().inner().clone().into_iter().collect(); gen_create_table_plan_without_bind( context, table_name, column_descs, pk_column_id_from_columns, constraints, + properties, definition, source_watermarks, append_only, @@ -366,6 +369,7 @@ pub(crate) fn gen_create_table_plan_without_bind( column_descs: Vec, pk_column_id_from_columns: Option, constraints: Vec, + properties: HashMap, definition: String, source_watermarks: Vec, append_only: bool, @@ -385,6 +389,7 @@ pub(crate) fn gen_create_table_plan_without_bind( context.into(), table_name, columns, + properties, pk_column_ids, row_id_index, None, @@ -400,6 +405,7 @@ fn gen_table_plan_inner( context: OptimizerContextRef, table_name: ObjectName, columns: Vec, + properties: HashMap, pk_column_ids: Vec, row_id_index: Option, source_info: Option, @@ -425,7 +431,7 @@ fn gen_table_plan_inner( .map(|column| column.to_protobuf()) .collect_vec(), pk_column_ids: pk_column_ids.iter().map(Into::into).collect_vec(), - properties: context.with_options().inner().clone().into_iter().collect(), + properties, info: Some(source_info), owner: session.user_id(), watermark_descs: watermark_descs.clone(), diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 57f6fc5eb02e..4db3dbd5cfa6 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -87,12 +87,19 @@ pub async fn handle_create_as( let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args.clone()); + let properties = handler_args + .with_options + .inner() + .clone() + .into_iter() + .collect(); let (plan, source, table) = gen_create_table_plan_without_bind( context, table_name.clone(), column_descs, None, vec![], + properties, "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` vec![], // No watermark should be defined in for `CREATE TABLE AS` append_only,