From 391969d36e330d3ea1df3a13cf5a464b97de9c94 Mon Sep 17 00:00:00 2001 From: Torben Schiz <49746900+tjwsch@users.noreply.github.com> Date: Thu, 25 Jul 2024 09:18:07 +0200 Subject: [PATCH] Snapshot computation (#101) * Initial commit for snapshot commputation * Snapshot: change import of module * Adapt SnapshotComputation further * Adapt Snapshot test * Snapshot: Formatting * Prepare Snapshot class further * First working version of snapshot computation * Add snapshot test, improve snapshot and restructure * Add snapshot test, improve snapshot and restructure part 2 * Improve logging, comments and minor structural improvements * Snapshot: extend setup and github actions * Move Snapshot into Micro Manager and update execution * Add post-processing script option and restructure snapshot example * Add post-processing to snapshot config file * Make snapshot executable and rename snapshot files * Restructure snapshot computation * Restructure config * Update snapshot computation still without chunking * Remove unintentional print statement * Update comments in tests * Read only the necessary input on each process * Clarify text and names * Add HDF5 dependency * Make h5py optional dependency * Fix h5py optional * Initial commit for snapshot commputation * Snapshot: change import of module * Adapt SnapshotComputation further * Snapshot: Formatting * Prepare Snapshot class further * First working version of snapshot computation * Add snapshot test, improve snapshot and restructure * Add snapshot test, improve snapshot and restructure part 2 * Improve logging, comments and minor structural improvements * Move Snapshot into Micro Manager and update execution * Add post-processing script option and restructure snapshot example * Make snapshot executable and rename snapshot files * Restructure snapshot computation * Restructure config * Update snapshot computation still without chunking * Remove unintentional print statement * Update comments in tests * Make h5py optional dependency * Fix h5py optional * Change snapshot case in main * Move dt to simulation parameters in Snapshot and snapshot specific dt * Fix init * Fix issue of using member variable in Config class * Fix tests * Apply suggestions from code review Co-authored-by: Ishaan Desai * Apply review suggestions * Remove output dir from gitignore * Extend CHANGELOG.md * Update CHANGELOG.md --------- Co-authored-by: Ishaan Desai --- .github/workflows/run-unit-tests.yml | 44 +++- CHANGELOG.md | 1 + examples/clean-example.sh | 2 + examples/parameter.hdf5 | Bin 0 -> 8432 bytes examples/snapshot-config.json | 14 ++ examples/snapshot_postprocessing.py | 24 ++ micro_manager/__init__.py | 23 +- micro_manager/config.py | 152 ++++++++++--- micro_manager/micro_manager.py | 10 +- micro_manager/micro_manager_base.py | 8 + micro_manager/snapshot/__init__.py | 0 micro_manager/snapshot/dataset.py | 230 +++++++++++++++++++ micro_manager/snapshot/snapshot.py | 273 +++++++++++++++++++++++ pyproject.toml | 3 +- tests/unit/.gitignore | 1 + tests/unit/hdf_files/test_parameter.hdf5 | Bin 0 -> 8432 bytes tests/unit/snapshot-config.json | 14 ++ tests/unit/test_hdf5_functionality.py | 171 ++++++++++++++ tests/unit/test_micro_manager.py | 2 +- tests/unit/test_parameter.hdf5 | Bin 0 -> 8432 bytes tests/unit/test_snapshot_computation.py | 157 +++++++++++++ 21 files changed, 1070 insertions(+), 59 deletions(-) create mode 100644 examples/parameter.hdf5 create mode 100644 examples/snapshot-config.json create mode 100644 examples/snapshot_postprocessing.py create mode 100644 micro_manager/snapshot/__init__.py create mode 100644 micro_manager/snapshot/dataset.py create mode 100644 micro_manager/snapshot/snapshot.py create mode 100644 tests/unit/hdf_files/test_parameter.hdf5 create mode 100644 tests/unit/snapshot-config.json create mode 100644 tests/unit/test_hdf5_functionality.py create mode 100644 tests/unit/test_parameter.hdf5 create mode 100644 tests/unit/test_snapshot_computation.py diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 23f5a978..f582228a 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -22,20 +22,42 @@ jobs: apt-get -qq install python3-dev python3-pip git python-is-python3 pkg-config pip3 install --upgrade pip - - name: Install Micro Manager and uninstall pyprecice - working-directory: micro-manager + - name: Install Micro Manager and run micro_manager unit test + working-directory: micro-manager/ + run: | + pip3 install --user . + pip3 uninstall -y pyprecice + cd tests/unit + python3 -m unittest test_micro_manager.py + + - name: Install Micro Manager and run interpolation unit test + working-directory: micro-manager/ run: | pip3 install --user .[sklearn] pip3 uninstall -y pyprecice + cd tests/unit + python3 -m unittest test_interpolation.py - - name: Run micro_manager unit test - working-directory: micro-manager/tests/unit - run: python3 -m unittest test_micro_manager.py + - name: Install Micro Manager and run micro simulation crash unit test + working-directory: micro-manager/ + run: | + pip3 install --user . + pip3 uninstall -y pyprecice + cd tests/unit + python3 -m unittest test_micro_simulation_crash_handling.py - - name: Run interpolation unit test - working-directory: micro-manager/tests/unit - run: python3 -m unittest test_interpolation.py + - name: Install Micro Manager and run HDF5 read and write unit tests + working-directory: micro-manager/ + run: | + pip3 install --user .[snapshot] + pip3 uninstall -y pyprecice + cd tests/unit + python3 -m unittest test_hdf5_functionality.py - - name: Run micro simulation crash unit test - working-directory: micro-manager/tests/unit - run: python3 -m unittest test_micro_simulation_crash_handling.py + - name: Install Micro Manager and run snapshot_computation unit tests + working-directory: micro-manager/ + run: | + pip3 install --user .[snapshot] + pip3 uninstall -y pyprecice + cd tests/unit + python3 -m unittest test_snapshot_computation.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 41563eac..3ed84273 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## latest +- Add `MicroManagerSnapshot` enabling snapshot computation and storage of microdata in HDF5 format https://github.com/precice/micro-manager/pull/101 - Make `sklearn` an optional dependency - Move the config variable `micro_dt` from the coupling parameters section to the simulation parameters section https://github.com/precice/micro-manager/pull/114 - Set time step of micro simulation in the configuration, and use it in the coupling https://github.com/precice/micro-manager/pull/112 diff --git a/examples/clean-example.sh b/examples/clean-example.sh index e4508edb..d1f7cb98 100755 --- a/examples/clean-example.sh +++ b/examples/clean-example.sh @@ -4,3 +4,5 @@ rm -rfv precice-profiling/ rm -fv *-events.json rm -fv cpp-dummy/micro-manager.log rm -fv cpp-dummy/micro_dummy.cpython-310-x86_64-linux-gnu.so +rm -r -fv snapshot-example/output +rm -fv snapshot-example/*.log diff --git a/examples/parameter.hdf5 b/examples/parameter.hdf5 new file mode 100644 index 0000000000000000000000000000000000000000..6f1576e2c913eefd1e3c06ff293265c1137812c6 GIT binary patch literal 8432 zcmeI0y>1gh6oqHk2`qtt!+)q;x{2hH5~LWp0t+Z8u>w*GP@)haN=noDFDY9d$3_;)zYreU_@2$LzSl4N{jhQ}KWe9i-d1cC3?$vC3Jm&Bhvf~wvAce3 zI=6LZ-SBaAbary^`Tf!P$=SiX(fP7>XWe#~Kr{>6)%uNxa)ov)N0!Gk z_s!ojW!=(mt=4<#Y_Aq>T)R&y=k}^sj_0j7b78-bR^IxuT{)`pavEFmU*qUsKE8VX z;xG$&=-7449$bDwcJZM@K`ys%3@!J&EbKS-JN8Mh5VKp6;ESuU>0XFWekSz^OYwPM zQ`;U3{9oIS>v?GO#Aj<6T-dJ^yXp8&#b;l4!#xq7-B!al6Q6Y(zJ>U1Hym+bDZV?7 zuWb`hzq^jFC%$`*Zz#U|j_*`_-gY+;AJ?hV&dkTSK0my5i$7_qK&n8hK&rrIR)D z3s1>co|3CPB~N)uj`EcJLPL9HKA)FiqFYLk0`Q#{gS2#HeJ`hfh zf*%VfN5RL!$x-mBaIO%u9}fcZHLq-~-|0DEP5(auj?loE!z8 z3b)$h?j)D(_lcV6r!pTwav2bz}d@7tA1)m$I&wj!E QFrGA3AXOk$;D4*YPaZXvtN;K2 literal 0 HcmV?d00001 diff --git a/examples/snapshot-config.json b/examples/snapshot-config.json new file mode 100644 index 00000000..6964122b --- /dev/null +++ b/examples/snapshot-config.json @@ -0,0 +1,14 @@ +{ + "micro_file_name": "python-dummy/micro_dummy", + "coupling_params": { + "parameter_file_name": "parameter.hdf5", + "read_data_names": {"macro-scalar-data": "scalar", "macro-vector-data": "vector"}, + "write_data_names": {"micro-scalar-data": "scalar", "micro-vector-data": "vector"} + }, + "simulation_params": { + "micro_dt": 1.0 + }, + "snapshot_params": { + "post_processing_file_name": "snapshot_postprocessing" + } +} diff --git a/examples/snapshot_postprocessing.py b/examples/snapshot_postprocessing.py new file mode 100644 index 00000000..248cb80f --- /dev/null +++ b/examples/snapshot_postprocessing.py @@ -0,0 +1,24 @@ +""" +Post-processing +In this script a post-processing step is defined. +A script like this can be used to post-process the simulation output before writing it to a file, +if this is not done in the micro simulation itself. +""" + + +class Postprocessing: + def postprocessing(sim_output): + """Post-process the simulation output. + + Parameters + ---------- + sim_output : dict + Raw simulation output. + + Returns + ------- + sim_output : dict + Post-processed simulation output. + """ + sim_output["micro-scalar-data"] = sim_output["micro-scalar-data"] + 20 + return sim_output diff --git a/micro_manager/__init__.py b/micro_manager/__init__.py index bb61d601..255f162b 100644 --- a/micro_manager/__init__.py +++ b/micro_manager/__init__.py @@ -4,19 +4,38 @@ from .config import Config from .micro_manager import MicroManagerCoupling +try: + from .snapshot.snapshot import MicroManagerSnapshot + + is_snapshot_possible = True +except ImportError: + is_snapshot_possible = False + + +def main(): -def main() -> None: parser = argparse.ArgumentParser(description=".") parser.add_argument( "config_file", type=str, help="Path to the JSON config file of the manager." ) + parser.add_argument( + "--snapshot", action="store_true", help="compute offline snapshot database" + ) args = parser.parse_args() config_file_path = args.config_file if not os.path.isabs(config_file_path): config_file_path = os.getcwd() + "/" + config_file_path - manager = MicroManagerCoupling(config_file_path) + if not args.snapshot: + manager = MicroManagerCoupling(config_file_path) + else: + if is_snapshot_possible: + manager = MicroManagerSnapshot(config_file_path) + else: + raise ImportError( + "The Micro Manager snapshot computation requires the h5py package." + ) manager.initialize() diff --git a/micro_manager/config.py b/micro_manager/config.py index 561f516e..3cd4a3c3 100644 --- a/micro_manager/config.py +++ b/micro_manager/config.py @@ -50,37 +50,38 @@ def __init__(self, logger, config_filename): self._adaptivity_every_implicit_iteration = False self._adaptivity_similarity_measure = "L1" + # Snapshot information + self._parameter_file_name = None + self._postprocessing_file_name = None + + self._output_micro_sim_time = False + self.read_json(config_filename) def read_json(self, config_filename): """ - Reads JSON adapter configuration file and saves the data to the respective instance attributes. + Reads JSON configuration file. Parameters ---------- config_filename : string Name of the JSON configuration file """ - folder = os.path.dirname(os.path.join(os.getcwd(), config_filename)) - path = os.path.join(folder, os.path.basename(config_filename)) + self._folder = os.path.dirname(os.path.join(os.getcwd(), config_filename)) + path = os.path.join(self._folder, os.path.basename(config_filename)) with open(path, "r") as read_file: - data = json.load(read_file) + self._data = json.load(read_file) # convert paths to python-importable paths self._micro_file_name = ( - data["micro_file_name"] + self._data["micro_file_name"] .replace("/", ".") .replace("\\", ".") .replace(".py", "") ) - self._config_file_name = os.path.join( - folder, data["coupling_params"]["config_file_name"] - ) - self._macro_mesh_name = data["coupling_params"]["macro_mesh_name"] - try: - self._write_data_names = data["coupling_params"]["write_data_names"] + self._write_data_names = self._data["coupling_params"]["write_data_names"] assert isinstance( self._write_data_names, dict ), "Write data entry is not a dictionary" @@ -99,7 +100,7 @@ def read_json(self, config_filename): ) try: - self._read_data_names = data["coupling_params"]["read_data_names"] + self._read_data_names = self._data["coupling_params"]["read_data_names"] assert isinstance( self._read_data_names, dict ), "Read data entry is not a dictionary" @@ -117,27 +118,48 @@ def read_json(self, config_filename): "No read data names provided. Micro manager will only write data to preCICE." ) - self._micro_dt = data["simulation_params"]["micro_dt"] + self._micro_dt = self._data["simulation_params"]["micro_dt"] - self._macro_domain_bounds = data["simulation_params"]["macro_domain_bounds"] + try: + if self._data["diagnostics"]["output_micro_sim_solve_time"]: + self._output_micro_sim_time = True + self._write_data_names["micro_sim_time"] = False + except BaseException: + self._logger.info( + "Micro manager will not output time required to solve each micro simulation in each time step." + ) + + def read_json_micro_manager(self): + """ + Reads Micro Manager relevant information from JSON configuration file + and saves the data to the respective instance attributes. + """ + self._config_file_name = os.path.join( + self._folder, self._data["coupling_params"]["config_file_name"] + ) + self._macro_mesh_name = self._data["coupling_params"]["macro_mesh_name"] + + self._macro_domain_bounds = self._data["simulation_params"][ + "macro_domain_bounds" + ] try: - self._ranks_per_axis = data["simulation_params"]["decomposition"] + self._ranks_per_axis = self._data["simulation_params"]["decomposition"] except BaseException: self._logger.info( "Domain decomposition is not specified, so the Micro Manager will expect to be run in serial." ) try: - if data["simulation_params"]["adaptivity"] == "True": + if self._data["simulation_params"]["adaptivity"] == "True": self._adaptivity = True - if not data["simulation_params"]["adaptivity_settings"]: + if not self._data["simulation_params"]["adaptivity_settings"]: raise Exception( "Adaptivity is turned on but no adaptivity settings are provided." ) else: self._adaptivity = False - if data["simulation_params"]["adaptivity_settings"]: + if self._data["simulation_params"]["adaptivity_settings"]: raise Exception( "Adaptivity settings are provided but adaptivity is turned off." ) @@ -147,15 +169,21 @@ def read_json(self, config_filename): ) if self._adaptivity: - if data["simulation_params"]["adaptivity_settings"]["type"] == "local": + if ( + self._data["simulation_params"]["adaptivity_settings"]["type"] + == "local" + ): self._adaptivity_type = "local" - elif data["simulation_params"]["adaptivity_settings"]["type"] == "global": + elif ( + self._data["simulation_params"]["adaptivity_settings"]["type"] + == "global" + ): self._adaptivity_type = "global" else: raise Exception("Adaptivity type can be either local or global.") exchange_data = {**self._read_data_names, **self._write_data_names} - for dname in data["simulation_params"]["adaptivity_settings"]["data"]: + for dname in self._data["simulation_params"]["adaptivity_settings"]["data"]: self._data_for_adaptivity[dname] = exchange_data[dname] if self._data_for_adaptivity.keys() == self._write_data_names.keys(): @@ -165,18 +193,21 @@ def read_json(self, config_filename): " please include macro simulation data as well." ) - self._adaptivity_history_param = data["simulation_params"][ + self._adaptivity_history_param = self._data["simulation_params"][ "adaptivity_settings" ]["history_param"] - self._adaptivity_coarsening_constant = data["simulation_params"][ + self._adaptivity_coarsening_constant = self._data["simulation_params"][ "adaptivity_settings" ]["coarsening_constant"] - self._adaptivity_refining_constant = data["simulation_params"][ + self._adaptivity_refining_constant = self._data["simulation_params"][ "adaptivity_settings" ]["refining_constant"] - if "similarity_measure" in data["simulation_params"]["adaptivity_settings"]: - self._adaptivity_similarity_measure = data["simulation_params"][ + if ( + "similarity_measure" + in self._data["simulation_params"]["adaptivity_settings"] + ): + self._adaptivity_similarity_measure = self._data["simulation_params"][ "adaptivity_settings" ]["similarity_measure"] else: @@ -185,7 +216,7 @@ def read_json(self, config_filename): ) self._adaptivity_similarity_measure = "L1" - adaptivity_every_implicit_iteration = data["simulation_params"][ + adaptivity_every_implicit_iteration = self._data["simulation_params"][ "adaptivity_settings" ]["every_implicit_iteration"] @@ -202,12 +233,12 @@ def read_json(self, config_filename): self._write_data_names["active_state"] = False self._write_data_names["active_steps"] = False - if "interpolate_crash" in data["simulation_params"]: - if data["simulation_params"]["interpolate_crash"] == "True": + if "interpolate_crash" in self._data["simulation_params"]: + if self._data["simulation_params"]["interpolate_crash"] == "True": self._interpolate_crash = True try: - diagnostics_data_names = data["diagnostics"]["data_from_micro_sims"] + diagnostics_data_names = self._data["diagnostics"]["data_from_micro_sims"] assert isinstance( diagnostics_data_names, dict ), "Diagnostics data is not a dictionary" @@ -226,20 +257,48 @@ def read_json(self, config_filename): ) try: - self._micro_output_n = data["diagnostics"]["micro_output_n"] + self._micro_output_n = self._data["diagnostics"]["micro_output_n"] except BaseException: self._logger.info( "Output interval of micro simulations not specified, if output is available then it will be called " "in every time window." ) + def read_json_snapshot(self): + self._parameter_file_name = os.path.join( + self._folder, self._data["coupling_params"]["parameter_file_name"] + ) + try: - if data["diagnostics"]["output_micro_sim_solve_time"]: - self._output_micro_sim_time = True - self._write_data_names["micro_sim_time"] = False + self._postprocessing_file_name = ( + self._data["snapshot_params"]["post_processing_file_name"] + .replace("/", ".") + .replace("\\", ".") + .replace(".py", "") + ) except BaseException: self._logger.info( - "Micro manager will not output time required to solve each micro simulation in each time step." + "No post-processing file name provided. Snapshot computation will not perform any post-processing." + ) + self._postprocessing_file_name = None + + try: + diagnostics_data_names = self._data["diagnostics"]["data_from_micro_sims"] + assert isinstance( + diagnostics_data_names, dict + ), "Diagnostics data is not a dictionary" + for key, value in diagnostics_data_names.items(): + if value == "scalar": + self._write_data_names[key] = False + elif value == "vector": + self._write_data_names[key] = True + else: + raise Exception( + "Diagnostics data dictionary has a value other than 'scalar' or 'vector'" + ) + except BaseException: + self._logger.info( + "No diagnostics data is defined. Snapshot computation will not output any diagnostics data." ) def get_config_file_name(self): @@ -452,6 +511,29 @@ def get_micro_dt(self): """ return self._micro_dt + def get_parameter_file_name(self): + """ + Get the name of the parameter file. + + Returns + ------- + parameter_file_name : string + Name of the hdf5 file containing the macro parameters. + """ + + return self._parameter_file_name + + def get_postprocessing_file_name(self): + """ + Depending on user input, snapshot computation will perform post-processing for every micro simulation before writing output to a file. + + Returns + ------- + postprocessing : str + Name of post-processing script. + """ + return self._postprocessing_file_name + def interpolate_crashed_micro_sim(self): """ Check if user wants crashed micro simulations to be interpolated. diff --git a/micro_manager/micro_manager.py b/micro_manager/micro_manager.py index 2fffa516..ca9f12de 100644 --- a/micro_manager/micro_manager.py +++ b/micro_manager/micro_manager.py @@ -49,7 +49,7 @@ def __init__(self, config_file: str) -> None: Name of the JSON configuration file (provided by the user). """ super().__init__(config_file) - + self._config.read_json_micro_manager() # Define the preCICE Participant self._participant = precice.Participant( "Micro-Manager", self._config.get_config_file_name(), self._rank, self._size @@ -57,19 +57,11 @@ def __init__(self, config_file: str) -> None: self._macro_mesh_name = self._config.get_macro_mesh_name() - # Data names of data written to preCICE - self._write_data_names = self._config.get_write_data_names() - - # Data names of data read from preCICE - self._read_data_names = self._config.get_read_data_names() - self._macro_bounds = self._config.get_macro_domain_bounds() if self._is_parallel: # Simulation is run in parallel self._ranks_per_axis = self._config.get_ranks_per_axis() - self._is_micro_solve_time_required = self._config.write_micro_solve_time() - # Parameter for interpolation in case of a simulation crash self._interpolate_crashed_sims = self._config.interpolate_crashed_micro_sim() if self._interpolate_crashed_sims: diff --git a/micro_manager/micro_manager_base.py b/micro_manager/micro_manager_base.py index 29dfdf86..9e8d54b4 100644 --- a/micro_manager/micro_manager_base.py +++ b/micro_manager/micro_manager_base.py @@ -76,8 +76,16 @@ def __init__(self, config_file): self._logger.info("Provided configuration file: {}".format(config_file)) self._config = Config(self._logger, config_file) + # Data names of data to output to the snapshot database + self._write_data_names = self._config.get_write_data_names() + + # Data names of data to read as input parameter to the simulations + self._read_data_names = self._config.get_read_data_names() + self._micro_dt = self._config.get_micro_dt() + self._is_micro_solve_time_required = self._config.write_micro_solve_time() + def initialize(self): """ Initialize micro simulations. Not implemented diff --git a/micro_manager/snapshot/__init__.py b/micro_manager/snapshot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/micro_manager/snapshot/dataset.py b/micro_manager/snapshot/dataset.py new file mode 100644 index 00000000..36172e88 --- /dev/null +++ b/micro_manager/snapshot/dataset.py @@ -0,0 +1,230 @@ +from importlib import metadata +import os +from datetime import datetime + +import numpy as np + +try: + import h5py +except ImportError: + raise ImportError( + "The Micro Manager snapshot computation requires the h5py package." + ) + + +class ReadWriteHDF: + def __init__(self, logger) -> None: + self._logger = logger + self._has_datasets = False + + def create_file(self, file_path: str) -> None: + """ + Create an HDF5 file for a given file name and path. + + Parameters + ---------- + file_path : str + File name added to the path to the file. + + """ + f = h5py.File(file_path, "w") + f.attrs["status"] = "writing" + f.attrs["MicroManager_version"] = str(metadata.version("micro-manager-precice")) + f.attrs["date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f.close() + + def collect_output_files( + self, dir_name: str, file_list: list, database_length: int + ) -> None: + """ + Iterate over a list of HDF5 files in a given directory and copy the content into a single file. + The files are deleted after the content is copied. + + Parameters + ---------- + dir_name : str + Path to directory containing the files. + file_list : list + List of files to be combined. + dataset_length : int + Global number of snapshots. + """ + # Create a output file + main_file = h5py.File(os.path.join(dir_name, "snapshot_data.hdf5"), "w") + main_file.attrs["status"] = "writing" + main_file.attrs["MicroManager_version"] = str( + metadata.version("micro-manager-precice") + ) + main_file.attrs["date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # Create datasets in output file + with h5py.File(os.path.join(dir_name, file_list[0]), "r") as parameter_file: + for key in parameter_file.keys(): + if not key == "crashed_snapshots": + current_data = parameter_file[key][0] + main_file.create_dataset( + key, + shape=(database_length, *current_data.shape), + chunks=(1, *current_data.shape), + fillvalue=np.nan, + ) + # Loop over files + crashed_snapshots = [] + outer_position = 0 + for file in file_list: + parameter_file = h5py.File(os.path.join(dir_name, file), "r") + # Add all data sets to the main file. + for key in parameter_file.keys(): + inner_position = outer_position + for chunk in parameter_file[key].iter_chunks(): + current_data = parameter_file[key][chunk] + # If the key is "crashed_snapshots" add the indices to the list of crashed snapshots + # Otherwise write the data to the main file + if key == "crashed_snapshots": + crashed_snapshots.extend( + inner_position + parameter_file[key][:] + ) + else: + main_file[key][inner_position] = current_data + inner_position += 1 + outer_position = inner_position + parameter_file.close() + os.remove(os.path.join(dir_name, file)) + + # Write the indices of crashed snapshots to the main file + if len(crashed_snapshots) > 0: + main_file.create_dataset( + "crashed_snapshots", data=crashed_snapshots, dtype=int + ) + main_file.attrs["status"] = "finished" + main_file.close() + + def write_output_to_hdf( + self, + file_path: str, + macro_data: dict, + micro_data: dict | None, + idx: int, + length: int, + ) -> None: + """ + Write the output of a micro simulation to a HDF5 file. + + Parameters + ---------- + file_path : str + Path to file in which the data should be written. + macro_data : dict + Dict of macro simulation input. + micro_data : dict | None + Dict of micro simulation output. If None, only the macro data is written. + idx: int + Local index of the current snapshot. + length : int + Local number of snapshots. + """ + parameter_file = h5py.File(file_path, "a") + if micro_data is None: + input_data = macro_data + else: + input_data = macro_data | micro_data + + # If the datasets are not created yet, create them + if not self._has_datasets: + for key in input_data.keys(): + current_data = np.asarray(input_data[key]) + parameter_file.create_dataset( + key, + shape=(length, *current_data.shape), + chunks=(1, *current_data.shape), + fillvalue=np.nan, + ) + self._has_datasets = True + + # Iterate over macro and micro data sets and write current simulation data to the file + for key in input_data.keys(): + current_data = np.asarray(input_data[key]) + parameter_file[key][idx] = current_data + + parameter_file.close() + + def read_hdf(self, file_path: str, data_names: dict, start: int, end: int) -> list: + """ + Read data from an HDF5 file and return it as a list of dictionaries. + + Parameters + ---------- + file_path : str + Path of file to read data from. + data_names : dict + Names of parameters to read from the file. + start: int + Index of the first snapshot to read on process. + end: int + Index of the last snapshot to read on process. + + Returns + ------- + output: list + List of dicts where the keys are the names of the parameters and the values the corresponding data. + """ + + parameter_file = h5py.File(file_path, "r") + parameter_data = dict() + output = [] + # Read data by iterating over the relevant datasets + for key in data_names.keys(): + parameter_data[key] = np.asarray(parameter_file[key][start:end]) + my_key = ( + key # Save one key to be able to iterate over the length of the data + ) + # Iterate over len of data. In each iteration write data from all macro data sets + # to a dictionary and append it to the output list of dicts. + for i in range(len(parameter_data[my_key])): + current_data = dict() + for key in data_names.keys(): + current_data[key] = parameter_data[key][i] + output.append(current_data) + return output + + def get_parameter_space_size(self, file_path: str) -> int: + """ + Get the length of the parameter space from the HDF5 file. + + Parameters + ---------- + file_path : str + Path of file to read data from. + + Returns + ------- + int + Size of Parameter Space + """ + with h5py.File(file_path, "r") as file: + return file[list(file.keys())[0]].len() + + def write_crashed_snapshots(self, file_path: str, crashed_input: list): + """ + Write indices of crashed snapshots to the HDF5 database. + + Parameters + ---------- + file_path : str + Path of file to read data from. + crashed_indices : list + list of indices of crashed simulations. + """ + with h5py.File(file_path, "a") as file: + file.create_dataset("crashed_snapshots", data=crashed_input, dtype=int) + + def set_status(self, file_path: str, status: str): + """ + Set the status of the file to "finished" to indicate that it is no longer accessed. + + Parameters + ---------- + file_path : str + Path of file to read data from. + """ + with h5py.File(file_path, "a") as file: + file.attrs["status"] = status diff --git a/micro_manager/snapshot/snapshot.py b/micro_manager/snapshot/snapshot.py new file mode 100644 index 00000000..7afc77eb --- /dev/null +++ b/micro_manager/snapshot/snapshot.py @@ -0,0 +1,273 @@ +#!/usr/bin/env python3 +""" +Snapshot Computation is a tool within the Micro Manager to initialize micro simulations and create a snapshot database of theirs outputs by running them with a set of prescribed input parameters. +This files the class SnapshotComputation which has the following callable public methods: + +- solve +- initialize + +Detailed documentation: https://precice.org/tooling-micro-manager-overview.html +""" + +import importlib +import os +import sys +import time + +import numpy as np + +from micro_manager.micro_manager import MicroManager +from .dataset import ReadWriteHDF +from micro_manager.micro_simulation import create_simulation_class + +sys.path.append(os.getcwd()) + + +class MicroManagerSnapshot(MicroManager): + def __init__(self, config_file: str) -> None: + """ + Constructor. + + Parameters + ---------- + config_file : string + Name of the JSON configuration file (provided by the user). + """ + super().__init__(config_file) + self._config.read_json_snapshot() + + # Path to the parameter file containing input parameters for micro simulations + self._parameter_file = self._config.get_parameter_file_name() + # Get name of pos-processing script + self._post_processing_file_name = self._config.get_postprocessing_file_name() + # Collect crashed indices + self._crashed_snapshots = [] # Declaration + + # ************** + # Public methods + # ************** + + def solve(self) -> None: + """ + Solve the problem by iterating over a set macro parameters. + - Create micro simulation object. + - Post-process micro output. + - Write output to database. + - Merge output in parallel run. + """ + + # Loop over all macro parameters + for elems in range(self._local_number_of_sims): + # Create micro simulation object + self._micro_sims = create_simulation_class(self._micro_problem)( + self._global_ids_of_local_sims[elems] + ) + + micro_sims_input = self._macro_parameters[elems] + # Solve micro simulation + micro_sims_output = self._solve_micro_simulation(micro_sims_input) + + # Write output to file + if micro_sims_output is not None: + # Post-processing + if self._post_processing_file_name is not None: + # Attempt importing post-processing script + try: + post_processing = getattr( + importlib.import_module( + self._post_processing_file_name, "Postprocessing" + ), + "Postprocessing", + ) + if hasattr(post_processing, "postprocessing") and callable( + getattr(post_processing, "postprocessing") + ): + micro_sims_output = post_processing.postprocessing( + micro_sims_output + ) + else: + self._logger.info( + "No post-processing script with the provided path found. Skipping post-processing." + ) + self._post_processing_file_name = None + except Exception: + self._logger.info( + "No post-processing script with the provided path found. Skipping post-processing." + ) + self._post_processing_file_name = None + self._data_storage.write_output_to_hdf( + self._output_file_path, + micro_sims_input, + micro_sims_output, + elems, + len(self._macro_parameters), + ) + # Log error and write macro data to database if simulation has crashed + else: + self._logger.info("Skipping snapshot storage for crashed simulation.") + self._data_storage.write_output_to_hdf( + self._output_file_path, + micro_sims_input, + None, + elems, + len(self._macro_parameters), + ) + + self._crashed_snapshots.append(elems) + + # Write positions of crashed snapshots in database to database + if len(self._crashed_snapshots) > 0: + self._data_storage.write_crashed_snapshots( + self._output_file_path, self._crashed_snapshots + ) + self._data_storage.set_status(self._output_file_path, "none") + + # Merge output files + if self._is_parallel: + self._logger.info( + "Snapshots have been computed and stored. Merging output files" + ) + self._data_storage.set_status(self._output_file_path, "reading/deleting") + list_of_output_files = self._comm.gather(self._file_name, 0) + if self._rank == 0: + self._data_storage.collect_output_files( + self._output_subdirectory, + list_of_output_files, + self._parameter_space_size, + ) + self._logger.info("Snapshot computation completed.") + + def initialize(self) -> None: + """ + Initialize the Snapshot Computation by performing the following tasks: + - Distribute the parameter data equally if the snapshot creation is executed in parallel. + - Read macro parameter from parameter file. + - Create output subdirectory and file paths to store output. + - Import micro simulation. + """ + + # Create subdirectory to store output files in + directory = os.path.dirname(self._parameter_file) + self._output_subdirectory = os.path.join(directory, "output") + os.makedirs(self._output_subdirectory, exist_ok=True) + + # Create object responsible for reading parameters and writing simulation output + self._data_storage = ReadWriteHDF(self._logger) + + self._parameter_space_size = self._data_storage.get_parameter_space_size( + self._parameter_file + ) + # Read macro parameters from the parameter file + # Decompose parameters if the snapshot creation is executed in parallel + if self._is_parallel: + equal_partition = int(self._parameter_space_size / self._size) + rest = self._parameter_space_size % self._size + if self._rank < rest: + start = self._rank * (equal_partition + 1) + end = start + equal_partition + 1 + else: + start = self._rank * equal_partition + rest + end = start + equal_partition + self._macro_parameters = self._data_storage.read_hdf( + self._parameter_file, self._read_data_names, start, end + ) + else: + self._macro_parameters = self._data_storage.read_hdf( + self._parameter_file, + self._read_data_names, + 0, + self._parameter_space_size, + ) + + # Create database file to store output from a rank in + if self._is_parallel: + self._file_name = "snapshot_data_{}.hdf5".format(self._rank) + else: + self._file_name = "snapshot_data.hdf5" + self._output_file_path = os.path.join( + self._output_subdirectory, self._file_name + ) + self._data_storage.create_file(self._output_file_path) + self._logger.info("Output file created: {}".format(self._output_file_path)) + self._local_number_of_sims = len(self._macro_parameters) + self._logger.info( + "Number of local micro simulations = {}".format(self._local_number_of_sims) + ) + + if self._local_number_of_sims == 0: + if self._is_parallel: + self._logger.info( + "Rank {} has no micro simulations and hence will not do any computation.".format( + self._rank + ) + ) + self._is_rank_empty = True + else: + raise Exception("Snapshot has no micro simulations.") + + nms_all_ranks = np.zeros(self._size, dtype=np.int64) + # Gather number of micro simulations that each rank has, because this rank needs to know how many micro + # simulations have been created by previous ranks, so that it can set + # the correct global IDs + self._comm.Allgatherv(np.array(self._local_number_of_sims), nms_all_ranks) + + # Get global number of micro simulations + self._global_number_of_sims = np.sum(nms_all_ranks) + + # Create lists of local and global IDs + sim_id = np.sum(nms_all_ranks[: self._rank]) + self._global_ids_of_local_sims = [] # DECLARATION + for i in range(self._local_number_of_sims): + self._global_ids_of_local_sims.append(sim_id) + sim_id += 1 + self._micro_problem = getattr( + importlib.import_module( + self._config.get_micro_file_name(), "MicroSimulation" + ), + "MicroSimulation", + ) + + self._micro_sims_have_output = False + if hasattr(self._micro_problem, "output") and callable( + getattr(self._micro_problem, "output") + ): + self._micro_sims_have_output = True + + # *************** + # Private methods + # *************** + + def _solve_micro_simulation(self, micro_sims_input: dict) -> dict | None: + """ + Solve a single micro simulation. + + Parameters + ---------- + micro_sims_input : dict + Keys are names of data and the values are the data which are required inputs to + solve a micro simulation. + + Returns + ------- + micro_sims_output : dict | None + Dict in which keys are names of data and the values are the data of the output of the micro + simulations. The return type is None if the simulation has crashed. + """ + try: + start_time = time.time() + micro_sims_output = self._micro_sims.solve(micro_sims_input, self._micro_dt) + end_time = time.time() + + if self._is_micro_solve_time_required: + micro_sims_output["micro_sim_time"] = end_time - start_time + + return micro_sims_output + # Handle simulation crash + except Exception as e: + self._logger.error( + "Micro simulation with input {} has crashed. See next entry on this rank for error message".format( + micro_sims_input + ) + ) + self._logger.error(e) + return None diff --git a/pyproject.toml b/pyproject.toml index 5d0c50f0..cee97d2a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ classifiers=[ ] [project.optional-dependencies] +snapshot = ["h5py"] sklearn = ["scikit-learn"] [project.urls] @@ -40,7 +41,7 @@ Repository = "https://github.com/precice/micro-manager" micro-manager-precice = "micro_manager:main" [tool.setuptools] -packages=["micro_manager", "micro_manager.adaptivity"] +packages=["micro_manager", "micro_manager.adaptivity", "micro_manager.snapshot"] [tool.setuptools-git-versioning] enabled = true diff --git a/tests/unit/.gitignore b/tests/unit/.gitignore index 397b4a76..4f54a829 100644 --- a/tests/unit/.gitignore +++ b/tests/unit/.gitignore @@ -1 +1,2 @@ *.log +output/ diff --git a/tests/unit/hdf_files/test_parameter.hdf5 b/tests/unit/hdf_files/test_parameter.hdf5 new file mode 100644 index 0000000000000000000000000000000000000000..18891bfe492dc57a0bde1a99267f8d98c4d071bd GIT binary patch literal 8432 zcmeI0u}Z{15Qb-y3xtD%vrxn;rKP2%!bPr9Q46~>L_6i+JjBvcJ0ByZrH|m__y(HY znIB9f2UdFCpM#y9e`km6$6Q$EX*@a@^tbxrpEQ-848s(|hdu9^D)6y>VH0Qe(QFmz z#EJijY?{7i`P*@4CwVS)B=W=BHreUvEQ3J1QUu2N{;@ysxt)5~?3>k@xpGsKv+3lv zEUt^$zP&a~>%UXTwHL;u z_Pid&nkltBaTMJrAtLCyCHT*({;~1&@Mz>Dva^X~@!}WEY`I!r9Jgw#{(I-YjHnyf zF9py*QVkle7EvvA#W$nrkpKyh011!)2`nQ4+%xg+@LShkEYp$z36KB@kN^q%0|MXs zP}|{DF4CI-?gMDYpq~K$=Y6U8bwj!N)IN>18Xi>r<*}=Of%DaJQNW4)lrD1A SNq_`MfCNZ@1V~^x348)GVsZ5V literal 0 HcmV?d00001 diff --git a/tests/unit/snapshot-config.json b/tests/unit/snapshot-config.json new file mode 100644 index 00000000..015d10b7 --- /dev/null +++ b/tests/unit/snapshot-config.json @@ -0,0 +1,14 @@ +{ + "micro_file_name": "test_snapshot_computation", + "coupling_params": { + "parameter_file_name": "test_parameter.hdf5", + "read_data_names": {"macro-scalar-data": "scalar", "macro-vector-data": "vector"}, + "write_data_names": {"micro-scalar-data": "scalar", "micro-vector-data": "vector"} + }, + "simulation_params": { + "micro_dt": 1.0 + }, + "snapshot_params": { + "post_processing_file_name": "snapshot_post_processing" + } +} diff --git a/tests/unit/test_hdf5_functionality.py b/tests/unit/test_hdf5_functionality.py new file mode 100644 index 00000000..67a23bcc --- /dev/null +++ b/tests/unit/test_hdf5_functionality.py @@ -0,0 +1,171 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +import numpy as np +import os +import h5py + +from micro_manager.snapshot.dataset import ReadWriteHDF + + +class TestHDFFunctionalities(TestCase): + def test_create_file(self): + """ + Test if file creation works as expected. + """ + path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "hdf_files") + file_name = "create_file.hdf5" + entire_path = os.path.join(path, file_name) + # Ensure output file does not exist + if os.path.isfile(entire_path): + os.remove(entire_path) + data_manager = ReadWriteHDF(MagicMock()) + data_manager.create_file(entire_path) + self.assertTrue(os.path.isfile(entire_path)) + with h5py.File(entire_path, "r") as f: + self.assertEqual(f.attrs["status"], "writing") + os.remove(entire_path) + + def test_collect_output_files(self): + """ + Test if collection of output files works as expected. + """ + dir_name = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "hdf_files" + ) + files = ["output_1.hdf5", "output_2.hdf5"] + # Create ouput files to merge + input_data = [ + { + "macro_vector_data": np.array([1, 2, 3]), + "macro_scalar_data": 1, + "micro_vector_data": np.array([-1, -2, -3]), + "micro_scalar_data": -1, + }, + { + "macro_vector_data": np.array([4, 5, 6]), + "macro_scalar_data": 2, + "micro_vector_data": np.array([-4, -5, -6]), + "micro_scalar_data": -2, + }, + ] + for data, file in zip(input_data, files): + with h5py.File(os.path.join(dir_name, file), "w") as f: + for key in data.keys(): + current_data = np.asarray(data[key]) + f.create_dataset( + key, + data=current_data, + shape=(1, *current_data.shape), + chunks=(1, *current_data.shape), + ) + # Ensure output file does not exist + if os.path.isfile(os.path.join(dir_name, "snapshot_data.hdf5")): + os.remove(os.path.join(dir_name, "snapshot_data.hdf5")) + length = 2 + data_manager = ReadWriteHDF(MagicMock()) + data_manager.collect_output_files(dir_name, files, length) + output = h5py.File(os.path.join(dir_name, "snapshot_data.hdf5"), "r") + + for i in range(length): + self.assertEqual( + output["macro_scalar_data"][i], input_data[i]["macro_scalar_data"] + ) + + self.assertListEqual( + output["macro_vector_data"][i].tolist(), + input_data[i]["macro_vector_data"].tolist(), + ) + self.assertEqual( + output["micro_scalar_data"][i], input_data[i]["micro_scalar_data"] + ) + + self.assertListEqual( + output["micro_vector_data"][i].tolist(), + input_data[i]["micro_vector_data"].tolist(), + ) + + output.close() + os.remove(os.path.join(dir_name, "snapshot_data.hdf5")) + + def test_simulation_output_to_hdf(self): + """ + Test if the write_output_to_hdf method correctly writes a dictionary to an HDF5 file. + """ + file_name = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "hdf_files", + "write_output.hdf5", + ) + if os.path.isfile(file_name): + os.remove(file_name) + + # Create artificial output data + macro_data = { + "macro_vector_data": np.array([3, 1, 2]), + "macro_scalar_data": 2, + } + micro_data = { + "micro_vector_data": np.array([3, 2, 1]), + "micro_scalar_data": 1, + } + + expected_micro_vector_data = np.array([3, 2, 1]) + expected_micro_scalar_data = 1 + + expected_macro_vector_data = np.array([3, 1, 2]) + expected_macro_scalar_data = 2 + + data_manager = ReadWriteHDF(MagicMock()) + for i in range(2): + data_manager.write_output_to_hdf(file_name, macro_data, micro_data, i, 2) + + test_file = h5py.File(file_name, "r") + + self.assertEqual( + (test_file["micro_scalar_data"][0]), expected_micro_scalar_data + ) + self.assertListEqual( + (test_file["micro_vector_data"][0]).tolist(), + (expected_micro_vector_data).tolist(), + ) + self.assertEqual( + (test_file["macro_scalar_data"][0]), expected_macro_scalar_data + ) + self.assertListEqual( + (test_file["macro_vector_data"][0]).tolist(), + (expected_macro_vector_data).tolist(), + ) + os.remove(file_name) + + def test_hdf_to_dict(self): + """ + Test if read__hdf method correctly reads parameter data from an HDF5 file. + """ + expected_macro_scalar = 1 + expected_macro_vector = np.array([0, 1, 2]) + file_name = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "hdf_files", + "test_parameter.hdf5", + ) + read_data_names = {"macro_vector_data": True, "macro_scalar_data": False} + data_manager = ReadWriteHDF(MagicMock()) + read = data_manager.read_hdf(file_name, read_data_names, 0, -1) + for i in range(len(read)): + self.assertEqual(read[i]["macro_scalar_data"], expected_macro_scalar) + self.assertListEqual( + read[i]["macro_vector_data"].tolist(), expected_macro_vector.tolist() + ) + + def test_get_parameter_space_length(self): + """ + Test if reading the length of the parameter space works as expected. + """ + file_name = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "hdf_files", + "test_parameter.hdf5", + ) + data_manager = ReadWriteHDF(MagicMock()) + self.assertEqual(data_manager.get_parameter_space_size(file_name), 1) diff --git a/tests/unit/test_micro_manager.py b/tests/unit/test_micro_manager.py index ad8285e8..b43850cd 100644 --- a/tests/unit/test_micro_manager.py +++ b/tests/unit/test_micro_manager.py @@ -120,7 +120,7 @@ def test_config(self): Test if the functions in the Config class work. """ config = micro_manager.Config(MagicMock(), "micro-manager-config.json") - + config.read_json_micro_manager() self.assertEqual(config._config_file_name.split("/")[-1], "dummy-config.xml") self.assertEqual(config._micro_file_name, "test_micro_manager") self.assertEqual(config._macro_mesh_name, "dummy-macro-mesh") diff --git a/tests/unit/test_parameter.hdf5 b/tests/unit/test_parameter.hdf5 new file mode 100644 index 0000000000000000000000000000000000000000..0af159d5e7a08cecdb02564d6fdae2fc609857bf GIT binary patch literal 8432 zcmeI0u}Z{15Qb-y3xtD%vrxn;rOkn*rNTw7Qc(*_L$p&4&O)UI?wEjDVTzg?m zYR~IYteH~F6Gzc~5+Z`GTY~?r>K_|V50A!9B0HN%7B7Cm%$BS5#c`{)>c4mX%ZR#x z{Zar8B-NndY7x~^S9~*?9tn^D36KB@kiaq$z&#W14!?E%#WF1kkN^pg011%5KOpeE z549am