diff --git a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java index cf80a72147ed..ca63649d2736 100644 --- a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java +++ b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java @@ -26,7 +26,6 @@ public class DatetimeTypeConverter implements CustomConverter { private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; - private static final String EPOCH_DAY = "1970-01-01"; @Override public void configure(Properties props) { @@ -40,7 +39,7 @@ public void converterFor( SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { - schemaBuilder = SchemaBuilder.string().name("risingwave.cdc.date.string"); + schemaBuilder = SchemaBuilder.string().name("rw.cdc.date.string"); converter = this::convertDate; } if (schemaBuilder != null) { @@ -50,7 +49,7 @@ public void converterFor( private String convertDate(Object input) { if (input == null) { - return EPOCH_DAY; + return null; } var epochDay = Date.toEpochDay(input, null); LocalDate date = LocalDate.ofEpochDay(epochDay); diff --git a/java/connector-node/risingwave-source-test/pom.xml b/java/connector-node/risingwave-source-test/pom.xml new file mode 100644 index 000000000000..6da046213366 --- /dev/null +++ b/java/connector-node/risingwave-source-test/pom.xml @@ -0,0 +1,79 @@ + + + + java-parent + com.risingwave.java + 1.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + risingwave-source-test + jar + risingwave-source-test + + + 1.17.6 + + + + + junit + junit + test + + + org.assertj + assertj-core + 3.24.2 + test + + + com.zaxxer + HikariCP + 5.0.1 + test + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + org.testcontainers + mysql + ${testcontainers.version} + test + + + org.testcontainers + postgresql + ${testcontainers.version} + test + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + test + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + test + + + + com.risingwave.java + risingwave-source-cdc + test + + + com.risingwave.java + risingwave-connector-service + test + + + diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java new file mode 100644 index 000000000000..34f2bc24cd6e --- /dev/null +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java @@ -0,0 +1,205 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector; + +import static org.assertj.core.api.Assertions.*; +import static org.junit.Assert.assertEquals; + +import com.risingwave.proto.ConnectorServiceProto.*; +import io.grpc.*; +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.*; +import javax.sql.DataSource; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.utility.MountableFile; + +public class MySQLSourceTest { + + static final Logger LOG = LoggerFactory.getLogger(MySQLSourceTest.class.getName()); + + private static final MySQLContainer mysql = + new MySQLContainer<>("mysql:8.0") + .withDatabaseName("test") + .withUsername("root") + .withCopyFileToContainer( + MountableFile.forClasspathResource("my.cnf"), "/etc/my.cnf"); + + public static Server connectorServer = + ServerBuilder.forPort(ConnectorService.DEFAULT_PORT) + .addService(new ConnectorServiceImpl()) + .build(); + + public static SourceTestClient testClient = + new SourceTestClient( + Grpc.newChannelBuilder( + "localhost:" + ConnectorService.DEFAULT_PORT, + InsecureChannelCredentials.create()) + .build()); + + private static DataSource mysqlDataSource; + + @BeforeClass + public static void init() { + // generate orders.tbl test data + SourceTestClient.genOrdersTable(10000); + // start connector server and mysql... + try { + connectorServer.start(); + LOG.info("connector service started"); + mysql.withCopyFileToContainer( + MountableFile.forClasspathResource("orders.tbl"), "/home/orders.tbl"); + mysql.start(); + mysqlDataSource = SourceTestClient.getDataSource(mysql); + LOG.info("mysql started"); + } catch (IOException e) { + fail("IO exception: ", e); + } + // check mysql configuration... + try { + Connection connection = SourceTestClient.connect(mysqlDataSource); + ResultSet resultSet = + SourceTestClient.performQuery( + connection, testClient.sqlStmts.getProperty("mysql.bin_log")); + assertThat(resultSet.getString("Value")).isEqualTo("ON").as("MySQL: bin_log ON"); + connection.close(); + } catch (SQLException e) { + fail("SQL exception: ", e); + } + } + + @AfterClass + public static void cleanup() { + connectorServer.shutdown(); + mysql.stop(); + } + + // create a TPC-H orders table in mysql + // insert 10,000 rows into orders + // check if the number of changes debezium captures is 10,000 + @Test + public void testLines() throws InterruptedException, SQLException { + ExecutorService executorService = Executors.newFixedThreadPool(1); + Connection connection = SourceTestClient.connect(mysqlDataSource); + String query = testClient.sqlStmts.getProperty("tpch.create.orders"); + SourceTestClient.performQuery(connection, query); + query = + "LOAD DATA INFILE '/home/orders.tbl' " + + "INTO TABLE orders " + + "CHARACTER SET UTF8 " + + "FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n';"; + SourceTestClient.performQuery(connection, query); + Iterator eventStream = + testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders"); + Callable countTask = + () -> { + int count = 0; + while (eventStream.hasNext()) { + List messages = eventStream.next().getEventsList(); + for (CdcMessage ignored : messages) { + count++; + } + if (count == 10000) { + return count; + } + } + return count; + }; + Future countResult = executorService.submit(countTask); + try { + int count = countResult.get(); + LOG.info("number of cdc messages received: {}", count); + assertEquals(count, 10000); + } catch (ExecutionException e) { + fail("Execution exception: ", e); + } + connection.close(); + } + + // generates test cases for the risingwave debezium parser + @Ignore + @Test + public void getTestJson() throws InterruptedException, SQLException { + Connection connection = SourceTestClient.connect(mysqlDataSource); + String query = + "CREATE TABLE IF NOT EXISTS orders (" + + "O_KEY BIGINT NOT NULL, " + + "O_BOOL BOOLEAN, " + + "O_TINY TINYINT, " + + "O_INT INT, " + + "O_REAL REAL, " + + "O_DOUBLE DOUBLE, " + + "O_DECIMAL DECIMAL(15, 2), " + + "O_CHAR CHAR(15), " + + "O_DATE DATE, " + + "O_TIME TIME, " + + "O_DATETIME DATETIME, " + + "O_TIMESTAMP TIMESTAMP, " + + "O_JSON JSON, " + + "PRIMARY KEY (O_KEY))"; + SourceTestClient.performQuery(connection, query); + Iterator eventStream = + testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders"); + Thread t1 = + new Thread( + () -> { + while (eventStream.hasNext()) { + List messages = eventStream.next().getEventsList(); + for (CdcMessage msg : messages) { + LOG.info("{}", msg.getPayload()); + } + } + }); + Thread.sleep(3000); + t1.start(); + Thread.sleep(3000); + // Q1: ordinary insert + query = + "INSERT INTO orders (O_KEY, O_BOOL, O_TINY, O_INT, O_REAL, O_DOUBLE, O_DECIMAL, O_CHAR, O_DATE, O_TIME, O_DATETIME, O_TIMESTAMP, O_JSON)" + + "VALUES(111, TRUE, -1, -1111, -11.11, -111.11111, -111.11, 'yes please', '1000-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:01.000000', '{\"k1\": \"v1\", \"k2\": 11}')"; + SourceTestClient.performQuery(connection, query); + // Q2: update value of Q1 (value -> new value) + query = + "UPDATE orders SET O_BOOL = FALSE, " + + "O_TINY = 3, " + + "O_INT = 3333, " + + "O_REAL = 33.33, " + + "O_DOUBLE = 333.33333, " + + "O_DECIMAL = 333.33, " + + "O_CHAR = 'no thanks', " + + "O_DATE = '9999-12-31', " + + "O_TIME = '23:59:59', " + + "O_DATETIME = '5138-11-16 09:46:39', " + + "O_TIMESTAMP = '2038-01-09 03:14:07', " + + "O_JSON = '{\"k1\": \"v1_updated\", \"k2\": 33}' " + + "WHERE orders.O_KEY = 111"; + SourceTestClient.performQuery(connection, query); + // Q3: delete value from Q1 + query = "DELETE FROM orders WHERE orders.O_KEY = 111"; + SourceTestClient.performQuery(connection, query); + Thread.sleep(5000); + connection.close(); + } +} diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java new file mode 100644 index 000000000000..fe0b35a15b04 --- /dev/null +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java @@ -0,0 +1,195 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertEquals; + +import com.risingwave.proto.ConnectorServiceProto; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.*; +import javax.sql.DataSource; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; + +public class PostgresSourceTest { + private static final Logger LOG = LoggerFactory.getLogger(PostgresSourceTest.class.getName()); + + private static final PostgreSQLContainer pg = + new PostgreSQLContainer<>("postgres:12.3-alpine") + .withDatabaseName("test") + .withUsername("postgres") + .withCommand("postgres -c wal_level=logical -c max_wal_senders=10"); + + public static Server connectorServer = + ServerBuilder.forPort(ConnectorService.DEFAULT_PORT) + .addService(new ConnectorServiceImpl()) + .build(); + + public static SourceTestClient testClient = + new SourceTestClient( + Grpc.newChannelBuilder( + "localhost:" + ConnectorService.DEFAULT_PORT, + InsecureChannelCredentials.create()) + .build()); + + private static DataSource pgDataSource; + + @BeforeClass + public static void init() { + // generate orders.tbl test data + SourceTestClient.genOrdersTable(10000); + // start connector server and postgres... + try { + connectorServer.start(); + LOG.info("connector service started"); + pg.withCopyFileToContainer( + MountableFile.forClasspathResource("orders.tbl"), "/home/orders.tbl"); + pg.start(); + pg.withUsername("postgres") + .execInContainer( + "sh", + "-c", + "echo 'host replication postgres 172.17.0.1/32 trust' >> /var/lib/postgresql/data/pg_hba.conf"); + pgDataSource = SourceTestClient.getDataSource(pg); + LOG.info("postgres started"); + } catch (IOException e) { + fail("IO exception: ", e); + } catch (InterruptedException e) { + fail("Interrupted exception", e); + } + // check pg configuration... + try { + Connection connection = SourceTestClient.connect(pgDataSource); + SourceTestClient.performQuery(connection, "SELECT pg_reload_conf()"); + ResultSet resultSet = + SourceTestClient.performQuery( + connection, testClient.sqlStmts.getProperty("postgres.wal")); + assertThat(resultSet.getString("wal_level")) + .isEqualTo("logical") + .as("pg: wal_level logical"); + connection.close(); + } catch (SQLException e) { + fail("SQL exception: ", e); + } + } + + @AfterClass + public static void cleanup() { + connectorServer.shutdown(); + pg.stop(); + } + + // create a TPC-H orders table in postgres + // insert 10,000 rows into orders + // check if the number of changes debezium captures is 10,000 + @Test + public void testLines() throws InterruptedException, SQLException { + ExecutorService executorService = Executors.newFixedThreadPool(1); + Connection connection = SourceTestClient.connect(pgDataSource); + String query = testClient.sqlStmts.getProperty("tpch.create.orders"); + SourceTestClient.performQuery(connection, query); + query = "COPY orders FROM '/home/orders.tbl' WITH DELIMITER '|'"; + SourceTestClient.performQuery(connection, query); + Iterator eventStream = + testClient.getEventStreamStart( + pg, ConnectorServiceProto.SourceType.POSTGRES, "test", "orders"); + Callable countTask = + () -> { + int count = 0; + while (eventStream.hasNext()) { + List messages = + eventStream.next().getEventsList(); + for (ConnectorServiceProto.CdcMessage ignored : messages) { + count++; + } + if (count == 10000) { + return count; + } + } + return count; + }; + Future countResult = executorService.submit(countTask); + try { + int count = countResult.get(); + LOG.info("number of cdc messages received: {}", count); + assertEquals(count, 10000); + } catch (ExecutionException e) { + fail("Execution exception: ", e); + } + connection.close(); + } + + // generates test cases for the risingwave debezium parser + @Ignore + @Test + public void getTestJson() throws InterruptedException, SQLException { + Connection connection = SourceTestClient.connect(pgDataSource); + String query = + "CREATE TABLE IF NOT EXISTS orders (" + + "O_KEY BIGINT NOT NULL, " + + "O_BOOL BOOLEAN, " + + "O_BITS BIT(3), " + + "O_TINY SMALLINT, " + + "O_INT INT, " + + "O_REAL REAL, " + + "O_DOUBLE DOUBLE PRECISION, " + + "O_DECIMAL DECIMAL(15, 2), " + + "O_CHAR CHAR(15), " + + "O_DATE DATE, " + + "O_TIME TIME, " + + "O_TIMESTAMP TIMESTAMP, " + + "O_JSON JSON, " + + "O_TEXT_ARR TEXT[][], " + + "PRIMARY KEY (O_KEY))"; + SourceTestClient.performQuery(connection, query); + Iterator eventStream = + testClient.getEventStreamStart( + pg, ConnectorServiceProto.SourceType.POSTGRES, "test", "orders"); + Thread t1 = + new Thread( + () -> { + while (eventStream.hasNext()) { + List messages = + eventStream.next().getEventsList(); + for (ConnectorServiceProto.CdcMessage msg : messages) { + LOG.info("{}", msg.getPayload()); + } + } + }); + // Q1: ordinary insert (read) + Thread.sleep(1000); + t1.start(); + query = + "INSERT INTO orders (O_KEY, O_BOOL, O_BITS, O_TINY, O_INT, O_REAL, O_DOUBLE, O_DECIMAL, O_CHAR, O_DATE, O_TIME, O_TIMESTAMP, O_JSON, O_TEXT_ARR)" + + "VALUES(111, TRUE, b'111', -1, -1111, -11.11, -111.11111, -111.11, 'yes please', '2011-11-11', '11:11:11', '2011-11-11 11:11:11.123456', '{\"k1\": \"v1\", \"k2\": 11}', ARRAY[['meeting', 'lunch'], ['training', 'presentation']])"; + SourceTestClient.performQuery(connection, query); + Thread.sleep(1000); + connection.close(); + } +} diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/SourceTestClient.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/SourceTestClient.java new file mode 100644 index 000000000000..ea0a601fbf69 --- /dev/null +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/SourceTestClient.java @@ -0,0 +1,176 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector; + +import static org.assertj.core.api.Assertions.fail; + +import com.risingwave.proto.ConnectorServiceGrpc; +import com.risingwave.proto.ConnectorServiceProto; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.grpc.Channel; +import io.grpc.StatusRuntimeException; +import java.io.*; +import java.net.URI; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Iterator; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.JdbcDatabaseContainer; + +public class SourceTestClient { + static final Logger LOG = LoggerFactory.getLogger(SourceTestClient.class.getName()); + private final ConnectorServiceGrpc.ConnectorServiceBlockingStub blockingStub; + + public Properties sqlStmts = new Properties(); + + public SourceTestClient(Channel channel) { + blockingStub = ConnectorServiceGrpc.newBlockingStub(channel); + try (InputStream input = + getClass().getClassLoader().getResourceAsStream("stored_queries.properties")) { + sqlStmts.load(input); + } catch (IOException e) { + fail("failed to load sql statements", e); + } + } + + protected static Connection connect(DataSource dataSource) { + Connection connection = null; + try { + connection = dataSource.getConnection(); + } catch (SQLException e) { + fail("SQL Exception: {}", e); + } + return connection; + } + + protected static ResultSet performQuery(Connection connection, String sql) { + ResultSet resultSet = null; + try { + Statement statement = connection.createStatement(); + if (statement.execute(sql)) { + resultSet = statement.getResultSet(); + resultSet.next(); + } else { + LOG.info("updated: " + statement.getUpdateCount()); + } + } catch (SQLException e) { + LOG.warn("SQL Exception: {}", e.getMessage()); + } + return resultSet; + } + + protected static DataSource getDataSource(JdbcDatabaseContainer container) { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setJdbcUrl(container.getJdbcUrl()); + hikariConfig.setUsername(container.getUsername()); + hikariConfig.setPassword(container.getPassword()); + hikariConfig.setDriverClassName(container.getDriverClassName()); + return new HikariDataSource(hikariConfig); + } + + protected Iterator getEventStreamStart( + JdbcDatabaseContainer container, + ConnectorServiceProto.SourceType sourceType, + String databaseName, + String tableName) { + String port = String.valueOf(URI.create(container.getJdbcUrl().substring(5)).getPort()); + ConnectorServiceProto.GetEventStreamRequest req = + ConnectorServiceProto.GetEventStreamRequest.newBuilder() + .setStart( + ConnectorServiceProto.GetEventStreamRequest.StartSource.newBuilder() + .setSourceId(0) + .setSourceType(sourceType) + .setStartOffset("") + .putProperties("hostname", container.getHost()) + .putProperties("port", port) + .putProperties("username", container.getUsername()) + .putProperties("password", container.getPassword()) + .putProperties("database.name", databaseName) + .putProperties("table.name", tableName) + .putProperties("schema.name", "public") // pg only + .putProperties("slot.name", "orders") // pg only + .putProperties("server.id", "1")) // mysql only + .build(); + Iterator responses = null; + try { + responses = blockingStub.getEventStream(req); + } catch (StatusRuntimeException e) { + fail("RPC failed: {}", e.getStatus()); + } + return responses; + } + + // generates an orders.tbl in class path using random data + // if file does not contain 10000 lines + static void genOrdersTable(int numRows) { + String[] orderStatusArr = {"O", "F"}; + String[] orderPriorityArr = {"1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED", "5-LOW"}; + String path = + SourceTestClient.class.getProtectionDomain().getCodeSource().getLocation().getFile() + + "orders.tbl"; + try (BufferedReader reader = new BufferedReader(new FileReader(path))) { + int lines = 0; + while (reader.readLine() != null) { + lines++; + } + if (lines == 10000) { + LOG.info("orders.tbl contains 10000 lines, skipping data generation"); + return; + } + } catch (Exception e) { + fail("Runtime Exception: {}", e); + } + Random rand = new Random(); + try (PrintWriter writer = new PrintWriter(path, "UTF-8")) { + for (int i = 1; i <= numRows; i++) { + String custKey = String.valueOf(Math.abs(rand.nextLong())); + String orderStatus = orderStatusArr[rand.nextInt(orderStatusArr.length)]; + String totalPrice = rand.nextInt(1000000) + "." + rand.nextInt(9) + rand.nextInt(9); + String orderDate = + (rand.nextInt(60) + 1970) + + "-" + + String.format("%02d", rand.nextInt(12) + 1) + + "-" + + String.format("%02d", rand.nextInt(28) + 1); + String orderPriority = orderPriorityArr[rand.nextInt(orderPriorityArr.length)]; + String clerk = "Clerk#" + String.format("%09d", rand.nextInt(1024)); + String shipPriority = "0"; + String comment = UUID.randomUUID() + " " + UUID.randomUUID(); + writer.printf( + "%s|%s|%s|%s|%s|%s|%s|%s|%s\n", + i, + custKey, + orderStatus, + totalPrice, + orderDate, + orderPriority, + clerk, + shipPriority, + comment); + } + } catch (Exception e) { + fail("Runtime Exception: {}", e); + } + LOG.info("10000 lines written to orders.tbl"); + } +} diff --git a/java/connector-node/risingwave-source-test/src/test/resources/my.cnf b/java/connector-node/risingwave-source-test/src/test/resources/my.cnf new file mode 100644 index 000000000000..a6a3175e7ef3 --- /dev/null +++ b/java/connector-node/risingwave-source-test/src/test/resources/my.cnf @@ -0,0 +1,7 @@ +[mysqld] +server-id = 223344 +log_bin = mysql-bin +binlog_format = ROW +binlog_row_image = FULL +expire_logs_days = 10 +secure-file-priv = '/home' diff --git a/java/connector-node/risingwave-source-test/src/test/resources/orders.tbl b/java/connector-node/risingwave-source-test/src/test/resources/orders.tbl new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/java/connector-node/risingwave-source-test/src/test/resources/stored_queries.properties b/java/connector-node/risingwave-source-test/src/test/resources/stored_queries.properties new file mode 100644 index 000000000000..20acafedebae --- /dev/null +++ b/java/connector-node/risingwave-source-test/src/test/resources/stored_queries.properties @@ -0,0 +1,3 @@ +mysql.bin_log=show variables like 'log_bin' +postgres.wal=show wal_level +tpch.create.orders=CREATE TABLE IF NOT EXISTS orders (O_ORDERKEY BIGINT NOT NULL, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15, 2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY BIGINT NOT NULL, O_COMMENT VARCHAR(79) NOT NULL, PRIMARY KEY (O_ORDERKEY)) \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index f78c968c8d61..85cc80a998c3 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -16,6 +16,7 @@ connector-node/risingwave-sink-deltalake connector-node/risingwave-sink-jdbc connector-node/risingwave-source-cdc + connector-node/risingwave-source-test connector-node/risingwave-connector-service connector-node/assembly diff --git a/src/common/src/types/chrono_wrapper.rs b/src/common/src/types/chrono_wrapper.rs index 7ec81c640e6f..2dd08c3a2dc3 100644 --- a/src/common/src/types/chrono_wrapper.rs +++ b/src/common/src/types/chrono_wrapper.rs @@ -16,7 +16,7 @@ use std::hash::Hash; use std::io::Write; use bytes::{Bytes, BytesMut}; -use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday}; +use chrono::{Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday}; use postgres_types::{ToSql, Type}; use thiserror::Error; @@ -190,6 +190,15 @@ impl NaiveDateWrapper { )) } + pub fn with_days_since_unix_epoch(days: i32) -> Result { + Ok(NaiveDateWrapper::new( + NaiveDate::from_num_days_from_ce_opt(days) + .ok_or_else(|| InvalidParamsError::date(days))? + .checked_add_days(Days::new(UNIX_EPOCH_DAYS as u64)) + .ok_or_else(|| InvalidParamsError::date(days))?, + )) + } + pub fn to_protobuf(self, output: &mut T) -> ArrayResult { output .write(&(self.0.num_days_from_ce()).to_be_bytes()) @@ -246,6 +255,12 @@ impl NaiveTimeWrapper { Self::with_secs_nano(secs, nano).map_err(Into::into) } + pub fn with_milli(milli: u32) -> Result { + let secs = milli / 1_000; + let nano = (milli % 1_000) * 1_000_000; + Self::with_secs_nano(secs, nano) + } + pub fn from_hms_uncheck(hour: u32, min: u32, sec: u32) -> Self { Self::from_hms_nano_uncheck(hour, min, sec, 0) } diff --git a/src/connector/src/parser/common.rs b/src/connector/src/parser/common.rs index ccc696509761..30c710744c26 100644 --- a/src/connector/src/parser/common.rs +++ b/src/connector/src/parser/common.rs @@ -15,7 +15,9 @@ use anyhow::{anyhow, Result}; use num_traits::FromPrimitive; use risingwave_common::array::{ListValue, StructValue}; -use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl}; +use risingwave_common::types::{ + DataType, Datum, Decimal, NaiveDateWrapper, NaiveTimeWrapper, ScalarImpl, +}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::vector_op::cast::{ i64_to_timestamp, i64_to_timestamptz, str_to_date, str_to_time, str_to_timestamp, @@ -24,42 +26,67 @@ use risingwave_expr::vector_op::cast::{ use simd_json::value::StaticNode; use simd_json::{BorrowedValue, ValueAccess}; -use crate::{ensure_int, ensure_str}; +use crate::{ensure_i16, ensure_i32, ensure_i64, ensure_str, simd_json_ensure_float}; fn do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result { - use crate::simd_json_ensure_float; - let v = match dtype { - DataType::Boolean => v.as_bool().ok_or_else(|| anyhow!("expect bool"))?.into(), - DataType::Int16 => ScalarImpl::Int16( - ensure_int!(v, i16) - .try_into() - .map_err(|e| anyhow!("expect i16: {}", e))?, - ), - DataType::Int32 => ScalarImpl::Int32( - ensure_int!(v, i32) - .try_into() - .map_err(|e| anyhow!("expect i32: {}", e))?, - ), - DataType::Int64 => ensure_int!(v, i64).into(), - DataType::Float32 => ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into()), - DataType::Float64 => ScalarImpl::Float64((simd_json_ensure_float!(v, f64)).into()), + DataType::Boolean => match v { + BorrowedValue::Static(StaticNode::Bool(b)) => (*b).into(), + // debezium converts bool to int, false -> 0, true -> 1, for mysql and postgres + BorrowedValue::Static(v) => match v.as_i64() { + Some(0i64) => ScalarImpl::Bool(false), + Some(1i64) => ScalarImpl::Bool(true), + _ => anyhow::bail!("expect bool, but found {v}"), + }, + _ => anyhow::bail!("expect bool, but found {v}"), + }, + DataType::Int16 => ensure_i16!(v, i16).into(), + DataType::Int32 => ensure_i32!(v, i32).into(), + DataType::Int64 => ensure_i64!(v, i64).into(), + // when f32 overflows, the value is converted to `inf` which is inappropriate + DataType::Float32 => { + let scalar_val = ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into()); + if let ScalarImpl::Float32(f) = scalar_val { + if f.is_infinite() { + anyhow::bail!("{v} is out of range for type f32"); + } + } + scalar_val + } + DataType::Float64 => simd_json_ensure_float!(v, f64).into(), // FIXME: decimal should have more precision than f64 DataType::Decimal => Decimal::from_f64(simd_json_ensure_float!(v, Decimal)) .ok_or_else(|| anyhow!("expect decimal"))? .into(), DataType::Varchar => ensure_str!(v, "varchar").to_string().into(), DataType::Bytea => ensure_str!(v, "bytea").to_string().into(), - DataType::Date => str_to_date(ensure_str!(v, "date"))?.into(), - DataType::Time => str_to_time(ensure_str!(v, "time"))?.into(), + // debezium converts date to i32 for mysql and postgres + DataType::Date => match v { + BorrowedValue::String(s) => str_to_date(s)?.into(), + BorrowedValue::Static(_) => { + NaiveDateWrapper::with_days_since_unix_epoch(ensure_i32!(v, i32))?.into() + } + _ => anyhow::bail!("expect date, but found {v}"), + }, + // debezium converts time to i64 for mysql and postgres + DataType::Time => match v { + BorrowedValue::String(s) => str_to_time(s)?.into(), + BorrowedValue::Static(_) => NaiveTimeWrapper::with_milli( + ensure_i64!(v, i64) + .try_into() + .map_err(|_| anyhow!("cannot cast i64 to time, value out of range"))?, + )? + .into(), + _ => anyhow::bail!("expect time, but found {v}"), + }, DataType::Timestamp => match v { BorrowedValue::String(s) => str_to_timestamp(s)?.into(), - BorrowedValue::Static(_) => i64_to_timestamp(ensure_int!(v, i64))?.into(), + BorrowedValue::Static(_) => i64_to_timestamp(ensure_i64!(v, i64))?.into(), _ => anyhow::bail!("expect timestamp, but found {v}"), }, DataType::Timestamptz => match v { BorrowedValue::String(s) => str_with_time_zone_to_timestamptz(s)?.into(), - BorrowedValue::Static(_) => i64_to_timestamptz(ensure_int!(v, i64))?.into(), + BorrowedValue::Static(_) => i64_to_timestamptz(ensure_i64!(v, i64))?.into(), _ => anyhow::bail!("expect timestamptz, but found {v}"), }, DataType::Jsonb => { diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index f6aa59f2c2f0..8a0c5718bd51 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -156,53 +156,63 @@ mod tests { use std::convert::TryInto; + use chrono::{NaiveDate, NaiveTime}; use risingwave_common::array::Op; use risingwave_common::catalog::ColumnId; use risingwave_common::row::{OwnedRow, Row}; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::{ + DataType, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, Scalar, ScalarImpl, + }; + use serde_json::Value; use super::*; use crate::parser::{SourceColumnDesc, SourceStreamChunkBuilder}; - fn get_test_columns() -> Vec { + fn get_test1_columns() -> Vec { let descs = vec![ - SourceColumnDesc { - name: "id".to_string(), - data_type: DataType::Int32, - column_id: ColumnId::from(0), - is_row_id: false, - is_meta: false, - fields: vec![], - }, - SourceColumnDesc { - name: "name".to_string(), - data_type: DataType::Varchar, - column_id: ColumnId::from(1), - is_row_id: false, - is_meta: false, - fields: vec![], - }, - SourceColumnDesc { - name: "description".to_string(), - data_type: DataType::Varchar, - column_id: ColumnId::from(2), - is_row_id: false, - is_meta: false, - fields: vec![], - }, - SourceColumnDesc { - name: "weight".to_string(), - data_type: DataType::Float64, - column_id: ColumnId::from(3), - is_row_id: false, - is_meta: false, - fields: vec![], - }, + SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)), + SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)), + SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)), + SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)), ]; descs } + // test2 generated by mysql + fn get_test2_columns() -> Vec { + let descs = vec![ + SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)), + SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)), + SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)), + SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)), + SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)), + SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)), + SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)), + SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)), + SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)), + SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)), + SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)), + SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamp, ColumnId::from(11)), + SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)), + ]; + + descs + } + + fn assert_json_eq(parse_result: &Option, json_str: &str) { + if let Some(ScalarImpl::Jsonb(json_val)) = parse_result { + let mut json_string = String::new(); + json_val + .as_scalar_ref() + .force_str(&mut json_string) + .unwrap(); + let val1: Value = serde_json::from_str(json_string.as_str()).unwrap(); + let val2: Value = serde_json::from_str(json_str).unwrap(); + assert_eq!(val1, val2); + } + } + async fn parse_one( parser: DebeziumJsonParser, columns: Vec, @@ -221,7 +231,261 @@ mod tests { } #[tokio::test] - async fn test_debezium_json_parser_read() { + async fn test2_debezium_json_parser_read() { + let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#; + + let columns = get_test2_columns(); + + let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + + let [(_op, row)]: [_; 1] = parse_one(parser, columns, data).await.try_into().unwrap(); + + assert!(row[0].eq(&Some(ScalarImpl::Int64(111)))); + assert!(row[1].eq(&Some(ScalarImpl::Bool(true)))); + assert!(row[2].eq(&Some(ScalarImpl::Int16(-1)))); + assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111)))); + assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into())))); + assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into())))); + assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap())))); + assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into())))); + assert!(row[8].eq(&Some(ScalarImpl::NaiveDate(NaiveDateWrapper::new( + NaiveDate::from_ymd_opt(1000, 1, 1).unwrap() + ))))); + assert!(row[9].eq(&Some(ScalarImpl::NaiveTime(NaiveTimeWrapper::new( + NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap() + ))))); + assert!( + row[10].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "1970-01-01T00:00:00".parse().unwrap() + )))) + ); + assert!( + row[11].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "1970-01-01T00:00:01".parse().unwrap() + )))) + ); + assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}"); + } + + #[tokio::test] + async fn test2_debezium_json_parser_insert() { + let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#; + + let columns = get_test2_columns(); + let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let [(op, row)]: [_; 1] = parse_one(parser, columns, data).await.try_into().unwrap(); + assert_eq!(op, Op::Insert); + + assert!(row[0].eq(&Some(ScalarImpl::Int64(111)))); + assert!(row[1].eq(&Some(ScalarImpl::Bool(true)))); + assert!(row[2].eq(&Some(ScalarImpl::Int16(-1)))); + assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111)))); + assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into())))); + assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into())))); + assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap())))); + assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into())))); + assert!(row[8].eq(&Some(ScalarImpl::NaiveDate(NaiveDateWrapper::new( + NaiveDate::from_ymd_opt(1000, 1, 1).unwrap() + ))))); + assert!(row[9].eq(&Some(ScalarImpl::NaiveTime(NaiveTimeWrapper::new( + NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap() + ))))); + assert!( + row[10].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "1970-01-01T00:00:00".parse().unwrap() + )))) + ); + assert!( + row[11].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "1970-01-01T00:00:01".parse().unwrap() + )))) + ); + assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}"); + } + + #[tokio::test] + async fn test2_debezium_json_parser_delete() { + let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#; + + let columns = get_test2_columns(); + let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let [(op, row)]: [_; 1] = parse_one(parser, columns, data).await.try_into().unwrap(); + + assert_eq!(op, Op::Delete); + + assert!(row[0].eq(&Some(ScalarImpl::Int64(111)))); + assert!(row[1].eq(&Some(ScalarImpl::Bool(false)))); + assert!(row[2].eq(&Some(ScalarImpl::Int16(3)))); + assert!(row[3].eq(&Some(ScalarImpl::Int32(3333)))); + assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into())))); + assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into())))); + assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap())))); + assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into())))); + assert!(row[8].eq(&Some(ScalarImpl::NaiveDate(NaiveDateWrapper::new( + NaiveDate::from_ymd_opt(9999, 12, 31).unwrap() + ))))); + assert!(row[9].eq(&Some(ScalarImpl::NaiveTime(NaiveTimeWrapper::new( + NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap() + ))))); + assert!( + row[10].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "5138-11-16T09:46:39".parse().unwrap() + )))) + ); + assert!( + row[11].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "2038-01-09T03:14:07".parse().unwrap() + )))) + ); + assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}"); + } + + #[tokio::test] + async fn test2_debezium_json_parser_update() { + let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"},{"type":"double","optional":true,"field":"O_DECIMAL"},{"type":"string","optional":true,"field":"O_CHAR"},{"type":"string","optional":true,"name":"risingwave.cdc.date.string","field":"O_DATE"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Time","version":1,"field":"O_TIME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"O_DATETIME"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"O_TIMESTAMP"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"O_JSON"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#; + + let columns = get_test2_columns(); + + let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let [(op1, row1), (op2, row2)]: [_; 2] = + parse_one(parser, columns, data).await.try_into().unwrap(); + + assert_eq!(op1, Op::UpdateDelete); + assert_eq!(op2, Op::UpdateInsert); + + assert!(row1[0].eq(&Some(ScalarImpl::Int64(111)))); + assert!(row1[1].eq(&Some(ScalarImpl::Bool(true)))); + assert!(row1[2].eq(&Some(ScalarImpl::Int16(-1)))); + assert!(row1[3].eq(&Some(ScalarImpl::Int32(-1111)))); + assert!(row1[4].eq(&Some(ScalarImpl::Float32((-11.11).into())))); + assert!(row1[5].eq(&Some(ScalarImpl::Float64((-111.11111).into())))); + assert!(row1[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap())))); + assert!(row1[7].eq(&Some(ScalarImpl::Utf8("yes please".into())))); + assert!( + row1[8].eq(&Some(ScalarImpl::NaiveDate(NaiveDateWrapper::new( + NaiveDate::from_ymd_opt(1000, 1, 1).unwrap() + )))) + ); + assert!( + row1[9].eq(&Some(ScalarImpl::NaiveTime(NaiveTimeWrapper::new( + NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap() + )))) + ); + assert!( + row1[10].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "1970-01-01T00:00:00".parse().unwrap() + )))) + ); + assert!( + row1[11].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "1970-01-01T00:00:01".parse().unwrap() + )))) + ); + assert_json_eq(&row1[12], "{\"k1\": \"v1\", \"k2\": 11}"); + + assert!(row2[0].eq(&Some(ScalarImpl::Int64(111)))); + assert!(row2[1].eq(&Some(ScalarImpl::Bool(false)))); + assert!(row2[2].eq(&Some(ScalarImpl::Int16(3)))); + assert!(row2[3].eq(&Some(ScalarImpl::Int32(3333)))); + assert!(row2[4].eq(&Some(ScalarImpl::Float32((33.33).into())))); + assert!(row2[5].eq(&Some(ScalarImpl::Float64((333.33333).into())))); + assert!(row2[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap())))); + assert!(row2[7].eq(&Some(ScalarImpl::Utf8("no thanks".into())))); + assert!( + row2[8].eq(&Some(ScalarImpl::NaiveDate(NaiveDateWrapper::new( + NaiveDate::from_ymd_opt(9999, 12, 31).unwrap() + )))) + ); + assert!( + row2[9].eq(&Some(ScalarImpl::NaiveTime(NaiveTimeWrapper::new( + NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap() + )))) + ); + assert!( + row2[10].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "5138-11-16T09:46:39".parse().unwrap() + )))) + ); + assert!( + row2[11].eq(&Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + "2038-01-09T03:14:07".parse().unwrap() + )))) + ); + assert_json_eq(&row2[12], "{\"k1\": \"v1_updated\", \"k2\": 33}"); + } + + #[tokio::test] + async fn test2_debezium_json_parser_overflow() { + let columns = vec![ + SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)), + SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)), + SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)), + SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)), + SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)), + SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)), + ]; + let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); + // i64 overflow + let data0 = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_KEY":9223372036854775808,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; + if let Err(e) = parser.parse_inner(data0, builder.row_writer()).await { + println!("{:?}", e.to_string()); + } else { + panic!("the test case is expected fail"); + } + // bool incorrect value + let data1 = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":2,"O_TINY":33,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; + if let Err(e) = parser.parse_inner(data1, builder.row_writer()).await { + println!("{:?}", e.to_string()); + } else { + panic!("the test case is expected failed"); + } + // i16 overflow + let data2 = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":32768,"O_INT":444,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; + if let Err(e) = parser.parse_inner(data2, builder.row_writer()).await { + println!("{:?}", e.to_string()); + } else { + panic!("the test case is expected to fail"); + } + // i32 overflow + let data3 = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":2147483648,"O_REAL":555.0,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; + if let Err(e) = parser.parse_inner(data3, builder.row_writer()).await { + println!("{:?}", e.to_string()); + } else { + panic!("the test case is expected to fail"); + } + // float32 overflow + let data4 = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_KEY"},{"type":"int16","optional":true,"field":"O_BOOL"},{"type":"int16","optional":true,"field":"O_TINY"},{"type":"int32","optional":true,"field":"O_INT"},{"type":"double","optional":true,"field":"O_REAL"},{"type":"double","optional":true,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":33,"O_INT":444,"O_REAL":3.80282347E38,"O_DOUBLE":666.0},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678158055464,"transaction":null}}"#; + if let Err(e) = parser.parse_inner(data4, builder.row_writer()).await { + println!("{:?}", e.to_string()); + } else { + panic!("the test case is expected to fail"); + } + // float64 will cause debezium simd_json_parser to panic, therefore included in the next + // test case below + } + + #[tokio::test] + #[should_panic] + async fn test2_debezium_json_parser_overflow_f64() { + let columns = vec![SourceColumnDesc::simple( + "O_DOUBLE", + DataType::Float64, + ColumnId::from(0), + )]; + let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); + let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"O_DOUBLE"}],"optional":true,"name":"RW_CDC_test.orders.test.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"RW_CDC_test.orders.test.orders.Envelope"},"payload":{"before":null,"after":{"O_DOUBLE":1.797695E308},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678174483000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":563,"row":0,"thread":3,"query":null},"op":"c","ts_ms":1678174483866,"transaction":null}}"#; + if let Err(e) = parser.parse_inner(data, builder.row_writer()).await { + println!("{:?}", e.to_string()); + } else { + panic!("the test case is expected to fail"); + } + } + + #[tokio::test] + async fn test1_debezium_json_parser_read() { // "before": null, // "after": { // "id": 101, @@ -231,7 +495,7 @@ mod tests { // }, let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#; - let columns = get_test_columns(); + let columns = get_test1_columns(); let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); @@ -244,7 +508,7 @@ mod tests { } #[tokio::test] - async fn test_debezium_json_parser_insert() { + async fn test1_debezium_json_parser_insert() { // "before": null, // "after": { // "id": 102, @@ -254,7 +518,7 @@ mod tests { // }, let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#; - let columns = get_test_columns(); + let columns = get_test1_columns(); let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); let [(op, row)]: [_; 1] = parse_one(parser, columns, data).await.try_into().unwrap(); assert_eq!(op, Op::Insert); @@ -266,7 +530,7 @@ mod tests { } #[tokio::test] - async fn test_debezium_json_parser_delete() { + async fn test1_debezium_json_parser_delete() { // "before": { // "id": 101, // "name": "scooter", @@ -276,7 +540,7 @@ mod tests { // "after": null, let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#; - let columns = get_test_columns(); + let columns = get_test1_columns(); let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); let [(op, row)]: [_; 1] = parse_one(parser, columns, data).await.try_into().unwrap(); @@ -289,7 +553,7 @@ mod tests { } #[tokio::test] - async fn test_debezium_json_parser_update() { + async fn test1_debezium_json_parser_update() { // "before": { // "id": 102, // "name": "car battery", @@ -304,7 +568,7 @@ mod tests { // }, let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#; - let columns = get_test_columns(); + let columns = get_test1_columns(); let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); let [(op1, row1), (op2, row2)]: [_; 2] = @@ -325,7 +589,7 @@ mod tests { } #[tokio::test] - async fn test_update_with_before_null() { + async fn test1_update_with_before_null() { // the test case it identical with test_debezium_json_parser_insert but op is 'u' // "before": null, // "after": { @@ -336,7 +600,7 @@ mod tests { // }, let data = br#"{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551564960,"transaction":null}}"#; - let columns = get_test_columns(); + let columns = get_test1_columns(); let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); diff --git a/src/connector/src/parser/macros.rs b/src/connector/src/parser/macros.rs index 10058f0bd8aa..bd46b85f47b4 100644 --- a/src/connector/src/parser/macros.rs +++ b/src/connector/src/parser/macros.rs @@ -35,7 +35,23 @@ macro_rules! simd_json_ensure_float { } #[macro_export] -macro_rules! ensure_int { +macro_rules! ensure_i16 { + ($v:ident, $t:ty) => { + $v.as_i16() + .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))? + }; +} + +#[macro_export] +macro_rules! ensure_i32 { + ($v:ident, $t:ty) => { + $v.as_i32() + .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))? + }; +} + +#[macro_export] +macro_rules! ensure_i64 { ($v:ident, $t:ty) => { $v.as_i64() .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))?