Skip to content

Commit

Permalink
fix: avro timestamp_mills and timestamp_micros parse to timestamptz (r…
Browse files Browse the repository at this point in the history
…isingwavelabs#8730)

Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion committed Mar 23, 2023
1 parent 24ecd4d commit aeddef3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 52 deletions.
2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ select id, first_name, last_name, email from s8;
query IITFFBTT
select id, sequence_id, name, score, avg_score, is_lasted, entrance_date, birthday, passed from s9;
----
32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00 1 mon 1 day 00:00:01
32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00+00:00 1 mon 1 day 00:00:01

query ITITT
select id, code, timestamp, xfas, contacts, sex from s10;
Expand Down
28 changes: 7 additions & 21 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ mod test {
use risingwave_common::catalog::ColumnId;
use risingwave_common::error;
use risingwave_common::row::Row;
use risingwave_common::types::{
DataType, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, ScalarImpl,
};
use risingwave_common::types::{DataType, IntervalUnit, NaiveDateWrapper, ScalarImpl};
use url::Url;

use super::{
Expand Down Expand Up @@ -423,24 +421,12 @@ mod test {
assert_eq!(row[i], date);
}
Value::TimestampMillis(millis) => {
let datetime = Some(ScalarImpl::NaiveDateTime(
NaiveDateTimeWrapper::with_secs_nsecs(
millis / 1000,
(millis % 1000) as u32 * 1_000_000,
)
.unwrap(),
));
assert_eq!(row[i], datetime);
let millis = Some(ScalarImpl::Int64(millis * 1000));
assert_eq!(row[i], millis);
}
Value::TimestampMicros(micros) => {
let datetime = Some(ScalarImpl::NaiveDateTime(
NaiveDateTimeWrapper::with_secs_nsecs(
micros / 1_000_000,
(micros % 1_000_000) as u32 * 1_000,
)
.unwrap(),
));
assert_eq!(row[i], datetime);
let micros = Some(ScalarImpl::Int64(micros));
assert_eq!(row[i], micros);
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
Expand All @@ -467,8 +453,8 @@ mod test {
SourceColumnDesc::simple("avg_score", DataType::Float64, ColumnId::from(4)),
SourceColumnDesc::simple("is_lasted", DataType::Boolean, ColumnId::from(5)),
SourceColumnDesc::simple("entrance_date", DataType::Date, ColumnId::from(6)),
SourceColumnDesc::simple("birthday", DataType::Timestamp, ColumnId::from(7)),
SourceColumnDesc::simple("anniversary", DataType::Timestamp, ColumnId::from(8)),
SourceColumnDesc::simple("birthday", DataType::Timestamptz, ColumnId::from(7)),
SourceColumnDesc::simple("anniversary", DataType::Timestamptz, ColumnId::from(8)),
SourceColumnDesc::simple("passed", DataType::Interval, ColumnId::from(9)),
]
}
Expand Down
52 changes: 22 additions & 30 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{
DataType, Datum, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, OrderedF32, OrderedF64,
ScalarImpl,
DataType, Datum, IntervalUnit, NaiveDateWrapper, OrderedF32, OrderedF64, ScalarImpl,
};
use risingwave_pb::plan_common::ColumnDesc;

Expand Down Expand Up @@ -74,8 +73,8 @@ fn avro_type_mapping(schema: &Schema) -> Result<DataType> {
Schema::Double => DataType::Float64,
Schema::Decimal { .. } => DataType::Decimal,
Schema::Date => DataType::Date,
Schema::TimestampMillis => DataType::Timestamp,
Schema::TimestampMicros => DataType::Timestamp,
Schema::TimestampMillis => DataType::Timestamptz,
Schema::TimestampMicros => DataType::Timestamptz,
Schema::Duration => DataType::Interval,
Schema::Enum { .. } => DataType::Varchar,
Schema::Record { fields, .. } => {
Expand Down Expand Up @@ -281,32 +280,13 @@ pub(crate) fn from_avro_value(value: Value, value_schema: &Schema) -> Result<Dat
RwError::from(InternalError(err_msg))
})?,
),
Value::TimestampMillis(millis) => ScalarImpl::NaiveDateTime(
NaiveDateTimeWrapper::with_secs_nsecs(
millis / 1_000,
(millis % 1_000) as u32 * 1_000_000,
)
.map_err(|e| {
let err_msg = format!(
"avro parse error.wrong timestamp millis value {}, err {:?}",
millis, e
);
RwError::from(InternalError(err_msg))
})?,
),
Value::TimestampMicros(micros) => ScalarImpl::NaiveDateTime(
NaiveDateTimeWrapper::with_secs_nsecs(
micros / 1_000_000,
(micros % 1_000_000) as u32 * 1_000,
)
.map_err(|e| {
let err_msg = format!(
"avro parse error.wrong timestamp micros value {}, err {:?}",
micros, e
);
RwError::from(InternalError(err_msg))
})?,
),
Value::TimestampMicros(us) => ScalarImpl::Int64(us),
Value::TimestampMillis(ms) => ScalarImpl::Int64(ms.checked_mul(1000).ok_or_else(|| {
RwError::from(InternalError(format!(
"avro parse millis overflow, value: {}",
ms
)))
})?),
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
Expand Down Expand Up @@ -364,4 +344,16 @@ mod tests {
let rust_decimal = avro_decimal_to_rust_decimal(avro_decimal, 28, 1).unwrap();
assert_eq!(rust_decimal, rust_decimal::Decimal::from_f32(28.1).unwrap());
}

#[test]
fn test_avro_timestamp_micros() {
let v1 = Value::TimestampMicros(1620000000000);
let v2 = Value::TimestampMillis(1620000000);
let value_schema1 = Schema::TimestampMicros;
let value_schema2 = Schema::TimestampMillis;
let datum1 = from_avro_value(v1, &value_schema1).unwrap();
let datum2 = from_avro_value(v2, &value_schema2).unwrap();
assert_eq!(datum1, Some(ScalarImpl::Int64(1620000000000)));
assert_eq!(datum2, Some(ScalarImpl::Int64(1620000000000)));
}
}

0 comments on commit aeddef3

Please sign in to comment.