Skip to content

Commit

Permalink
Bye bye mlflow 👋
Browse files Browse the repository at this point in the history
  • Loading branch information
lauraporta committed Nov 28, 2024
1 parent dc886dd commit f7eb891
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 91 deletions.
4 changes: 2 additions & 2 deletions calcium_imaging_automation/core/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
from pathlib import Path

from calcium_imaging_automation.core.pipeline import mlflow_orchestrator
from calcium_imaging_automation.core.pipeline import orchestrator

if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -40,7 +40,7 @@

args = parser.parse_args()

mlflow_orchestrator(
orchestrator(
args.raw_data_path,
args.output_path,
args.folder_read_pattern,
Expand Down
103 changes: 15 additions & 88 deletions calcium_imaging_automation/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
from pathlib import Path
from typing import Callable, List

import mlflow
import setuptools_scm
import pandas as pd
import submitit
from submitit import AutoExecutor

from calcium_imaging_automation.core.reader import ReadAquiredData
from calcium_imaging_automation.core.writer import DatashuttleWrapper


def mlflow_orchestrator(
def orchestrator(
raw_data_path: Path,
output_path: Path,
folder_read_pattern: str,
Expand All @@ -24,7 +23,6 @@ def mlflow_orchestrator(
):
# --- Setup logging and MLflow ---
logging_setup(output_path)
mlflow_setup(output_path)

# mkdir for submitit logs submitit / timestamp
(output_path / "submitit").mkdir(exist_ok=True)
Expand Down Expand Up @@ -55,28 +53,11 @@ def mlflow_orchestrator(
compute_metric=compute_metric,
)

# --- Log all results with MLflow ---
for dataset, result, error in zip(reader.dataset_names, results, errors):
mlflow_set_experiment(experiment_name, dataset, 0)

with mlflow.start_run():
mlflow_parent_run_logs(
dataset,
0,
raw_data_path,
output_path,
folder_read_pattern,
file_read_pattern,
)

# log error if any
if error:
mlflow.log_param("error", error)

if result:
mlflow.log_metric("stability", result)

mlflow.end_run()
# save the results and errors as csv
results_df = pd.DataFrame(results)
results_df.to_csv(output_path / "results.csv")
errors_df = pd.DataFrame(errors)
errors_df.to_csv(output_path / "errors.csv")

logging.info("Pipeline finished.")

Expand Down Expand Up @@ -132,8 +113,14 @@ def analysis_pipeline(
os.system("module load miniconda")
os.system("source activate /nfs/nhome/live/lporta/.conda/envs/cimat")
output_path_dataset = output_path_dataset / "ses-0/funcimg/"
data = preprocessing_function(dataset, output_path_dataset)
metric_measured = compute_metric(data)
try:
data = preprocessing_function(dataset, output_path_dataset)
metric_measured = compute_metric(data)
with open(output_path_dataset / "metric.txt", "w") as f:
f.write(str(metric_measured))
except Exception as e:
with open(output_path_dataset / "error.txt", "w") as f:
f.write(str(e.args))
return metric_measured


Expand All @@ -149,63 +136,3 @@ def logging_setup(output_path: Path):
level=logging.INFO,
format="%(asctime)s - %(message)s",
)


def mlflow_setup(output_path: Path):
# --- Setup MLflow tracking ---
mlflow_tracking_dir = output_path / "mlflow"
mlflow.set_tracking_uri(str(mlflow_tracking_dir))


def mlflow_set_experiment(
experiment_name: str, dataset_name: str, session: int
):
# Start a new MLflow experiment for each dataset and session
mlflow.set_experiment(
f"{experiment_name}/{dataset_name}/session_{session}"
)


def mlflow_parent_run_logs(
dataset_name: str,
session: int,
raw_data_path: Path,
output_path: Path,
folder_read_pattern: str,
file_read_pattern: List[str],
):
# give specific name to the parent run
mlflow.set_tag("mlflow.runName", f"{dataset_name}_session_{session}")

# Log session-specific parameters
mlflow.log_param("mlflow.Dataset", dataset_name)
mlflow.log_param("session_number", session)
mlflow.log_param("raw_data_path", str(raw_data_path))
mlflow.log_param("output_path", str(output_path))
mlflow.log_param("folder_read_pattern", folder_read_pattern)
mlflow.log_param("file_read_pattern", file_read_pattern)
mlflow.log_param("local_changes_hash", setuptools_scm.get_version())


def mlflow_log_run(
i: int,
dataset_name: str,
session: int,
metric_measured: float,
# image_path: Path,
):
# give specific name to the run
mlflow.set_tag("mlflow.runName", f"param_{i}")

# Log parameters and metrics specific to this run
mlflow.log_param("data_size", f"{i * 10}x100")
mlflow.log_param("run_iteration", i)
mlflow.log_param("run_id", mlflow.active_run().info.run_id)
mlflow.log_metric("stability", metric_measured)

# mlflow.log_artifact(
# # where I am storing the image according to Neuroblueprint
# # I think it gets copied in the mlflow data structure
# image_path,
# artifact_path=f"{dataset_name}/session_{session}/run_{i}",
# )
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ dynamic = ["version"]
dependencies = [
"datashuttle",
"setuptools_scm",
"mlflow",
"numpy",
"submitit",
]
Expand Down

0 comments on commit f7eb891

Please sign in to comment.