diff --git a/src/mattersim/cli/mattersim_app.py b/src/mattersim/cli/mattersim_app.py index e7413a8..56ced86 100644 --- a/src/mattersim/cli/mattersim_app.py +++ b/src/mattersim/cli/mattersim_app.py @@ -5,18 +5,20 @@ from datetime import datetime from typing import List, Union -# import numpy as np +import numpy as np import pandas as pd +import yaml from ase import Atoms from ase.constraints import Filter from ase.io import read as ase_read from ase.optimize.optimize import Optimizer from ase.units import GPa from loguru import logger +from pymatgen.core.structure import Structure from pymatgen.io.ase import AseAtomsAdaptor from tqdm import tqdm -# from mattersim.applications.phonon import PhononWorkflow +from mattersim.applications.phonon import PhononWorkflow from mattersim.applications.relax import Relaxer from mattersim.forcefield import MatterSimCalculator @@ -54,7 +56,7 @@ def singlepoint( logger.info(f"Saving the results to {os.path.join(work_dir, save_csv)}") df = pd.DataFrame(predicted_properties) - df.to_csv(os.path.join(work_dir, save_csv), index=False) + df.to_csv(os.path.join(work_dir, save_csv), index=False, mode="a") return predicted_properties @@ -72,7 +74,12 @@ def singlepoint_cli(args: argparse.Namespace) -> dict: atoms_list = parse_atoms_list( args.structure_file, args.mattersim_model, args.device ) - return singlepoint(atoms_list, **vars(args)) + singlepoint_args = { + k: v + for k, v in vars(args).items() + if k not in ["structure_file", "mattersim_model", "device"] + } + return singlepoint(atoms_list, **singlepoint_args) def relax( @@ -154,7 +161,7 @@ def relax( logger.info(f"Saving the results to {os.path.join(work_dir, save_csv)}") df = pd.DataFrame(relaxed_results) - df.to_csv(os.path.join(work_dir, save_csv), index=False) + df.to_csv(os.path.join(work_dir, save_csv), index=False, mode="a") return relaxed_results @@ -171,11 +178,159 @@ def relax_cli(args: argparse.Namespace) -> dict: atoms_list = parse_atoms_list( args.structure_file, args.mattersim_model, args.device ) - return relax(atoms_list, **vars(args)) + relax_args = { + k: v + for k, v in vars(args).items() + if k not in ["structure_file", "mattersim_model", "device"] + } + return relax(atoms_list, **relax_args) -def phonon(): - pass +def phonon( + atoms_list: List[Atoms], + *, + find_prim: bool = False, + work_dir: str = str(uuid.uuid4()), + save_csv: str = "results.csv.gz", + amplitude: float = 0.01, + supercell_matrix: np.ndarray = None, + qpoints_mesh: np.ndarray = None, + max_atoms: int = None, + enable_relax: bool = False, + **kwargs, +) -> dict: + """ + Predict phonon properties for a list of atoms. + + Args: + atoms_list (List[Atoms]): List of ASE Atoms objects. + find_prim (bool, optional): If find the primitive cell and use it + to calculate phonon. Default to False. + work_dir (str, optional): workplace path to contain phonon result. + Defaults to data + chemical_symbols + 'phonon' + amplitude (float, optional): Magnitude of the finite difference to + displace in force constant calculation, in Angstrom. Defaults + to 0.01 Angstrom. + supercell_matrix (nd.array, optional): Supercell matrix for constr + -uct supercell, priority over than max_atoms. Defaults to None. + qpoints_mesh (nd.array, optional): Qpoint mesh for IBZ integral, + priority over than max_atoms. Defaults to None. + max_atoms (int, optional): Maximum atoms number limitation for the + supercell generation. If not set, will automatic generate super + -cell based on symmetry. Defaults to None. + enable_relax (bool, optional): Whether to relax the structure before + predicting phonon properties. Defaults to False. + """ + phonon_results = defaultdict(list) + + for atoms in tqdm( + atoms_list, total=len(atoms_list), desc="Predicting phonon properties" + ): + if enable_relax: + relaxed_results = relax( + [atoms], + constrain_symmetry=True, + work_dir=work_dir, + save_csv=save_csv.replace(".csv", "_relax.csv"), + ) + structure = Structure.from_str(relaxed_results["structure"][0], fmt="json") + _atoms = AseAtomsAdaptor.get_atoms(structure) + _atoms.calc = atoms.calc + atoms = _atoms + ph = PhononWorkflow( + atoms=atoms, + find_prim=find_prim, + work_dir=work_dir, + amplitude=amplitude, + supercell_matrix=supercell_matrix, + qpoints_mesh=qpoints_mesh, + max_atoms=max_atoms, + ) + has_imaginary, phonon = ph.run() + phonon_results["has_imaginary"].append(has_imaginary) + # phonon_results["phonon"].append(phonon) + phonon_results["phonon_band_plot"].append( + os.path.join(os.path.abspath(work_dir), f"{atoms.symbols}_phonon_band.png") + ) + phonon_results["phonon_dos_plot"].append( + os.path.join(os.path.abspath(work_dir), f"{atoms.symbols}_phonon_dos.png") + ) + os.rename( + os.path.join(os.path.abspath(work_dir), "band.yaml"), + os.path.join(os.path.abspath(work_dir), f"{atoms.symbols}_band.yaml"), + ) + os.rename( + os.path.join(os.path.abspath(work_dir), "phonopy_params.yaml"), + os.path.join( + os.path.abspath(work_dir), f"{atoms.symbols}_phonopy_params.yaml" + ), + ) + os.rename( + os.path.join(os.path.abspath(work_dir), "total_dos.dat"), + os.path.join(os.path.abspath(work_dir), f"{atoms.symbols}_total_dos.dat"), + ) + phonon_results["phonon_band"].append( + yaml.safe_load( + open( + os.path.join( + os.path.abspath(work_dir), f"{atoms.symbols}_band.yaml" + ), + "r", + ) + ) + ) + phonon_results["phonopy_params"].append( + yaml.safe_load( + open( + os.path.join( + os.path.abspath(work_dir), + f"{atoms.symbols}_phonopy_params.yaml", + ), + "r", + ) + ) + ) + phonon_results["total_dos"].append( + np.loadtxt( + os.path.join( + os.path.abspath(work_dir), f"{atoms.symbols}_total_dos.dat" + ), + comments="#", + ) + ) + + if not os.path.exists(work_dir): + os.makedirs(work_dir) + + logger.info(f"Saving the results to {os.path.join(work_dir, save_csv)}") + df = pd.DataFrame(phonon_results) + df.to_csv( + os.path.join(work_dir, save_csv.replace(".csv", "_phonon.csv")), + index=False, + mode="a", + ) + return phonon_results + + +def phonon_cli(args: argparse.Namespace) -> dict: + """ + CLI wrapper for phonon function. + + Args: + args (argparse.Namespace): Command line arguments. + + Returns: + dict: Dictionary containing the phonon properties. + """ + atoms_list = parse_atoms_list( + args.structure_file, args.mattersim_model, args.device + ) + phonon_args = { + k: v + for k, v in vars(args).items() + if k not in ["structure_file", "mattersim_model", "device"] + } + return phonon(atoms_list, **phonon_args) def moldyn(): @@ -219,6 +374,91 @@ def add_common_args(parser: argparse.ArgumentParser): ) +def add_relax_common_args(parser: argparse.ArgumentParser): + parser.add_argument( + "--optimizer", + type=str, + default="FIRE", + help="The optimizer to use. Default is FIRE.", + ) + parser.add_argument( + "--filter", + type=str, + default=None, + help="The filter to use.", + ) + parser.add_argument( + "--constrain-symmetry", + action="store_true", + help="Constrain symmetry.", + ) + parser.add_argument( + "--fix-axis", + type=bool, + default=False, + nargs="+", + help="Fix the axis.", + ) + parser.add_argument( + "--pressure-in-GPa", + type=float, + default=None, + help="Pressure in GPa to use for relaxation.", + ) + parser.add_argument( + "--fmax", + type=float, + default=0.01, + help="Maximum force tolerance for relaxation.", + ) + parser.add_argument( + "--steps", + type=int, + default=500, + help="Maximum number of steps for relaxation.", + ) + + +def add_phonon_common_args(parser: argparse.ArgumentParser): + parser.add_argument( + "--find-prim", + action="store_true", + help="If find the primitive cell and use it to calculate phonon.", + ) + parser.add_argument( + "--amplitude", + type=float, + default=0.01, + help="Magnitude of the finite difference to displace in " + "force constant calculation, in Angstrom.", + ) + parser.add_argument( + "--supercell-matrix", + type=int, + nargs=3, + default=None, + help="Supercell matrix for construct supercell, must be a list of 3 integers.", + ) + parser.add_argument( + "--qpoints-mesh", + type=int, + nargs=3, + default=None, + help="Qpoint mesh for IBZ integral, must be a list of 3 integers.", + ) + parser.add_argument( + "--max-atoms", + type=int, + default=None, + help="Maximum atoms number limitation for the supercell generation.", + ) + parser.add_argument( + "--enable-relax", + action="store_true", + help="Whether to relax the structure before predicting phonon properties.", + ) + + def parse_atoms_list( structure_file_list: Union[str, List[str]], mattersim_model: str, @@ -256,50 +496,19 @@ def main(): "relax", help="Relax a list of atoms structures." ) add_common_args(relax_parser) - relax_parser.add_argument( - "--optimizer", - type=str, - default="FIRE", - help="The optimizer to use. Default is FIRE.", - ) - relax_parser.add_argument( - "--filter", - type=str, - default=None, - help="The filter to use.", - ) - relax_parser.add_argument( - "--constrain-symmetry", - action="store_true", - help="Constrain symmetry.", - ) - relax_parser.add_argument( - "--fix-axis", - type=bool, - default=False, - nargs="+", - help="Fix the axis.", - ) - relax_parser.add_argument( - "--pressure-in-GPa", - type=float, - default=None, - help="Pressure in GPa to use for relaxation.", - ) - relax_parser.add_argument( - "--fmax", - type=float, - default=0.01, - help="Maximum force tolerance for relaxation.", - ) - relax_parser.add_argument( - "--steps", - type=int, - default=500, - help="Maximum number of steps for relaxation.", - ) + add_relax_common_args(relax_parser) relax_parser.set_defaults(func=relax_cli) + # Sub-command for phonon + phonon_parser = subparsers.add_parser( + "phonon", + help="Predict phonon properties for a list of structures.", + ) + add_common_args(phonon_parser) + add_relax_common_args(phonon_parser) + add_phonon_common_args(phonon_parser) + phonon_parser.set_defaults(func=phonon_cli) + # Parse arguments args = argparser.parse_args() print(args)