Skip to content

Commit

Permalink
test more than 32bits & rename to varint
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Mar 5, 2024
1 parent 564b465 commit 68c74e6
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,24 +524,24 @@ fn protobuf_type_mapping(
Ok(t)
}

/// A port from the implementation of confluent's Variant Zig-zag deserialization.
/// See `ReadVariant` in <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java>
fn decode_variant_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> {
/// A port from the implementation of confluent's Varint Zig-zag deserialization.
/// See `ReadVarint` in <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java>
fn decode_varint_zigzag(buffer: &[u8]) -> ConnectorResult<(i32, usize)> {
// We expect the decoded number to be 4 bytes.
let mut value = 0u32;
let mut shift = 0;
let mut len = 0usize;

for &byte in buffer {
len += 1;
// The Variant encoding is limited to 5 bytes.
// The Varint encoding is limited to 5 bytes.
if len > 5 {
break;
}
// The byte is cast to u32 to avoid shifting overflow.
let byte_ext = byte as u32;
// In Variant encoding, the lowest 7 bits are used to represent number,
// while the highest zero bit indicates the end of the number with Variant encoding.
// In Varint encoding, the lowest 7 bits are used to represent number,
// while the highest zero bit indicates the end of the number with Varint encoding.
value |= (byte_ext & 0x7F) << shift;
if byte_ext & 0x80 == 0 {
return Ok((((value >> 1) as i32) ^ -((value & 1) as i32), len));
Expand All @@ -567,9 +567,9 @@ pub(crate) fn resolve_pb_header(payload: &[u8]) -> ConnectorResult<&[u8]> {
match remained.first() {
Some(0) => Ok(&remained[1..]),
Some(_) => {
let (index_len, mut offset) = decode_variant_zigzag(remained)?;
let (index_len, mut offset) = decode_varint_zigzag(remained)?;
for _ in 0..index_len {
offset += decode_variant_zigzag(&remained[offset..])?.1;
offset += decode_varint_zigzag(&remained[offset..])?.1;
}
Ok(&remained[offset..])
}
Expand Down Expand Up @@ -1142,46 +1142,52 @@ mod test {
}

#[test]
fn test_decode_variant_zigzag() {
fn test_decode_varint_zigzag() {
// 1. Positive number
let buffer = vec![0x02];
let (value, len) = decode_variant_zigzag(&buffer).unwrap();
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, 1);
assert_eq!(len, 1);

// 2. Negative number
let buffer = vec![0x01];
let (value, len) = decode_variant_zigzag(&buffer).unwrap();
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, -1);
assert_eq!(len, 1);

// 3. Larger positive number
let buffer = vec![0x9E, 0x03];
let (value, len) = decode_variant_zigzag(&buffer).unwrap();
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, 207);
assert_eq!(len, 2);

// 4. Larger negative number
let buffer = vec![0xBF, 0x07];
let (value, len) = decode_variant_zigzag(&buffer).unwrap();
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, -480);
assert_eq!(len, 2);

// 5. Maximum positive number
let buffer = vec![0xFE, 0xFF, 0xFF, 0xFF, 0x0F];
let (value, len) = decode_variant_zigzag(&buffer).unwrap();
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, i32::MAX);
assert_eq!(len, 5);

// 6. Maximum negative number
let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x0F];
let (value, len) = decode_variant_zigzag(&buffer).unwrap();
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, i32::MIN);
assert_eq!(len, 5);

// 7. Invalid input (more than 5 bytes)
// 7. More than 32 bits
let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0x7F];
let (value, len) = decode_varint_zigzag(&buffer).unwrap();
assert_eq!(value, i32::MIN);
assert_eq!(len, 5);

// 8. Invalid input (more than 5 bytes)
let buffer = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF];
let result = decode_variant_zigzag(&buffer);
let result = decode_varint_zigzag(&buffer);
assert!(result.is_err());
}
}

0 comments on commit 68c74e6

Please sign in to comment.