Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): use config file to initiate a debezium source connector #8539

Merged
merged 7 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cargo make pre-start-dev
cargo make link-all-in-one-binaries

echo "--- starting risingwave cluster with connector node"
mkdir -p .risingwave/log
./connector-node/start-service.sh -p 50051 > .risingwave/log/connector-sink.log 2>&1 &
cargo make ci-start ci-iceberg-test
sleep 1
Expand Down
8 changes: 5 additions & 3 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ psql -h db -U postgres -d test -c "CREATE TABLE t_remote (id serial PRIMARY KEY,

node_port=50051
node_timeout=10
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-source.log 2>&1 &

echo "--- starting risingwave cluster with connector node"
cargo make ci-start ci-1cn-1fe
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 &

echo "waiting for connector node to start"
start_time=$(date +%s)
while :
Expand All @@ -83,8 +87,6 @@ do
sleep 0.1
done

echo "--- starting risingwave cluster with connector node"
cargo make ci-start ci-1cn-1fe

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
Expand Down
10 changes: 5 additions & 5 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -euo pipefail
source ci/scripts/common.env.sh

# prepare environment
export CONNECTOR_RPC_ENDPOINT="localhost:60061"
export CONNECTOR_RPC_ENDPOINT="localhost:50051"

while getopts 'p:' opt; do
case ${opt} in
Expand Down Expand Up @@ -65,7 +65,10 @@ psql -h db -U postgres -d cdc_test < ./e2e_test/source/cdc/postgres_cdc.sql

node_port=50051
node_timeout=10
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-source.log 2>&1 &

echo "--- starting risingwave cluster with connector node"
cargo make ci-start ci-1cn-1fe-with-recovery
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 &

echo "waiting for connector node to start"
start_time=$(date +%s)
Expand All @@ -84,9 +87,6 @@ do
fi
sleep 0.1
done

# start risingwave cluster
cargo make ci-start ci-1cn-1fe-with-recovery
sleep 2

echo "---- mysql & postgres cdc validate test"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

/** Handler for RPC request */
public interface SourceHandler {
void handle(
void startSource(
ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>
responseObserver);
}
9 changes: 8 additions & 1 deletion java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

package com.risingwave.sourcenode;

import com.risingwave.connector.api.source.ConnectorConfig;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data.DataType;
import com.risingwave.sourcenode.common.DbzConnectorConfig;
import com.risingwave.sourcenode.core.SourceHandlerFactory;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -46,19 +46,19 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) {
break;
case START:
var startRequest = request.getStart();
var handler =
SourceHandlerFactory.createSourceHandler(
SourceTypeE.valueOf(startRequest.getSourceType()),
startRequest.getSourceId(),
startRequest.getStartOffset(),
new ConnectorConfig(startRequest.getPropertiesMap()));
if (handler == null) {
LOG.error("failed to create source handler");
responseObserver.onCompleted();
} else {
handler.handle(
try {
var handler =
SourceHandlerFactory.createSourceHandler(
SourceTypeE.valueOf(startRequest.getSourceType()),
startRequest.getSourceId(),
startRequest.getStartOffset(),
startRequest.getPropertiesMap());
handler.startSource(
(ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>)
responseObserver);
} catch (Throwable t) {
LOG.error("failed to start source", t);
responseObserver.onError(t);
}
break;
case REQUEST_NOT_SET:
Expand All @@ -75,11 +75,11 @@ private void validateDbProperties(
String jdbcUrl =
toJdbcPrefix(validate.getSourceType())
+ "://"
+ props.get(ConnectorConfig.HOST)
+ props.get(DbzConnectorConfig.HOST)
+ ":"
+ props.get(ConnectorConfig.PORT)
+ props.get(DbzConnectorConfig.PORT)
+ "/"
+ props.get(ConnectorConfig.DB_NAME);
+ props.get(DbzConnectorConfig.DB_NAME);
LOG.debug("validate jdbc url: {}", jdbcUrl);

var sqlStmts = new Properties();
Expand All @@ -93,8 +93,8 @@ private void validateDbProperties(
try (var conn =
DriverManager.getConnection(
jdbcUrl,
props.get(ConnectorConfig.USER),
props.get(ConnectorConfig.PASSWORD))) {
props.get(DbzConnectorConfig.USER),
props.get(DbzConnectorConfig.PASSWORD))) {
// usernamed and password are correct
var dbMeta = conn.getMetaData();

Expand Down Expand Up @@ -138,8 +138,8 @@ private void validateDbProperties(
}
// check whether table exist
try (var stmt = conn.prepareStatement(sqlStmts.getProperty("mysql.table"))) {
stmt.setString(1, props.get(ConnectorConfig.DB_NAME));
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
stmt.setString(1, props.get(DbzConnectorConfig.DB_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
var ret = res.getInt(1);
Expand All @@ -152,8 +152,8 @@ private void validateDbProperties(
try (var stmt =
conn.prepareStatement(sqlStmts.getProperty("mysql.table_schema"))) {
var sourceSchema = validate.getTableSchema();
stmt.setString(1, props.get(ConnectorConfig.DB_NAME));
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
stmt.setString(1, props.get(DbzConnectorConfig.DB_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
var pkFields = new HashSet<String>();
int index = 0;
Expand Down Expand Up @@ -203,8 +203,8 @@ private void validateDbProperties(
}
// check schema name and table name
try (var stmt = conn.prepareStatement(sqlStmts.getProperty("postgres.table"))) {
stmt.setString(1, props.get(ConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
var ret = res.getString(1);
Expand All @@ -219,9 +219,9 @@ private void validateDbProperties(
try (var stmt = conn.prepareStatement(sqlStmts.getProperty("postgres.pk"))) {
stmt.setString(
1,
props.get(ConnectorConfig.PG_SCHEMA_NAME)
props.get(DbzConnectorConfig.PG_SCHEMA_NAME)
+ "."
+ props.get(ConnectorConfig.TABLE_NAME));
+ props.get(DbzConnectorConfig.TABLE_NAME));

var res = stmt.executeQuery();
var pkFields = new HashSet<String>();
Expand All @@ -237,8 +237,8 @@ private void validateDbProperties(
// check whether source schema match table schema on upstream
try (var stmt =
conn.prepareStatement(sqlStmts.getProperty("postgres.table_schema"))) {
stmt.setString(1, props.get(ConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
var sourceSchema = validate.getTableSchema();
int index = 0;
Expand Down
Loading