Skip to content

Commit

Permalink
WIP: nested runs, 🐛 on artifacts saving
Browse files Browse the repository at this point in the history
  • Loading branch information
lauraporta committed Nov 12, 2024
1 parent 3893ef7 commit b580fbc
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 48 deletions.
3 changes: 1 addition & 2 deletions calcium_imaging_automation/core/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ def get_dataset_path(self, dataset_name: str) -> Path:
def save_image(
self,
image: np.ndarray,
run_id: int,
dataset_name: str,
session_number: int,
filename: str,
) -> Path:
path = self.get_dataset_path(dataset_name)
image = Image.fromarray(image).convert("L")
image_path = (
path / f"ses-{session_number}" / f"{filename}-{run_id}.png"
path / f"ses-{session_number}" / f"{filename}.png"
)
image.save(
image_path,
Expand Down
101 changes: 55 additions & 46 deletions examples/example_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from pathlib import Path
from typing import List
import setuptools_scm

import mlflow
import numpy as np
Expand All @@ -16,6 +17,7 @@ def main(
output_path: Path,
folder_read_pattern: str,
file_read_pattern: List[str],
experiment_name: str = "pipeline_test",
):
# --- Setup experiment-wide logging to file ---
(output_path / "logs").mkdir(exist_ok=True)
Expand All @@ -30,9 +32,9 @@ def main(
)

# --- Setup MLflow tracking ---
mlflow_tracking_dir = output_path / "derivatives" / "mlflow"
mlflow_tracking_dir = output_path / "mlflow"
mlflow.set_tracking_uri(str(mlflow_tracking_dir))
mlflow.set_experiment("calcium_imaging_pipeline")
mlflow.set_experiment(experiment_name)

# --- Read folders and files ---
reader = ReadAquiredData(
Expand All @@ -53,57 +55,61 @@ def main(
for dataset in reader.datasets_paths:
dataset_name = dataset.stem
for session in range(0, number_of_tiffs):
# Start a new MLflow run for each dataset-session
with mlflow.start_run():
# Generate mock data
data = np.random.rand(100, 100)

# Start a new MLflow experiment for each dataset-session
with mlflow.start_run() as parent_run:
# Log session-specific parameters
mlflow.log_param("dataset_name", dataset_name)
mlflow.log_param("session_number", session)
mlflow.log_param("raw_data_path", str(raw_data_path))
mlflow.log_param("output_path", str(output_path))
mlflow.log_param("folder_read_pattern", folder_read_pattern)
mlflow.log_param("file_read_pattern", file_read_pattern)
mlflow.log_param("local_changes_hash", setuptools_scm.get_version())

logging.info(
f"Processing dataset {dataset_name} session {session}..."
)

# Mock processing
data = np.random.rand(100, 100)
metric_measured = np.random.rand()

# Log metric with MLflow
mlflow.log_metric("metric_measured", metric_measured)

# Save image in session folder
image_path = writer.save_image(
image=data,
run_id=session,
dataset_name=dataset_name,
session_number=session,
filename="image",
f"Starting MLflow experiment for dataset {dataset_name} session {session}..."
)

# Log the image as an artifact in MLflow
mlflow.log_artifact(
image_path,
artifact_path=f"{dataset_name}/session_{session}",
)
# Mock processing for different runs within the experiment
for i in range(1, 11): # 10 runs with varying parameters
# Start a child run under the main dataset-session run
with mlflow.start_run(nested=True):

# Mock metric calculation
metric_measured = np.mean(data) * i

# Log parameters and metrics specific to this run
mlflow.log_param("data_size", f"{i * 10}x100")
mlflow.log_param("run_iteration", i)
mlflow.log_param("run_id", mlflow.active_run().info.run_id)
mlflow.log_metric("metric_measured", metric_measured)

# Log the generated data as an artifact if desired
# Here, simulate an image or data file save path
image_path = writer.save_image(
image=data,
dataset_name=dataset_name,
session_number=session,
filename=f"image_run_{i}",
)

mlflow.log_artifact(
image_path,
artifact_path=f"{dataset_name}/session_{session}/run_{i}",
)

logging.info(
f"Completed MLflow run iteration {i} for dataset {dataset_name} session {session}"
)

logging.info(
f"MLflow run_id: {mlflow.active_run().info.run_id}"
)
logging.info(
"MLflow experiment_id: "
+ f"{mlflow.active_run().info.experiment_id}"
)
logging.info(
f"MLflow tracking_uri: {mlflow.get_tracking_uri()}"
)
logging.info(
f"Completed MLflow run for dataset {dataset_name} "
+ f"session {session}"
f"Completed MLflow experiment for dataset {dataset_name} session {session}"
)


logging.info("Pipeline finished.")


Expand All @@ -130,16 +136,19 @@ def main(
help="List of glob patterns for reading files.",
action="append",
)
parser.add_argument(
"--experiment_name",
type=str,
help="Name of the experiment.",
default="pipeline_test",
)

args = parser.parse_args()
raw_data_path = args.raw_data_path
output_path = args.output_path
folder_read_pattern = args.folder_read_pattern
file_read_pattern = args.file_read_pattern

main(
raw_data_path,
output_path,
folder_read_pattern,
file_read_pattern,
args.raw_data_path,
args.output_path,
args.folder_read_pattern,
args.file_read_pattern,
args.experiment_name,
)

0 comments on commit b580fbc

Please sign in to comment.