diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs index bc18858d678ec..af6b54b057c82 100644 --- a/src/common/src/types/datetime.rs +++ b/src/common/src/types/datetime.rs @@ -536,6 +536,12 @@ impl Timestamp { self.0.timestamp_nanos_opt().unwrap() } + pub fn with_millis(timestamp_millis: i64) -> Result { + let secs = timestamp_millis.div_euclid(1_000); + let nsecs = timestamp_millis.rem_euclid(1_000) * 1_000_000; + Self::with_secs_nsecs(secs, nsecs as u32) + } + pub fn with_micros(timestamp_micros: i64) -> Result { let secs = timestamp_micros.div_euclid(1_000_000); let nsecs = timestamp_micros.rem_euclid(1_000_000) * 1000; diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 7cc9cf14c1f84..a6c5c6fbef5d1 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -103,6 +103,8 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result { DataType::Decimal } Schema::Date => DataType::Date, + Schema::LocalTimestampMillis => DataType::Timestamp, + Schema::LocalTimestampMicros => DataType::Timestamp, Schema::TimestampMillis => DataType::Timestamptz, Schema::TimestampMicros => DataType::Timestamptz, Schema::Duration => DataType::Interval, diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index 02cdcb4de4ca5..af5658331270d 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -23,10 +23,11 @@ use chrono::Datelike; use itertools::Itertools; use num_bigint::{BigInt, Sign}; use risingwave_common::array::{ListValue, StructValue}; -use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz}; use risingwave_common::error::Result as RwResult; use risingwave_common::log::LogSuppresser; -use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time}; +use risingwave_common::types::{ + DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, +}; use risingwave_common::util::iter_util::ZipEqFast; use super::{Access, AccessError, AccessResult}; @@ -181,19 +182,27 @@ impl<'a> AvroParseOptions<'a> { } (Some(DataType::Varchar) | None, Value::String(s)) => s.clone().into_boxed_str().into(), // ---- Timestamp ----- - (Some(DataType::Timestamp) | None, Value::TimestampMillis(ms)) => { - i64_to_timestamp(*ms).map_err(|_| create_error())?.into() + (Some(DataType::Timestamp) | None, Value::LocalTimestampMillis(ms)) => { + Timestamp::with_millis(*ms) + .map_err(|_| create_error())? + .into() } - (Some(DataType::Timestamp) | None, Value::TimestampMicros(us)) => { - i64_to_timestamp(*us).map_err(|_| create_error())?.into() + (Some(DataType::Timestamp) | None, Value::LocalTimestampMicros(us)) => { + Timestamp::with_micros(*us) + .map_err(|_| create_error())? + .into() } // ---- TimestampTz ----- - (Some(DataType::Timestamptz), Value::TimestampMillis(ms)) => { - i64_to_timestamptz(*ms).map_err(|_| create_error())?.into() + (Some(DataType::Timestamptz) | None, Value::TimestampMillis(ms)) => { + Timestamptz::from_millis(*ms) + .ok_or(AccessError::Other(anyhow!( + "timestamptz with milliseconds {ms} * 1000 is out of range", + )))? + .into() } - (Some(DataType::Timestamptz), Value::TimestampMicros(us)) => { - i64_to_timestamptz(*us).map_err(|_| create_error())?.into() + (Some(DataType::Timestamptz) | None, Value::TimestampMicros(us)) => { + Timestamptz::from_micros(*us).into() } // ---- Interval ----- @@ -424,7 +433,7 @@ pub(crate) fn unix_epoch_days() -> i32 { mod tests { use apache_avro::Decimal as AvroDecimal; use risingwave_common::error::{ErrorCode, RwError}; - use risingwave_common::types::{Decimal, Timestamp}; + use risingwave_common::types::{Decimal, Timestamptz}; use super::*; @@ -486,24 +495,24 @@ mod tests { } #[test] - fn test_avro_timestamp_micros() { - let v1 = Value::TimestampMicros(1620000000000); - let v2 = Value::TimestampMillis(1620000000); + fn test_avro_timestamptz_micros() { + let v1 = Value::TimestampMicros(1620000000000000); + let v2 = Value::TimestampMillis(1620000000000); let value_schema1 = Schema::TimestampMicros; let value_schema2 = Schema::TimestampMillis; - let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap(); - let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap(); + let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap(); + let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap(); assert_eq!( datum1, - Some(ScalarImpl::Timestamp(Timestamp::new( - "2021-05-03T00:00:00".parse().unwrap() - ))) + Some(ScalarImpl::Timestamptz( + Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap() + )) ); assert_eq!( datum2, - Some(ScalarImpl::Timestamp(Timestamp::new( - "2021-05-03T00:00:00".parse().unwrap() - ))) + Some(ScalarImpl::Timestamptz( + Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap() + )) ); }