Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve error of building key encoder #18563

Merged
merged 4 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ profile:
# - use: kafka
# persist-data: true

# To enable Confluent schema registry, uncomment the following line
# - use: schema-registry

default-v6:
steps:
- use: meta-node
Expand Down
22 changes: 11 additions & 11 deletions src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ mod tests {
}
]
}"#,
"encode q error: avro name ref unsupported yet",
"encode 'q' error: avro name ref unsupported yet",
);

test_err(
Expand All @@ -663,7 +663,7 @@ mod tests {
i64::MAX,
))),
r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
"encode error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
"encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
);

let avro_schema = AvroSchema::parse_str(
Expand Down Expand Up @@ -738,7 +738,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode req error: field not present but required"
"Encode error: encode 'req' error: field not present but required"
);

let schema = Schema::new(vec![
Expand All @@ -751,7 +751,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode extra error: field not in avro"
"Encode error: encode 'extra' error: field not in avro"
);

let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
Expand All @@ -761,14 +761,14 @@ mod tests {
};
assert_eq!(
err.to_string(),
r#"Encode error: encode error: expect avro record but got ["null","long"]"#
r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
);

test_err(
&DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
(),
r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
"encode f0 error: cannot encode boolean column as \"int\" field",
"encode 'f0' error: cannot encode boolean column as \"int\" field",
);
}

Expand All @@ -790,7 +790,7 @@ mod tests {
&DataType::List(DataType::Int32.into()),
Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
avro_schema,
"encode error: found null but required",
"encode '' error: found null but required",
);

test_ok(
Expand Down Expand Up @@ -829,7 +829,7 @@ mod tests {
&DataType::List(DataType::Boolean.into()),
(),
r#"{"type": "array", "items": "int"}"#,
"encode error: cannot encode boolean column as \"int\" field",
"encode '' error: cannot encode boolean column as \"int\" field",
);
}

Expand Down Expand Up @@ -863,14 +863,14 @@ mod tests {
t,
datum.to_datum_ref(),
both,
r#"encode error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
);

test_err(
t,
datum.to_datum_ref(),
empty,
"encode error: cannot encode timestamp with time zone column as [] field",
"encode '' error: cannot encode timestamp with time zone column as [] field",
);

test_ok(
Expand All @@ -879,7 +879,7 @@ mod tests {
one,
Value::Union(0, Value::TimestampMillis(1).into()),
);
test_err(t, None, one, "encode error: found null but required");
test_err(t, None, one, "encode '' error: found null but required");

test_ok(
t,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl std::fmt::Display for FieldEncodeError {

write!(
f,
"encode {} error: {}",
"encode '{}' error: {}",
self.rev_path.iter().rev().join("."),
self.message
)
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode repeated_int_field error: cannot encode integer[] column as int32 field"
"encode 'repeated_int_field' error: cannot encode integer[] column as int32 field"
);

let schema = Schema::new(vec![Field::with_name(
Expand All @@ -554,7 +554,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode repeated_int_field error: array containing null not allowed as repeated field"
"encode 'repeated_int_field' error: array containing null not allowed as repeated field"
);
}

Expand All @@ -573,7 +573,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode not_exists error: field not in proto"
"encode 'not_exists' error: field not in proto"
);

let err = validate_fields(
Expand All @@ -583,7 +583,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode map_field error: field not in proto"
"encode 'map_field' error: field not in proto"
);
}
}
10 changes: 7 additions & 3 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use risingwave_common::array::StreamChunk;

use crate::sink::{Result, SinkError};
Expand Down Expand Up @@ -279,8 +279,12 @@ impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<

impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
async fn build(b: FormatterParams<'_>) -> Result<Self> {
let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?;
let val_encoder = VE::build(b.builder, None).await?;
let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
.await
.with_context(|| "Failed to build key encoder")?;
let val_encoder = VE::build(b.builder, None)
.await
.with_context(|| "Failed to build value encoder")?;
Ok(UpsertFormatter::new(key_encoder, val_encoder))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ pub enum SinkError {
#[backtrace]
anyhow::Error,
),
#[error("Internal error: {0}")]
#[error(transparent)]
Internal(
#[from]
#[backtrace]
Expand Down
Loading