From 8cc9a55c85d137554f464a00aca4be1137c468c7 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 13 Sep 2024 15:10:33 +0800 Subject: [PATCH] fix Signed-off-by: xxchan --- .../codec/src/decoder/protobuf/parser.rs | 12 +++-- .../codec/tests/integration_tests/protobuf.rs | 53 ++++++++++++++++++- .../codec/tests/test_data/all-types.proto | 3 ++ 3 files changed, 63 insertions(+), 5 deletions(-) diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index 3464797877ac..852fa9cca48d 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -192,10 +192,10 @@ pub fn from_protobuf_value<'a>( let DataType::Map(map_type) = type_expected else { return Err(err()); }; - let map_desc = kind.as_message().ok_or_else(err)?; - if !map_desc.is_map_entry() { + if !field_desc.is_map() { return Err(err()); } + let map_desc = kind.as_message().ok_or_else(err)?; let mut key_builder = map_type.key().create_array_builder(map.len()); let mut value_builder = map_type.value().create_array_builder(map.len()); @@ -205,11 +205,15 @@ pub fn from_protobuf_value<'a>( // in the future. for (key, value) in map.iter().sorted_by_key(|(k, _v)| *k) { key_builder.append(from_protobuf_value( - field_desc, + &map_desc.map_entry_key_field(), &key.clone().into(), map_type.key(), )?); - value_builder.append(from_protobuf_value(field_desc, value, map_type.value())?); + value_builder.append(from_protobuf_value( + &map_desc.map_entry_value_field(), + value, + map_type.value(), + )?); } let keys = key_builder.finish(); let values = value_builder.finish(); diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index be71ece4e9a6..9a70ef5e5c7a 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -537,6 +537,26 @@ fn test_all_types() -> anyhow::Result<()> { int32_value_field: Some(42), string_value_field: Some("Hello, Wrapper!".to_string()), example_oneof: Some(ExampleOneof::OneofInt32(123)), + map_struct_field: HashMap::from_iter([ + ( + "key1".to_string(), + NestedMessage { + id: 1, + name: "A".to_string(), + }, + ), + ( + "key2".to_string(), + NestedMessage { + id: 2, + name: "B".to_string(), + }, + ), + ]), + map_enum_field: HashMap::from_iter([ + (1, EnumType::Option1 as i32), + (2, EnumType::Option2 as i32), + ]), } }; let mut data_bytes = Vec::new(); @@ -583,6 +603,11 @@ fn test_all_types() -> anyhow::Result<()> { any_field(#35): Jsonb, type_name: google.protobuf.Any, field_descs: [type_url(#33): Varchar, value(#34): Bytea], int32_value_field(#37): Struct { value: Int32 }, type_name: google.protobuf.Int32Value, field_descs: [value(#36): Int32], string_value_field(#39): Struct { value: Varchar }, type_name: google.protobuf.StringValue, field_descs: [value(#38): Varchar], + map_struct_field(#44): Map(Varchar,Struct { id: Int32, name: Varchar }), type_name: all_types.AllTypes.MapStructFieldEntry, field_descs: [key(#40): Varchar, value(#43): Struct { + id: Int32, + name: Varchar, + }, type_name: all_types.AllTypes.NestedMessage, field_descs: [id(#41): Int32, name(#42): Varchar]], + map_enum_field(#47): Map(Int32,Varchar), type_name: all_types.AllTypes.MapEnumFieldEntry, field_descs: [key(#45): Int32, value(#46): Varchar], ]"#]], expect![[r#" Owned(Float64(OrderedFloat(1.2345))) @@ -641,7 +666,33 @@ fn test_all_types() -> anyhow::Result<()> { Error at column `any_field`: Fail to convert protobuf Any into jsonb: message 'my_custom_type' not found ~~~~ Owned(StructValue(Int32(42))) - Owned(StructValue(Utf8("Hello, Wrapper!")))"#]], + Owned(StructValue(Utf8("Hello, Wrapper!"))) + Owned([ + StructValue( + Utf8("key1"), + StructValue( + Int32(1), + Utf8("A"), + ), + ), + StructValue( + Utf8("key2"), + StructValue( + Int32(2), + Utf8("B"), + ), + ), + ]) + Owned([ + StructValue( + Int32(1), + Utf8("OPTION1"), + ), + StructValue( + Int32(2), + Utf8("OPTION2"), + ), + ])"#]], ); Ok(()) diff --git a/src/connector/codec/tests/test_data/all-types.proto b/src/connector/codec/tests/test_data/all-types.proto index 3d019a70167d..5070328dbf5f 100644 --- a/src/connector/codec/tests/test_data/all-types.proto +++ b/src/connector/codec/tests/test_data/all-types.proto @@ -73,4 +73,7 @@ message AllTypes { // wrapper types google.protobuf.Int32Value int32_value_field = 27; google.protobuf.StringValue string_value_field = 28; + + map map_struct_field = 29; + map map_enum_field = 30; }