Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(connector): add test cases for debezium json test #8334

Merged
merged 20 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions connector_node/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<module>risingwave-sink-deltalake</module>
<module>risingwave-sink-jdbc</module>
<module>risingwave-source-cdc</module>
<module>risingwave-source-test</module>
<module>risingwave-connector-service</module>
<module>assembly</module>
</modules>
Expand Down
77 changes: 77 additions & 0 deletions connector_node/risingwave-source-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>connector-parent</artifactId>
<groupId>com.risingwave.connector</groupId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>risingwave-source-test</artifactId>
<version>1.0-SNAPSHOT</version>

<name>risingwave-source-test</name>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.24.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.17.6</version>
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>1.17.6</version>
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.17.6</version>
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>

<!-- Specific connectors dependencies managed by the assembly plugin, refer to assembly/pom.xml for more detials -->
<dependency>
<groupId>com.risingwave.connector</groupId>
<artifactId>risingwave-source-cdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave.connector</groupId>
<artifactId>risingwave-connector-service</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package com.risingwave.connector;

import static org.assertj.core.api.Assertions.*;

import com.risingwave.proto.ConnectorServiceProto.*;
import io.grpc.*;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.MountableFile;

public class MySQLSourceTest {

private static final Logger logger = LoggerFactory.getLogger(MySQLSourceTest.class.getName());

private static final MySQLContainer<?> mysql =
new MySQLContainer<>("mysql:5.7.34")
.withDatabaseName("test")
.withUsername("root")
.withCopyFileToContainer(
MountableFile.forClasspathResource("my.cnf"), "/etc/my.cnf");

public static Server connectorServer =
ServerBuilder.forPort(ConnectorService.DEFAULT_PORT)
.addService(new ConnectorServiceImpl())
.build();

public static SourceTestClient testClient =
new SourceTestClient(
Grpc.newChannelBuilder(
"localhost:" + ConnectorService.DEFAULT_PORT,
InsecureChannelCredentials.create())
.build());

@BeforeClass
public static void init() {
// generate orders.tbl test data
SourceTestClient.genOrdersTable(10000);
// start connector server and mysql...
try {
connectorServer.start();
logger.info("connector service started");
mysql.withCopyFileToContainer(
MountableFile.forClasspathResource("orders.tbl"), "/home/orders.tbl");
mysql.start();
logger.info("mysql started");
} catch (IOException e) {
fail("IO exception: ", e);
}
// check mysql configuration...
try {
Connection connection = SourceTestClient.connect(mysql);
ResultSet resultSet =
SourceTestClient.performQuery(
connection, testClient.sqlStmts.getProperty("mysql.bin_log"));
assertThat(resultSet.getString("Value")).isEqualTo("ON").as("MySQL: bin_log ON");
connection.close();
} catch (SQLException e) {
fail("SQL exception: ", e);
}
}

@AfterClass
public static void cleanup() {
connectorServer.shutdown();
mysql.stop();
}

@Test
public void testLines() throws InterruptedException, SQLException {
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
Lock lock = new ReentrantLock();
Condition done = lock.newCondition();
Connection connection = SourceTestClient.connect(mysql);
String query = testClient.sqlStmts.getProperty("tpch.create.orders");
SourceTestClient.performQuery(connection, query);
query =
"LOAD DATA INFILE '/home/orders.tbl' "
+ "INTO TABLE orders "
+ "CHARACTER SET UTF8 "
+ "FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n';";
SourceTestClient.performQuery(connection, query);
Iterator<GetEventStreamResponse> eventStream =
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders");
AtomicInteger count = new AtomicInteger();
Thread t1 =
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
new Thread(
() -> {
while (eventStream.hasNext()) {
List<CdcMessage> messages = eventStream.next().getEventsList();
for (CdcMessage ignored : messages) {
count.getAndIncrement();
}
if (count.get() == 10000) {
lock.lock();
try {
done.signal();
} finally {
lock.unlock();
}
}
}
});
t1.start();
lock.lock();
try {
done.await();
} finally {
lock.unlock();
}
logger.info("count: {}", count.get());
assertThat(count.get()).isEqualTo(10000);
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
connection.close();
}

@Ignore
@Test
public void getTestJson() throws InterruptedException, SQLException {
Connection connection = SourceTestClient.connect(mysql);
String query =
"CREATE TABLE IF NOT EXISTS orders ("
+ "O_KEY BIGINT NOT NULL, "
+ "O_BOOL BOOLEAN, "
+ "O_TINY TINYINT, "
+ "O_INT INT, "
+ "O_REAL REAL, "
+ "O_DOUBLE DOUBLE, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please cover overflow cases

Copy link
Contributor Author

@WillyKidd WillyKidd Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as for the overflow cases...
when inserting data into mysql, if there is overflow, mysql will throw an error:

Data truncation: Out of range value for column 'O_INT' at row 1

so it's possibly unlikely that debezium will generate overflowing values...?

I can also try to modify the json generated by debezium and see if the parser catches the error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debezium is unlikely to deliver the corrupted data but we still may face it when the data comes from kafka.
Just mock some test cases and see if rw catches the error.

+ "O_DECIMAL DECIMAL(15, 2), "
+ "O_CHAR CHAR(15), "
+ "O_DATE DATE, "
+ "O_TIME TIME, "
+ "O_TIMESTAMP TIMESTAMP, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please cover corner cases
for date: 1000-01-01 and 9999-12-31
for time: -838:59:59 and 838:59:59
for datetime: 1000-01-01 00:00:00 and 9999-12-31 23:59:59
for timestamp: '1970-01-01 00:00:01' UTC and '2038-01-09 03:14:07' UTC (and other timezones)

Copy link
Contributor Author

@WillyKidd WillyKidd Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your comments bohan! i am wondering what is the expected behavior when converting mysql time to postgres time (since risingwave is postgres compatible)...

https://support.huaweicloud.com/intl/en-us/productdesc-drs/drs_01_0324.html

TIME values supported by MySQL range from '-838:59:59' to '838:59:59'. For details, see the official MySQL documentation. For PostgreSQL, the minimum value of the TIME type is 00:00:00 and the maximum value is 24:00:00. In MySQL, if a value of the TIME type is less than 00:00:00 or greater than 24:00:00, DRS will convert it to 00:00:00.

I guess we can either let it overflow, or convert it to 00:00:00 as above...?

after discussion with @xiangjinwu we can either do the following

  • throw a parsing error (skip this line)
  • convert to 00:00:00
  • convert to null if schema allows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the value is out of range of RisingWave's type, which means the data may have been corrupted in upstream. Thus we can report an out-of-range (e.g. underflow, overflow) error to log and skip this row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I mean we should cover them in test cases and document the behavior.

+ "O_JSON JSON, "
+ "PRIMARY KEY (O_KEY))";
SourceTestClient.performQuery(connection, query);
Iterator<GetEventStreamResponse> eventStream =
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders");
AtomicInteger count = new AtomicInteger();
Thread t1 =
new Thread(
() -> {
while (eventStream.hasNext()) {
List<CdcMessage> messages = eventStream.next().getEventsList();
for (CdcMessage msg : messages) {
count.getAndIncrement();
logger.info("{}", msg.getPayload());
}
}
});
Thread.sleep(3000);
t1.start();
Thread.sleep(3000);
// Q1: ordinary insert (read)
query =
"INSERT INTO orders (O_KEY, O_BOOL, O_TINY, O_INT, O_REAL, O_DOUBLE, O_DECIMAL, O_CHAR, O_DATE, O_TIME, O_TIMESTAMP, O_JSON)"
+ "VALUES(111, TRUE, -1, -1111, -11.11, -111.11111, -111.11, 'yes please', '2011-11-11', '11:11:11', '2011-11-11 11:11:11.123456', '{\"k1\":\"v1\",\"k2\":11}')";
SourceTestClient.performQuery(connection, query);
// Q2: update value of Q1 (value -> new value)
query =
"UPDATE orders SET O_BOOL = FALSE, "
+ "O_TINY = 3, "
+ "O_INT = 3333, "
+ "O_REAL = 33.33, "
+ "O_DOUBLE = 333.33333, "
+ "O_DECIMAL = 333.33, "
+ "O_CHAR = 'no thanks', "
+ "O_DATE = '2012-12-12', "
+ "O_TIME = '12:12:12', "
+ "O_TIMESTAMP = '2011-12-12 12:12:12.121212', "
+ "O_JSON = '{\"k1\":\"v1_updated\",\"k2\":33}' "
+ "WHERE orders.O_KEY = 111";
SourceTestClient.performQuery(connection, query);
// Q3: delete value from Q1
query = "DELETE FROM orders WHERE orders.O_KEY = 111";
SourceTestClient.performQuery(connection, query);
Thread.sleep(5000);
connection.close();
}
}
Loading