Skip to content

Commit

Permalink
Use threading Events instead of sleeps, and lock access to buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
joetoddsonos committed Apr 10, 2024
1 parent 949cf63 commit 3952eac
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
11 changes: 10 additions & 1 deletion examples/passthrough.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@ def __init__(self, args):
self.idx = 0
self.total_bytes = 0
self.queue = queue.SimpleQueue()
self.data, self.sr = sf.read(args.input_file, dtype='int16', always_2d=True)

info = sf.info(str(args.input_file))
if info.subtype == 'PCM_16':
dtype = 'int16'
elif info.subtype == 'PCM_32':
dtype = 'int32'
else:
raise ValueError(f'WAV input data type must be either PCM_16 or PCM_32: Got {info.subtype}')

self.data, self.sr = sf.read(args.input_file, dtype=dtype, always_2d=True)

self.encoder = pyflac.StreamEncoder(
write_callback=self.encoder_callback,
Expand Down
31 changes: 24 additions & 7 deletions pyflac/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def __init__(self,

self._done = False
self._buffer = deque()
self._event = threading.Event()
self._lock = threading.Lock()
self.write_callback = write_callback

rc = _lib.FLAC__stream_decoder_init_stream(
Expand Down Expand Up @@ -201,7 +203,10 @@ def process(self, data: bytes):
Args:
data (bytes): Bytes of FLAC data
"""
self._lock.acquire()
self._buffer.append(data)
self._lock.release()
self._event.set()

def finish(self):
"""
Expand All @@ -226,6 +231,7 @@ def finish(self):
# Instruct the decoder to finish up and wait until it is done
# --------------------------------------------------------------
self._done = True
self._event.set()
self._thread.join(timeout=3)
super().finish()
if self._error:
Expand Down Expand Up @@ -316,13 +322,11 @@ def _read_callback(_decoder,
"""
decoder = _ffi.from_handle(client_data)

while len(decoder._buffer) == 0 and not (decoder._error or decoder._done):
# ----------------------------------------------------------
# Wait until there is something in the buffer, or an error
# occurs, or the end of the stream is reached.
# ----------------------------------------------------------
time.sleep(0.01)

# ----------------------------------------------------------
# Wait until there is something in the buffer, or an error
# occurs, or the end of the stream is reached.
# ----------------------------------------------------------
decoder._event.wait()
if decoder._error:
# ----------------------------------------------------------
# If an error has been issued via the error callback, then
Expand All @@ -345,16 +349,28 @@ def _read_callback(_decoder,
data = bytes()
maximum_bytes = int(num_bytes[0])
if len(decoder._buffer[0]) <= maximum_bytes:
decoder._lock.acquire()
data = decoder._buffer.popleft()
decoder._lock.release()
maximum_bytes -= len(data)

if len(decoder._buffer) > 0 and len(decoder._buffer[0]) > maximum_bytes:
decoder._lock.acquire()
data += decoder._buffer[0][0:maximum_bytes]
decoder._buffer[0] = decoder._buffer[0][maximum_bytes:]
decoder._lock.release()

actual_bytes = len(data)
num_bytes[0] = actual_bytes
_ffi.memmove(byte_buffer, data, actual_bytes)

# --------------------------------------------------------------
# If there is no more data to process from the buffer, then
# clear the event, the thread will await more data to process.
# --------------------------------------------------------------
if len(decoder._buffer) == 0 or (len(decoder._buffer) > 0 and len(decoder._buffer[0]) == 0):
decoder._event.clear()

return _lib.FLAC__STREAM_DECODER_READ_STATUS_CONTINUE


Expand Down Expand Up @@ -452,3 +468,4 @@ def _error_callback(_decoder,
_lib.FLAC__StreamDecoderErrorStatusString[status]).decode()
decoder.logger.error(f'Error in libFLAC decoder: {message}')
decoder._error = message
decoder._event.set()

0 comments on commit 3952eac

Please sign in to comment.