From 7e99cc7567ee1b47699ea9ecf11b001aba85008d Mon Sep 17 00:00:00 2001 From: Nikos Gavalas Date: Thu, 1 Aug 2024 23:25:27 +0200 Subject: [PATCH] - --- kevo/engines/lsmtree.py | 3 +- kevo/remote.py | 70 ++++++++++++++++++++++------------------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/kevo/engines/lsmtree.py b/kevo/engines/lsmtree.py index c6c5b36..0c87f6b 100644 --- a/kevo/engines/lsmtree.py +++ b/kevo/engines/lsmtree.py @@ -278,8 +278,7 @@ def _flush(self): def snapshot(self, id: int): self._flush() if self.remote: - runs = discover_run_files(self.data_dir) - self.remote.push_deltas(runs, id) + self.remote.push_deltas(id) def restore(self, version=None): # flush first to empty the memtable diff --git a/kevo/remote.py b/kevo/remote.py index 2f5d05a..144f288 100644 --- a/kevo/remote.py +++ b/kevo/remote.py @@ -17,14 +17,27 @@ def read_key(filename): # abstract class class Remote: def __init__(self): - self.src_dir_path = None + self.local_dir_path = None - def init(self, src_dir_path): - self.src_dir_path = src_dir_path + def init(self, local_dir_path): + self.local_dir_path = local_dir_path - def push_deltas(self, delta_map: list[Path]): + def _get_local_files(self): raise NotImplementedError + def _get_remote_files(self): + raise NotImplementedError + + def push_deltas(self, snapshot_version: int): + delta_map = self._get_local_files() - self._get_remote_files() + for file in delta_map: + self.put(file) + + version_file_name = f'version.{snapshot_version}.txt' + with open(os.path.join(self.local_dir_path.resolve(), version_file_name), 'w') as f: + f.write('-'.join(delta_map)) + self.put(version_file_name) + def put(self, filename: str): raise NotImplementedError @@ -51,25 +64,16 @@ def __init__(self, remote_dir_path: str, latency_per_byte: int = 0): os.makedirs(self.remote_dir_path, exist_ok=True) - def init(self, src_dir_path: Path): - super().init(src_dir_path) + def init(self, local_dir_path: Path): + super().init(local_dir_path) os.makedirs(self.remote_dir_path, exist_ok=True) - def push_deltas(self, delta_map: list[Path], snapshot_version: int = 0): - delta_map = {f.name for f in delta_map} - - discovered_files = discover_run_files(self.remote_dir_path) - remote_delta_map = {f.name for f in discovered_files} + def _get_remote_deltas(self): + return {f.name for f in discover_run_files(self.remote_dir_path)} - files_to_push = delta_map - remote_delta_map - for file in files_to_push: - self.put(file) - - version_file_name = f'version.{snapshot_version}.txt' - with open(os.path.join(self.src_dir_path.resolve(), version_file_name), 'w') as f: - f.write('-'.join(delta_map)) - self.put(version_file_name) + def _get_local_files(self): + return {f.name for f in discover_run_files(self.local_dir_path)} def _simulate_net_delay(self, filepath: str): filesize = os.path.getsize(filepath) @@ -78,13 +82,13 @@ def _simulate_net_delay(self, filepath: str): def put(self, filename: str): filename = os.path.basename(filename) - filepath = os.path.join(self.src_dir_path.resolve(), filename) + filepath = os.path.join(self.local_dir_path.resolve(), filename) self._simulate_net_delay(filepath) shutil.copy(filepath, self.remote_dir_path) def get(self, filename: str): filename = os.path.basename(filename) - shutil.copy(os.path.join(self.remote_dir_path, filename), self.src_dir_path.resolve()) + shutil.copy(os.path.join(self.remote_dir_path, filename), self.local_dir_path.resolve()) def gc(self): raise NotImplementedError @@ -99,12 +103,14 @@ def restore(self, version=None): # clean up the local tree first # TODO do this after a successful recovery, fetch all the files first and only afterwards delete the old ones. - shutil.rmtree(self.src_dir_path) - os.mkdir(self.src_dir_path) + p = [f.name for f in self.local_dir_path.glob("L*.run")] + + shutil.rmtree(self.local_dir_path) + os.mkdir(self.local_dir_path) version_file_name = f'version.{version}.txt' self.get(version_file_name) - with open(os.path.join(self.src_dir_path, version_file_name), 'r') as f: + with open(os.path.join(self.local_dir_path, version_file_name), 'r') as f: filenames = f.read().split('-') for f in filenames: @@ -140,8 +146,8 @@ def __init__(self, self.global_version = 0 - def init(self, src_dir_path: Path): - super().init(src_dir_path) + def init(self, local_dir_path: Path): + super().init(local_dir_path) if not self.client.bucket_exists(self.bucket): self.client.make_bucket(self.bucket) @@ -158,18 +164,18 @@ def push_deltas(self, delta_map: list[Path], snapshot_version: int = 0): self.put(file) version_file_name = f'version.{snapshot_version}.txt' - with open(os.path.join(self.src_dir_path.resolve(), version_file_name), 'w') as f: + with open(os.path.join(self.local_dir_path.resolve(), version_file_name), 'w') as f: f.write('-'.join(delta_map)) self.put(version_file_name) def put(self, filename: str): filename = os.path.basename(filename) - filepath = os.path.join(self.src_dir_path.resolve(), filename) + filepath = os.path.join(self.local_dir_path.resolve(), filename) self.client.fput_object(self.bucket, filename, filepath) def get(self, filename: str): filename = os.path.basename(filename) - self.client.fget_object(self.bucket, filename, os.path.join(self.src_dir_path.resolve(), filename)) + self.client.fget_object(self.bucket, filename, os.path.join(self.local_dir_path.resolve(), filename)) def gc(self): raise NotImplementedError @@ -185,12 +191,12 @@ def restore(self, version=None): # clean up the local tree first # TODO do this after a successful recovery, fetch all the files first and only afterwards delete the old ones. - shutil.rmtree(self.src_dir_path) - os.mkdir(self.src_dir_path) + shutil.rmtree(self.local_dir_path) + os.mkdir(self.local_dir_path) version_file_name = f'version.{version}.txt' self.get(version_file_name) - with open(os.path.join(self.src_dir_path, version_file_name), 'r') as f: + with open(os.path.join(self.local_dir_path, version_file_name), 'r') as f: filenames = f.read().split('-') for f in filenames: