Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADJUST io.py for logging mlflow; Credits: Blirona #1

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c0a51a2
ADJUST io.py for logging mlflow; Credits: Blirona
FeU-aKlos Sep 5, 2023
fe3737d
MADE ajustments as commented by PR
FeU-aKlos Sep 7, 2023
f884c88
ADD rclone utils file; ADJUST trainer, so that intermediate results a…
FeU-aKlos Sep 7, 2023
40bb9ca
ADJUST trainer file
FeU-aKlos Sep 7, 2023
5a85257
ADJUST trainer/io.py to have the imports at the beginning of the file
FeU-aKlos Sep 8, 2023
9a6e0e1
ADJUST files
FeU-aKlos Sep 8, 2023
8de4be8
ADJUST imports again circular import issue
FeU-aKlos Sep 8, 2023
098bc7e
ADJUST output_path name
FeU-aKlos Sep 8, 2023
bce3f88
DELETE intermediate saving of the results to lakefs <- otherwise, sto…
FeU-aKlos Sep 10, 2023
51df431
ADJUST rclone methods and trainer.py, so that intermediate results ar…
FeU-aKlos Sep 11, 2023
ee37597
FIX minor bug
FeU-aKlos Sep 11, 2023
bf41074
ADJUST somethiong :D
FeU-aKlos Sep 11, 2023
36d5b18
ADJUST somethiong :D
FeU-aKlos Sep 11, 2023
6958fda
ADJUST stuff
FeU-aKlos Sep 11, 2023
84999c2
ADJUST stuff
FeU-aKlos Sep 11, 2023
4904235
run_eval does not have any influence on the training anymore. eval is…
FeU-aKlos Sep 12, 2023
e1acc82
delete mlflow logging stuff. this will be done at the end of the trai…
FeU-aKlos Sep 12, 2023
e23a011
ADD clone dir from minio
FeU-aKlos Sep 14, 2023
8a1a751
SIMPLIFIED copy data from s3bucket function in trainer rclone.py
FeU-aKlos Sep 14, 2023
f1725a8
ADJUST copy method
FeU-aKlos Sep 14, 2023
6e69c68
ADJUST to only copy from lakefs ... sync2copy
FeU-aKlos Sep 14, 2023
a4118fb
FIX potential bug
FeU-aKlos Sep 20, 2023
a3d12c8
FIX bug loss value, if not float
FeU-aKlos Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions trainer/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, Union
from urllib.parse import urlparse

import mlflow
import fsspec
import torch
from coqpit import Coqpit


from trainer.logger import logger


Expand Down Expand Up @@ -161,10 +162,10 @@ def save_checkpoint(
save_func=save_func,
**kwargs,
)

if save_n_checkpoints is not None:
keep_n_checkpoints(output_folder, save_n_checkpoints)


def save_best_model(
current_loss,
best_loss,
Expand All @@ -180,6 +181,8 @@ def save_best_model(
save_func=None,
**kwargs,
):
print("current_loss: ",current_loss)
print("best_loss: ",best_loss)
if current_loss < best_loss:
best_model_name = f"best_model_{current_step}.pth"
checkpoint_path = os.path.join(out_path, best_model_name)
Expand Down
30 changes: 21 additions & 9 deletions trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
rank_zero_logger_info,
rank_zero_only,
)
from trainer.utils.rclone import sync_data2s3bucket

logger = logging.getLogger("trainer")

Expand Down Expand Up @@ -413,6 +414,8 @@ def __init__( # pylint: disable=dangerous-default-value
# create a new output folder name
output_path = get_experiment_folder_path(config.output_path, config.run_name)
os.makedirs(output_path, exist_ok=True)

self.experiment_output_path = output_path

# copy training assets to the output folder
copy_model_files(config, output_path, new_fields)
Expand Down Expand Up @@ -1484,6 +1487,7 @@ def train_epoch(self) -> None:
loader_start_time = time.time()
# TRAINING EPOCH -> iterate over the training samples
batch_num_steps = len(self.train_loader)
intermediate_eval = False
for cur_step, batch in enumerate(self.train_loader):
outputs, _ = self.train_step(batch, batch_num_steps, cur_step, loader_start_time)
if outputs is None:
Expand All @@ -1494,7 +1498,9 @@ def train_epoch(self) -> None:

# RUN EVAL -> run evaluation epoch in the middle of training. Useful for big datasets.
if self.config.run_eval_steps is not None and (self.total_steps_done % self.config.run_eval_steps == 0):
intermediate_eval = True
self.eval_epoch()
self.test_run()
if self.num_gpus > 1:
self.model.module.train()
else:
Expand All @@ -1521,6 +1527,8 @@ def train_epoch(self) -> None:
self.dashboard_logger.train_epoch_stats(self.total_steps_done, epoch_stats)
if self.config.model_param_stats:
self.dashboard_logger.model_weights(self.model, self.total_steps_done)
if intermediate_eval:
sync_data2s3bucket(os.getenv("VOICE_GENERATION_RESULTS_BUCKET"), os.getenv("OUTPUT_PATH"))
torch.cuda.empty_cache()

#######################
Expand Down Expand Up @@ -1612,9 +1620,12 @@ def eval_epoch(self) -> None:
self.eval_samples,
verbose=True,
)
if self.config.run_eval
else None
# if self.config.run_eval or not isinstance(self.config.run_eval_steps,type(None))
# else None
)

print("self.config.run_eval_steps",self.config.run_eval_steps)
print("self.config.run_eval",self.config.run_eval)

torch.set_grad_enabled(False)
self.model.eval()
Expand Down Expand Up @@ -1706,7 +1717,7 @@ def _restore_best_loss(self):
logger.info(" > Restoring best loss from %s ...", os.path.basename(self.args.best_path))
ch = load_fsspec(self.args.restore_path, map_location="cpu")
if "model_loss" in ch:
self.best_loss = ch["model_loss"]
self.best_loss = ch["model_loss"]["eval_loss"] if isinstance(ch["model_loss"],dict) else ch["model_loss"]
logger.info(" > Starting with loaded last best loss %f", self.best_loss)

def test(self, model=None, test_samples=None) -> None:
Expand Down Expand Up @@ -1753,24 +1764,25 @@ def _fit(self) -> None:
dist.barrier()
self.callbacks.on_epoch_start(self)
self.keep_avg_train = KeepAverage()
self.keep_avg_eval = KeepAverage() if self.config.run_eval else None
self.keep_avg_eval = KeepAverage() #if self.config.run_eval else None
self.epochs_done = epoch
self.c_logger.print_epoch_start(epoch, self.config.epochs, self.output_path)
if not self.skip_train_epoch and not self.start_with_eval:
self.train_epoch()
if self.config.run_eval:
self.eval_epoch()
if epoch >= self.config.test_delay_epochs and self.args.rank <= 0:
self.test_run()
# if self.config.run_eval:
# self.eval_epoch()
# if epoch >= self.config.test_delay_epochs and self.args.rank <= 0:
# self.test_run()

self.c_logger.print_epoch_end(
epoch,
self.keep_avg_eval.avg_values if self.config.run_eval else self.keep_avg_train.avg_values,
self.keep_avg_eval.avg_values #if self.config.run_eval else self.keep_avg_train.avg_values,
)
if self.args.rank in [None, 0]:
self.save_best_model()
self.callbacks.on_epoch_end(self)
self.start_with_eval = False


def fit_with_largest_batch_size(self, starting_batch_size=2048) -> None:
cuda_meminfo()
Expand Down
26 changes: 26 additions & 0 deletions trainer/utils/rclone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
import subprocess

def get_data_from_lakefs(repo_name:str, branch_name:str, target_folder:str):
command = ["rclone", "copy",
f"lakefs:{repo_name}/{branch_name}", target_folder]
result = subprocess.run(command, capture_output=True)
print("Rclone stderr:", result.stderr)
assert result.returncode == 0

def sync_data2s3bucket(bucket_name:str, source_folder:str, name_from_config:str="lakefs"):
command = ["rclone", "sync", source_folder, f"{name_from_config}:{bucket_name}"]
result = subprocess.run(command, capture_output=True)
print("Rclone stderr:", result.stderr)
assert result.returncode == 0

def copy_data_from_s3bucket(bucket_uri:str, target_folder:str):
splitted_bucket_uri = [s for s in bucket_uri.split("/") if s != ""]
if len(splitted_bucket_uri) > 1:
#assumption: only one folder is specified in the bucket_uri
target_folder = os.path.join(target_folder,splitted_bucket_uri[-1])
command = ["rclone", "copy", bucket_uri, target_folder]
result = subprocess.run(command, capture_output=True)
print("Rclone stderr:", result.stderr)
assert result.returncode == 0
return target_folder