Skip to content

Commit

Permalink
make the wal optional in the lsm tree
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosgavalas committed Sep 16, 2023
1 parent 772f3c2 commit e7d1a2d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 23 deletions.
31 changes: 18 additions & 13 deletions kevo/engines/lsmtree.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self,
max_runs_per_level=3,
density_factor=20,
memtable_bytes_limit=1_000_000,
use_wal=False,
remote: Optional[Remote] = None):
self.type = 'lsmtree'
super().__init__(data_dir=data_dir, max_key_len=max_key_len, max_value_len=max_value_len, remote=remote)
Expand All @@ -42,20 +43,22 @@ def __init__(self,

self.max_runs_per_level = max_runs_per_level
self.density_factor = density_factor
self.use_wal = use_wal

self.memtable = SortedDict()
self.memtable_bytes_limit = memtable_bytes_limit
self.memtable_bytes_count = 0

self.wal_path = self.data_dir / 'wal'
if self.wal_path.is_file():
with self.wal_path.open('rb') as wal_file:
k, v = self._read_kv_pair(wal_file)
while k:
# write the value to the memtable directly, no checks for amount of bytes etc.
self.memtable[k] = v
if self.use_wal:
self.wal_path = self.data_dir / 'wal'
if self.wal_path.is_file():
with self.wal_path.open('rb') as wal_file:
k, v = self._read_kv_pair(wal_file)
self.wal_file = self.wal_path.open('ab')
while k:
# write the value to the memtable directly, no checks for amount of bytes etc.
self.memtable[k] = v
k, v = self._read_kv_pair(wal_file)
self.wal_file = self.wal_path.open('ab')

self.levels: list[list[Run]] = []
self.rfds: list[list[FileIO]] = []
Expand Down Expand Up @@ -102,7 +105,8 @@ def _rebuild_indices(self):
self.rfds[level_idx].append((self.data_dir / f'L{level_idx}.{run_idx}.{version}.run').open('rb'))

def close(self):
self.wal_file.close()
if self.use_wal:
self.wal_file.close()
for rfds in self.rfds:
for rfd in rfds:
rfd.close()
Expand Down Expand Up @@ -146,7 +150,7 @@ def _set(self, key, value=KVStore.EMPTY):
# normally I would allocate a new memtable here so that writes can continue there
# and then give the flushing of the old memtable to a background thread
self._flush()
else:
elif self.use_wal:
# write to wal
self._write_kv_pair(self.wal_file, key, value)

Expand Down Expand Up @@ -271,9 +275,10 @@ def _flush(self):
self.levels[flush_level].append(Run(bloom_filter, fence_pointers, nr_records))
self.rfds[flush_level].append((self.data_dir / f'L{flush_level}.{n_runs}.{self.global_version}.run').open('rb'))

# reset WAL
self.wal_file.close()
self.wal_file = self.wal_path.open('wb')
if self.use_wal:
# reset WAL
self.wal_file.close()
self.wal_file = self.wal_path.open('wb')

# trigger merge if exceeding the runs per level
if len(self.levels[flush_level]) >= self.max_runs_per_level:
Expand Down
26 changes: 16 additions & 10 deletions tests/test_lsmtree.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ def test_basic(self):
self.assertEqual(l.get(b'cc'), b'cici345')
self.assertEqual(l.get(b'\x01\x00'), b'\x01\x01')
self.assertEqual(l.get(b'\x01'), b'\x02')
self.assertEqual(l.get(256, serializer=lambda i, length: i.to_bytes(length=length), serializer_args={'length': 2}), b'\x01\x01')
self.assertEqual(
l.get(256, serializer=lambda i, length: i.to_bytes(length=length), serializer_args={'length': 2}),
b'\x01\x01')
self.assertEqual(l.get(b'\x01'), b'\x02')

l.close()
Expand All @@ -44,7 +46,8 @@ def test_fuzzy_granular(self):
n_items=100, n_iter=10_000, seeds=[1], test_recovery=False, test_remote=False)

def test_fuzzy_realistic(self):
self.fuzzy_test(LSMTree, args={'data_dir': self.dir.name, 'remote': None}, key_len_range=(1, 10),
self.fuzzy_test(LSMTree, args={'data_dir': self.dir.name, 'use_wal': True, 'remote': None},
key_len_range=(1, 10),
val_len_range=(0, 13), n_items=100, n_iter=1_000_000, seeds=[1], test_recovery=True,
test_remote=False)

Expand All @@ -56,36 +59,39 @@ def test_fuzzy_large_kvs(self):

def test_fuzzy_recovery(self):
self.fuzzy_test(LSMTree,
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 100},
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 100, 'use_wal': True},
key_len_range=(1, 10), val_len_range=(0, 13), n_items=10_000, n_iter=10_000, seeds=[1],
test_recovery=True, test_remote=False)

def test_fuzzy_remote(self):
self.fuzzy_test(LSMTree,
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 1_000, 'remote': self.remote},
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 1_000, 'use_wal': True,
'remote': self.remote},
key_len_range=(1, 10), val_len_range=(0, 13), n_items=10_000, n_iter=100_000, seeds=[1],
test_recovery=True, test_remote=True)

def test_fuzzy_snapshot(self):
self.fuzzy_test_snapshot(LSMTree,
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 1000, 'remote': self.remote},
key_len_range=(1, 10), val_len_range=(0, 13), n_items=10_000, n_iter=10_000, seed=1)
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 1000, 'remote': self.remote},
key_len_range=(1, 10), val_len_range=(0, 13), n_items=10_000, n_iter=10_000, seed=1)

def test_fuzzy_snapshot_continuous(self):
self.fuzzy_test_snapshot_continuous(LSMTree,
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 1000, 'remote': self.remote},
key_len_range=(1, 10), val_len_range=(0, 13), n_items=10_000, n_iter=10_000, seed=1)
args={'data_dir': self.dir.name, 'memtable_bytes_limit': 1000,
'remote': self.remote},
key_len_range=(1, 10), val_len_range=(0, 13), n_items=10_000, n_iter=10_000,
seed=1)

def test_wal(self):
l1 = LSMTree(self.dir.name)
l1 = LSMTree(self.dir.name, use_wal=True)

l1.set(b'a', b'1')
l1.set(b'b', b'2')
l1.set(b'c', b'3')

l1.close()

l2 = LSMTree(self.dir.name)
l2 = LSMTree(self.dir.name, use_wal=True)

self.assertEqual(l2.get(b'a'), b'1')
self.assertEqual(l2.get(b'b'), b'2')
Expand Down

0 comments on commit e7d1a2d

Please sign in to comment.