From 772f3c2bdb75b46147c8d347cee8f88572fd129c Mon Sep 17 00:00:00 2001 From: Nikos Gavalas Date: Sat, 16 Sep 2023 23:23:19 +0200 Subject: [PATCH] Refactor, implement __contains__ --- kevo/engines/appendlog.py | 32 ++++++++++++++++---------------- kevo/engines/hybridlog.py | 28 ++++++++++++++-------------- kevo/engines/kvstore.py | 12 ++++++++++++ kevo/engines/lsmtree.py | 20 ++++++++++---------- kevo/engines/memonly.py | 29 +++++++++-------------------- tests/test_appendlog.py | 2 ++ 6 files changed, 63 insertions(+), 60 deletions(-) diff --git a/kevo/engines/appendlog.py b/kevo/engines/appendlog.py index 761a915..5e65ce1 100644 --- a/kevo/engines/appendlog.py +++ b/kevo/engines/appendlog.py @@ -41,9 +41,9 @@ def __init__(self, if self.remote: self.restore() else: - self.rebuild_indices() + self._rebuild_indices() - def rebuild_indices(self): + def _rebuild_indices(self): self.levels.clear() self.rfds.clear() @@ -81,7 +81,7 @@ def rebuild_indices(self): self.rfds[0].append((self.data_dir / f'L{0}.{self.levels[0]}.{self.global_version}.run').open('rb')) def close(self): - self.close_run() + self._close_run() self.wfd.close() for rfds in self.rfds: for rfd in rfds: @@ -117,10 +117,10 @@ def _set(self, key, value=KVStore.EMPTY): self.counter += len(key) + len(value) if self.counter >= self.threshold: - self.close_run() - self.open_new_files() + self._close_run() + self._open_new_files() - def close_run(self): + def _close_run(self): if self.counter == 0: return @@ -134,19 +134,19 @@ def close_run(self): self.wfd.close() if self.compaction_enabled: - self.compaction(self.levels[flush_level]) + self._compaction(self.levels[flush_level]) self.levels[flush_level] += 1 if self.levels[flush_level] >= self.max_runs_per_level: - self.merge(flush_level) + self._merge(flush_level) - def open_new_files(self): + def _open_new_files(self): flush_level = 0 self.wfd.close() self.wfd = (self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('ab') self.rfds[flush_level].append((self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('rb')) - def compaction(self, run): + def _compaction(self, run): log_path = (self.data_dir / f'L0.{run}.{self.global_version}.run') compacted_log_path = log_path.with_suffix('.tmp') # NOTE i can copy the index here and keep the old one for as long as the compaction is running to enable reads @@ -170,7 +170,7 @@ def compaction(self, run): # get a new read fd self.rfds[0][run] = log_path.open('rb') - def merge(self, level: int): + def _merge(self, level: int): next_level = level + 1 if next_level >= len(self.levels): self.levels.append(0) @@ -212,17 +212,17 @@ def merge(self, level: int): # merge recursively if self.levels[next_level] >= self.max_runs_per_level: - self.merge(next_level) + self._merge(next_level) def snapshot(self, id: int): - self.close_run() + self._close_run() if self.remote: runs = discover_run_files(self.data_dir) self.remote.push_deltas(runs, id) - self.open_new_files() + self._open_new_files() def restore(self, version=None): - self.close_run() + self._close_run() if self.remote: self.global_version = self.remote.restore(version=version) if self.wfd is not None: @@ -230,7 +230,7 @@ def restore(self, version=None): for rfds in self.rfds: for rfd in rfds: rfd.close() - self.rebuild_indices() + self._rebuild_indices() def __sizeof__(self): return getsizeof(self.hash_index) diff --git a/kevo/engines/hybridlog.py b/kevo/engines/hybridlog.py index e2392e7..d5f237e 100644 --- a/kevo/engines/hybridlog.py +++ b/kevo/engines/hybridlog.py @@ -63,9 +63,9 @@ def __init__(self, if self.remote: self.restore() else: - self.rebuild_indices() + self._rebuild_indices() - def rebuild_indices(self): + def _rebuild_indices(self): self.levels.clear() self.rfds.clear() @@ -115,7 +115,7 @@ def rebuild_indices(self): self.memory.set_tail_offset(self.tail_offset) def close(self): - self.flush(self.tail_offset) # flush everything + self._flush(self.tail_offset) # flush everything self.wfd.close() for rfds in self.rfds: for rfd in rfds: @@ -144,7 +144,7 @@ def _get(self, key): def _set(self, key, value=KVStore.EMPTY): if self.memory.is_full(): - self.flush(self.ro_offset) + self._flush(self.ro_offset) if key in self.hash_index: offset = self.hash_index[key] @@ -161,10 +161,10 @@ def _set(self, key, value=KVStore.EMPTY): self.ro_offset += 1 if self.ro_offset - self.head_offset > self.flush_interval: - self.flush(self.ro_offset) - self.open_new_files() + self._flush(self.ro_offset) + self._open_new_files() - def flush(self, offset: int): + def _flush(self, offset: int): if self.memory.is_empty(): return @@ -188,13 +188,13 @@ def flush(self, offset: int): self.levels[flush_level] += 1 if self.levels[flush_level] >= self.max_runs_per_level: - self.merge(flush_level) + self._merge(flush_level) # open a new file after merging # self.wfd = (self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('ab') # self.rfds[flush_level].append((self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('rb')) - def merge(self, level: int): + def _merge(self, level: int): next_level = level + 1 if next_level >= len(self.levels): self.levels.append(0) @@ -237,21 +237,21 @@ def merge(self, level: int): # merge recursively if self.levels[next_level] >= self.max_runs_per_level: - self.merge(next_level) + self._merge(next_level) - def open_new_files(self): + def _open_new_files(self): flush_level = 0 self.wfd.close() self.wfd = (self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('ab') self.rfds[flush_level].append((self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('rb')) def snapshot(self, id: int): - self.flush(self.tail_offset) + self._flush(self.tail_offset) self.ro_offset = self.tail_offset if self.remote: runs = discover_run_files(self.data_dir) self.remote.push_deltas(runs, id) - self.open_new_files() + self._open_new_files() def restore(self, version=None): if self.remote: @@ -262,7 +262,7 @@ def restore(self, version=None): for rfds in self.rfds: for rfd in rfds: rfd.close() - self.rebuild_indices() + self._rebuild_indices() def __sizeof__(self): return getsizeof(self.hash_index) + getsizeof(self.la_to_file_offset) + getsizeof(self.memory) diff --git a/kevo/engines/kvstore.py b/kevo/engines/kvstore.py index 1fb988e..cb3d29b 100644 --- a/kevo/engines/kvstore.py +++ b/kevo/engines/kvstore.py @@ -112,3 +112,15 @@ def __sizeof__(self): def close(self): raise NotImplementedError('') + + def snapshot(self, id: int): + raise NotImplementedError('') + + def restore(self, version=None): + raise NotImplementedError('') + + def _rebuild_indices(self): + raise NotImplementedError('') + + def __contains__(self, key): + return self.get(key) != b'' diff --git a/kevo/engines/lsmtree.py b/kevo/engines/lsmtree.py index 2a98138..d2e78fe 100644 --- a/kevo/engines/lsmtree.py +++ b/kevo/engines/lsmtree.py @@ -68,9 +68,9 @@ def __init__(self, # restore calls rebuild_indices, so this way we avoid rebuilding twice self.restore() else: - self.rebuild_indices() + self._rebuild_indices() - def rebuild_indices(self): + def _rebuild_indices(self): # TODO set the global version to the max of the discovered ones self.levels.clear() self.rfds.clear() @@ -145,12 +145,12 @@ def _set(self, key, value=KVStore.EMPTY): if self.memtable_bytes_count > self.memtable_bytes_limit: # normally I would allocate a new memtable here so that writes can continue there # and then give the flushing of the old memtable to a background thread - self.flush() + self._flush() else: # write to wal self._write_kv_pair(self.wal_file, key, value) - def merge(self, level_idx: int): + def _merge(self, level_idx: int): level = self.levels[level_idx] if level_idx + 1 >= len(self.levels): self.levels.append([]) @@ -239,9 +239,9 @@ def merge(self, level_idx: int): # cascade the merging recursively if len(next_level) >= self.max_runs_per_level: - self.merge(level_idx + 1) + self._merge(level_idx + 1) - def flush(self): + def _flush(self): if len(self.memtable) == 0: return fence_pointers = FencePointers(self.density_factor) @@ -277,24 +277,24 @@ def flush(self): # trigger merge if exceeding the runs per level if len(self.levels[flush_level]) >= self.max_runs_per_level: - self.merge(flush_level) + self._merge(flush_level) def snapshot(self, id: int): - self.flush() + self._flush() if self.remote: runs = discover_run_files(self.data_dir) self.remote.push_deltas(runs, id) def restore(self, version=None): # flush first to empty the memtable - self.flush() + self._flush() if self.remote: self.global_version = self.remote.restore(version=version) # close open file descriptors first for rfds in self.rfds: for rfd in rfds: rfd.close() - self.rebuild_indices() + self._rebuild_indices() def __sizeof__(self): memtable_size = sum((getsizeof(k) + getsizeof(v) for k, v in self.memtable.items())) diff --git a/kevo/engines/memonly.py b/kevo/engines/memonly.py index 924f9ee..582db69 100644 --- a/kevo/engines/memonly.py +++ b/kevo/engines/memonly.py @@ -23,9 +23,9 @@ def __init__(self, if self.remote: self.restore() else: - self.rebuild_indices() + self._rebuild_indices() - def rebuild_indices(self): + def _rebuild_indices(self): self.hash_index.clear() runs_discovered = self.discover_runs() @@ -48,24 +48,13 @@ def __getitem__(self, key): def __setitem__(self, key, value): return self.set(key, value) - def get(self, key: bytes): - assert type(key) is bytes - assert 0 < len(key) <= self.max_key_len - - ret = KVStore.EMPTY - try: - ret = self.hash_index[key] - except KeyError: - pass - return ret - - def set(self, key: bytes, value: bytes = KVStore.EMPTY): - assert type(key) is bytes and type(value) is bytes - assert 0 < len(key) <= self.max_key_len and len(value) <= self.max_value_len + def _get(self, key): + return self.hash_index.get(key, b'') + def _set(self, key, value=KVStore.EMPTY): self.hash_index[key] = value - def flush(self): + def _flush(self): if len(self.hash_index) == 0: return @@ -80,16 +69,16 @@ def flush(self): self.global_version += 1 def snapshot(self, id: int): - self.flush() + self._flush() if self.remote: runs = discover_run_files(self.data_dir) self.remote.push_deltas(runs, id) def restore(self, version=None): - self.flush() + self._flush() if self.remote: self.global_version = self.remote.restore(version=version) - self.rebuild_indices() + self._rebuild_indices() def __sizeof__(self): return getsizeof(self.hash_index) diff --git a/tests/test_appendlog.py b/tests/test_appendlog.py index 0a77076..20868ef 100644 --- a/tests/test_appendlog.py +++ b/tests/test_appendlog.py @@ -41,6 +41,8 @@ def test_basic(self): self.assertEqual(l.get(b'to be deleted'), b'') self.assertEqual(l.get(256, serializer=lambda i, length: i.to_bytes(length=length), serializer_args={'length': 2}), b'\x01\x01') self.assertEqual(l.get(b'\x01'), b'\x02') + self.assertTrue(b'd' in l) + self.assertTrue(b'not in' not in l) l.close()