Skip to content

Commit

Permalink
Switch Buffers to memoryviews
Browse files Browse the repository at this point in the history
When this was written in the code, Python's Buffer Protocol support was
inconsistent across Python versions (specifically on Python 2.7). Since
Python 2.7 reached EOL and it was dropped from Numcodecs, the Python
Buffer Protocol support has become more consistent.

At this stage the `memoryview` object, which Cython also supports, does
all the same things that `Buffer` would do for us. Plus it is builtin to
the Python standard library. It behaves similarly in a lot of ways.

Given this, switch the code over to `memoryview`s internally and drop
`Buffer`.
  • Loading branch information
jakirkham committed Nov 22, 2024
1 parent 86f5ce4 commit 2a942ad
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 363 deletions.
253 changes: 129 additions & 124 deletions numcodecs/blosc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import multiprocessing
import os


from cpython.buffer cimport PyBUF_ANY_CONTIGUOUS, PyBUF_WRITEABLE
from cpython.buffer cimport PyBuffer_IsContiguous
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING
from cpython.memoryview cimport PyMemoryView_GET_BUFFER


from .compat_ext cimport Buffer
from .compat_ext import Buffer
from .compat import ensure_contiguous_ndarray
from .abc import Codec

Expand Down Expand Up @@ -146,34 +145,36 @@ def cbuffer_sizes(source):
"""
cdef:
Buffer buffer
memoryview source_mv
const Py_buffer* source_pb
size_t nbytes, cbytes, blocksize

# obtain buffer
buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# determine buffer size
blosc_cbuffer_sizes(buffer.ptr, &nbytes, &cbytes, &blocksize)

# release buffers
buffer.release()
blosc_cbuffer_sizes(source_pb.buf, &nbytes, &cbytes, &blocksize)

return nbytes, cbytes, blocksize


def cbuffer_complib(source):
"""Return the name of the compression library used to compress `source`."""
cdef:
Buffer buffer
memoryview source_mv
const Py_buffer* source_pb

# obtain buffer
buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# determine buffer size
complib = blosc_cbuffer_complib(buffer.ptr)

# release buffers
buffer.release()
complib = blosc_cbuffer_complib(source_pb.buf)

complib = complib.decode('ascii')

Expand All @@ -193,18 +194,19 @@ def cbuffer_metainfo(source):
"""
cdef:
Buffer buffer
memoryview source_mv
const Py_buffer* source_pb
size_t typesize
int flags

# obtain buffer
buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# determine buffer size
blosc_cbuffer_metainfo(buffer.ptr, &typesize, &flags)

# release buffers
buffer.release()
blosc_cbuffer_metainfo(source_pb.buf, &typesize, &flags)

# decompose flags
if flags & BLOSC_DOSHUFFLE:
Expand Down Expand Up @@ -252,23 +254,29 @@ def compress(source, char* cname, int clevel, int shuffle=SHUFFLE,
"""

cdef:
char *source_ptr
char *dest_ptr
Buffer source_buffer
memoryview source_mv
const Py_buffer* source_pb
const char* source_ptr
size_t nbytes, itemsize
int cbytes
bytes dest
char* dest_ptr

# check valid cname early
cname_str = cname.decode('ascii')
if cname_str not in list_compressors():
err_bad_cname(cname_str)

# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
nbytes = source_buffer.nbytes
itemsize = source_buffer.itemsize
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# extract metadata
source_ptr = <const char*>source_pb.buf
nbytes = source_pb.len
itemsize = source_pb.itemsize

# determine shuffle
if shuffle == AUTOSHUFFLE:
Expand All @@ -280,46 +288,40 @@ def compress(source, char* cname, int clevel, int shuffle=SHUFFLE,
raise ValueError('invalid shuffle argument; expected -1, 0, 1 or 2, found %r' %
shuffle)

try:

# setup destination
dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD)
dest_ptr = PyBytes_AS_STRING(dest)

# perform compression
if _get_use_threads():
# allow blosc to use threads internally
# setup destination
dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD)
dest_ptr = PyBytes_AS_STRING(dest)

# N.B., we are using blosc's global context, and so we need to use a lock
# to ensure no-one else can modify the global context while we're setting it
# up and using it.
with get_mutex():
# perform compression
if _get_use_threads():
# allow blosc to use threads internally

# set compressor
compressor_set = blosc_set_compressor(cname)
if compressor_set < 0:
# shouldn't happen if we checked against list of compressors
# already, but just in case
err_bad_cname(cname_str)
# N.B., we are using blosc's global context, and so we need to use a lock
# to ensure no-one else can modify the global context while we're setting it
# up and using it.
with get_mutex():

# set blocksize
blosc_set_blocksize(blocksize)
# set compressor
compressor_set = blosc_set_compressor(cname)
if compressor_set < 0:
# shouldn't happen if we checked against list of compressors
# already, but just in case
err_bad_cname(cname_str)

# perform compression
with nogil:
cbytes = blosc_compress(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD)
# set blocksize
blosc_set_blocksize(blocksize)

else:
# perform compression
with nogil:
cbytes = blosc_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD,
cname, blocksize, 1)
cbytes = blosc_compress(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD)

finally:
else:
with nogil:
cbytes = blosc_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD,
cname, blocksize, 1)

# release buffers
source_buffer.release()

# check compression was successful
if cbytes <= 0:
Expand Down Expand Up @@ -350,15 +352,22 @@ def decompress(source, dest=None):
"""
cdef:
int ret
char *source_ptr
char *dest_ptr
Buffer source_buffer
Buffer dest_buffer = None
memoryview source_mv
const Py_buffer* source_pb
const char* source_ptr
memoryview dest_mv
Py_buffer* dest_pb
char* dest_ptr
size_t nbytes, cbytes, blocksize

# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# get source pointer
source_ptr = <const char*>source_pb.buf

# determine buffer size
blosc_cbuffer_sizes(source_ptr, &nbytes, &cbytes, &blocksize)
Expand All @@ -367,36 +376,28 @@ def decompress(source, dest=None):
if dest is None:
# allocate memory
dest = PyBytes_FromStringAndSize(NULL, nbytes)
dest_ptr = PyBytes_AS_STRING(dest)
dest_nbytes = nbytes
else:
arr = ensure_contiguous_ndarray(dest)
dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE)
dest_ptr = dest_buffer.ptr
dest_nbytes = dest_buffer.nbytes

try:

# guard condition
if dest_nbytes < nbytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nbytes, dest_nbytes))

# perform decompression
if _get_use_threads():
# allow blosc to use threads internally
with nogil:
ret = blosc_decompress(source_ptr, dest_ptr, nbytes)
else:
with nogil:
ret = blosc_decompress_ctx(source_ptr, dest_ptr, nbytes, 1)

finally:

# release buffers
source_buffer.release()
if dest_buffer is not None:
dest_buffer.release()
dest = ensure_contiguous_ndarray(dest)

# obtain dest memoryview
dest_mv = memoryview(dest)
dest_pb = PyMemoryView_GET_BUFFER(dest_mv)
dest_ptr = <char*>dest_pb.buf
dest_nbytes = dest_pb.len

# guard condition
if dest_nbytes < nbytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nbytes, dest_nbytes))

# perform decompression
if _get_use_threads():
# allow blosc to use threads internally
with nogil:
ret = blosc_decompress(source_ptr, dest_ptr, nbytes)
else:
with nogil:
ret = blosc_decompress_ctx(source_ptr, dest_ptr, nbytes, 1)

# handle errors
if ret <= 0:
Expand Down Expand Up @@ -433,14 +434,22 @@ def decompress_partial(source, start, nitems, dest=None):
int encoding_size
int nitems_bytes
int start_bytes
char *source_ptr
char *dest_ptr
Buffer source_buffer
Buffer dest_buffer = None

# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
const char* source_ptr
memoryview source_mv
const Py_buffer* source_pb
memoryview dest_mv
Py_buffer* dest_pb
char* dest_ptr
size_t dest_nbytes

# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b"A"):
raise BufferError("`source` must contain contiguous memory")

# setup source pointer
source_ptr = <const char*>source_pb.buf

# get encoding size from source buffer header
encoding_size = source[3]
Expand All @@ -451,26 +460,22 @@ def decompress_partial(source, start, nitems, dest=None):

# setup destination buffer
if dest is None:
# allocate memory
dest = PyBytes_FromStringAndSize(NULL, nitems_bytes)
dest_ptr = PyBytes_AS_STRING(dest)
dest_nbytes = nitems_bytes
else:
arr = ensure_contiguous_ndarray(dest)
dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE)
dest_ptr = dest_buffer.ptr
dest_nbytes = dest_buffer.nbytes
dest = ensure_contiguous_ndarray(dest)

# obtain dest memoryview
dest_mv = memoryview(dest)
dest_pb = PyMemoryView_GET_BUFFER(dest_mv)
dest_ptr = <char*>dest_pb.buf
dest_nbytes = dest_pb.len

# try decompression
try:
if dest_nbytes < nitems_bytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nitems_bytes, dest_nbytes))
ret = blosc_getitem(source_ptr, start, nitems, dest_ptr)

finally:
source_buffer.release()
if dest_buffer is not None:
dest_buffer.release()
if dest_nbytes < nitems_bytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nitems_bytes, dest_nbytes))
ret = blosc_getitem(source_ptr, start, nitems, dest_ptr)

# ret refers to the number of bytes returned from blosc_getitem.
if ret <= 0:
Expand Down
12 changes: 0 additions & 12 deletions numcodecs/compat_ext.pxd

This file was deleted.

Loading

0 comments on commit 2a942ad

Please sign in to comment.