diff --git a/kevo/common/fencepointers.py b/kevo/common/fencepointers.py index d32c835..eb7bc83 100644 --- a/kevo/common/fencepointers.py +++ b/kevo/common/fencepointers.py @@ -1,12 +1,5 @@ -''' -This now is implemented as a sorted dictionary (because I need the bisect_left/right) with base64/json-based ser/der. -A better implementation would be: two arrays (one for keys one for values) so that I can binary-search on the keys, and binary encoding for ser/der. -TODO rebuilding from string could be done linearly if the serialization is sorted, right now the sorteddict is being rebuilt from scratch so that should be fixed -''' - from sys import getsizeof -import json -from base64 import b64encode, b64decode +import struct from sortedcontainers import SortedDict @@ -17,38 +10,44 @@ def __init__(self, density_factor=20, from_str: str | None = None): self.density_factor = density_factor self.counter = 0 - - if type(from_str) is str: - data = json.loads(from_str) - for k, v in data['pointers'].items(): - self.pointers[b64decode(k)] = v - self.density_factor = data['density_factor'] - self.counter = data['counter'] + self.incr = 0 def add(self, key: bytes, offset: int): - if self.counter % self.density_factor == 0: + if self.incr % self.density_factor == 0: self.pointers[key] = offset + self.counter += 1 + + self.incr += 1 - self.counter += 1 - def bisect(self, key: bytes): return self.pointers.bisect(key) def peekitem(self, idx): return self.pointers.peekitem(idx) - def serialize(self): - pointers = {} + def to_file_as_blob(self, fd, enc_len): + fd.write(struct.pack(' 0: + self._read_kv_pair(fd, enc_len) + cnt -= 1 def __len__(self): - return self.counter + return self.incr def __str__(self) -> str: return self.serialize() diff --git a/kevo/engines/lsmtree.py b/kevo/engines/lsmtree.py index 2ef67de..c6c5b36 100644 --- a/kevo/engines/lsmtree.py +++ b/kevo/engines/lsmtree.py @@ -13,15 +13,6 @@ Run = namedtuple('Run', ['filter', 'pointers', 'nr_records']) -def append_indices(file_descriptor, fence_pointers, bloom_filter, nr_records): - pointers_offset = file_descriptor.tell() - file_descriptor.write(fence_pointers.serialize().encode()) - bloom_offset = file_descriptor.tell() - file_descriptor.write(bloom_filter.serialize().encode()) - # pack two 8 byte unsigned ints for the offsets of the pointers and the bloom filter - file_descriptor.write(struct.pack('= len(self.rfds): self.rfds.append([]) @@ -268,7 +259,7 @@ def _flush(self): self._write_kv_pair(run_file, k, v) bloom_filter.add(k) nr_records += 1 - append_indices(run_file, fence_pointers, bloom_filter, nr_records) + self._append_indices(run_file, fence_pointers, bloom_filter, nr_records) self.memtable_bytes_count = 0 @@ -301,6 +292,14 @@ def restore(self, version=None): rfd.close() self._rebuild_indices() + def _append_indices(self, file_descriptor, fence_pointers, bloom_filter, nr_records): + pointers_offset = file_descriptor.tell() + fence_pointers.to_file_as_blob(file_descriptor, self.key_enc_len) + bloom_offset = file_descriptor.tell() + file_descriptor.write(bloom_filter.serialize().encode()) + # pack two 8 byte unsigned ints for the offsets of the pointers and the bloom filter + file_descriptor.write(struct.pack('