"""Makers to perform MD with forcefields."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
import numpy as np
from ase import units
from ase.md.andersen import Andersen
from ase.md.langevin import Langevin
from ase.md.md import MolecularDynamics
from ase.md.npt import NPT
from ase.md.nptberendsen import NPTBerendsen
from ase.md.nvtberendsen import NVTBerendsen
from ase.md.velocitydistribution import (
MaxwellBoltzmannDistribution,
Stationary,
ZeroRotation,
)
from ase.md.verlet import VelocityVerlet
from jobflow import Maker
from pymatgen.io.ase import AseAtomsAdaptor
from scipy.interpolate import interp1d
from scipy.linalg import schur
from atomate2.forcefields import MLFF
from atomate2.forcefields.jobs import forcefield_job
from atomate2.forcefields.schemas import ForcefieldResult, ForceFieldTaskDocument
from atomate2.forcefields.utils import (
TrajectoryObserver,
ase_calculator,
revert_default_dtype,
)
if TYPE_CHECKING:
from pathlib import Path
from typing import Literal
from ase.calculators.calculator import Calculator
from pymatgen.core.structure import Structure
_valid_dynamics: dict[str, tuple[str, ...]] = {
"nve": ("velocityverlet",),
"nvt": ("nose-hoover", "langevin", "andersen", "berendsen"),
"npt": ("nose-hoover", "berendsen"),
}
_preset_dynamics: dict = {
"nve_velocityverlet": VelocityVerlet,
"nvt_andersen": Andersen,
"nvt_berendsen": NVTBerendsen,
"nvt_langevin": Langevin,
"nvt_nose-hoover": NPT,
"npt_berendsen": NPTBerendsen,
"npt_nose-hoover": NPT,
}
[docs]
@dataclass
class ForceFieldMDMaker(Maker):
"""
Perform MD with a force field.
Note the the following units are consistent with the VASP MD implementation:
- `temperature` in Kelvin (TEBEG and TEEND)
- `time_step` in femtoseconds (POTIM)
- `pressure` in kB (PSTRESS)
The default dynamics is Langevin NVT consistent with VASP MD, with the friction
coefficient set to 10 ps^-1 (LANGEVIN_GAMMA).
For the rest of preset dynamics (`_valid_dynamics`) and custom dynamics inherited
from ASE (`MolecularDynamics`), the user can specify the dynamics as a string or an
ASE class into the `dynamics` attribute. In this case, please consult the ASE
documentation for the parameters and units to pass into the ASE MD function through
`ase_md_kwargs`.
Parameters
----------
name : str
The name of the MD Maker
force_field_name : str
The name of the forcefield (for provenance)
time_step : float | None = None.
The timestep of the MD run in fs.
If `None`, defaults to 0.5 fs if a structure contains an isotope of
hydrogen and 2 fs otherwise.
n_steps : int = 1000
The number of MD steps to run
ensemble : str = "nvt"
The ensemble to use. Valid ensembles are nve, nvt, or npt
temperature: float | Sequence | np.ndarray | None.
The temperature in Kelvin. If a sequence or 1D array, the temperature
schedule will be interpolated linearly between the given values. If a
float, the temperature will be constant throughout the run.
pressure: float | Sequence | None = None
The pressure in kilobar. If a sequence or 1D array, the pressure
schedule will be interpolated linearly between the given values. If a
float, the pressure will be constant throughout the run.
dynamics : str | ASE .MolecularDynamics = "langevin"
The dynamical thermostat to use. If dynamics is an ASE .MolecularDynamics
object, this uses the option specified explicitly by the user.
See _valid_dynamics for a list of pre-defined options when
specifying dynamics as a string.
ase_md_kwargs : dict | None = None
Options except for temperature and pressure to pass into the ASE MD function
calculator_kwargs : dict
kwargs to pass to the ASE calculator class
traj_file : str | Path | None = None
If a str or Path, the name of the file to save the MD trajectory to.
If None, the trajectory is not written to disk
traj_file_fmt : Literal["ase","pmg"]
The format of the trajectory file to write. If "ase", writes an
ASE trajectory, if "pmg", writes a Pymatgen trajectory.
traj_interval : int
The step interval for saving the trajectories.
mb_velocity_seed : int | None = None
If an int, a random number seed for generating initial velocities
from a Maxwell-Boltzmann distribution.
zero_linear_momentum : bool = False
Whether to initialize the atomic velocities with zero linear momentum
zero_angular_momentum : bool = False
Whether to initialize the atomic velocities with zero angular momentum
task_document_kwargs: dict
Options to pass to the TaskDoc. Default choice
{"store_trajectory": "partial", "ionic_step_data": ("energy",),}
is consistent with atomate2.vasp.md.MDMaker
"""
name: str = "Forcefield MD"
force_field_name: str = f"{MLFF.Forcefield}"
time_step: float | None = None
n_steps: int = 1000
ensemble: Literal["nve", "nvt", "npt"] = "nvt"
dynamics: str | MolecularDynamics = "langevin"
temperature: float | Sequence | np.ndarray | None = 300.0
pressure: float | Sequence | np.ndarray | None = None
ase_md_kwargs: dict | None = None
calculator_kwargs: dict = field(default_factory=dict)
traj_file: str | Path | None = None
traj_file_fmt: Literal["pmg", "ase"] = "ase"
traj_interval: int = 1
mb_velocity_seed: int | None = None
zero_linear_momentum: bool = False
zero_angular_momentum: bool = False
task_document_kwargs: dict = field(
default_factory=lambda: {
"store_trajectory": "partial",
"ionic_step_data": ("energy",), # energy is required in ionic steps
}
)
@staticmethod
def _interpolate_quantity(values: Sequence | np.ndarray, n_pts: int) -> np.ndarray:
"""Interpolate temperature / pressure on a schedule."""
n_vals = len(values)
return np.interp(
np.linspace(0, n_vals - 1, n_pts + 1),
np.linspace(0, n_vals - 1, n_vals),
values,
)
def _get_ensemble_schedule(self) -> None:
if self.ensemble == "nve":
# Disable thermostat and barostat
self.temperature = np.nan
self.pressure = np.nan
self.t_schedule = np.full(self.n_steps + 1, self.temperature)
self.p_schedule = np.full(self.n_steps + 1, self.pressure)
return
if isinstance(self.temperature, Sequence) or (
isinstance(self.temperature, np.ndarray) and self.temperature.ndim == 1
):
self.t_schedule = self._interpolate_quantity(self.temperature, self.n_steps)
# NOTE: In ASE Langevin dynamics, the temperature are normally
# scalars, but in principle one quantity per atom could be specified by giving
# an array. This is not implemented yet here.
else:
self.t_schedule = np.full(self.n_steps + 1, self.temperature)
if self.ensemble == "nvt":
self.pressure = np.nan
self.p_schedule = np.full(self.n_steps + 1, self.pressure)
return
if isinstance(self.pressure, Sequence) or (
isinstance(self.pressure, np.ndarray) and self.pressure.ndim == 1
):
self.p_schedule = self._interpolate_quantity(self.pressure, self.n_steps)
elif isinstance(self.pressure, np.ndarray) and self.pressure.ndim == 4:
self.p_schedule = interp1d(
np.arange(self.n_steps + 1), self.pressure, kind="linear"
)
else:
self.p_schedule = np.full(self.n_steps + 1, self.pressure)
def _get_ensemble_defaults(self) -> None:
"""Update ASE MD kwargs with defaults consistent with VASP MD."""
self.ase_md_kwargs = self.ase_md_kwargs or {}
if self.ensemble == "nve":
self.ase_md_kwargs.pop("temperature", None)
self.ase_md_kwargs.pop("temperature_K", None)
self.ase_md_kwargs.pop("externalstress", None)
elif self.ensemble == "nvt":
self.ase_md_kwargs["temperature_K"] = self.t_schedule[0]
self.ase_md_kwargs.pop("externalstress", None)
elif self.ensemble == "npt":
self.ase_md_kwargs["temperature_K"] = self.t_schedule[0]
self.ase_md_kwargs["externalstress"] = self.p_schedule[0] * 1e3 * units.bar
if isinstance(self.dynamics, str) and self.dynamics.lower() == "langevin":
self.ase_md_kwargs["friction"] = self.ase_md_kwargs.get(
"friction",
10.0 * 1e-3 / units.fs, # Same default as in VASP: 10 ps^-1
)
[docs]
@forcefield_job
def make(
self,
structure: Structure,
prev_dir: str | Path | None = None,
) -> ForceFieldTaskDocument:
"""
Perform MD on a structure using a force field.
Parameters
----------
structure: .Structure
pymatgen structure.
prev_dir : str or Path or None
A previous calculation directory to copy output files from. Unused, just
added to match the method signature of other makers.
"""
self._get_ensemble_schedule()
self._get_ensemble_defaults()
if self.time_step is None:
# If a structure contains an isotope of hydrogen, set default `time_step`
# to 0.5 fs, and 2 fs otherwise.
has_h_isotope = any(element.Z == 1 for element in structure.composition)
self.time_step = 0.5 if has_h_isotope else 2.0
initial_velocities = structure.site_properties.get("velocities")
if isinstance(self.dynamics, str):
# Use known dynamics if `self.dynamics` is a str
self.dynamics = self.dynamics.lower()
if self.dynamics not in _valid_dynamics[self.ensemble]:
raise ValueError(
f"{self.dynamics} thermostat not available for {self.ensemble}."
f"Available {self.ensemble} thermostats are:"
" ".join(_valid_dynamics[self.ensemble])
)
if self.ensemble == "nve" and self.dynamics is None:
self.dynamics = "velocityverlet"
md_func = _preset_dynamics[f"{self.ensemble}_{self.dynamics}"]
elif issubclass(self.dynamics, MolecularDynamics):
# Allow user to explicitly run ASE Dynamics class
md_func = self.dynamics
atoms = structure.to_ase_atoms()
if md_func is NPT:
# Note that until md_func is instantiated, isinstance(md_func,NPT) is False
# ASE NPT implementation requires upper triangular cell
schur_decomp, _ = schur(atoms.get_cell(complete=True), output="complex")
atoms.set_cell(schur_decomp.real, scale_atoms=True)
if initial_velocities:
atoms.set_velocities(initial_velocities)
elif not np.isnan(self.t_schedule).any():
MaxwellBoltzmannDistribution(
atoms=atoms,
temperature_K=self.t_schedule[0],
rng=np.random.default_rng(seed=self.mb_velocity_seed),
)
if self.zero_linear_momentum:
Stationary(atoms)
if self.zero_angular_momentum:
ZeroRotation(atoms)
with revert_default_dtype():
atoms.calc = self.calculator
md_observer = TrajectoryObserver(atoms, store_md_outputs=True)
md_runner = md_func(
atoms=atoms, timestep=self.time_step * units.fs, **self.ase_md_kwargs
)
md_runner.attach(md_observer, interval=self.traj_interval)
def _callback(dyn: MolecularDynamics = md_runner) -> None:
if self.ensemble == "nve":
return
dyn.set_temperature(temperature_K=self.t_schedule[dyn.nsteps])
if self.ensemble == "nvt":
return
dyn.set_stress(self.p_schedule[dyn.nsteps] * 1e3 * units.bar)
md_runner.attach(_callback, interval=1)
md_runner.run(steps=self.n_steps)
if self.traj_file is not None:
md_observer.save(filename=self.traj_file, fmt=self.traj_file_fmt)
structure = AseAtomsAdaptor.get_structure(atoms)
self.task_document_kwargs = self.task_document_kwargs or {}
return ForceFieldTaskDocument.from_ase_compatible_result(
self.force_field_name,
ForcefieldResult(
final_structure=structure,
trajectory=md_observer.to_pymatgen_trajectory(None),
),
relax_cell=(self.ensemble == "npt"),
steps=self.n_steps,
relax_kwargs=None,
optimizer_kwargs=None,
**self.task_document_kwargs,
)
@property
def calculator(self) -> Calculator:
"""ASE calculator, can be overwritten by user."""
return ase_calculator(self.force_field_name, **self.calculator_kwargs)
[docs]
@dataclass
class NEPMDMaker(ForceFieldMDMaker):
"""Perform an MD run with NEP."""
name: str = f"{MLFF.NEP} MD"
force_field_name: str = f"{MLFF.NEP}"
calculator_kwargs: dict = field(
default_factory=lambda: {"model_filename": "nep.txt"}
)
[docs]
@dataclass
class MACEMDMaker(ForceFieldMDMaker):
"""Perform an MD run with MACE."""
name: str = f"{MLFF.MACE} MD"
force_field_name: str = f"{MLFF.MACE}"
calculator_kwargs: dict = field(
default_factory=lambda: {"default_dtype": "float32"}
)
[docs]
@dataclass
class M3GNetMDMaker(ForceFieldMDMaker):
"""Perform an MD run with M3GNet."""
name: str = f"{MLFF.M3GNet} MD"
force_field_name: str = f"{MLFF.M3GNet}"
[docs]
@dataclass
class CHGNetMDMaker(ForceFieldMDMaker):
"""Perform an MD run with CHGNet."""
name: str = f"{MLFF.CHGNet} MD"
force_field_name: str = f"{MLFF.CHGNet}"
[docs]
@dataclass
class GAPMDMaker(ForceFieldMDMaker):
"""Perform an MD run with GAP."""
name: str = f"{MLFF.GAP} MD"
force_field_name: str = f"{MLFF.GAP}"
calculator_kwargs: dict = field(
default_factory=lambda: {"args_str": "IP GAP", "param_filename": "gap.xml"}
)
[docs]
@dataclass
class NequipMDMaker(ForceFieldMDMaker):
"""Perform an MD run with nequip."""
name: str = f"{MLFF.Nequip} MD"
force_field_name: str = f"{MLFF.Nequip}"