Skip to content

Commit

Permalink
Refactor, implement __contains__
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosgavalas committed Sep 16, 2023
1 parent e6c3f1f commit 772f3c2
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 60 deletions.
32 changes: 16 additions & 16 deletions kevo/engines/appendlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def __init__(self,
if self.remote:
self.restore()
else:
self.rebuild_indices()
self._rebuild_indices()

def rebuild_indices(self):
def _rebuild_indices(self):
self.levels.clear()
self.rfds.clear()

Expand Down Expand Up @@ -81,7 +81,7 @@ def rebuild_indices(self):
self.rfds[0].append((self.data_dir / f'L{0}.{self.levels[0]}.{self.global_version}.run').open('rb'))

def close(self):
self.close_run()
self._close_run()
self.wfd.close()
for rfds in self.rfds:
for rfd in rfds:
Expand Down Expand Up @@ -117,10 +117,10 @@ def _set(self, key, value=KVStore.EMPTY):
self.counter += len(key) + len(value)

if self.counter >= self.threshold:
self.close_run()
self.open_new_files()
self._close_run()
self._open_new_files()

def close_run(self):
def _close_run(self):
if self.counter == 0:
return

Expand All @@ -134,19 +134,19 @@ def close_run(self):
self.wfd.close()

if self.compaction_enabled:
self.compaction(self.levels[flush_level])
self._compaction(self.levels[flush_level])

self.levels[flush_level] += 1
if self.levels[flush_level] >= self.max_runs_per_level:
self.merge(flush_level)
self._merge(flush_level)

def open_new_files(self):
def _open_new_files(self):
flush_level = 0
self.wfd.close()
self.wfd = (self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('ab')
self.rfds[flush_level].append((self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('rb'))

def compaction(self, run):
def _compaction(self, run):
log_path = (self.data_dir / f'L0.{run}.{self.global_version}.run')
compacted_log_path = log_path.with_suffix('.tmp')
# NOTE i can copy the index here and keep the old one for as long as the compaction is running to enable reads
Expand All @@ -170,7 +170,7 @@ def compaction(self, run):
# get a new read fd
self.rfds[0][run] = log_path.open('rb')

def merge(self, level: int):
def _merge(self, level: int):
next_level = level + 1
if next_level >= len(self.levels):
self.levels.append(0)
Expand Down Expand Up @@ -212,25 +212,25 @@ def merge(self, level: int):

# merge recursively
if self.levels[next_level] >= self.max_runs_per_level:
self.merge(next_level)
self._merge(next_level)

def snapshot(self, id: int):
self.close_run()
self._close_run()
if self.remote:
runs = discover_run_files(self.data_dir)
self.remote.push_deltas(runs, id)
self.open_new_files()
self._open_new_files()

def restore(self, version=None):
self.close_run()
self._close_run()
if self.remote:
self.global_version = self.remote.restore(version=version)
if self.wfd is not None:
self.wfd.close()
for rfds in self.rfds:
for rfd in rfds:
rfd.close()
self.rebuild_indices()
self._rebuild_indices()

def __sizeof__(self):
return getsizeof(self.hash_index)
28 changes: 14 additions & 14 deletions kevo/engines/hybridlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ def __init__(self,
if self.remote:
self.restore()
else:
self.rebuild_indices()
self._rebuild_indices()

def rebuild_indices(self):
def _rebuild_indices(self):
self.levels.clear()
self.rfds.clear()

Expand Down Expand Up @@ -115,7 +115,7 @@ def rebuild_indices(self):
self.memory.set_tail_offset(self.tail_offset)

def close(self):
self.flush(self.tail_offset) # flush everything
self._flush(self.tail_offset) # flush everything
self.wfd.close()
for rfds in self.rfds:
for rfd in rfds:
Expand Down Expand Up @@ -144,7 +144,7 @@ def _get(self, key):

def _set(self, key, value=KVStore.EMPTY):
if self.memory.is_full():
self.flush(self.ro_offset)
self._flush(self.ro_offset)

if key in self.hash_index:
offset = self.hash_index[key]
Expand All @@ -161,10 +161,10 @@ def _set(self, key, value=KVStore.EMPTY):
self.ro_offset += 1

if self.ro_offset - self.head_offset > self.flush_interval:
self.flush(self.ro_offset)
self.open_new_files()
self._flush(self.ro_offset)
self._open_new_files()

def flush(self, offset: int):
def _flush(self, offset: int):
if self.memory.is_empty():
return

Expand All @@ -188,13 +188,13 @@ def flush(self, offset: int):

self.levels[flush_level] += 1
if self.levels[flush_level] >= self.max_runs_per_level:
self.merge(flush_level)
self._merge(flush_level)

# open a new file after merging
# self.wfd = (self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('ab')
# self.rfds[flush_level].append((self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('rb'))

def merge(self, level: int):
def _merge(self, level: int):
next_level = level + 1
if next_level >= len(self.levels):
self.levels.append(0)
Expand Down Expand Up @@ -237,21 +237,21 @@ def merge(self, level: int):

# merge recursively
if self.levels[next_level] >= self.max_runs_per_level:
self.merge(next_level)
self._merge(next_level)

def open_new_files(self):
def _open_new_files(self):
flush_level = 0
self.wfd.close()
self.wfd = (self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('ab')
self.rfds[flush_level].append((self.data_dir / f'L{flush_level}.{self.levels[flush_level]}.{self.global_version}.run').open('rb'))

def snapshot(self, id: int):
self.flush(self.tail_offset)
self._flush(self.tail_offset)
self.ro_offset = self.tail_offset
if self.remote:
runs = discover_run_files(self.data_dir)
self.remote.push_deltas(runs, id)
self.open_new_files()
self._open_new_files()

def restore(self, version=None):
if self.remote:
Expand All @@ -262,7 +262,7 @@ def restore(self, version=None):
for rfds in self.rfds:
for rfd in rfds:
rfd.close()
self.rebuild_indices()
self._rebuild_indices()

def __sizeof__(self):
return getsizeof(self.hash_index) + getsizeof(self.la_to_file_offset) + getsizeof(self.memory)
12 changes: 12 additions & 0 deletions kevo/engines/kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,15 @@ def __sizeof__(self):

def close(self):
raise NotImplementedError('')

def snapshot(self, id: int):
raise NotImplementedError('')

def restore(self, version=None):
raise NotImplementedError('')

def _rebuild_indices(self):
raise NotImplementedError('')

def __contains__(self, key):
return self.get(key) != b''
20 changes: 10 additions & 10 deletions kevo/engines/lsmtree.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ def __init__(self,
# restore calls rebuild_indices, so this way we avoid rebuilding twice
self.restore()
else:
self.rebuild_indices()
self._rebuild_indices()

def rebuild_indices(self):
def _rebuild_indices(self):
# TODO set the global version to the max of the discovered ones
self.levels.clear()
self.rfds.clear()
Expand Down Expand Up @@ -145,12 +145,12 @@ def _set(self, key, value=KVStore.EMPTY):
if self.memtable_bytes_count > self.memtable_bytes_limit:
# normally I would allocate a new memtable here so that writes can continue there
# and then give the flushing of the old memtable to a background thread
self.flush()
self._flush()
else:
# write to wal
self._write_kv_pair(self.wal_file, key, value)

def merge(self, level_idx: int):
def _merge(self, level_idx: int):
level = self.levels[level_idx]
if level_idx + 1 >= len(self.levels):
self.levels.append([])
Expand Down Expand Up @@ -239,9 +239,9 @@ def merge(self, level_idx: int):

# cascade the merging recursively
if len(next_level) >= self.max_runs_per_level:
self.merge(level_idx + 1)
self._merge(level_idx + 1)

def flush(self):
def _flush(self):
if len(self.memtable) == 0:
return
fence_pointers = FencePointers(self.density_factor)
Expand Down Expand Up @@ -277,24 +277,24 @@ def flush(self):

# trigger merge if exceeding the runs per level
if len(self.levels[flush_level]) >= self.max_runs_per_level:
self.merge(flush_level)
self._merge(flush_level)

def snapshot(self, id: int):
self.flush()
self._flush()
if self.remote:
runs = discover_run_files(self.data_dir)
self.remote.push_deltas(runs, id)

def restore(self, version=None):
# flush first to empty the memtable
self.flush()
self._flush()
if self.remote:
self.global_version = self.remote.restore(version=version)
# close open file descriptors first
for rfds in self.rfds:
for rfd in rfds:
rfd.close()
self.rebuild_indices()
self._rebuild_indices()

def __sizeof__(self):
memtable_size = sum((getsizeof(k) + getsizeof(v) for k, v in self.memtable.items()))
Expand Down
29 changes: 9 additions & 20 deletions kevo/engines/memonly.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def __init__(self,
if self.remote:
self.restore()
else:
self.rebuild_indices()
self._rebuild_indices()

def rebuild_indices(self):
def _rebuild_indices(self):
self.hash_index.clear()

runs_discovered = self.discover_runs()
Expand All @@ -48,24 +48,13 @@ def __getitem__(self, key):
def __setitem__(self, key, value):
return self.set(key, value)

def get(self, key: bytes):
assert type(key) is bytes
assert 0 < len(key) <= self.max_key_len

ret = KVStore.EMPTY
try:
ret = self.hash_index[key]
except KeyError:
pass
return ret

def set(self, key: bytes, value: bytes = KVStore.EMPTY):
assert type(key) is bytes and type(value) is bytes
assert 0 < len(key) <= self.max_key_len and len(value) <= self.max_value_len
def _get(self, key):
return self.hash_index.get(key, b'')

def _set(self, key, value=KVStore.EMPTY):
self.hash_index[key] = value

def flush(self):
def _flush(self):
if len(self.hash_index) == 0:
return

Expand All @@ -80,16 +69,16 @@ def flush(self):
self.global_version += 1

def snapshot(self, id: int):
self.flush()
self._flush()
if self.remote:
runs = discover_run_files(self.data_dir)
self.remote.push_deltas(runs, id)

def restore(self, version=None):
self.flush()
self._flush()
if self.remote:
self.global_version = self.remote.restore(version=version)
self.rebuild_indices()
self._rebuild_indices()

def __sizeof__(self):
return getsizeof(self.hash_index)
2 changes: 2 additions & 0 deletions tests/test_appendlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def test_basic(self):
self.assertEqual(l.get(b'to be deleted'), b'')
self.assertEqual(l.get(256, serializer=lambda i, length: i.to_bytes(length=length), serializer_args={'length': 2}), b'\x01\x01')
self.assertEqual(l.get(b'\x01'), b'\x02')
self.assertTrue(b'd' in l)
self.assertTrue(b'not in' not in l)

l.close()

Expand Down

0 comments on commit 772f3c2

Please sign in to comment.