Skip to content

Commit

Permalink
fix fencepointers serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosgavalas committed Sep 16, 2023
1 parent e7d1a2d commit f9dc12c
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 44 deletions.
53 changes: 26 additions & 27 deletions kevo/common/fencepointers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
'''
This now is implemented as a sorted dictionary (because I need the bisect_left/right) with base64/json-based ser/der.
A better implementation would be: two arrays (one for keys one for values) so that I can binary-search on the keys, and binary encoding for ser/der.
TODO rebuilding from string could be done linearly if the serialization is sorted, right now the sorteddict is being rebuilt from scratch so that should be fixed
'''

from sys import getsizeof
import json
from base64 import b64encode, b64decode
import struct

from sortedcontainers import SortedDict

Expand All @@ -17,38 +10,44 @@ def __init__(self, density_factor=20, from_str: str | None = None):

self.density_factor = density_factor
self.counter = 0

if type(from_str) is str:
data = json.loads(from_str)
for k, v in data['pointers'].items():
self.pointers[b64decode(k)] = v
self.density_factor = data['density_factor']
self.counter = data['counter']
self.incr = 0

def add(self, key: bytes, offset: int):
if self.counter % self.density_factor == 0:
if self.incr % self.density_factor == 0:
self.pointers[key] = offset
self.counter += 1

self.incr += 1

self.counter += 1

def bisect(self, key: bytes):
return self.pointers.bisect(key)

def peekitem(self, idx):
return self.pointers.peekitem(idx)

def serialize(self):
pointers = {}
def to_file_as_blob(self, fd, enc_len):
fd.write(struct.pack('<QQ', self.density_factor, self.counter))
for k, v in self.pointers.items():
pointers[b64encode(k).decode()] = v
return json.dumps({
'pointers': pointers,
'density_factor': self.density_factor,
'counter': self.counter
})
fd.write(len(k).to_bytes(enc_len, byteorder='little'))
fd.write(k)
fd.write(v.to_bytes(8, byteorder='little'))

def _read_kv_pair(self, fd, enc_len):
key_len = int.from_bytes(fd.read(enc_len), byteorder='little')
key = fd.read(key_len)
value = int.from_bytes(fd.read(8), byteorder='little')
self.pointers[key] = value

def from_file_descriptor(self, fd, enc_len):
metadata = fd.read(16)
self.density_factor, self.counter = struct.unpack('<QQ', metadata)
cnt = self.counter
while cnt > 0:
self._read_kv_pair(fd, enc_len)
cnt -= 1

def __len__(self):
return self.counter
return self.incr

def __str__(self) -> str:
return self.serialize()
Expand Down
25 changes: 12 additions & 13 deletions kevo/engines/lsmtree.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,6 @@
Run = namedtuple('Run', ['filter', 'pointers', 'nr_records'])


def append_indices(file_descriptor, fence_pointers, bloom_filter, nr_records):
pointers_offset = file_descriptor.tell()
file_descriptor.write(fence_pointers.serialize().encode())
bloom_offset = file_descriptor.tell()
file_descriptor.write(bloom_filter.serialize().encode())
# pack two 8 byte unsigned ints for the offsets of the pointers and the bloom filter
file_descriptor.write(struct.pack('<QQQ', pointers_offset, bloom_offset, nr_records))


class LSMTree(KVStore):
name = 'LSMTree'

Expand Down Expand Up @@ -74,7 +65,6 @@ def __init__(self,
self._rebuild_indices()

def _rebuild_indices(self):
# TODO set the global version to the max of the discovered ones
self.levels.clear()
self.rfds.clear()

Expand All @@ -92,7 +82,8 @@ def _rebuild_indices(self):
offsets = run_file.read()
pointers_offset, bloom_offset, nr_records = struct.unpack('<QQQ', offsets)
run_file.seek(pointers_offset)
pointers = FencePointers(from_str=run_file.read(bloom_offset - pointers_offset).decode())
pointers = FencePointers()
pointers.from_file_descriptor(run_file, self.key_enc_len)
run_file.seek(bloom_offset)
bloom_filter = BloomFilter(from_str=run_file.read(bloom_end_offset - bloom_offset).decode())

Expand Down Expand Up @@ -220,7 +211,7 @@ def _merge(self, level_idx: int):
keys[i], values[i] = self._read_kv_pair(fds[i])
counters[i] += 1

append_indices(run_file, fence_pointers, bloom_filter, nr_records)
self._append_indices(run_file, fence_pointers, bloom_filter, nr_records)

if level_idx + 1 >= len(self.rfds):
self.rfds.append([])
Expand Down Expand Up @@ -268,7 +259,7 @@ def _flush(self):
self._write_kv_pair(run_file, k, v)
bloom_filter.add(k)
nr_records += 1
append_indices(run_file, fence_pointers, bloom_filter, nr_records)
self._append_indices(run_file, fence_pointers, bloom_filter, nr_records)

self.memtable_bytes_count = 0

Expand Down Expand Up @@ -301,6 +292,14 @@ def restore(self, version=None):
rfd.close()
self._rebuild_indices()

def _append_indices(self, file_descriptor, fence_pointers, bloom_filter, nr_records):
pointers_offset = file_descriptor.tell()
fence_pointers.to_file_as_blob(file_descriptor, self.key_enc_len)
bloom_offset = file_descriptor.tell()
file_descriptor.write(bloom_filter.serialize().encode())
# pack two 8 byte unsigned ints for the offsets of the pointers and the bloom filter
file_descriptor.write(struct.pack('<QQQ', pointers_offset, bloom_offset, nr_records))

def __sizeof__(self):
memtable_size = sum((getsizeof(k) + getsizeof(v) for k, v in self.memtable.items()))
bloom_filters_size = sum((getsizeof(run.filter) for level in self.levels for run in level))
Expand Down
25 changes: 21 additions & 4 deletions tests/test_fencepointers.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
import os.path
import unittest

from random import Random
from kevo.common import FencePointers
from tempfile import TemporaryFile


class TestFencePointers(unittest.TestCase):
tmpfile = 'tmpfile'

def tearDown(self):
if os.path.exists(self.tmpfile):
os.remove(self.tmpfile)

def test_ser_der(self):
rng = Random(1)

fp1 = FencePointers()
for i in range(100):
randbytes = rng.randbytes(rng.randint(50, 100))
randint = rng.randint(0, 1000)

for i in range(1_000):
randbytes = rng.randbytes(rng.randint(0, 100))
randint = rng.randint(0, 10_000_000_000)
fp1.add(randbytes, randint)
fp2 = FencePointers(from_str=fp1.serialize())

with open(self.tmpfile, 'wb') as f:
fp1.to_file_as_blob(f, 2)

fp2 = FencePointers()
with open(self.tmpfile, 'rb') as f:
fp2.from_file_descriptor(f, 2)

self.assertEqual(fp1.pointers, fp2.pointers)


Expand Down

0 comments on commit f9dc12c

Please sign in to comment.