diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index f0cb474eb48df..b09cc24ea59e1 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -524,9 +524,9 @@ fn protobuf_type_mapping( Ok(t) } -/// A port from the implementation of confluent's Variant Zig-zag deserialization. -/// See `ReadVariant` in -fn decode_variant_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { +/// A port from the implementation of confluent's Varint Zig-zag deserialization. +/// See `ReadVarint` in +fn decode_varint_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { // We expect the decoded number to be 4 bytes. let mut value = 0u32; let mut shift = 0; @@ -534,14 +534,14 @@ fn decode_variant_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> { for &byte in buffer { len += 1; - // The Variant encoding is limited to 5 bytes. + // The Varint encoding is limited to 5 bytes. if len > 5 { break; } // The byte is cast to u32 to avoid shifting overflow. let byte_ext = byte as u32; - // In Variant encoding, the lowest 7 bits are used to represent number, - // while the highest zero bit indicates the end of the number with Variant encoding. + // In Varint encoding, the lowest 7 bits are used to represent number, + // while the highest zero bit indicates the end of the number with Varint encoding. value |= (byte_ext & 0x7F) << shift; if byte_ext & 0x80 == 0 { return Ok((((value >> 1) as i32) ^ -((value & 1) as i32), len)); @@ -567,9 +567,9 @@ pub(crate) fn resolve_pb_header(payload: &[u8]) -> ConnectorResult<&[u8]> { match remained.first() { Some(0) => Ok(&remained[1..]), Some(_) => { - let (index_len, mut offset) = decode_variant_zigzag(remained)?; + let (index_len, mut offset) = decode_varint_zigzag(remained)?; for _ in 0..index_len { - offset += decode_variant_zigzag(&remained[offset..])?.1; + offset += decode_varint_zigzag(&remained[offset..])?.1; } Ok(&remained[offset..]) } @@ -1142,46 +1142,52 @@ mod test { } #[test] - fn test_decode_variant_zigzag() { + fn test_decode_varint_zigzag() { // 1. Positive number let buffer = vec![0x02]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, 1); assert_eq!(len, 1); // 2. Negative number let buffer = vec![0x01]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, -1); assert_eq!(len, 1); // 3. Larger positive number let buffer = vec![0x9E, 0x03]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, 207); assert_eq!(len, 2); // 4. Larger negative number let buffer = vec![0xBF, 0x07]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, -480); assert_eq!(len, 2); // 5. Maximum positive number let buffer = vec![0xFE, 0xFF, 0xFF, 0xFF, 0x0F]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, i32::MAX); assert_eq!(len, 5); // 6. Maximum negative number let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x0F]; - let (value, len) = decode_variant_zigzag(&buffer).unwrap(); + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); assert_eq!(value, i32::MIN); assert_eq!(len, 5); - // 7. Invalid input (more than 5 bytes) + // 7. More than 32 bits + let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x7F]; + let (value, len) = decode_varint_zigzag(&buffer).unwrap(); + assert_eq!(value, i32::MIN); + assert_eq!(len, 5); + + // 8. Invalid input (more than 5 bytes) let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]; - let result = decode_variant_zigzag(&buffer); + let result = decode_varint_zigzag(&buffer); assert!(result.is_err()); } }