Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error decoding bytes type with logicalType decimal - Avro #60

Open
arhantpanda opened this issue Nov 1, 2019 · 1 comment
Open

Error decoding bytes type with logicalType decimal - Avro #60

arhantpanda opened this issue Nov 1, 2019 · 1 comment

Comments

@arhantpanda
Copy link

I have the following Avro schema for a field that is a decimal logicalType.

{
  "name": "amount",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 10,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2",
        "connect.decimal.precision": "10"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

The value from processor is a dict with string and bytes values
The field value in bytes: b'\x00\x97\xf4'

I am unable to get the decimal value to proceed further. I've tried the following which did not work.

  1. data.decode() & data.decode('latin-1')
Fails with error
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x97 in position 1: invalid start byte
  1. BinaryDecoder
decoder = BinaryDecoder(io.BytesIO(data))
decoder.read_double()

Fails with error
  File "/IdeaProjects/virtualenv/event-venv/lib/python3.6/site-packages/avro/io.py", line 240, in read_double
    ((ord(self.read(1)) & 0xff) << 48) |
  File "/IdeaProjects/virtualenv/event-venv/lib/python3.6/site-packages/avro/io.py", line 180, in read
    assert (len(input_bytes) == n), input_bytes
AssertionError: b''

How can I extract data from byte fields in Avro?

@ryananguiano
Copy link

ryananguiano commented Jun 14, 2021

You can use this to decode the byte string into decimal.

def decode_decimal(value: bytes, num_places: int) -> Decimal:
    value_size = len(value)
    for fmt in ('>b', '>h', '>l', '>q'):
        fmt_size = struct.calcsize(fmt)
        if fmt_size >= value_size:
            padding = b'\x00' * (fmt_size - value_size)
            int_value = struct.unpack(fmt, padding + value)[0]
            scale = Decimal('1') / (10 ** num_places)
            return Decimal(int_value) * scale
    raise ValueError('Could not unpack value')

Ex:

>>> decode_decimal(b'\x00\x97\xf4', 2)
Decimal('389.00')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants