Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: feat(source): support ingesting protobuf map #18558

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 0 additions & 13 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,7 @@ echo "preparing confluent schema registry"
python3 -m pip install --break-system-packages requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
rpk topic delete test-rw-sink-upsert-avro
1 change: 0 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ risedev ci-kill
echo "--- Prepare data"
cp src/connector/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc
cp src/connector/src/test_data/complex-schema.avsc ./avro-complex-schema.avsc
cp src/connector/src/test_data/complex-schema ./proto-complex-schema
cp src/connector/src/test_data/complex-schema.json ./json-complex-schema


Expand Down
12 changes: 12 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
statement ok
set sink_decouple = false;

system ok
rpk topic create test-rw-sink-upsert-avro

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'

statement ok
create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) )
include key as some_key
Expand Down Expand Up @@ -232,3 +241,6 @@ drop table into_kafka;

statement ok
drop table from_kafka;

system ok
rpk topic delete test-rw-sink-upsert-avro
50 changes: 37 additions & 13 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
statement ok
set sink_decouple = false;

system ok
rpk topic create test-rw-sink-append-only-protobuf

statement ok
create table from_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-a

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_trivial with (
Expand All @@ -19,6 +28,12 @@ format plain encode protobuf (
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageA');

system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-hi

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_nested with (
connector = 'kafka',
Expand Down Expand Up @@ -73,8 +88,8 @@ create sink sink0 from into_kafka with (
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

statement ok
create sink sink_csr_trivial as select string_field as field_a from into_kafka with (
Expand Down Expand Up @@ -103,8 +118,8 @@ create sink sink_upsert from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'string_field')
format upsert encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');
----
db error: ERROR: Failed to run the query

Expand All @@ -122,8 +137,8 @@ create sink sink_upsert from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'string_field')
format upsert encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes')
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes')
key encode text;

# Shall be ignored by force_append_only sinks but processed by upsert sinks.
Expand Down Expand Up @@ -178,7 +193,7 @@ create sink sink_err from into_kafka with (
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursiv',
message = 'recursive.AllTypes');
message = 'all_types.AllTypes');

statement error field not in proto
create sink sink_err as select 1 as extra_column with (
Expand All @@ -187,8 +202,8 @@ create sink sink_err as select 1 as extra_column with (
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

statement error s3 URL not supported yet
create sink sink_err from into_kafka with (
Expand All @@ -197,8 +212,8 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 's3:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

statement ok
drop table from_kafka cascade;
Expand All @@ -215,5 +230,14 @@ drop table from_kafka_raw cascade;
statement ok
drop table into_kafka cascade;

system ok
rpk topic delete test-rw-sink-append-only-protobuf

system ok
rpk topic delete test-rw-sink-append-only-protobuf-csr-a

system ok
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

system ok
rpk topic delete test-rw-sink-upsert-protobuf
36 changes: 0 additions & 36 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,6 @@ create table s10 with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true);

statement ok
create table s11 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT PLAIN ENCODE PROTOBUF (
message = 'test.User',
schema.location = 'file:///risingwave/proto-complex-schema'
);

statement ok
CREATE TABLE s12(
id int,
Expand Down Expand Up @@ -273,17 +262,6 @@ create table s16 (v1 int, v2 varchar) with (
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON

statement ok
create source s17 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT PLAIN ENCODE PROTOBUF (
message = 'test.User',
schema.location = 'file:///risingwave/proto-complex-schema'
);

statement ok
create source s18 with (
connector = 'kafka',
Expand Down Expand Up @@ -669,11 +647,6 @@ select id, code, timestamp, xfas, contacts, sex from s10;
----
100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, sex from s11;
----
0 abc 1473305798 {"(0,200,127.0.0.1)","(1,400,127.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, jsonb from s12;
----
Expand Down Expand Up @@ -703,9 +676,6 @@ select count(*) from s16
statement error Not supported: alter source with schema registry
alter source s18 add column v10 int;

statement error Not supported: alter source with schema registry
alter source s17 add column v10 int;

query III rowsort
select * from s21;
----
Expand Down Expand Up @@ -848,9 +818,6 @@ drop table s9
statement ok
drop table s10

statement ok
drop table s11

statement ok
drop table s12

Expand All @@ -866,9 +833,6 @@ drop table s15
statement ok
drop table s16

statement ok
drop source s17

statement ok
drop source s18

Expand Down
27 changes: 0 additions & 27 deletions e2e_test/source/basic/old_row_format_syntax/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,6 @@ create table s10 with (
scan.startup.mode = 'earliest'
) row format avro row schema location 'file:///risingwave/avro-complex-schema.avsc'

statement ok
create table s11 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'

statement ok
CREATE TABLE s12(
id int,
Expand Down Expand Up @@ -254,14 +246,6 @@ create table s16 (v1 int, v2 varchar) with (
scan.startup.mode = 'latest'
) ROW FORMAT JSON

statement ok
create source s17 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'

statement error without schema registry
create source s18 with (
connector = 'kafka',
Expand Down Expand Up @@ -570,11 +554,6 @@ select id, first_name, last_name, email from s8_no_schema_field;
# ----
# 100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, sex from s11;
----
0 abc 1473305798 {"(0,200,127.0.0.1)","(1,400,127.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, jsonb from s12;
----
Expand Down Expand Up @@ -712,9 +691,6 @@ drop table s8_no_schema_field
# statement ok
# drop table s10

statement ok
drop table s11

statement ok
drop table s12

Expand All @@ -730,9 +706,6 @@ drop table s15
statement ok
drop table s16

statement ok
drop source s17

# statement ok
# drop source s18

Expand Down
6 changes: 1 addition & 5 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ prometheus = { version = "0.13", features = ["process"] }
prost = { workspace = true, features = ["no-recursion-limit"] }
prost-reflect = { version = "0.14", features = ["serde"] }
prost-types = "0.13"
protobuf-native = "0.2.2"
pulsar = { version = "6.3", default-features = false, features = [
"tokio-runtime",
"telemetry",
Expand Down Expand Up @@ -192,6 +191,7 @@ assert_matches = "1"
criterion = { workspace = true, features = ["async_tokio", "async"] }
deltalake = { workspace = true, features = ["datafusion"] }
expect-test = "1"
fs-err = "2"
paste = "1"
pretty_assertions = "1"
quote = "1"
Expand All @@ -204,10 +204,6 @@ tracing-subscriber = "0.3"
tracing-test = "0.2"
walkdir = "2"

[build-dependencies]
prost-build = "0.12"
protobuf-src = "1"

[[bench]]
name = "debezium_json_parser"
harness = false
Expand Down
Loading
Loading