From 564b465c71e389c06e27b733d32415c734a33f11 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 5 Mar 2024 13:46:54 +0800 Subject: [PATCH 1/2] fix pb header --- src/connector/src/parser/protobuf/parser.rs | 84 ++++++++++++++++++- .../src/schema/schema_registry/util.rs | 2 + 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 4248fa2b7470..f0cb474eb48d 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -36,7 +36,7 @@ use crate::parser::unified::{ use crate::parser::util::bytes_from_url; use crate::parser::{AccessBuilder, EncodingProperties}; use crate::schema::schema_registry::{ - extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, + extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, WireFormatError, }; #[derive(Debug)] @@ -524,6 +524,35 @@ fn protobuf_type_mapping( Ok(t) } +/// A port from the implementation of confluent's Variant Zig-zag deserialization. +/// See `ReadVariant` in +fn decode_variant_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { + // We expect the decoded number to be 4 bytes. + let mut value = 0u32; + let mut shift = 0; + let mut len = 0usize; + + for &byte in buffer { + len += 1; + // The Variant encoding is limited to 5 bytes. + if len > 5 { + break; + } + // The byte is cast to u32 to avoid shifting overflow. + let byte_ext = byte as u32; + // In Variant encoding, the lowest 7 bits are used to represent number, + // while the highest zero bit indicates the end of the number with Variant encoding. + value |= (byte_ext & 0x7F) << shift; + if byte_ext & 0x80 == 0 { + return Ok((((value >> 1) as i32) ^ -((value & 1) as i32), len)); + } + + shift += 7; + } + + Err(WireFormatError::ParseMessageIndexes.into()) +} + /// Reference: /// Wire format for Confluent pb header is: /// | 0 | 1-4 | 5-x | x+1-end @@ -531,14 +560,19 @@ fn protobuf_type_mapping( pub(crate) fn resolve_pb_header(payload: &[u8]) -> ConnectorResult<&[u8]> { // there's a message index array at the front of payload // if it is the first message in proto def, the array is just and `0` - // TODO: support parsing more complex index array let (_, remained) = extract_schema_id(payload)?; // The message indexes are encoded as int using variable-length zig-zag encoding, // prefixed by the length of the array. // Note that if the first byte is 0, it is equivalent to (1, 0) as an optimization. match remained.first() { Some(0) => Ok(&remained[1..]), - Some(i) => Ok(&remained[(*i as usize)..]), + Some(_) => { + let (index_len, mut offset) = decode_variant_zigzag(remained)?; + for _ in 0..index_len { + offset += decode_variant_zigzag(&remained[offset..])?.1; + } + Ok(&remained[offset..]) + } None => bail!("The proto payload is empty"), } } @@ -1106,4 +1140,48 @@ mod test { Ok(()) } + + #[test] + fn test_decode_variant_zigzag() { + // 1. Positive number + let buffer = vec![0x02]; + let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + assert_eq!(value, 1); + assert_eq!(len, 1); + + // 2. Negative number + let buffer = vec![0x01]; + let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + assert_eq!(value, -1); + assert_eq!(len, 1); + + // 3. Larger positive number + let buffer = vec![0x9E, 0x03]; + let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + assert_eq!(value, 207); + assert_eq!(len, 2); + + // 4. Larger negative number + let buffer = vec![0xBF, 0x07]; + let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + assert_eq!(value, -480); + assert_eq!(len, 2); + + // 5. Maximum positive number + let buffer = vec![0xFE, 0xFF, 0xFF, 0xFF, 0x0F]; + let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + assert_eq!(value, i32::MAX); + assert_eq!(len, 5); + + // 6. Maximum negative number + let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x0F]; + let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + assert_eq!(value, i32::MIN); + assert_eq!(len, 5); + + // 7. Invalid input (more than 5 bytes) + let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]; + let result = decode_variant_zigzag(&buffer); + assert!(result.is_err()); + } } diff --git a/src/connector/src/schema/schema_registry/util.rs b/src/connector/src/schema/schema_registry/util.rs index 407534b1a567..0d43f33baa31 100644 --- a/src/connector/src/schema/schema_registry/util.rs +++ b/src/connector/src/schema/schema_registry/util.rs @@ -49,6 +49,8 @@ pub enum WireFormatError { NoMagic, #[error("fail to read 4-byte schema ID")] NoSchemaId, + #[error("failed to parse message indexes")] + ParseMessageIndexes, } /// extract the magic number and `schema_id` at the front of payload From 68c74e6e89733da6a6d33d99428d6176419387e9 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 5 Mar 2024 19:20:37 +0800 Subject: [PATCH 2/2] test more than 32bits & rename to varint --- src/connector/src/parser/protobuf/parser.rs | 40 ++++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index f0cb474eb48d..b09cc24ea59e 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -524,9 +524,9 @@ fn protobuf_type_mapping( Ok(t) } -/// A port from the implementation of confluent's Variant Zig-zag deserialization. -/// See `ReadVariant` in -fn decode_variant_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { +/// A port from the implementation of confluent's Varint Zig-zag deserialization. +/// See `ReadVarint` in +fn decode_varint_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { // We expect the decoded number to be 4 bytes. let mut value = 0u32; let mut shift = 0; @@ -534,14 +534,14 @@ fn decode_variant_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { for &byte in buffer { len += 1; - // The Variant encoding is limited to 5 bytes. + // The Varint encoding is limited to 5 bytes. if len > 5 { break; } // The byte is cast to u32 to avoid shifting overflow. let byte_ext = byte as u32; - // In Variant encoding, the lowest 7 bits are used to represent number, - // while the highest zero bit indicates the end of the number with Variant encoding. + // In Varint encoding, the lowest 7 bits are used to represent number, + // while the highest zero bit indicates the end of the number with Varint encoding. value |= (byte_ext & 0x7F) << shift; if byte_ext & 0x80 == 0 { return Ok((((value >> 1) as i32) ^ -((value & 1) as i32), len)); @@ -567,9 +567,9 @@ pub(crate) fn resolve_pb_header(payload: &[u8]) -> ConnectorResult<&[u8]> { match remained.first() { Some(0) => Ok(&remained[1..]), Some(_) => { - let (index_len, mut offset) = decode_variant_zigzag(remained)?; + let (index_len, mut offset) = decode_varint_zigzag(remained)?; for _ in 0..index_len { - offset += decode_variant_zigzag(&remained[offset..])?.1; + offset += decode_varint_zigzag(&remained[offset..])?.1; } Ok(&remained[offset..]) } @@ -1142,46 +1142,52 @@ mod test { } #[test] - fn test_decode_variant_zigzag() { + fn test_decode_varint_zigzag() { // 1. Positive number let buffer = vec![0x02]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, 1); assert_eq!(len, 1); // 2. Negative number let buffer = vec![0x01]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, -1); assert_eq!(len, 1); // 3. Larger positive number let buffer = vec![0x9E, 0x03]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, 207); assert_eq!(len, 2); // 4. Larger negative number let buffer = vec![0xBF, 0x07]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, -480); assert_eq!(len, 2); // 5. Maximum positive number let buffer = vec![0xFE, 0xFF, 0xFF, 0xFF, 0x0F]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, i32::MAX); assert_eq!(len, 5); // 6. Maximum negative number let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x0F]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, i32::MIN); assert_eq!(len, 5); - // 7. Invalid input (more than 5 bytes) + // 7. More than 32 bits + let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x7F]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, i32::MIN); + assert_eq!(len, 5); + + // 8. Invalid input (more than 5 bytes) let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]; - let result = decode_variant_zigzag(&buffer); + let result = decode_varint_zigzag(&buffer); assert!(result.is_err()); } }