From 8ee7c2c4378cf7a91ce3c9ab0dd02e496a3702f0 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 9 Mar 2023 17:13:40 +0800 Subject: [PATCH 01/12] fix: should throw error for some null configs --- .../connector/api/source/ConnectorConfig.java | 18 +++++++++++++++++- .../sourcenode/SourceRequestHandler.java | 2 +- .../sourcenode/core/SourceHandlerFactory.java | 3 ++- .../sourcenode/mysql/MySqlSourceConfig.java | 6 +++--- .../postgres/PostgresSourceConfig.java | 16 ++++++++-------- 5 files changed, 31 insertions(+), 14 deletions(-) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java index 8d9cf56ca8bc..7428bf9c4d74 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java @@ -3,7 +3,23 @@ import java.util.HashMap; import java.util.Map; -public class ConnectorConfig { +public class ConnectorConfig extends HashMap { + + public ConnectorConfig() { + } + + public ConnectorConfig(Map m) { + super(m); + } + + public String getNonNull(String key) { + String value = super.get(key); + if (value == null) { + throw new RuntimeException(key + "cannot be null"); + } + return value; + } + /* Common configs */ public static final String HOST = "hostname"; public static final String PORT = "port"; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java index 14f715aa4f7d..cd246e894017 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java @@ -37,7 +37,7 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) { SourceTypeE.valueOf(startRequest.getSourceType()), startRequest.getSourceId(), startRequest.getStartOffset(), - startRequest.getPropertiesMap()); + new ConnectorConfig(startRequest.getPropertiesMap())); if (handler == null) { LOG.error("failed to create source handler"); responseObserver.onCompleted(); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java index 9b3a6239126d..b55dad5da5bb 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java @@ -1,5 +1,6 @@ package com.risingwave.sourcenode.core; +import com.risingwave.connector.api.source.ConnectorConfig; import com.risingwave.connector.api.source.SourceHandler; import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.sourcenode.mysql.MySqlSourceConfig; @@ -12,7 +13,7 @@ public abstract class SourceHandlerFactory { static final Logger LOG = LoggerFactory.getLogger(SourceHandlerFactory.class); public static SourceHandler createSourceHandler( - SourceTypeE type, long sourceId, String startOffset, Map userProps) { + SourceTypeE type, long sourceId, String startOffset, ConnectorConfig userProps) { switch (type) { case MYSQL: return DefaultSourceHandler.newWithConfig( diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java index 2939e3e5d367..1a39ebb22068 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java @@ -15,7 +15,7 @@ public class MySqlSourceConfig implements SourceConfig { private final long id; private final String sourceName; - public MySqlSourceConfig(long sourceId, String startOffset, Map userProps) { + public MySqlSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) { id = sourceId; props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); props.setProperty( @@ -42,9 +42,9 @@ public MySqlSourceConfig(long sourceId, String startOffset, Map props.setProperty("database.include.list", userProps.get(ConnectorConfig.DB_NAME)); // only captures data of the specified table String tableFilter = - userProps.get(ConnectorConfig.DB_NAME) + userProps.getNonNull(ConnectorConfig.DB_NAME) + "." - + userProps.get(ConnectorConfig.TABLE_NAME); + + userProps.getNonNull(ConnectorConfig.TABLE_NAME); props.setProperty("table.include.list", tableFilter); // disable schema change events for current stage diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java index a314f037e8ae..762a2014a431 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java @@ -18,7 +18,7 @@ public class PostgresSourceConfig implements SourceConfig { private final String sourceName; private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis(); - public PostgresSourceConfig(long sourceId, String startOffset, Map userProps) { + public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) { id = sourceId; props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); props.setProperty( @@ -64,23 +64,23 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map Date: Thu, 9 Mar 2023 17:17:02 +0800 Subject: [PATCH 02/12] PG default schema name: 'public' --- .../connector/api/source/ConnectorConfig.java | 2 ++ .../postgres/PostgresSourceConfig.java | 18 ++++++++---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java index 7428bf9c4d74..c7dbe1b095d1 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java @@ -36,6 +36,8 @@ public String getNonNull(String key) { public static final String PG_SLOT_NAME = "slot.name"; public static final String PG_SCHEMA_NAME = "schema.name"; + public static final String PG_DEFAULT_SCHEMA = "public"; + public static Map extractDebeziumProperties(Map properties) { // retain only debezium properties if any var userProps = new HashMap<>(properties); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java index 762a2014a431..271cfb94eaef 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java @@ -32,12 +32,16 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u props.setProperty(ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } + String dbName = userProps.getNonNull(ConnectorConfig.DB_NAME); + String schema = userProps.getOrDefault(ConnectorConfig.PG_SCHEMA_NAME, ConnectorConfig.PG_DEFAULT_SCHEMA); + String table = userProps.getNonNull(ConnectorConfig.TABLE_NAME); + // Begin of connector configs props.setProperty("database.hostname", userProps.get(ConnectorConfig.HOST)); props.setProperty("database.port", userProps.get(ConnectorConfig.PORT)); props.setProperty("database.user", userProps.get(ConnectorConfig.USER)); props.setProperty("database.password", userProps.get(ConnectorConfig.PASSWORD)); - props.setProperty("database.dbname", userProps.get(ConnectorConfig.DB_NAME)); + props.setProperty("database.dbname", dbName); // The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. // Supported values are decoderbufs, and pgoutput. // The wal2json plug-in is deprecated and scheduled for removal. @@ -63,10 +67,8 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()); - String tableFilter = - userProps.getNonNull(ConnectorConfig.PG_SCHEMA_NAME) - + "." - + userProps.getNonNull(ConnectorConfig.TABLE_NAME); + + String tableFilter = schema + "." + table; props.setProperty("table.include.list", tableFilter); props.setProperty("database.server.name", DB_SERVER_NAME_PREFIX + tableFilter); @@ -76,11 +78,7 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u + ":" + userProps.getNonNull(ConnectorConfig.PORT) + ":" - + userProps.getNonNull(ConnectorConfig.DB_NAME) - + "." - + userProps.getNonNull(ConnectorConfig.PG_SCHEMA_NAME) - + "." - + userProps.getNonNull(ConnectorConfig.TABLE_NAME); + + dbName + "." + schema + "." + table; props.setProperty("name", sourceName); // pass through debezium properties if any From 8da3a377b3e96eda18227b45852a56a8aa16f79d Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 9 Mar 2023 17:27:12 +0800 Subject: [PATCH 03/12] auto generate slot for PG-cdc --- .../connector/api/source/ConnectorConfig.java | 12 ++++++++++-- .../sourcenode/postgres/PostgresSourceConfig.java | 10 ++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java index c7dbe1b095d1..43f51a6505b8 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java @@ -2,11 +2,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; public class ConnectorConfig extends HashMap { - public ConnectorConfig() { - } + public ConnectorConfig() {} public ConnectorConfig(Map m) { super(m); @@ -20,6 +20,14 @@ public String getNonNull(String key) { return value; } + public String getOrCompute(String key, Supplier fn) { + String value = super.get(key); + if (value == null) { + return fn.get(); + } + return value; + } + /* Common configs */ public static final String HOST = "hostname"; public static final String PORT = "port"; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java index 271cfb94eaef..b98eab310dc1 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java @@ -7,8 +7,8 @@ import com.risingwave.sourcenode.common.DebeziumCdcUtils; import io.debezium.heartbeat.Heartbeat; import java.time.Duration; -import java.util.Map; import java.util.Properties; +import java.util.UUID; /** Postgres Source Config */ public class PostgresSourceConfig implements SourceConfig { @@ -56,7 +56,8 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u // Slot names must conform to PostgreSQL replication slot naming rules, // which state: "Each replication slot has a name, which can contain lower-case letters, // numbers, and the underscore character." - props.setProperty("slot.name", userProps.get(ConnectorConfig.PG_SLOT_NAME)); + props.setProperty("slot.name", userProps.getOrCompute(ConnectorConfig.PG_SLOT_NAME, + PostgresSourceConfig::generateReplicaSlot)); // Sending heartbeat messages enables the connector to send the latest retrieved LSN to the // database, which allows the database to reclaim disk space being @@ -105,4 +106,9 @@ public SourceTypeE getSourceType() { public Properties getProperties() { return props; } + + private static String generateReplicaSlot() { + // Example: rw_cdc_f9a3567e6dd54bf5900444c8b1c03815 + return "rw_cdc_" + UUID.randomUUID().toString().replace("-", ""); + } } From d3045af840dc0d32d0d09016d582a4273d853ff8 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 9 Mar 2023 17:52:41 +0800 Subject: [PATCH 04/12] fix checkstyle --- .../sourcenode/core/SourceHandlerFactory.java | 1 - .../sourcenode/mysql/MySqlSourceConfig.java | 1 - .../postgres/PostgresSourceConfig.java | 17 ++++++++++++----- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java index b55dad5da5bb..d862fa7cbfba 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java @@ -5,7 +5,6 @@ import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.sourcenode.mysql.MySqlSourceConfig; import com.risingwave.sourcenode.postgres.PostgresSourceConfig; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java index 1a39ebb22068..6b13ce97a865 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java @@ -5,7 +5,6 @@ import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore; import com.risingwave.sourcenode.common.DebeziumCdcUtils; -import java.util.Map; import java.util.Properties; /** MySQL Source Config */ diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java index b98eab310dc1..325247f8dc63 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java @@ -33,7 +33,9 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u } String dbName = userProps.getNonNull(ConnectorConfig.DB_NAME); - String schema = userProps.getOrDefault(ConnectorConfig.PG_SCHEMA_NAME, ConnectorConfig.PG_DEFAULT_SCHEMA); + String schema = + userProps.getOrDefault( + ConnectorConfig.PG_SCHEMA_NAME, ConnectorConfig.PG_DEFAULT_SCHEMA); String table = userProps.getNonNull(ConnectorConfig.TABLE_NAME); // Begin of connector configs @@ -56,8 +58,10 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u // Slot names must conform to PostgreSQL replication slot naming rules, // which state: "Each replication slot has a name, which can contain lower-case letters, // numbers, and the underscore character." - props.setProperty("slot.name", userProps.getOrCompute(ConnectorConfig.PG_SLOT_NAME, - PostgresSourceConfig::generateReplicaSlot)); + props.setProperty( + "slot.name", + userProps.getOrCompute( + ConnectorConfig.PG_SLOT_NAME, PostgresSourceConfig::generateReplicaSlot)); // Sending heartbeat messages enables the connector to send the latest retrieved LSN to the // database, which allows the database to reclaim disk space being @@ -68,7 +72,6 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()); - String tableFilter = schema + "." + table; props.setProperty("table.include.list", tableFilter); props.setProperty("database.server.name", DB_SERVER_NAME_PREFIX + tableFilter); @@ -79,7 +82,11 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u + ":" + userProps.getNonNull(ConnectorConfig.PORT) + ":" - + dbName + "." + schema + "." + table; + + dbName + + "." + + schema + + "." + + table; props.setProperty("name", sourceName); // pass through debezium properties if any From b61f24ad2506e038117464f327af3f52f0b840a6 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 9 Mar 2023 17:55:41 +0800 Subject: [PATCH 05/12] Revert "auto generate slot for PG-cdc" This reverts commit 8da3a377b3e96eda18227b45852a56a8aa16f79d. --- .../connector/api/source/ConnectorConfig.java | 9 --------- .../sourcenode/postgres/PostgresSourceConfig.java | 11 +---------- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java index 43f51a6505b8..7657f96c51ba 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java @@ -2,7 +2,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.function.Supplier; public class ConnectorConfig extends HashMap { @@ -20,14 +19,6 @@ public String getNonNull(String key) { return value; } - public String getOrCompute(String key, Supplier fn) { - String value = super.get(key); - if (value == null) { - return fn.get(); - } - return value; - } - /* Common configs */ public static final String HOST = "hostname"; public static final String PORT = "port"; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java index 325247f8dc63..884cf3e6fba4 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java @@ -8,7 +8,6 @@ import io.debezium.heartbeat.Heartbeat; import java.time.Duration; import java.util.Properties; -import java.util.UUID; /** Postgres Source Config */ public class PostgresSourceConfig implements SourceConfig { @@ -58,10 +57,7 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u // Slot names must conform to PostgreSQL replication slot naming rules, // which state: "Each replication slot has a name, which can contain lower-case letters, // numbers, and the underscore character." - props.setProperty( - "slot.name", - userProps.getOrCompute( - ConnectorConfig.PG_SLOT_NAME, PostgresSourceConfig::generateReplicaSlot)); + props.setProperty("slot.name", userProps.get(ConnectorConfig.PG_SLOT_NAME)); // Sending heartbeat messages enables the connector to send the latest retrieved LSN to the // database, which allows the database to reclaim disk space being @@ -113,9 +109,4 @@ public SourceTypeE getSourceType() { public Properties getProperties() { return props; } - - private static String generateReplicaSlot() { - // Example: rw_cdc_f9a3567e6dd54bf5900444c8b1c03815 - return "rw_cdc_" + UUID.randomUUID().toString().replace("-", ""); - } } From 1e7ff00a5fd5319989725ee2b845ea8ca5e04eea Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 9 Mar 2023 18:33:51 +0800 Subject: [PATCH 06/12] revert default schema --- .../com/risingwave/connector/api/source/ConnectorConfig.java | 2 -- .../risingwave/sourcenode/postgres/PostgresSourceConfig.java | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java index 7657f96c51ba..686caf5ef046 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java @@ -35,8 +35,6 @@ public String getNonNull(String key) { public static final String PG_SLOT_NAME = "slot.name"; public static final String PG_SCHEMA_NAME = "schema.name"; - public static final String PG_DEFAULT_SCHEMA = "public"; - public static Map extractDebeziumProperties(Map properties) { // retain only debezium properties if any var userProps = new HashMap<>(properties); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java index 884cf3e6fba4..8c2451d86c69 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java @@ -32,9 +32,7 @@ public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig u } String dbName = userProps.getNonNull(ConnectorConfig.DB_NAME); - String schema = - userProps.getOrDefault( - ConnectorConfig.PG_SCHEMA_NAME, ConnectorConfig.PG_DEFAULT_SCHEMA); + String schema = userProps.getNonNull(ConnectorConfig.PG_SCHEMA_NAME); String table = userProps.getNonNull(ConnectorConfig.TABLE_NAME); // Begin of connector configs From f3568aa97c00e244d9f7aca2137c7dacad084d20 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 9 Mar 2023 18:34:55 +0800 Subject: [PATCH 07/12] implement on Meta side --- Cargo.lock | 1 + src/connector/Cargo.toml | 1 + src/connector/src/source/base.rs | 24 ++++++++++++++++++------ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1dea5f4876eb..46c0108e0b7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6041,6 +6041,7 @@ dependencies = [ "tracing", "url", "urlencoding", + "uuid", "wiremock", "workspace-hack", ] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 3643f351d555..46a86548ae43 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -69,6 +69,7 @@ tonic = { version = "0.2", package = "madsim-tonic" } tracing = "0.1" url = "2" urlencoding = "2" +uuid = "1" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 6afd5df56c31..2f8094e52f50 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -217,7 +217,7 @@ pub enum ConnectorProperties { impl ConnectorProperties { fn new_cdc_properties( connector_name: &str, - properties: HashMap, + mut properties: HashMap, ) -> Result { match connector_name { MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties { @@ -225,11 +225,23 @@ impl ConnectorProperties { source_type: "mysql".to_string(), ..Default::default() }))), - POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties { - props: properties, - source_type: "postgres".to_string(), - ..Default::default() - }))), + POSTGRES_CDC_CONNECTOR => { + if !properties.contains_key("slot.name") { + // Build a random slot name with UUID + // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815" + let uuid = uuid::Uuid::new_v4().to_string().replace("-", ""); + properties.insert("slot.name".into(), format!("rw_cdc_{}", uuid)); + } + if !properties.contains_key("schema.name") { + // Default schema name is "public" + properties.insert("schema.name".into(), "public".into()); + } + Ok(Self::PostgresCdc(Box::new(CdcProperties { + props: properties, + source_type: "postgres".to_string(), + ..Default::default() + }))) + } _ => Err(anyhow!("unexpected cdc connector '{}'", connector_name,)), } } From dc828029f6ec0c2dd60f2541a587551d1bfb5a30 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 10 Mar 2023 15:10:01 +0800 Subject: [PATCH 08/12] Revert "implement on Meta side" This reverts commit f3568aa97c00e244d9f7aca2137c7dacad084d20. --- Cargo.lock | 1 - src/connector/Cargo.toml | 1 - src/connector/src/source/base.rs | 24 ++++++------------------ 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46c0108e0b7f..1dea5f4876eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6041,7 +6041,6 @@ dependencies = [ "tracing", "url", "urlencoding", - "uuid", "wiremock", "workspace-hack", ] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 46a86548ae43..3643f351d555 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -69,7 +69,6 @@ tonic = { version = "0.2", package = "madsim-tonic" } tracing = "0.1" url = "2" urlencoding = "2" -uuid = "1" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 2f8094e52f50..6afd5df56c31 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -217,7 +217,7 @@ pub enum ConnectorProperties { impl ConnectorProperties { fn new_cdc_properties( connector_name: &str, - mut properties: HashMap, + properties: HashMap, ) -> Result { match connector_name { MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties { @@ -225,23 +225,11 @@ impl ConnectorProperties { source_type: "mysql".to_string(), ..Default::default() }))), - POSTGRES_CDC_CONNECTOR => { - if !properties.contains_key("slot.name") { - // Build a random slot name with UUID - // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815" - let uuid = uuid::Uuid::new_v4().to_string().replace("-", ""); - properties.insert("slot.name".into(), format!("rw_cdc_{}", uuid)); - } - if !properties.contains_key("schema.name") { - // Default schema name is "public" - properties.insert("schema.name".into(), "public".into()); - } - Ok(Self::PostgresCdc(Box::new(CdcProperties { - props: properties, - source_type: "postgres".to_string(), - ..Default::default() - }))) - } + POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties { + props: properties, + source_type: "postgres".to_string(), + ..Default::default() + }))), _ => Err(anyhow!("unexpected cdc connector '{}'", connector_name,)), } } From a812d737c564844a348ced4678afcf44aa4ffba1 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 10 Mar 2023 15:10:10 +0800 Subject: [PATCH 09/12] reimpl in frontend --- src/frontend/src/handler/create_source.rs | 21 +++++++++++++++++---- src/frontend/src/handler/create_table.rs | 4 ++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index b5ed06085086..7d2edc27acb9 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -185,7 +185,7 @@ pub(crate) fn is_kafka_source(with_properties: &HashMap) -> bool pub(crate) async fn resolve_source_schema( source_schema: SourceSchema, columns: &mut Vec, - with_properties: &HashMap, + with_properties: &mut HashMap, row_id_index: &mut Option, pk_column_ids: &mut Vec, is_materialized: bool, @@ -525,7 +525,7 @@ fn source_shema_to_row_format(source_schema: &SourceSchema) -> RowFormatType { fn validate_compatibility( source_schema: &SourceSchema, - props: &HashMap, + props: &mut HashMap, ) -> Result<()> { let connector = get_connector(props); let row_format = source_shema_to_row_format(source_schema); @@ -561,6 +561,19 @@ fn validate_compatibility( connector, row_format )))); } + + if connector == POSTGRES_CDC_CONNECTOR { + if !props.contains_key("slot.name") { + // Build a random slot name with UUID + // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815" + let uuid = uuid::Uuid::new_v4().to_string().replace("-", ""); + props.insert("slot.name".into(), format!("rw_cdc_{}", uuid)); + } + if !props.contains_key("schema.name") { + // Default schema name is "public" + props.insert("schema.name".into(), "public".into()); + } + } Ok(()) } @@ -576,7 +589,7 @@ pub async fn handle_create_source( let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, stmt.source_name)?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; - let with_properties = handler_args + let mut with_properties = handler_args .with_options .inner() .clone() @@ -606,7 +619,7 @@ pub async fn handle_create_source( let source_info = resolve_source_schema( stmt.source_schema, &mut columns, - &with_properties, + &mut with_properties, &mut row_id_index, &mut pk_column_ids, false, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 5276ae8c8fe5..523671a9e864 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -292,7 +292,7 @@ pub(crate) async fn gen_create_table_plan_with_source( append_only: bool, ) -> Result<(PlanRef, Option, ProstTable)> { let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; - let properties = context.with_options().inner().clone().into_iter().collect(); + let mut properties = context.with_options().inner().clone().into_iter().collect(); let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; @@ -311,7 +311,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let source_info = resolve_source_schema( source_schema, &mut columns, - &properties, + &mut properties, &mut row_id_index, &mut pk_column_ids, true, From 22925f07ec96db3d0b9e34c38fee5487de38d93f Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 10 Mar 2023 15:16:46 +0800 Subject: [PATCH 10/12] update test cases --- e2e_test/source/cdc/cdc.load.slt | 1 - e2e_test/source/cdc/cdc.validate.postgres.slt | 3 --- 2 files changed, 4 deletions(-) diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 4258636e804a..48a5b9197927 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -57,7 +57,6 @@ create table shipments ( username = 'postgres', password = 'postgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipments', slot.name = 'shipments' ); diff --git a/e2e_test/source/cdc/cdc.validate.postgres.slt b/e2e_test/source/cdc/cdc.validate.postgres.slt index ddc46fe92c37..15fce0b554dc 100644 --- a/e2e_test/source/cdc/cdc.validate.postgres.slt +++ b/e2e_test/source/cdc/cdc.validate.postgres.slt @@ -16,7 +16,6 @@ create table shipments ( username = 'posres', password = 'postgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipments', slot.name = 'shipments' ); @@ -38,7 +37,6 @@ create table shipments ( username = 'postgres', password = 'otgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipments', slot.name = 'shipments' ); @@ -59,7 +57,6 @@ create table shipments ( username = 'postgres', password = 'postgres', database.name = 'cdc_test', - schema.name = 'public', table.name = 'shipment', slot.name = 'shipments' ); From c184bef2da40a44ff178cda508a39b77e0c7da99 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 10 Mar 2023 15:42:17 +0800 Subject: [PATCH 11/12] fix cargo clippy --- src/frontend/src/handler/create_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 7d2edc27acb9..52fc880faf15 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -566,7 +566,7 @@ fn validate_compatibility( if !props.contains_key("slot.name") { // Build a random slot name with UUID // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815" - let uuid = uuid::Uuid::new_v4().to_string().replace("-", ""); + let uuid = uuid::Uuid::new_v4().to_string().replace('-', ""); props.insert("slot.name".into(), format!("rw_cdc_{}", uuid)); } if !props.contains_key("schema.name") { From 439df99ddfe5a2a6ba4b5684ac6c83dd5c46805d Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 10 Mar 2023 16:28:35 +0800 Subject: [PATCH 12/12] fuck --- src/frontend/src/handler/create_source.rs | 6 +----- src/frontend/src/handler/create_table.rs | 8 +++++++- src/frontend/src/handler/create_table_as.rs | 7 +++++++ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 52fc880faf15..019c219bb477 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -213,11 +213,7 @@ pub(crate) async fn resolve_source_schema( columns_extend( columns, - extract_protobuf_table_schema( - protobuf_schema, - with_properties.clone().into_iter().collect(), - ) - .await?, + extract_protobuf_table_schema(protobuf_schema, with_properties.clone()).await?, ); StreamSourceInfo { diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 523671a9e864..571685f9a5e5 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -322,6 +322,7 @@ pub(crate) async fn gen_create_table_plan_with_source( context.into(), table_name, columns, + properties, pk_column_ids, row_id_index, Some(source_info), @@ -346,12 +347,14 @@ pub(crate) fn gen_create_table_plan( let definition = context.normalized_sql().to_owned(); let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; + let properties = context.with_options().inner().clone().into_iter().collect(); gen_create_table_plan_without_bind( context, table_name, column_descs, pk_column_id_from_columns, constraints, + properties, definition, source_watermarks, append_only, @@ -366,6 +369,7 @@ pub(crate) fn gen_create_table_plan_without_bind( column_descs: Vec, pk_column_id_from_columns: Option, constraints: Vec, + properties: HashMap, definition: String, source_watermarks: Vec, append_only: bool, @@ -385,6 +389,7 @@ pub(crate) fn gen_create_table_plan_without_bind( context.into(), table_name, columns, + properties, pk_column_ids, row_id_index, None, @@ -400,6 +405,7 @@ fn gen_table_plan_inner( context: OptimizerContextRef, table_name: ObjectName, columns: Vec, + properties: HashMap, pk_column_ids: Vec, row_id_index: Option, source_info: Option, @@ -425,7 +431,7 @@ fn gen_table_plan_inner( .map(|column| column.to_protobuf()) .collect_vec(), pk_column_ids: pk_column_ids.iter().map(Into::into).collect_vec(), - properties: context.with_options().inner().clone().into_iter().collect(), + properties, info: Some(source_info), owner: session.user_id(), watermark_descs: watermark_descs.clone(), diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 57f6fc5eb02e..4db3dbd5cfa6 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -87,12 +87,19 @@ pub async fn handle_create_as( let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args.clone()); + let properties = handler_args + .with_options + .inner() + .clone() + .into_iter() + .collect(); let (plan, source, table) = gen_create_table_plan_without_bind( context, table_name.clone(), column_descs, None, vec![], + properties, "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` vec![], // No watermark should be defined in for `CREATE TABLE AS` append_only,