Skip to content

Commit

Permalink
fix(sink): fix connector node sink json payload serialization (#8461)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Mar 10, 2023
1 parent 2f626d9 commit 614b6c5
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 54 deletions.
13 changes: 10 additions & 3 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,18 @@ else
fi
ARTIFACT="risingwave-connector-1.0.0.tar.gz"
TARGET_PATH="${JAVA_DIR}/connector-node/assembly/target/${ARTIFACT}"
cd "${JAVA_DIR}"
"${MAVEN_PATH}" --batch-mode --update-snapshots clean package -Dmaven.test.skip
if [[ ! -f ${TARGET_PATH} ]] || [[ ! -z ${REBUILD_CONNECTOR_NODE} ]]; then
echo "Rebuild connector node"
cd "${JAVA_DIR}"
"${MAVEN_PATH}" --batch-mode --update-snapshots clean package -Dmaven.test.skip
else
echo "Connector node was built already. Skipped. Set REBUILD_CONNECTOR_NODE=1 to enable rebuild"
fi
rm -rf ${PREFIX_BIN}/connector-node
mkdir -p "${PREFIX_BIN}/connector-node"
tar xf "${JAVA_DIR}/connector-node/assembly/target/${ARTIFACT}" -C "${PREFIX_BIN}/connector-node"
tar xf ${TARGET_PATH} -C "${PREFIX_BIN}/connector-node"
'''


Expand Down
16 changes: 8 additions & 8 deletions ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
--S --e "CREATE TABLE demo.demo_db.demo_table(v1 int, v2 int) TBLPROPERTIES ('format-version'='2');"
--S --e "CREATE TABLE demo.demo_db.demo_table(v1 int, v2 bigint, v3 string) TBLPROPERTIES ('format-version'='2');"

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/iceberg_sink.slt'
Expand All @@ -80,13 +80,13 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \

# check sink destination using shell
if cat ./spark-output/*.csv | sort | awk -F "," '{
if ($1 == 1 && $2 == 2) c1++;
if ($1 == 13 && $2 == 2) c2++;
if ($1 == 21 && $2 == 2) c3++;
if ($1 == 2 && $2 == 2) c4++;
if ($1 == 3 && $2 == 2) c5++;
if ($1 == 5 && $2 == 2) c6++;
if ($1 == 8 && $2 == 2) c7++; }
if ($1 == 1 && $2 == 2 && $3 == "1-2") c1++;
if ($1 == 13 && $2 == 2 && $3 == "13-2") c2++;
if ($1 == 21 && $2 == 2 && $3 == "21-2") c3++;
if ($1 == 2 && $2 == 2 && $3 == "2-2") c4++;
if ($1 == 3 && $2 == 2 && $3 == "3-2") c5++;
if ($1 == 5 && $2 == 2 && $3 == "5-2") c6++;
if ($1 == 8 && $2 == 2 && $3 == "8-2") c7++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then
echo "Iceberg sink check passed"
else
Expand Down
12 changes: 6 additions & 6 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 int);
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2 from mv6 WITH (
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
sink.mode='append-only',
location.type='minio',
Expand All @@ -15,7 +15,10 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2 from mv6 WITH (
);

statement ok
INSERT INTO t6 VALUES (1, 2), (2, 2), (3, 2), (5, 2), (8, 2), (13, 2), (21, 2);
INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2');

statement ok
FLUSH;

statement ok
DROP SINK s6;
Expand All @@ -25,6 +28,3 @@ DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;

statement ok
FLUSH;
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,18 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem
public List<String> getPrimaryKeys() {
return primaryKeys;
}

@Override
public String toString() {
return "TableSchema{"
+ "columnNames="
+ columnNames
+ ", columns="
+ columns
+ ", columnIndices="
+ columnIndices
+ ", primaryKeys="
+ primaryKeys
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,63 +49,82 @@ public Iterator<SinkRow> deserialize(Object payload) {
.iterator();
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
if (value instanceof Double
&& (Double) value % 1 == 0
&& typeName != Data.DataType.TypeName.DOUBLE
&& typeName != Data.DataType.TypeName.FLOAT) {
return (int) (double) value;
private static Long castLong(Object value) {
if (value instanceof Integer) {
return ((Integer) value).longValue();
} else if (value instanceof Double) {
double d = (Double) value;
if (d % 1.0 != 0.0) {

throw io.grpc.Status.INVALID_ARGUMENT
.withDescription(
"unable to cast into long from non-integer double value: " + d)
.asRuntimeException();
}
return ((Double) value).longValue();
} else if (value instanceof Long) {
return (Long) value;
} else if (value instanceof Short) {
return ((Short) value).longValue();
} else if (value instanceof Float) {
double f = (Float) value;
if (f % 1.0 != 0.0) {

throw io.grpc.Status.INVALID_ARGUMENT
.withDescription(
"unable to cast into long from non-integer float value: " + f)
.asRuntimeException();
}
return ((Float) value).longValue();
} else {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into long from " + value.getClass())
.asRuntimeException();
}
}

private static Double castDouble(Object value) {
if (value instanceof Double) {
return (Double) value;
} else if (value instanceof Float) {
return ((Float) value).doubleValue();
} else {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into double from " + value.getClass())
.asRuntimeException();
}
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
switch (typeName) {
case INT16:
return castLong(value).shortValue();
case INT32:
return castLong(value).intValue();
case INT64:
case INT16:
if (!(value instanceof Integer)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected int, got " + value.getClass())
.asRuntimeException();
}
break;
return castLong(value);
case VARCHAR:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected string, got " + value.getClass())
.asRuntimeException();
}
break;
return value;
case DOUBLE:
if (!(value instanceof Double)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected double, got " + value.getClass())
.asRuntimeException();
}
break;
return castDouble(value);
case FLOAT:
if (!(value instanceof Float)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected float, got " + value.getClass())
.asRuntimeException();
}
break;
case DECIMAL:
if (!(value instanceof Float || value instanceof Double)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected float, got " + value.getClass())
.asRuntimeException();
}
break;
return castDouble(value).floatValue();
case BOOLEAN:
if (!(value instanceof Boolean)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected boolean, got " + value.getClass())
.asRuntimeException();
}
break;
return value;
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
.asRuntimeException();
}
return value;
}
}
4 changes: 2 additions & 2 deletions src/risedevtool/connector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ description = "Download Maven"
script = '''
#!/usr/bin/env bash
if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ "11" ]]); then
echo "JDK 11 is not installed. Please install JDK 11 first."
if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17) ]]); then
echo "JDK 11+ is not installed. Please install JDK 11+ first."
exit 1
fi
Expand Down

0 comments on commit 614b6c5

Please sign in to comment.