Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosgavalas committed Aug 1, 2024
1 parent e779e6d commit 7e99cc7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
3 changes: 1 addition & 2 deletions kevo/engines/lsmtree.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,7 @@ def _flush(self):
def snapshot(self, id: int):
self._flush()
if self.remote:
runs = discover_run_files(self.data_dir)
self.remote.push_deltas(runs, id)
self.remote.push_deltas(id)

def restore(self, version=None):
# flush first to empty the memtable
Expand Down
70 changes: 38 additions & 32 deletions kevo/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@ def read_key(filename):
# abstract class
class Remote:
def __init__(self):
self.src_dir_path = None
self.local_dir_path = None

def init(self, src_dir_path):
self.src_dir_path = src_dir_path
def init(self, local_dir_path):
self.local_dir_path = local_dir_path

def push_deltas(self, delta_map: list[Path]):
def _get_local_files(self):
raise NotImplementedError

def _get_remote_files(self):
raise NotImplementedError

def push_deltas(self, snapshot_version: int):
delta_map = self._get_local_files() - self._get_remote_files()
for file in delta_map:
self.put(file)

version_file_name = f'version.{snapshot_version}.txt'
with open(os.path.join(self.local_dir_path.resolve(), version_file_name), 'w') as f:
f.write('-'.join(delta_map))
self.put(version_file_name)

def put(self, filename: str):
raise NotImplementedError

Expand All @@ -51,25 +64,16 @@ def __init__(self, remote_dir_path: str, latency_per_byte: int = 0):

os.makedirs(self.remote_dir_path, exist_ok=True)

def init(self, src_dir_path: Path):
super().init(src_dir_path)
def init(self, local_dir_path: Path):
super().init(local_dir_path)

os.makedirs(self.remote_dir_path, exist_ok=True)

def push_deltas(self, delta_map: list[Path], snapshot_version: int = 0):
delta_map = {f.name for f in delta_map}

discovered_files = discover_run_files(self.remote_dir_path)
remote_delta_map = {f.name for f in discovered_files}
def _get_remote_deltas(self):
return {f.name for f in discover_run_files(self.remote_dir_path)}

files_to_push = delta_map - remote_delta_map
for file in files_to_push:
self.put(file)

version_file_name = f'version.{snapshot_version}.txt'
with open(os.path.join(self.src_dir_path.resolve(), version_file_name), 'w') as f:
f.write('-'.join(delta_map))
self.put(version_file_name)
def _get_local_files(self):
return {f.name for f in discover_run_files(self.local_dir_path)}

def _simulate_net_delay(self, filepath: str):
filesize = os.path.getsize(filepath)
Expand All @@ -78,13 +82,13 @@ def _simulate_net_delay(self, filepath: str):

def put(self, filename: str):
filename = os.path.basename(filename)
filepath = os.path.join(self.src_dir_path.resolve(), filename)
filepath = os.path.join(self.local_dir_path.resolve(), filename)
self._simulate_net_delay(filepath)
shutil.copy(filepath, self.remote_dir_path)

def get(self, filename: str):
filename = os.path.basename(filename)
shutil.copy(os.path.join(self.remote_dir_path, filename), self.src_dir_path.resolve())
shutil.copy(os.path.join(self.remote_dir_path, filename), self.local_dir_path.resolve())

def gc(self):
raise NotImplementedError
Expand All @@ -99,12 +103,14 @@ def restore(self, version=None):

# clean up the local tree first
# TODO do this after a successful recovery, fetch all the files first and only afterwards delete the old ones.
shutil.rmtree(self.src_dir_path)
os.mkdir(self.src_dir_path)
p = [f.name for f in self.local_dir_path.glob("L*.run")]

shutil.rmtree(self.local_dir_path)
os.mkdir(self.local_dir_path)

version_file_name = f'version.{version}.txt'
self.get(version_file_name)
with open(os.path.join(self.src_dir_path, version_file_name), 'r') as f:
with open(os.path.join(self.local_dir_path, version_file_name), 'r') as f:
filenames = f.read().split('-')

for f in filenames:
Expand Down Expand Up @@ -140,8 +146,8 @@ def __init__(self,

self.global_version = 0

def init(self, src_dir_path: Path):
super().init(src_dir_path)
def init(self, local_dir_path: Path):
super().init(local_dir_path)

if not self.client.bucket_exists(self.bucket):
self.client.make_bucket(self.bucket)
Expand All @@ -158,18 +164,18 @@ def push_deltas(self, delta_map: list[Path], snapshot_version: int = 0):
self.put(file)

version_file_name = f'version.{snapshot_version}.txt'
with open(os.path.join(self.src_dir_path.resolve(), version_file_name), 'w') as f:
with open(os.path.join(self.local_dir_path.resolve(), version_file_name), 'w') as f:
f.write('-'.join(delta_map))
self.put(version_file_name)

def put(self, filename: str):
filename = os.path.basename(filename)
filepath = os.path.join(self.src_dir_path.resolve(), filename)
filepath = os.path.join(self.local_dir_path.resolve(), filename)
self.client.fput_object(self.bucket, filename, filepath)

def get(self, filename: str):
filename = os.path.basename(filename)
self.client.fget_object(self.bucket, filename, os.path.join(self.src_dir_path.resolve(), filename))
self.client.fget_object(self.bucket, filename, os.path.join(self.local_dir_path.resolve(), filename))

def gc(self):
raise NotImplementedError
Expand All @@ -185,12 +191,12 @@ def restore(self, version=None):

# clean up the local tree first
# TODO do this after a successful recovery, fetch all the files first and only afterwards delete the old ones.
shutil.rmtree(self.src_dir_path)
os.mkdir(self.src_dir_path)
shutil.rmtree(self.local_dir_path)
os.mkdir(self.local_dir_path)

version_file_name = f'version.{version}.txt'
self.get(version_file_name)
with open(os.path.join(self.src_dir_path, version_file_name), 'r') as f:
with open(os.path.join(self.local_dir_path, version_file_name), 'r') as f:
filenames = f.read().split('-')

for f in filenames:
Expand Down

0 comments on commit 7e99cc7

Please sign in to comment.