diff --git a/doc/workflows.rst b/doc/workflows.rst index a60576822..a614dd58c 100644 --- a/doc/workflows.rst +++ b/doc/workflows.rst @@ -563,9 +563,9 @@ be absolute because Nextflow does not resolve environment variables like ``$SCRATCH`` in paths. .. warning:: - Running the workflow on Sherlock sets a 2 hour limit on each job in the + Running the workflow on Sherlock sets a 1 hour limit on each job in the workflow, including analyses. Analysis scripts that take more than - 2 hours to run should be excluded from workflow configurations and manually + 1 hours to run should be excluded from workflow configurations and manually run using :py:mod:`runscripts.analysis` afterwards. .. _sherlock-interactive: @@ -665,7 +665,7 @@ is a list workflow behaviors enabled in our model to handle unexpected errors. are automatically retried up to a maximum of 3 tries. For the resource limit error code (140), Nextflow will automatically request more RAM and a higher runtime limit with each attempt: ``4 * {attempt num}`` - GB of memory and ``2 * {attempt num}`` hours of runtime. See the + GB of memory and ``1 * {attempt num}`` hours of runtime. See the ``sherlock`` profile in ``runscripts/nextflow/config.template``. - Additionally, some jobs may fail on Sherlock due to issues submitting them to the SLURM scheduler. Nextflow was configured to limit the rate diff --git a/ecoli/analysis/multigeneration/new_gene_counts.py b/ecoli/analysis/multigeneration/new_gene_counts.py index 40a8bb051..90e3ec32d 100644 --- a/ecoli/analysis/multigeneration/new_gene_counts.py +++ b/ecoli/analysis/multigeneration/new_gene_counts.py @@ -94,7 +94,7 @@ def plot( ) # mRNA counts - mrna_plot = new_gene_data.hvplot.line( # type: ignore[attr-defined] + mrna_plot = new_gene_data.hvplot.line( # type: ignore[attr-defined] x="Time (min)", y=new_gene_mRNA_ids, ylabel="mRNA Counts", @@ -102,7 +102,7 @@ def plot( ) # Protein counts - protein_plot = new_gene_data.hvplot.line( # type: ignore[attr-defined] + protein_plot = new_gene_data.hvplot.line( # type: ignore[attr-defined] x="Time (min)", y=new_gene_monomer_ids, ylabel="Protein Counts", diff --git a/ecoli/analysis/single/mass_fraction_summary.py b/ecoli/analysis/single/mass_fraction_summary.py index a85c135bb..eea59ec43 100644 --- a/ecoli/analysis/single/mass_fraction_summary.py +++ b/ecoli/analysis/single/mass_fraction_summary.py @@ -63,7 +63,7 @@ def plot( }, } mass_fold_change = pl.DataFrame(new_columns) - plot_namespace = mass_fold_change.hvplot # type: ignore[attr-defined] + plot_namespace = mass_fold_change.hvplot # type: ignore[attr-defined] # hvplot.output(backend='matplotlib') plotted_data = plot_namespace.line( x="Time (min)", diff --git a/ecoli/processes/chromosome_structure.py b/ecoli/processes/chromosome_structure.py index c388a9d75..eb49aefaf 100644 --- a/ecoli/processes/chromosome_structure.py +++ b/ecoli/processes/chromosome_structure.py @@ -50,8 +50,7 @@ "promoters": ("unique", "promoter"), "DnaA_boxes": ("unique", "DnaA_box"), "genes": ("unique", "gene"), - # TODO(vivarium): Only include if superhelical density flag is passed - # "chromosomal_segments": ("unique", "chromosomal_segment") + "chromosomal_segments": ("unique", "chromosomal_segment"), "global_time": ("global_time",), "timestep": ("timestep",), "next_update_time": ("next_update_time", "chromosome_structure"), @@ -185,6 +184,9 @@ def ports_schema(self): "DnaA_boxes": numpy_schema( "DnaA_boxes", emit=self.parameters["emit_unique"] ), + "chromosomal_segments": numpy_schema( + "chromosomal_segments", emit=self.parameters["emit_unique"] + ), "genes": numpy_schema("genes", emit=self.parameters["emit_unique"]), "global_time": {"_default": 0.0}, "timestep": {"_default": self.parameters["time_step"]}, @@ -195,21 +197,6 @@ def ports_schema(self): }, } - # TODO: Work on this functionality - if self.calculate_superhelical_densities: - ports["chromosomal_segments"] = { - "*": { - "boundary_molecule_indexes": { - "_default": np.empty((0, 2), dtype=np.int64) - }, - "boundary_coordinates": { - "_default": np.empty((0, 2), dtype=np.int64) - }, - "domain_index": {"_default": 0}, - "linking_number": {"_default": 0}, - } - } - return ports def update_condition(self, timestep, states): @@ -408,6 +395,7 @@ def get_removed_molecules_mask(domain_indexes, coordinates): "RNAs": {}, "active_ribosome": {}, "full_chromosomes": {}, + "chromosomal_segments": {}, "promoters": {}, "genes": {}, "DnaA_boxes": {}, @@ -420,6 +408,7 @@ def get_removed_molecules_mask(domain_indexes, coordinates): boundary_coordinates, segment_domain_indexes, linking_numbers, + chromosomal_segment_indexes, ) = attrs( states["chromosomal_segments"], [ @@ -427,6 +416,7 @@ def get_removed_molecules_mask(domain_indexes, coordinates): "boundary_coordinates", "domain_index", "linking_number", + "unique_index", ], ) @@ -557,18 +547,7 @@ def get_removed_molecules_mask(domain_indexes, coordinates): # Add new chromosomal segments n_segments = len(all_new_linking_numbers) - if "chromosomal_segments" in states and states["chromosomal_segments"]: - self.chromosome_segment_index = ( - int( - max( - [ - int(index) - for index in list(states["chromosomal_segments"].keys()) - ] - ) - ) - + 1 - ) + self.chromosome_segment_index = chromosomal_segment_indexes.max() + 1 update["chromosomal_segments"].update( { diff --git a/runscripts/jenkins/configs/ecoli-anaerobic.json b/runscripts/jenkins/configs/ecoli-anaerobic.json index 50b5de190..b6d804670 100644 --- a/runscripts/jenkins/configs/ecoli-anaerobic.json +++ b/runscripts/jenkins/configs/ecoli-anaerobic.json @@ -29,8 +29,5 @@ "runtime_image_name": "runtime-image", "build_runtime_image": true, "jenkins": true - }, - "parca_options": { - "cpus": 4 } } diff --git a/runscripts/jenkins/configs/ecoli-glucose-minimal.json b/runscripts/jenkins/configs/ecoli-glucose-minimal.json index f871d903b..d0eb9cf70 100644 --- a/runscripts/jenkins/configs/ecoli-glucose-minimal.json +++ b/runscripts/jenkins/configs/ecoli-glucose-minimal.json @@ -15,8 +15,5 @@ "runtime_image_name": "runtime-image", "build_runtime_image": true, "jenkins": true - }, - "parca_options": { - "cpus": 4 } } diff --git a/runscripts/jenkins/configs/ecoli-new-gene-gfp.json b/runscripts/jenkins/configs/ecoli-new-gene-gfp.json index 45e73da66..512fb2331 100644 --- a/runscripts/jenkins/configs/ecoli-new-gene-gfp.json +++ b/runscripts/jenkins/configs/ecoli-new-gene-gfp.json @@ -9,8 +9,7 @@ "out_dir": "/scratch/groups/mcovert/vecoli" }, "parca_options": { - "new_genes": "gfp", - "cpus": 4 + "new_genes": "gfp" }, "analysis_options": { "single": {"mass_fraction_summary": {}} @@ -39,7 +38,6 @@ }, "sherlock": { "runtime_image_name": "runtime-image", - "build_runtime_image": true, "jenkins": true } } diff --git a/runscripts/jenkins/configs/ecoli-no-growth-rate-control.json b/runscripts/jenkins/configs/ecoli-no-growth-rate-control.json index eac43d634..cbb767432 100644 --- a/runscripts/jenkins/configs/ecoli-no-growth-rate-control.json +++ b/runscripts/jenkins/configs/ecoli-no-growth-rate-control.json @@ -22,8 +22,5 @@ "runtime_image_name": "runtime-image", "build_runtime_image": true, "jenkins": true - }, - "parca_options": { - "cpus": 4 } } diff --git a/runscripts/jenkins/configs/ecoli-no-operons.json b/runscripts/jenkins/configs/ecoli-no-operons.json index a38b2758c..dfbda0562 100644 --- a/runscripts/jenkins/configs/ecoli-no-operons.json +++ b/runscripts/jenkins/configs/ecoli-no-operons.json @@ -9,15 +9,13 @@ "out_dir": "/scratch/groups/mcovert/vecoli" }, "parca_options": { - "operons": false, - "cpus": 4 + "operons": false }, "analysis_options": { "single": {"mass_fraction_summary": {}} }, "sherlock": { "runtime_image_name": "runtime-image", - "build_runtime_image": true, "jenkins": true } } diff --git a/runscripts/jenkins/configs/ecoli-superhelical-density.json b/runscripts/jenkins/configs/ecoli-superhelical-density.json index 1d099fc41..ad1be06dc 100644 --- a/runscripts/jenkins/configs/ecoli-superhelical-density.json +++ b/runscripts/jenkins/configs/ecoli-superhelical-density.json @@ -14,10 +14,6 @@ }, "sherlock": { "runtime_image_name": "runtime-image", - "build_runtime_image": true, "jenkins": true - }, - "parca_options": { - "cpus": 4 } } diff --git a/runscripts/jenkins/configs/ecoli-with-aa.json b/runscripts/jenkins/configs/ecoli-with-aa.json index 5d3a12d2f..de7abd429 100644 --- a/runscripts/jenkins/configs/ecoli-with-aa.json +++ b/runscripts/jenkins/configs/ecoli-with-aa.json @@ -19,8 +19,5 @@ "runtime_image_name": "runtime-image", "build_runtime_image": true, "jenkins": true - }, - "parca_options": { - "cpus": 4 } } diff --git a/runscripts/nextflow/analysis.nf b/runscripts/nextflow/analysis.nf index 53e10a4c7..6c5735d7b 100644 --- a/runscripts/nextflow/analysis.nf +++ b/runscripts/nextflow/analysis.nf @@ -3,6 +3,8 @@ process analysisSingle { tag "variant=${variant}/lineage_seed=${lineage_seed}/generation=${generation}/agent_id=${agent_id}" + label "short" + input: path config path kb @@ -51,6 +53,8 @@ process analysisMultiDaughter { tag "variant=${variant}/lineage_seed=${lineage_seed}/generation=${generation}" + label "short" + input: path config path kb @@ -97,6 +101,8 @@ process analysisMultiGeneration { tag "variant=${variant}/lineage_seed=${lineage_seed}" + label "short" + input: path config path kb @@ -141,6 +147,8 @@ process analysisMultiSeed { tag "variant=${variant}" + label "short" + input: path config path kb @@ -181,6 +189,8 @@ process analysisMultiSeed { process analysisMultiVariant { publishDir "${params.publishDir}/${params.experimentId}/analyses", mode: "move" + label "short" + input: path config path kb diff --git a/runscripts/nextflow/config.template b/runscripts/nextflow/config.template index cdbd2f426..31417a82f 100644 --- a/runscripts/nextflow/config.template +++ b/runscripts/nextflow/config.template @@ -2,6 +2,9 @@ params { experimentId = 'EXPERIMENT_ID' config = 'CONFIG_FILE' + parca_cpus = PARCA_CPUS + publishDir = 'PUBLISH_DIR' + container_image = 'IMAGE_NAME' } trace { @@ -16,7 +19,15 @@ profiles { // Using single core is slightly slower but much cheaper process.cpus = 1 process.executor = 'google-batch' - process.container = 'IMAGE_NAME' + process.container = params.container_image + // Necessary otherwise symlinks to other files in bucket can break + process.containerOptions = '--volume /mnt/disks/BUCKET:/mnt/disks/BUCKET' + process { + withLabel: parca { + cpus = params.parca_cpus + memory = params.parca_cpus * 2.GB + } + } process.errorStrategy = { // Codes: 137 (out-of-memory), 50001 - 50006 (Google Batch task fail: // https://cloud.google.com/batch/docs/troubleshooting#reserved-exit-codes) @@ -43,7 +54,6 @@ profiles { google.batch.subnetwork = "regions/${google.location}/subnetworks/default" docker.enabled = true params.projectRoot = '/vEcoli' - params.publishDir = "PUBLISH_DIR" process.maxRetries = 1 // Check Google Cloud latest spot pricing / performance process.machineType = { @@ -74,19 +84,31 @@ profiles { // queue times and is less damaging to future job priority process.cpus = 1 process.executor = 'slurm' - process.queue = 'owners' - process.container = 'IMAGE_NAME' + process.queue = 'mcovert,owners' + process.container = params.container_image apptainer.enabled = true + process { + // Run analyses, create variants, and run ParCa locally with + // the job used to launch workflow to avoid long queue times + withLabel: short { + executor = 'local' + }, + // ParCa 4 CPUs in ~15 min, 1 CPU in ~30 min, not too bad + withLabel: parca { + executor = 'local' + cpus = 1 + memory = 2.GB + } + } process.time = { if ( task.exitStatus == 140 ) { - 2.h * task.attempt + 1.h * task.attempt } else { - 2.h + 1.h } } process.maxRetries = 3 params.projectRoot = "${launchDir}" - params.publishDir = "PUBLISH_DIR" // Avoid getting queue status too frequently (can cause job status mixups) executor.queueStatInterval = '2 min' // Check for terminated jobs and submit new ones fairly frequently @@ -107,8 +129,13 @@ profiles { standard { process.executor = 'local' params.projectRoot = "${launchDir}" - params.publishDir = "PUBLISH_DIR" workflow.failOnIgnore = true process.errorStrategy = 'ignore' + process { + withLabel: parca { + cpus = params.parca_cpus + memory = params.parca_cpus * 2.GB + } + } } } diff --git a/runscripts/nextflow/template.nf b/runscripts/nextflow/template.nf index 825d17d85..7118ba0d0 100644 --- a/runscripts/nextflow/template.nf +++ b/runscripts/nextflow/template.nf @@ -2,7 +2,7 @@ process runParca { // Run ParCa using parca_options from config JSON publishDir "${params.publishDir}/${params.experimentId}/parca", mode: "copy" - cpus PARCA_CPUS + label "parca" input: path config @@ -28,6 +28,8 @@ process runParca { process analysisParca { publishDir "${params.publishDir}/${params.experimentId}/parca/analysis", mode: "move" + label "short" + input: path config path kb @@ -55,6 +57,8 @@ process createVariants { // Parse variants in config JSON to generate variants publishDir "${params.publishDir}/${params.experimentId}/variant_sim_data", mode: "copy" + label "short" + input: path config path kb diff --git a/runscripts/workflow.py b/runscripts/workflow.py index 0cf446ff4..6ea04c7fe 100644 --- a/runscripts/workflow.py +++ b/runscripts/workflow.py @@ -1,13 +1,11 @@ import argparse import json import os -import time import shutil import subprocess import warnings from datetime import datetime from urllib import parse -from typing import Optional from pyarrow import fs @@ -78,104 +76,6 @@ def merge_dicts(a, b): a[key] = value -def submit_job(cmd: str, sbatch_options: Optional[list] = None) -> int: - """ - Submits a job to SLURM using sbatch and waits for it to complete. - - Args: - cmd: Command to run in batch job. - sbatch_options: Additional sbatch options as a list of strings. - - Returns: - Job ID of the submitted job. - """ - sbatch_command = ["sbatch"] - if sbatch_options: - sbatch_command.extend(sbatch_options) - sbatch_command.extend(["--wrap", cmd]) - - try: - result = subprocess.run( - sbatch_command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - text=True, - ) - # Extract job ID from sbatch output - output = result.stdout.strip() - # Assuming job ID is the last word in the output - job_id = int(output.split()[-1]) - print(f"Job submitted with ID: {job_id}") - return job_id - except subprocess.CalledProcessError as e: - print(f"Error submitting job: {e.stderr.strip()}") - raise - - -def wait_for_job(job_id: int, poll_interval: int = 10): - """ - Waits for a SLURM job to finish. - - Args: - job_id: SLURM job ID. - poll_interval: Time in seconds between job status checks. - """ - job_id = str(job_id) - while True: - try: - # Check job status with squeue - result = subprocess.run( - ["squeue", "--job", job_id], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - if job_id not in result.stdout: - break - except Exception as e: - print(f"Error checking job status: {e}") - raise - time.sleep(poll_interval) - - -def check_job_status(job_id: int) -> bool: - """ - Checks the exit status of a SLURM job using sacct. - - Args: - job_id: SLURM job ID. - - Returns: - True if the job succeeded (exit code 0), False otherwise. - """ - try: - # Query job status with sacct - result = subprocess.run( - ["sacct", "-j", str(job_id), "--format=JobID,State,ExitCode", "--noheader"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - output = result.stdout.strip() - - for line in output.splitlines(): - fields = line.split() - # Match the job ID - if str(job_id) in fields[0]: - state = fields[1] - # Extract the numeric exit code - exit_code = fields[2].split(":")[0] - print(f"Job {job_id} - State: {state}, Exit Code: {exit_code}") - return state == "COMPLETED" and exit_code == "0" - - print(f"Job {job_id} status not found in sacct output.") - return False - except Exception as e: - print(f"Error checking job status: {e}") - raise - - def generate_colony(seeds: int): """ Create strings to import and compose Nextflow processes for colony sims. @@ -358,25 +258,8 @@ def build_runtime_image(image_name, apptainer=False): ) cmd = [build_script, "-r", image_name] if apptainer: - print("Submitting job to build runtime image.") cmd.append("-a") - # On Sherlock, submit job to build runtime image - job_id = submit_job( - " ".join(cmd), - sbatch_options=[ - "--time=01:00:00", - "--mem=4G", - "--cpus-per-task=1", - "--partition=mcovert", - ], - ) - wait_for_job(job_id, 30) - if check_job_status(job_id): - print("Done building runtime image.") - else: - raise RuntimeError("Job to build runtime image failed.") - else: - subprocess.run([build_script, "-r", image_name], check=True) + subprocess.run(cmd, check=True) def build_wcm_image(image_name, runtime_image_name): @@ -461,11 +344,13 @@ def main(): f" != {parse.quote_plus(experiment_id)}" ) # Resolve output directory + out_bucket = "" if "out_uri" not in config["emitter_arg"]: out_uri = os.path.abspath(config["emitter_arg"]["out_dir"]) config["emitter_arg"]["out_dir"] = out_uri else: out_uri = config["emitter_arg"]["out_uri"] + out_bucket = out_uri.split("://")[1].split("/")[0] # Resolve sim_data_path if provided if config["sim_data_path"] is not None: config["sim_data_path"] = os.path.abspath(config["sim_data_path"]) @@ -478,6 +363,7 @@ def main(): filesystem.create_dir(outdir) temp_config_path = f"{local_outdir}/workflow_config.json" final_config_path = os.path.join(outdir, "workflow_config.json") + final_config_uri = os.path.join(out_uri, "workflow_config.json") with open(temp_config_path, "w") as f: json.dump(config, f) if not args.resume: @@ -488,10 +374,14 @@ def main(): nf_config = f.readlines() nf_config = "".join(nf_config) nf_config = nf_config.replace("EXPERIMENT_ID", experiment_id) - nf_config = nf_config.replace("CONFIG_FILE", temp_config_path) + nf_config = nf_config.replace("CONFIG_FILE", final_config_uri) + nf_config = nf_config.replace("BUCKET", out_bucket) nf_config = nf_config.replace( "PUBLISH_DIR", os.path.dirname(os.path.dirname(out_uri)) ) + nf_config = nf_config.replace( + "PARCA_CPUS", str(config["parca_options"]["cpus"]) + ) # By default, assume running on local device nf_profile = "standard" @@ -568,9 +458,6 @@ def main(): nf_template = nf_template.replace("RUN_PARCA", run_parca) nf_template = nf_template.replace("IMPORTS", sim_imports) nf_template = nf_template.replace("WORKFLOW", sim_workflow) - nf_template = nf_template.replace( - "PARCA_CPUS", str(config["parca_options"]["cpus"]) - ) local_workflow = os.path.join(local_outdir, "main.nf") with open(local_workflow, "w") as f: f.writelines(nf_template)