Source code for

"""Makers to perform MD with forcefields."""

from __future__ import annotations

from import Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

import numpy as np
from ase import units
from import Andersen
from import Langevin
from import MolecularDynamics
from import NPT
from import NPTBerendsen
from import NVTBerendsen
from import (
from import VelocityVerlet
from jobflow import Maker
from import AseAtomsAdaptor
from scipy.interpolate import interp1d
from scipy.linalg import schur

from atomate2.forcefields import MLFF
from import forcefield_job
from atomate2.forcefields.schemas import ForcefieldResult, ForceFieldTaskDocument
from atomate2.forcefields.utils import (

    from pathlib import Path
    from typing import Literal

    from ase.calculators.calculator import Calculator
    from pymatgen.core.structure import Structure

_valid_dynamics: dict[str, tuple[str, ...]] = {
    "nve": ("velocityverlet",),
    "nvt": ("nose-hoover", "langevin", "andersen", "berendsen"),
    "npt": ("nose-hoover", "berendsen"),

_preset_dynamics: dict = {
    "nve_velocityverlet": VelocityVerlet,
    "nvt_andersen": Andersen,
    "nvt_berendsen": NVTBerendsen,
    "nvt_langevin": Langevin,
    "nvt_nose-hoover": NPT,
    "npt_berendsen": NPTBerendsen,
    "npt_nose-hoover": NPT,

[docs] @dataclass class ForceFieldMDMaker(Maker): """ Perform MD with a force field. Note the the following units are consistent with the VASP MD implementation: - `temperature` in Kelvin (TEBEG and TEEND) - `time_step` in femtoseconds (POTIM) - `pressure` in kB (PSTRESS) The default dynamics is Langevin NVT consistent with VASP MD, with the friction coefficient set to 10 ps^-1 (LANGEVIN_GAMMA). For the rest of preset dynamics (`_valid_dynamics`) and custom dynamics inherited from ASE (`MolecularDynamics`), the user can specify the dynamics as a string or an ASE class into the `dynamics` attribute. In this case, please consult the ASE documentation for the parameters and units to pass into the ASE MD function through `ase_md_kwargs`. Parameters ---------- name : str The name of the MD Maker force_field_name : str The name of the forcefield (for provenance) time_step : float | None = None. The timestep of the MD run in fs. If `None`, defaults to 0.5 fs if a structure contains an isotope of hydrogen and 2 fs otherwise. n_steps : int = 1000 The number of MD steps to run ensemble : str = "nvt" The ensemble to use. Valid ensembles are nve, nvt, or npt temperature: float | Sequence | np.ndarray | None. The temperature in Kelvin. If a sequence or 1D array, the temperature schedule will be interpolated linearly between the given values. If a float, the temperature will be constant throughout the run. pressure: float | Sequence | None = None The pressure in kilobar. If a sequence or 1D array, the pressure schedule will be interpolated linearly between the given values. If a float, the pressure will be constant throughout the run. dynamics : str | ASE .MolecularDynamics = "langevin" The dynamical thermostat to use. If dynamics is an ASE .MolecularDynamics object, this uses the option specified explicitly by the user. See _valid_dynamics for a list of pre-defined options when specifying dynamics as a string. ase_md_kwargs : dict | None = None Options except for temperature and pressure to pass into the ASE MD function calculator_kwargs : dict kwargs to pass to the ASE calculator class traj_file : str | Path | None = None If a str or Path, the name of the file to save the MD trajectory to. If None, the trajectory is not written to disk traj_file_fmt : Literal["ase","pmg"] The format of the trajectory file to write. If "ase", writes an ASE trajectory, if "pmg", writes a Pymatgen trajectory. traj_interval : int The step interval for saving the trajectories. mb_velocity_seed : int | None = None If an int, a random number seed for generating initial velocities from a Maxwell-Boltzmann distribution. zero_linear_momentum : bool = False Whether to initialize the atomic velocities with zero linear momentum zero_angular_momentum : bool = False Whether to initialize the atomic velocities with zero angular momentum task_document_kwargs: dict Options to pass to the TaskDoc. Default choice {"store_trajectory": "partial", "ionic_step_data": ("energy",),} is consistent with """ name: str = "Forcefield MD" force_field_name: str = f"{MLFF.Forcefield}" time_step: float | None = None n_steps: int = 1000 ensemble: Literal["nve", "nvt", "npt"] = "nvt" dynamics: str | MolecularDynamics = "langevin" temperature: float | Sequence | np.ndarray | None = 300.0 pressure: float | Sequence | np.ndarray | None = None ase_md_kwargs: dict | None = None calculator_kwargs: dict = field(default_factory=dict) traj_file: str | Path | None = None traj_file_fmt: Literal["pmg", "ase"] = "ase" traj_interval: int = 1 mb_velocity_seed: int | None = None zero_linear_momentum: bool = False zero_angular_momentum: bool = False task_document_kwargs: dict = field( default_factory=lambda: { "store_trajectory": "partial", "ionic_step_data": ("energy",), # energy is required in ionic steps } ) @staticmethod def _interpolate_quantity(values: Sequence | np.ndarray, n_pts: int) -> np.ndarray: """Interpolate temperature / pressure on a schedule.""" n_vals = len(values) return np.interp( np.linspace(0, n_vals - 1, n_pts + 1), np.linspace(0, n_vals - 1, n_vals), values, ) def _get_ensemble_schedule(self) -> None: if self.ensemble == "nve": # Disable thermostat and barostat self.temperature = np.nan self.pressure = np.nan self.t_schedule = np.full(self.n_steps + 1, self.temperature) self.p_schedule = np.full(self.n_steps + 1, self.pressure) return if isinstance(self.temperature, Sequence) or ( isinstance(self.temperature, np.ndarray) and self.temperature.ndim == 1 ): self.t_schedule = self._interpolate_quantity(self.temperature, self.n_steps) # NOTE: In ASE Langevin dynamics, the temperature are normally # scalars, but in principle one quantity per atom could be specified by giving # an array. This is not implemented yet here. else: self.t_schedule = np.full(self.n_steps + 1, self.temperature) if self.ensemble == "nvt": self.pressure = np.nan self.p_schedule = np.full(self.n_steps + 1, self.pressure) return if isinstance(self.pressure, Sequence) or ( isinstance(self.pressure, np.ndarray) and self.pressure.ndim == 1 ): self.p_schedule = self._interpolate_quantity(self.pressure, self.n_steps) elif isinstance(self.pressure, np.ndarray) and self.pressure.ndim == 4: self.p_schedule = interp1d( np.arange(self.n_steps + 1), self.pressure, kind="linear" ) else: self.p_schedule = np.full(self.n_steps + 1, self.pressure) def _get_ensemble_defaults(self) -> None: """Update ASE MD kwargs with defaults consistent with VASP MD.""" self.ase_md_kwargs = self.ase_md_kwargs or {} if self.ensemble == "nve": self.ase_md_kwargs.pop("temperature", None) self.ase_md_kwargs.pop("temperature_K", None) self.ase_md_kwargs.pop("externalstress", None) elif self.ensemble == "nvt": self.ase_md_kwargs["temperature_K"] = self.t_schedule[0] self.ase_md_kwargs.pop("externalstress", None) elif self.ensemble == "npt": self.ase_md_kwargs["temperature_K"] = self.t_schedule[0] self.ase_md_kwargs["externalstress"] = self.p_schedule[0] * 1e3 * if isinstance(self.dynamics, str) and self.dynamics.lower() == "langevin": self.ase_md_kwargs["friction"] = self.ase_md_kwargs.get( "friction", 10.0 * 1e-3 / units.fs, # Same default as in VASP: 10 ps^-1 )
[docs] @forcefield_job def make( self, structure: Structure, prev_dir: str | Path | None = None, ) -> ForceFieldTaskDocument: """ Perform MD on a structure using a force field. Parameters ---------- structure: .Structure pymatgen structure. prev_dir : str or Path or None A previous calculation directory to copy output files from. Unused, just added to match the method signature of other makers. """ self._get_ensemble_schedule() self._get_ensemble_defaults() if self.time_step is None: # If a structure contains an isotope of hydrogen, set default `time_step` # to 0.5 fs, and 2 fs otherwise. has_h_isotope = any(element.Z == 1 for element in structure.composition) self.time_step = 0.5 if has_h_isotope else 2.0 initial_velocities = structure.site_properties.get("velocities") if isinstance(self.dynamics, str): # Use known dynamics if `self.dynamics` is a str self.dynamics = self.dynamics.lower() if self.dynamics not in _valid_dynamics[self.ensemble]: raise ValueError( f"{self.dynamics} thermostat not available for {self.ensemble}." f"Available {self.ensemble} thermostats are:" " ".join(_valid_dynamics[self.ensemble]) ) if self.ensemble == "nve" and self.dynamics is None: self.dynamics = "velocityverlet" md_func = _preset_dynamics[f"{self.ensemble}_{self.dynamics}"] elif issubclass(self.dynamics, MolecularDynamics): # Allow user to explicitly run ASE Dynamics class md_func = self.dynamics atoms = structure.to_ase_atoms() if md_func is NPT: # Note that until md_func is instantiated, isinstance(md_func,NPT) is False # ASE NPT implementation requires upper triangular cell schur_decomp, _ = schur(atoms.get_cell(complete=True), output="complex") atoms.set_cell(schur_decomp.real, scale_atoms=True) if initial_velocities: atoms.set_velocities(initial_velocities) elif not np.isnan(self.t_schedule).any(): MaxwellBoltzmannDistribution( atoms=atoms, temperature_K=self.t_schedule[0], rng=np.random.default_rng(seed=self.mb_velocity_seed), ) if self.zero_linear_momentum: Stationary(atoms) if self.zero_angular_momentum: ZeroRotation(atoms) with revert_default_dtype(): atoms.calc = self.calculator md_observer = TrajectoryObserver(atoms, store_md_outputs=True) md_runner = md_func( atoms=atoms, timestep=self.time_step * units.fs, **self.ase_md_kwargs ) md_runner.attach(md_observer, interval=self.traj_interval) def _callback(dyn: MolecularDynamics = md_runner) -> None: if self.ensemble == "nve": return dyn.set_temperature(temperature_K=self.t_schedule[dyn.nsteps]) if self.ensemble == "nvt": return dyn.set_stress(self.p_schedule[dyn.nsteps] * 1e3 * md_runner.attach(_callback, interval=1) if self.traj_file is not None:, fmt=self.traj_file_fmt) structure = AseAtomsAdaptor.get_structure(atoms) self.task_document_kwargs = self.task_document_kwargs or {} return ForceFieldTaskDocument.from_ase_compatible_result( self.force_field_name, ForcefieldResult( final_structure=structure, trajectory=md_observer.to_pymatgen_trajectory(None), ), relax_cell=(self.ensemble == "npt"), steps=self.n_steps, relax_kwargs=None, optimizer_kwargs=None, **self.task_document_kwargs, )
@property def calculator(self) -> Calculator: """ASE calculator, can be overwritten by user.""" return ase_calculator(self.force_field_name, **self.calculator_kwargs)
[docs] @dataclass class NEPMDMaker(ForceFieldMDMaker): """Perform an MD run with NEP.""" name: str = f"{MLFF.NEP} MD" force_field_name: str = f"{MLFF.NEP}" calculator_kwargs: dict = field( default_factory=lambda: {"model_filename": "nep.txt"} )
[docs] @dataclass class MACEMDMaker(ForceFieldMDMaker): """Perform an MD run with MACE.""" name: str = f"{MLFF.MACE} MD" force_field_name: str = f"{MLFF.MACE}" calculator_kwargs: dict = field( default_factory=lambda: {"default_dtype": "float32"} )
[docs] @dataclass class M3GNetMDMaker(ForceFieldMDMaker): """Perform an MD run with M3GNet.""" name: str = f"{MLFF.M3GNet} MD" force_field_name: str = f"{MLFF.M3GNet}"
[docs] @dataclass class CHGNetMDMaker(ForceFieldMDMaker): """Perform an MD run with CHGNet.""" name: str = f"{MLFF.CHGNet} MD" force_field_name: str = f"{MLFF.CHGNet}"
[docs] @dataclass class GAPMDMaker(ForceFieldMDMaker): """Perform an MD run with GAP.""" name: str = f"{MLFF.GAP} MD" force_field_name: str = f"{MLFF.GAP}" calculator_kwargs: dict = field( default_factory=lambda: {"args_str": "IP GAP", "param_filename": "gap.xml"} )
[docs] @dataclass class NequipMDMaker(ForceFieldMDMaker): """Perform an MD run with nequip.""" name: str = f"{MLFF.Nequip} MD" force_field_name: str = f"{MLFF.Nequip}"