Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(connector): add test cases for debezium json test #8334

Merged
merged 20 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions java/connector-node/risingwave-source-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>java-parent</artifactId>
<groupId>com.risingwave.java</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>risingwave-source-test</name>
<packaging>jar</packaging>
<artifactId>risingwave-source-test</artifactId>

<properties>
<testcontainers.version>1.17.6</testcontainers.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.24.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-source-cdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-connector-service</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package com.risingwave.connector;

import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;

import com.risingwave.proto.ConnectorServiceProto.*;
import io.grpc.*;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import javax.sql.DataSource;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.MountableFile;

public class MySQLSourceTest {

static final Logger LOG = LoggerFactory.getLogger(MySQLSourceTest.class.getName());

private static final MySQLContainer<?> mysql =
new MySQLContainer<>("mysql:8.0")
.withDatabaseName("test")
.withUsername("root")
.withCopyFileToContainer(
MountableFile.forClasspathResource("my.cnf"), "/etc/my.cnf");

public static Server connectorServer =
ServerBuilder.forPort(ConnectorService.DEFAULT_PORT)
.addService(new ConnectorServiceImpl())
.build();

public static SourceTestClient testClient =
new SourceTestClient(
Grpc.newChannelBuilder(
"localhost:" + ConnectorService.DEFAULT_PORT,
InsecureChannelCredentials.create())
.build());

private static DataSource mysqlDataSource;

@BeforeClass
public static void init() {
// generate orders.tbl test data
SourceTestClient.genOrdersTable(10000);
// start connector server and mysql...
try {
connectorServer.start();
LOG.info("connector service started");
mysql.withCopyFileToContainer(
MountableFile.forClasspathResource("orders.tbl"), "/home/orders.tbl");
mysql.start();
mysqlDataSource = SourceTestClient.getDataSource(mysql);
LOG.info("mysql started");
} catch (IOException e) {
fail("IO exception: ", e);
}
// check mysql configuration...
try {
Connection connection = SourceTestClient.connect(mysqlDataSource);
ResultSet resultSet =
SourceTestClient.performQuery(
connection, testClient.sqlStmts.getProperty("mysql.bin_log"));
assertThat(resultSet.getString("Value")).isEqualTo("ON").as("MySQL: bin_log ON");
connection.close();
} catch (SQLException e) {
fail("SQL exception: ", e);
}
}

@AfterClass
public static void cleanup() {
connectorServer.shutdown();
mysql.stop();
}

// create a TPC-H orders table in mysql
// insert 10,000 rows into orders
// check if the number of changes debezium captures is 10,000
@Test
public void testLines() throws InterruptedException, SQLException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Connection connection = SourceTestClient.connect(mysqlDataSource);
String query = testClient.sqlStmts.getProperty("tpch.create.orders");
SourceTestClient.performQuery(connection, query);
query =
"LOAD DATA INFILE '/home/orders.tbl' "
+ "INTO TABLE orders "
+ "CHARACTER SET UTF8 "
+ "FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n';";
SourceTestClient.performQuery(connection, query);
Iterator<GetEventStreamResponse> eventStream =
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders");
Callable<Integer> countTask =
() -> {
int count = 0;
while (eventStream.hasNext()) {
List<CdcMessage> messages = eventStream.next().getEventsList();
for (CdcMessage ignored : messages) {
count++;
}
if (count == 10000) {
return count;
}
}
return count;
};
Future<Integer> countResult = executorService.submit(countTask);
try {
int count = countResult.get();
LOG.info("number of cdc messages received: {}", count);
assertEquals(count, 10000);
} catch (ExecutionException e) {
fail("Execution exception: ", e);
}
connection.close();
}

// generates test cases for the risingwave debezium parser
@Ignore
@Test
public void getTestJson() throws InterruptedException, SQLException {
Connection connection = SourceTestClient.connect(mysqlDataSource);
String query =
"CREATE TABLE IF NOT EXISTS orders ("
+ "O_KEY BIGINT NOT NULL, "
+ "O_BOOL BOOLEAN, "
+ "O_TINY TINYINT, "
+ "O_INT INT, "
+ "O_REAL REAL, "
+ "O_DOUBLE DOUBLE, "
+ "O_DECIMAL DECIMAL(15, 2), "
+ "O_CHAR CHAR(15), "
+ "O_DATE DATE, "
+ "O_TIME TIME, "
+ "O_DATETIME DATETIME, "
+ "O_TIMESTAMP TIMESTAMP, "
+ "O_JSON JSON, "
+ "PRIMARY KEY (O_KEY))";
SourceTestClient.performQuery(connection, query);
Iterator<GetEventStreamResponse> eventStream =
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders");
Thread t1 =
new Thread(
() -> {
while (eventStream.hasNext()) {
List<CdcMessage> messages = eventStream.next().getEventsList();
for (CdcMessage msg : messages) {
LOG.info("{}", msg.getPayload());
}
}
});
Thread.sleep(3000);
t1.start();
Thread.sleep(3000);
// Q1: ordinary insert
query =
"INSERT INTO orders (O_KEY, O_BOOL, O_TINY, O_INT, O_REAL, O_DOUBLE, O_DECIMAL, O_CHAR, O_DATE, O_TIME, O_DATETIME, O_TIMESTAMP, O_JSON)"
+ "VALUES(111, TRUE, -1, -1111, -11.11, -111.11111, -111.11, 'yes please', '1000-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:01.000000', '{\"k1\": \"v1\", \"k2\": 11}')";
SourceTestClient.performQuery(connection, query);
// Q2: update value of Q1 (value -> new value)
query =
"UPDATE orders SET O_BOOL = FALSE, "
+ "O_TINY = 3, "
+ "O_INT = 3333, "
+ "O_REAL = 33.33, "
+ "O_DOUBLE = 333.33333, "
+ "O_DECIMAL = 333.33, "
+ "O_CHAR = 'no thanks', "
+ "O_DATE = '9999-12-31', "
+ "O_TIME = '23:59:59', "
+ "O_DATETIME = '5138-11-16 09:46:39', "
+ "O_TIMESTAMP = '2038-01-09 03:14:07', "
+ "O_JSON = '{\"k1\": \"v1_updated\", \"k2\": 33}' "
+ "WHERE orders.O_KEY = 111";
SourceTestClient.performQuery(connection, query);
// Q3: delete value from Q1
query = "DELETE FROM orders WHERE orders.O_KEY = 111";
SourceTestClient.performQuery(connection, query);
Thread.sleep(5000);
connection.close();
}
}
Loading