Source code for atomate2.cp2k.sets.base
"""Module defining base CP2K input set and generator."""
from __future__ import annotations
import os
from copy import deepcopy
from dataclasses import dataclass, field
from importlib.resources import files as get_mod_path
from pathlib import Path
from typing import Any
import numpy as np
from monty.io import zopen
from monty.serialization import loadfn
from pymatgen.core.structure import Molecule, Structure
from pymatgen.io.core import InputGenerator, InputSet
from pymatgen.io.cp2k.inputs import (
BasisFile,
Cp2kInput,
GaussianTypeOrbitalBasisSet,
GthPotential,
PotentialFile,
)
from pymatgen.io.cp2k.outputs import Cp2kOutput
from pymatgen.io.cp2k.sets import DftSet
from pymatgen.io.vasp import Kpoints # TODO Currently uses the VASP implementation
from pymatgen.symmetry.analyzer import SpacegroupAnalyzer
from pymatgen.symmetry.bandstructure import HighSymmKpath
from atomate2 import SETTINGS
_BASE_CP2K_SET = loadfn(get_mod_path("atomate2.cp2k.sets") / "BaseCp2kSet.yaml")
_BASE_GAPW_SET = loadfn(get_mod_path("atomate2.cp2k.sets") / "BaseAllSet.yaml")
[docs]
class Cp2kInputSet(InputSet):
"""A class to represent a set of CP2K inputs."""
def __init__(
self,
cp2k_input: Cp2kInput,
optional_files: dict | None = None,
) -> None:
"""Initialize the set.
Parameters
----------
cp2k_input
Cp2kInput object for representing the main cp2k input file
optional_files
Any additional files needed for running the calculations. Most common
use is to make data files available which are not guaranteed to be available
at runtime.
Format pseudocode:
{
"name of this optional data": {
"filename": filename to write to,
"object": object with a __str__ method to write the file
}
}
Some examples uses:
(1) Cp2kInputGenerator below will try to put the basis and potential
info into their own optional files. This allows them to run when the
cp2k executable cannot find this info due to version mismatch,
custom data, etc.
(2) Include files. CP2K preprocessor link input sections like the
structure definition to an external file in order to keep the main
input file neat. This use case requires "@include" parameters (see
pymatgen.io.cp2k or the cp2k manual)
(3) Other custom data files like vdw kernel tables, truncated coulomb.
"""
self.cp2k_input = cp2k_input
self.optional_files = optional_files or {}
[docs]
def write_input(
self,
directory: str | Path,
make_dir: bool = True,
overwrite: bool = True,
) -> None:
"""Write Cp2k input file to a directory.
Parameters
----------
directory
Directory to write input files to.
make_dir
Whether to create the directory if it does not already exist.
overwrite
Whether to overwrite an input file if it already exists.
"""
directory = Path(directory)
if make_dir:
os.makedirs(directory, exist_ok=True)
inputs = {
"input": {"filename": "cp2k.inp", "object": self.cp2k_input},
}
inputs.update(self.optional_files)
for key, val in inputs.items():
filename = val.get("filename")
obj = val.get("object")
if val is not None and (overwrite or not (directory / key).exists()):
with zopen(directory / filename, "wt") as file:
file.write(str(obj))
elif not overwrite and (directory / filename).exists():
raise FileExistsError(f"{directory / filename} already exists.")
[docs]
@staticmethod
def from_directory(
directory: str | Path, optional_files: dict = None
) -> Cp2kInputSet:
"""Load a set of CP2K inputs from a directory.
Parameters
----------
directory
Directory to read CP2K inputs from.
optional_files
Optional files to read in as well as a dict of {filename: Object class}.
Object class must have a static/class method from_file.
"""
directory = Path(directory)
if (directory / "cp2k.inp").exists():
cp2k_input = Cp2kInput.from_file(directory / "cp2k.inp")
else:
raise FileNotFoundError
optional_files = optional_files or {}
optional = {}
for filename, obj in optional_files.items():
optional[filename] = {
"filename": filename,
"object": obj.from_file(filename),
}
return Cp2kInputSet(cp2k_input=cp2k_input, optional_files=optional)
# TODO Validation
@property
def is_valid(self) -> bool:
"""Whether the input set is valid."""
return True
[docs]
@dataclass
class Cp2kInputGenerator(InputGenerator):
"""
A class to generate Cp2k input sets.
Parameters
----------
user_input_settings:
Updates to the inputs in the base config dict.
user_kpoints_settings:
Updates to the kpoint settings in the base config dict
sort_structure
Whether to sort the structure (using the default sort order of
electronegativity) before generating input files. Defaults to True, the behavior
you would want most of the time. This ensures that similar atomic species are
grouped together.
symprec
Tolerance for symmetry finding, used for line mode band structure k-points.
config_dict
The config dictionary to use containing the base input set settings.
"""
user_input_settings: dict = field(default_factory=dict)
user_kpoints_settings: dict | Kpoints = field(default_factory=dict)
auto_kspacing: bool = True
use_structure_charge: bool = False
sort_structure: bool = True
symprec: float = SETTINGS.SYMPREC
force_gamma: bool = False
config_dict: dict = field(default_factory=lambda: _BASE_CP2K_SET)
[docs]
def get_input_set(
self,
structure: Structure | Molecule = None,
prev_dir: str | Path = None,
optional_files: dict | None = None,
) -> Cp2kInputSet:
"""Get a CP2K input set.
Note, if both ``structure`` and ``prev_dir`` are set, then the structure
specified will be preferred over the final structure from the last CP2K run.
Parameters
----------
structure
A structure.
prev_dir
A previous directory to generate the input set from.
optional_files
Additional files (e.g. vdw kernel file) to be included in the input set.
Returns
-------
Cp2kInput
A Cp2k input set.
"""
structure, prev_input, _cp2k_output = self._get_previous(structure, prev_dir)
input_updates = self.get_input_updates(
structure,
prev_input=prev_input,
)
if isinstance(structure, Structure):
kpoints_updates = self.get_kpoints_updates(
structure,
prev_input=prev_input,
)
kpoints = self._get_kpoints(structure, kpoints_updates)
else:
kpoints = None
cp2k_input = self._get_input(
structure,
kpoints,
prev_input,
input_updates,
)
optional_files = optional_files or {}
optional_files["basis"] = {
"filename": "BASIS",
"object": self._get_basis_file(cp2k_input=cp2k_input),
}
optional_files["potential"] = {
"filename": "POTENTIAL",
"object": self._get_potential_file(cp2k_input=cp2k_input),
}
return Cp2kInputSet(cp2k_input=cp2k_input, optional_files=optional_files)
[docs]
def get_input_updates(self, structure: Structure, prev_input: Cp2kInput) -> dict:
"""Get updates to the cp2k input for this calculation type.
Parameters
----------
structure
A structure.
prev_input
A Cp2kInput from a previous calculation.
bandgap
The band gap.
cp2k_output
A Cp2kOutput from a previous calculation.
Returns
-------
dict
A dictionary of updates to apply.
"""
raise NotImplementedError
[docs]
def get_kpoints_updates(
self, structure: Structure, prev_input: Cp2kInput = None
) -> dict:
"""Get updates to the kpoints configuration for this calculation type.
Note, these updates will be ignored if the user has set user_kpoint_settings.
Parameters
----------
structure
A structure.
prev_input
A Cp2kInput from a previous calculation.
bandgap
The band gap.
cp2k_output
A Cp2kOutput from a previous calculation.
Returns
-------
dict
A dictionary of updates to apply to the KPOINTS config.
"""
return {}
def _get_previous(
self, structure: Structure = None, prev_dir: str | Path = None
) -> tuple[Structure, Cp2kInput, Cp2kOutput]:
"""Load previous calculation outputs and decide which structure to use."""
if structure is None and prev_dir is None:
raise ValueError("Either structure or prev_dir must be set")
prev_input = {}
prev_structure = None
cp2k_output = None
if prev_dir:
cp2k_output = Cp2kOutput(Path(prev_dir) / "cp2k.out")
prev_input = cp2k_output.input
prev_structure = cp2k_output.final_structure
structure = structure if structure is not None else prev_structure
structure = self._get_structure(structure)
return structure, prev_input, cp2k_output
def _get_structure(self, structure: Structure) -> Structure:
"""Get the standardized structure."""
if self.sort_structure and hasattr(structure, "get_sorted_structure"):
structure = structure.get_sorted_structure()
return structure
def _get_input(
self,
structure: Structure | Molecule,
kpoints: Kpoints | None = None,
previous_input: Cp2kInput = None,
input_updates: dict = None,
) -> Cp2kInput:
"""Get the input."""
previous_input = previous_input or {}
input_updates = input_updates or {}
input_settings = dict(self.config_dict["cp2k_input"])
# Generate base input but override with user input settings
input_settings = recursive_update(input_settings, input_updates)
input_settings = recursive_update(input_settings, self.user_input_settings)
overrides = (
input_settings.pop("override_default_params")
if "override_default_params" in input_settings
else {}
)
cp2k_input = DftSet(structure=structure, kpoints=kpoints, **input_settings)
for setting in input_settings:
if (
hasattr(cp2k_input, setting)
and input_settings[setting]
and callable(getattr(cp2k_input, setting))
):
sub_settings = input_settings.get(setting)
getattr(cp2k_input, setting)(
**sub_settings if isinstance(sub_settings, dict) else {}
)
cp2k_input.update(overrides)
return cp2k_input
def _get_basis_file(self, cp2k_input: Cp2kInput) -> BasisFile:
"""Get input object's basis sets and convert them to a basis file object.
Allows calculation to execute if the basis sets are not available on the
execution resource.
"""
basis_sets = []
for el in cp2k_input.structure.symbol_set:
for data in cp2k_input.basis_and_potential[el].values():
if isinstance(data, GaussianTypeOrbitalBasisSet):
basis_sets.append(data) # noqa: PERF401
if not basis_sets:
return None
cp2k_input.safeset({"force_eval": {"dft": {"BASIS_SET_FILE_NAME": "BASIS"}}})
return BasisFile(objects=basis_sets)
def _get_potential_file(self, cp2k_input: Cp2kInput) -> PotentialFile:
"""Get the potentials and convert them to a potential file object.
Allows calculation to execute if the potentials are not available on the
execution resource.
"""
potentials = []
for el in cp2k_input.structure.symbol_set:
for data in cp2k_input.basis_and_potential[el].values():
if isinstance(data, GthPotential):
potentials.append(data) # noqa: PERF401
if not potentials:
return None
cp2k_input.safeset(
{"force_eval": {"dft": {"POTENTIAL_FILE_NAME": "POTENTIAL"}}}
)
return PotentialFile(objects=potentials)
def _get_kpoints(
self,
structure: Structure,
kpoints_updates: dict[str, Any] | None,
) -> Kpoints | None:
"""Get the kpoints object."""
kpoints_updates = kpoints_updates or {}
# use user setting if set otherwise default to base config settings
if self.user_kpoints_settings != {}:
kpt_config = deepcopy(self.user_kpoints_settings)
else:
# apply updates to k-points config
kpt_config = deepcopy(self.config_dict.get("KPOINTS", {}))
kpt_config.update(kpoints_updates)
if isinstance(kpt_config, Kpoints):
return kpt_config
explicit = (
kpt_config.get("explicit")
or len(kpt_config.get("added_kpoints", [])) > 0
or "zero_weighted_reciprocal_density" in kpt_config
or "zero_weighted_line_density" in kpt_config
)
# handle length generation first as this doesn't support any additional options
if kpt_config.get("length"):
if explicit:
raise ValueError(
"length option cannot be used with explicit k-point generation, "
"added_kpoints, or zero weighted k-points."
)
# If length is in kpoints settings use Kpoints.automatic
return Kpoints.automatic(kpt_config["length"])
base_kpoints = None
if kpt_config.get("line_density"):
# handle line density generation
kpath = HighSymmKpath(structure, **kpt_config.get("kpath_kwargs", {}))
frac_k_points, k_points_labels = kpath.get_kpoints(
line_density=kpt_config["line_density"], coords_are_cartesian=False
)
base_kpoints = Kpoints(
comment="Non SCF run along symmetry lines",
style=Kpoints.supported_modes.Line_mode,
num_kpts=len(frac_k_points),
kpts=frac_k_points,
labels=k_points_labels,
kpts_weights=[1] * len(frac_k_points),
)
elif kpt_config.get("grid_density") or kpt_config.get("reciprocal_density"):
# handle regular weighted k-point grid generation
if kpt_config.get("grid_density"):
base_kpoints = Kpoints.automatic_density(
structure, int(kpt_config["grid_density"]), self.force_gamma
)
if kpt_config.get("reciprocal_density"):
base_kpoints = Kpoints.automatic_density_by_vol(
structure, kpt_config["reciprocal_density"], self.force_gamma
)
if explicit:
sga = SpacegroupAnalyzer(structure, symprec=self.symprec)
mesh = sga.get_ir_reciprocal_mesh(base_kpoints.kpts[0])
base_kpoints = Kpoints(
comment="Uniform grid",
style=Kpoints.supported_modes.Reciprocal,
num_kpts=len(mesh),
kpts=[i[0] for i in mesh],
kpts_weights=[i[1] for i in mesh],
)
else:
# if not explicit that means no other options have been specified
# so we can return the k-points as is
return base_kpoints
zero_weighted_kpoints = None
if kpt_config.get("zero_weighted_line_density"):
# zero_weighted k-points along line mode path
kpath = HighSymmKpath(structure)
frac_k_points, k_points_labels = kpath.get_kpoints(
line_density=kpt_config["zero_weighted_line_density"],
coords_are_cartesian=False,
)
zero_weighted_kpoints = Kpoints(
comment="Hybrid run along symmetry lines",
style=Kpoints.supported_modes.Reciprocal,
num_kpts=len(frac_k_points),
kpts=frac_k_points,
labels=k_points_labels,
kpts_weights=[0] * len(frac_k_points),
)
elif kpt_config.get("zero_weighted_reciprocal_density"):
zero_weighted_kpoints = Kpoints.automatic_density_by_vol(
structure,
kpt_config["zero_weighted_reciprocal_density"],
self.force_gamma,
)
sga = SpacegroupAnalyzer(structure, symprec=self.symprec)
mesh = sga.get_ir_reciprocal_mesh(zero_weighted_kpoints.kpts[0])
zero_weighted_kpoints = Kpoints(
comment="Uniform grid",
style=Kpoints.supported_modes.Reciprocal,
num_kpts=len(mesh),
kpts=[i[0] for i in mesh],
kpts_weights=[0 for i in mesh],
)
added_kpoints = None
if kpt_config.get("added_kpoints"):
added_kpoints = Kpoints(
comment="Specified k-points only",
style=Kpoints.supported_modes.Reciprocal,
num_kpts=len(kpt_config.get("added_kpoints")),
kpts=kpt_config.get("added_kpoints"),
labels=["user-defined"] * len(kpt_config.get("added_kpoints")),
kpts_weights=[0] * len(kpt_config.get("added_kpoints")),
)
if base_kpoints and not (added_kpoints or zero_weighted_kpoints):
return base_kpoints
if added_kpoints and not (base_kpoints or zero_weighted_kpoints):
return added_kpoints
# do some sanity checking
if "line_density" in kpt_config and zero_weighted_kpoints:
raise ValueError(
"Cannot combined line_density and zero weighted k-points options"
)
if zero_weighted_kpoints and not base_kpoints:
raise ValueError(
"Zero weighted k-points must be used with reciprocal_density or "
"grid_density options"
)
if not (base_kpoints or zero_weighted_kpoints or added_kpoints):
return None
return _combine_kpoints(base_kpoints, zero_weighted_kpoints, added_kpoints)
# TODO From `atomate2.vasp.sets.base`. Should possibly go in common.
# only reservation is if, eventually, CP2K gets it own kpoint object version
# instead of using the vasp kpoint objects.
def _combine_kpoints(*kpoints_objects: Kpoints) -> Kpoints:
"""Combine k-points files together."""
labels = []
kpoints = []
weights = []
for kpoints_object in filter(None, kpoints_objects):
if kpoints_object.style != Kpoints.supported_modes.Reciprocal:
raise ValueError(
"Can only combine kpoints with style=Kpoints.supported_modes.Reciprocal"
)
if kpoints_object.labels is None:
labels.append([""] * len(kpoints_object.kpts))
else:
labels.append(kpoints_object.labels)
weights.append(kpoints_object.kpts_weights)
kpoints.append(kpoints_object.kpts)
labels = np.concatenate(labels).tolist()
weights = np.concatenate(weights).tolist()
kpoints = np.concatenate(kpoints)
return Kpoints(
comment="Combined k-points",
style=Kpoints.supported_modes.Reciprocal,
num_kpts=len(kpoints),
kpts=kpoints,
labels=labels,
kpts_weights=weights,
)
[docs]
@dataclass
class Cp2kAllElectronInputGenerator(Cp2kInputGenerator):
"""
A class to generate Cp2k input sets for all electron calculations.
Parameters
----------
user_input_settings:
Updates to the inputs in the base config dict.
sort_structure
Whether to sort the structure (using the default sort order of
electronegativity) before generating input files. Defaults to True, the behavior
you would want most of the time. This ensures that similar atomic species are
grouped together.
symprec
Tolerance for symmetry finding, used for line mode band structure k-points.
config_dict
The config dictionary to use containing the base input set settings.
"""
user_input_settings: dict = field(default_factory=dict)
sort_structure: bool = True
symprec: float = SETTINGS.SYMPREC
config_dict: dict = field(default_factory=lambda: _BASE_GAPW_SET)
def _get_kpoints(
self, structure: Structure, kpoints_updates: dict[str, Any] | None
) -> Kpoints | None:
"""No Kpoints possible."""
return None
[docs]
def recursive_update(d: dict, u: dict) -> dict:
"""
Update a dictionary recursively and return it.
Parameters
----------
d: Dict
Input dictionary to modify
u: Dict
Dictionary of updates to apply
Returns
-------
Dict
The updated dictionary.
Example
----------
d = {'activate_hybrid': {"hybrid_functional": "HSE06"}}
u = {'activate_hybrid': {"cutoff_radius": 8}}
yields {'activate_hybrid': {"hybrid_functional": "HSE06", "cutoff_radius": 8}}}
"""
for k, v in u.items():
if isinstance(v, dict):
d[k] = recursive_update(d.get(k, {}), v)
else:
d[k] = v
return d