Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve CDC connector param check #8450

Merged
merged 12 commits into from
Mar 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,32 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class ConnectorConfig extends HashMap<String, String> {

public ConnectorConfig() {}

public ConnectorConfig(Map<? extends String, ? extends String> m) {
super(m);
}

public String getNonNull(String key) {
String value = super.get(key);
if (value == null) {
throw new RuntimeException(key + "cannot be null");
}
return value;
}

public String getOrCompute(String key, Supplier<String> fn) {
String value = super.get(key);
if (value == null) {
return fn.get();
}
return value;
}

public class ConnectorConfig {
/* Common configs */
public static final String HOST = "hostname";
public static final String PORT = "port";
Expand All @@ -20,6 +44,8 @@ public class ConnectorConfig {
public static final String PG_SLOT_NAME = "slot.name";
public static final String PG_SCHEMA_NAME = "schema.name";

public static final String PG_DEFAULT_SCHEMA = "public";

public static Map<String, String> extractDebeziumProperties(Map<String, String> properties) {
// retain only debezium properties if any
var userProps = new HashMap<>(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) {
SourceTypeE.valueOf(startRequest.getSourceType()),
startRequest.getSourceId(),
startRequest.getStartOffset(),
startRequest.getPropertiesMap());
new ConnectorConfig(startRequest.getPropertiesMap()));
if (handler == null) {
LOG.error("failed to create source handler");
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.risingwave.sourcenode.core;

import com.risingwave.connector.api.source.ConnectorConfig;
import com.risingwave.connector.api.source.SourceHandler;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.sourcenode.mysql.MySqlSourceConfig;
Expand All @@ -12,7 +13,7 @@ public abstract class SourceHandlerFactory {
static final Logger LOG = LoggerFactory.getLogger(SourceHandlerFactory.class);

public static SourceHandler createSourceHandler(
SourceTypeE type, long sourceId, String startOffset, Map<String, String> userProps) {
SourceTypeE type, long sourceId, String startOffset, ConnectorConfig userProps) {
switch (type) {
case MYSQL:
return DefaultSourceHandler.newWithConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class MySqlSourceConfig implements SourceConfig {
private final long id;
private final String sourceName;

public MySqlSourceConfig(long sourceId, String startOffset, Map<String, String> userProps) {
public MySqlSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) {
id = sourceId;
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty(
Expand All @@ -42,9 +42,9 @@ public MySqlSourceConfig(long sourceId, String startOffset, Map<String, String>
props.setProperty("database.include.list", userProps.get(ConnectorConfig.DB_NAME));
// only captures data of the specified table
String tableFilter =
userProps.get(ConnectorConfig.DB_NAME)
userProps.getNonNull(ConnectorConfig.DB_NAME)
+ "."
+ userProps.get(ConnectorConfig.TABLE_NAME);
+ userProps.getNonNull(ConnectorConfig.TABLE_NAME);
props.setProperty("table.include.list", tableFilter);

// disable schema change events for current stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.risingwave.sourcenode.common.DebeziumCdcUtils;
import io.debezium.heartbeat.Heartbeat;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

/** Postgres Source Config */
public class PostgresSourceConfig implements SourceConfig {
Expand All @@ -18,7 +18,7 @@ public class PostgresSourceConfig implements SourceConfig {
private final String sourceName;
private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();

public PostgresSourceConfig(long sourceId, String startOffset, Map<String, String> userProps) {
public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) {
id = sourceId;
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.setProperty(
Expand All @@ -32,12 +32,16 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map<String, Strin
props.setProperty(ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}

String dbName = userProps.getNonNull(ConnectorConfig.DB_NAME);
String schema = userProps.getOrDefault(ConnectorConfig.PG_SCHEMA_NAME, ConnectorConfig.PG_DEFAULT_SCHEMA);
String table = userProps.getNonNull(ConnectorConfig.TABLE_NAME);

// Begin of connector configs
props.setProperty("database.hostname", userProps.get(ConnectorConfig.HOST));
props.setProperty("database.port", userProps.get(ConnectorConfig.PORT));
props.setProperty("database.user", userProps.get(ConnectorConfig.USER));
props.setProperty("database.password", userProps.get(ConnectorConfig.PASSWORD));
props.setProperty("database.dbname", userProps.get(ConnectorConfig.DB_NAME));
props.setProperty("database.dbname", dbName);
// The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server.
// Supported values are decoderbufs, and pgoutput.
// The wal2json plug-in is deprecated and scheduled for removal.
Expand All @@ -52,7 +56,8 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map<String, Strin
// Slot names must conform to PostgreSQL replication slot naming rules,
// which state: "Each replication slot has a name, which can contain lower-case letters,
// numbers, and the underscore character."
props.setProperty("slot.name", userProps.get(ConnectorConfig.PG_SLOT_NAME));
props.setProperty("slot.name", userProps.getOrCompute(ConnectorConfig.PG_SLOT_NAME,
PostgresSourceConfig::generateReplicaSlot));

// Sending heartbeat messages enables the connector to send the latest retrieved LSN to the
// database, which allows the database to reclaim disk space being
Expand All @@ -63,24 +68,18 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map<String, Strin
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());

String tableFilter =
userProps.get(ConnectorConfig.PG_SCHEMA_NAME)
+ "."
+ userProps.get(ConnectorConfig.TABLE_NAME);

String tableFilter = schema + "." + table;
props.setProperty("table.include.list", tableFilter);
props.setProperty("database.server.name", DB_SERVER_NAME_PREFIX + tableFilter);

// host:port:database.schema.table
sourceName =
userProps.get(ConnectorConfig.HOST)
userProps.getNonNull(ConnectorConfig.HOST)
+ ":"
+ userProps.get(ConnectorConfig.PORT)
+ userProps.getNonNull(ConnectorConfig.PORT)
+ ":"
+ userProps.get(ConnectorConfig.DB_NAME)
+ "."
+ userProps.get(ConnectorConfig.PG_SCHEMA_NAME)
+ "."
+ userProps.get(ConnectorConfig.TABLE_NAME);
+ dbName + "." + schema + "." + table;
props.setProperty("name", sourceName);

// pass through debezium properties if any
Expand All @@ -107,4 +106,9 @@ public SourceTypeE getSourceType() {
public Properties getProperties() {
return props;
}

private static String generateReplicaSlot() {
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
// Example: rw_cdc_f9a3567e6dd54bf5900444c8b1c03815
return "rw_cdc_" + UUID.randomUUID().toString().replace("-", "");
}
}