-
Notifications
You must be signed in to change notification settings - Fork 564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test(connector): add test cases for debezium json test #8334
Merged
Merged
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
1182879
test: connector source test client
WillyKidd 987f3df
test: pg, mysql containers for testing debezium
WillyKidd 20b3951
test: more cases for debezium json parser
WillyKidd 640159f
fix: use condition variable instread of sleep
WillyKidd 21478a3
fix: format errors and ci
WillyKidd b5e618f
fix: generate test data instead of using tbl file
WillyKidd f8d42bf
test: add date time and corner cases
WillyKidd 000a6a6
fix: better java code
WillyKidd c392031
test: fix bool conversion/float overflow
WillyKidd ba30d2b
fix: add comments
WillyKidd a97f049
fix: more comments
WillyKidd 6feeb6b
fix: refine parser code
WillyKidd 28053a1
fix: refine once more
WillyKidd 920fb1b
Merge branch 'main' into weili/json-test
WillyKidd 8b42462
test: add ci, fix mysql version
WillyKidd d7fb4f9
fix: remove ci for now
WillyKidd 6f31da6
fix: minor fixes for parser
WillyKidd d55c85f
fix: add license
WillyKidd f81acdf
Merge branch 'main' into weili/json-test
WillyKidd 54651ce
pg-source: fix null value of DATE
StrikeW File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>java-parent</artifactId> | ||
<groupId>com.risingwave.java</groupId> | ||
<version>1.0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
<name>risingwave-source-test</name> | ||
<packaging>jar</packaging> | ||
<artifactId>risingwave-source-test</artifactId> | ||
|
||
<properties> | ||
<testcontainers.version>1.17.6</testcontainers.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.assertj</groupId> | ||
<artifactId>assertj-core</artifactId> | ||
<version>3.24.2</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.zaxxer</groupId> | ||
<artifactId>HikariCP</artifactId> | ||
<version>5.0.1</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>testcontainers</artifactId> | ||
<version>${testcontainers.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>mysql</artifactId> | ||
<version>${testcontainers.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>postgresql</artifactId> | ||
<version>${testcontainers.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
<version>${jackson.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-core</artifactId> | ||
<version>${jackson.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.risingwave.java</groupId> | ||
<artifactId>risingwave-source-cdc</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.risingwave.java</groupId> | ||
<artifactId>risingwave-connector-service</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
</project> |
191 changes: 191 additions & 0 deletions
191
...r-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
package com.risingwave.connector; | ||
|
||
import static org.assertj.core.api.Assertions.*; | ||
import static org.junit.Assert.assertEquals; | ||
|
||
import com.risingwave.proto.ConnectorServiceProto.*; | ||
import io.grpc.*; | ||
import java.io.IOException; | ||
import java.sql.Connection; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.concurrent.*; | ||
import javax.sql.DataSource; | ||
import org.junit.AfterClass; | ||
import org.junit.BeforeClass; | ||
import org.junit.Ignore; | ||
import org.junit.Test; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.testcontainers.containers.MySQLContainer; | ||
import org.testcontainers.utility.MountableFile; | ||
|
||
public class MySQLSourceTest { | ||
|
||
static final Logger LOG = LoggerFactory.getLogger(MySQLSourceTest.class.getName()); | ||
|
||
private static final MySQLContainer<?> mysql = | ||
new MySQLContainer<>("mysql:5.7.34") | ||
.withDatabaseName("test") | ||
.withUsername("root") | ||
.withCopyFileToContainer( | ||
MountableFile.forClasspathResource("my.cnf"), "/etc/my.cnf"); | ||
|
||
public static Server connectorServer = | ||
ServerBuilder.forPort(ConnectorService.DEFAULT_PORT) | ||
.addService(new ConnectorServiceImpl()) | ||
.build(); | ||
|
||
public static SourceTestClient testClient = | ||
new SourceTestClient( | ||
Grpc.newChannelBuilder( | ||
"localhost:" + ConnectorService.DEFAULT_PORT, | ||
InsecureChannelCredentials.create()) | ||
.build()); | ||
|
||
private static DataSource mysqlDataSource; | ||
|
||
@BeforeClass | ||
public static void init() { | ||
// generate orders.tbl test data | ||
SourceTestClient.genOrdersTable(10000); | ||
// start connector server and mysql... | ||
try { | ||
connectorServer.start(); | ||
LOG.info("connector service started"); | ||
mysql.withCopyFileToContainer( | ||
MountableFile.forClasspathResource("orders.tbl"), "/home/orders.tbl"); | ||
mysql.start(); | ||
mysqlDataSource = SourceTestClient.getDataSource(mysql); | ||
LOG.info("mysql started"); | ||
} catch (IOException e) { | ||
fail("IO exception: ", e); | ||
} | ||
// check mysql configuration... | ||
try { | ||
Connection connection = SourceTestClient.connect(mysqlDataSource); | ||
ResultSet resultSet = | ||
SourceTestClient.performQuery( | ||
connection, testClient.sqlStmts.getProperty("mysql.bin_log")); | ||
assertThat(resultSet.getString("Value")).isEqualTo("ON").as("MySQL: bin_log ON"); | ||
connection.close(); | ||
} catch (SQLException e) { | ||
fail("SQL exception: ", e); | ||
} | ||
} | ||
|
||
@AfterClass | ||
public static void cleanup() { | ||
connectorServer.shutdown(); | ||
mysql.stop(); | ||
} | ||
|
||
// create a TPC-H orders table in mysql | ||
// insert 10,000 rows into orders | ||
// check if the number of changes debezium captures is 10,000 | ||
@Test | ||
public void testLines() throws InterruptedException, SQLException { | ||
ExecutorService executorService = Executors.newFixedThreadPool(1); | ||
Connection connection = SourceTestClient.connect(mysqlDataSource); | ||
String query = testClient.sqlStmts.getProperty("tpch.create.orders"); | ||
SourceTestClient.performQuery(connection, query); | ||
query = | ||
"LOAD DATA INFILE '/home/orders.tbl' " | ||
+ "INTO TABLE orders " | ||
+ "CHARACTER SET UTF8 " | ||
+ "FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n';"; | ||
SourceTestClient.performQuery(connection, query); | ||
Iterator<GetEventStreamResponse> eventStream = | ||
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders"); | ||
Callable<Integer> countTask = | ||
() -> { | ||
int count = 0; | ||
while (eventStream.hasNext()) { | ||
List<CdcMessage> messages = eventStream.next().getEventsList(); | ||
for (CdcMessage ignored : messages) { | ||
count++; | ||
} | ||
if (count == 10000) { | ||
return count; | ||
} | ||
} | ||
return count; | ||
}; | ||
Future<Integer> countResult = executorService.submit(countTask); | ||
try { | ||
int count = countResult.get(); | ||
LOG.info("number of cdc messages received: {}", count); | ||
assertEquals(count, 10000); | ||
} catch (ExecutionException e) { | ||
fail("Execution exception: ", e); | ||
} | ||
connection.close(); | ||
} | ||
|
||
// generates test cases for the risingwave debezium parser | ||
@Ignore | ||
@Test | ||
public void getTestJson() throws InterruptedException, SQLException { | ||
Connection connection = SourceTestClient.connect(mysqlDataSource); | ||
String query = | ||
"CREATE TABLE IF NOT EXISTS orders (" | ||
+ "O_KEY BIGINT NOT NULL, " | ||
+ "O_BOOL BOOLEAN, " | ||
+ "O_TINY TINYINT, " | ||
+ "O_INT INT, " | ||
+ "O_REAL REAL, " | ||
+ "O_DOUBLE DOUBLE, " | ||
+ "O_DECIMAL DECIMAL(15, 2), " | ||
+ "O_CHAR CHAR(15), " | ||
+ "O_DATE DATE, " | ||
+ "O_TIME TIME, " | ||
+ "O_DATETIME DATETIME, " | ||
+ "O_TIMESTAMP TIMESTAMP, " | ||
+ "O_JSON JSON, " | ||
+ "PRIMARY KEY (O_KEY))"; | ||
SourceTestClient.performQuery(connection, query); | ||
Iterator<GetEventStreamResponse> eventStream = | ||
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders"); | ||
Thread t1 = | ||
new Thread( | ||
() -> { | ||
while (eventStream.hasNext()) { | ||
List<CdcMessage> messages = eventStream.next().getEventsList(); | ||
for (CdcMessage msg : messages) { | ||
LOG.info("{}", msg.getPayload()); | ||
} | ||
} | ||
}); | ||
Thread.sleep(3000); | ||
t1.start(); | ||
Thread.sleep(3000); | ||
// Q1: ordinary insert | ||
query = | ||
"INSERT INTO orders (O_KEY, O_BOOL, O_TINY, O_INT, O_REAL, O_DOUBLE, O_DECIMAL, O_CHAR, O_DATE, O_TIME, O_DATETIME, O_TIMESTAMP, O_JSON)" | ||
+ "VALUES(111, TRUE, -1, -1111, -11.11, -111.11111, -111.11, 'yes please', '1000-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:01.000000', '{\"k1\": \"v1\", \"k2\": 11}')"; | ||
SourceTestClient.performQuery(connection, query); | ||
// Q2: update value of Q1 (value -> new value) | ||
query = | ||
"UPDATE orders SET O_BOOL = FALSE, " | ||
+ "O_TINY = 3, " | ||
+ "O_INT = 3333, " | ||
+ "O_REAL = 33.33, " | ||
+ "O_DOUBLE = 333.33333, " | ||
+ "O_DECIMAL = 333.33, " | ||
+ "O_CHAR = 'no thanks', " | ||
+ "O_DATE = '9999-12-31', " | ||
+ "O_TIME = '23:59:59', " | ||
+ "O_DATETIME = '5138-11-16 09:46:39', " | ||
+ "O_TIMESTAMP = '2038-01-09 03:14:07', " | ||
+ "O_JSON = '{\"k1\": \"v1_updated\", \"k2\": 33}' " | ||
+ "WHERE orders.O_KEY = 111"; | ||
SourceTestClient.performQuery(connection, query); | ||
// Q3: delete value from Q1 | ||
query = "DELETE FROM orders WHERE orders.O_KEY = 111"; | ||
SourceTestClient.performQuery(connection, query); | ||
Thread.sleep(5000); | ||
connection.close(); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried testcontainers sometime ago, but not using Java. I used Rust. Just to remind you that mysql:5.7 may not have an official arm-built docker image. So the test may run successfully on your x86 PC, but may fail on M1/M2 mac.