"""Core definitions of a CP2K calculation documents."""
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from shutil import which
from typing import Any, Union
try:
from emmet.core.types.enums import ValueEnum
except ImportError:
from emmet.core.utils import ValueEnum
from pydantic import BaseModel, Field, field_validator
from pymatgen.command_line.bader_caller import BaderAnalysis
from pymatgen.core.structure import Molecule, Structure
from pymatgen.core.trajectory import Trajectory
from pymatgen.core.units import Ha_to_eV
from pymatgen.electronic_structure.bandstructure import BandStructure
from pymatgen.electronic_structure.dos import Dos
from pymatgen.io.common import VolumetricData
from pymatgen.io.cp2k.inputs import BasisFile, Cp2kInput, DataFile, PotentialFile
from pymatgen.io.cp2k.outputs import Cp2kOutput, parse_energy_file
from typing_extensions import Self
from atomate2 import SETTINGS
from atomate2.cp2k.schemas.calc_types import (
CalcType,
RunType,
TaskType,
calc_type,
run_type,
task_type,
)
logger = logging.getLogger(__name__)
# Can be expanded if support for other volumetric files is added
__is_stored_in_Ha__ = ["v_hartree"]
_BADER_EXE_EXISTS = bool(which("bader") or which("bader.exe"))
[docs]
class Status(ValueEnum):
"""CP2K calculation state."""
SUCCESS = "successful"
FAILED = "failed"
[docs]
class Cp2kObject(ValueEnum):
"""Types of CP2K data objects."""
DOS = "dos"
BANDSTRUCTURE = "band_structure"
BASIS = "basis" # Basis file
POTENTIAL = "potential" # potential file
ELECTRON_DENSITY = "electron_density" # e_density
SPIN_DENSITY = "spin_density" # spin density
v_hartree = "v_hartree" # elec. potential
TRAJECTORY = "trajectory" # Trajectory
WFN = "wfn" # Wavefunction file
[docs]
class RunStatistics(BaseModel):
"""Summary of the run statistics for a CP2K calculation."""
total_time: float = Field(0, description="The total CPU time for this calculation")
[docs]
@classmethod
def from_cp2k_output(cls, output: Cp2kOutput) -> Self:
"""Create a run statistics document from an CP2K Output object.
Parameters
----------
output:
Cp2kOutput object
Returns
-------
RunStatistics
The run statistics.
"""
# rename these statistics
run_stats = {}
output.parse_timing()
run_stats["total_time"] = output.timing["CP2K"]["total_time"]["maximum"]
return cls(**run_stats)
[docs]
class CalculationOutput(BaseModel):
"""Document defining CP2K calculation outputs."""
energy: float = Field(
None, description="The final total DFT energy for the calculation"
)
energy_per_atom: float = Field(
None, description="The final DFT energy per atom for the calculation"
)
structure: Union[Structure, Molecule] = Field(
None, description="The final structure/molecule from the calculation"
)
efermi: float | None = Field(
None, description="The Fermi level from the calculation in eV"
)
is_metal: bool = Field(None, description="Whether the system is metallic")
bandgap: float | None = Field(
None, description="The band gap from the calculation in eV"
)
v_hartree: Union[dict[int, list[float]], None] = Field(
None, description="Plane averaged electrostatic potential"
)
cbm: float | None = Field(
None,
description="The conduction band minimum in eV (if system is not metallic)",
)
vbm: float | None = Field(
None, description="The valence band maximum in eV (if system is not metallic)"
)
ionic_steps: list[dict[str, Any]] = Field(
None, description="Energy, forces, and structure for each ionic step"
)
locpot: Union[dict[int, list[float]], None] = Field(
None, description="Average of the local potential along the crystal axes"
)
run_stats: RunStatistics = Field(
None, description="Summary of runtime statistics for this calculation"
)
scf: list | None = Field(None, description="SCF optimization steps")
[docs]
@classmethod
def from_cp2k_output(
cls,
output: Cp2kOutput, # Must use auto_load kwarg when passed
v_hartree: VolumetricData | None = None,
store_trajectory: bool = False,
store_scf: bool = False,
) -> Self:
"""Create a CP2K output document from CP2K outputs.
Parameters
----------
output
A Cp2kOutput object.
v_hartree
A VolumetricData object for the V_HARTREE data
Returns
-------
The CP2K calculation output document.
"""
v_hart_avg = None
if v_hartree:
v_hart_avg = {
i: v_hartree.get_average_along_axis(i).tolist() for i in range(3)
}
structure = output.final_structure
if output.band_structure:
bandgap_info = output.band_structure.get_band_gap()
electronic_output = {
"efermi": output.band_structure.efermi,
"vbm": output.band_structure.get_vbm()["energy"],
"cbm": output.band_structure.get_cbm()["energy"],
"bandgap": bandgap_info["energy"],
"is_gap_direct": bandgap_info["direct"],
"is_metal": output.band_structure.is_metal(),
"direct_gap": output.band_structure.get_direct_band_gap(),
"transition": bandgap_info["transition"],
}
else:
electronic_output = {
"efermi": output.efermi,
"vbm": output.vbm,
"cbm": output.cbm,
"bandgap": output.band_gap,
"is_metal": output.is_metal,
}
if store_scf:
output.parse_scf_opt()
scf = output.data.get("electronic_steps")
scf = [item for sublist in scf for item in sublist]
else:
scf = None
return cls(
structure=structure,
energy=output.final_energy,
energy_per_atom=output.final_energy / len(structure),
**electronic_output,
ionic_steps=None if store_trajectory else output.ionic_steps,
v_hartree=v_hart_avg,
run_stats=RunStatistics.from_cp2k_output(output),
scf=scf,
)
[docs]
class Calculation(BaseModel):
"""Full CP2K calculation inputs and outputs."""
dir_name: str = Field(None, description="The directory for this CP2K calculation")
cp2k_version: str = Field(
None, description="CP2K version used to perform the calculation"
)
has_cp2k_completed: Status = Field(
None, description="Whether CP2K completed the calculation successfully"
)
input: CalculationInput = Field(
None, description="CP2K input settings for the calculation"
)
output: CalculationOutput = Field(None, description="The CP2K calculation output")
completed_at: str = Field(
None, description="Timestamp for when the calculation was completed"
)
task_name: str = Field(
None, description="Name of task given by custodian (e.g., relax1, relax2)"
)
output_file_paths: dict[str, str] = Field(
None,
description="Paths (relative to dir_name) of the CP2K output files "
"associated with this calculation",
)
bader: dict | None = Field(None, description="Output from the bader software")
run_type: RunType = Field(
None, description="Calculation run type (e.g., HF, HSE06, PBE)"
)
task_type: TaskType = Field(
None, description="Calculation task type (e.g., Structure Optimization)."
)
calc_type: CalcType = Field(
None, description="Return calculation type (run type + task_type)."
)
[docs]
@classmethod
def from_cp2k_files(
cls,
dir_name: Union[Path, str],
task_name: str,
cp2k_output_file: Union[Path, str] = "cp2k.out",
volumetric_files: list[str] = None,
parse_dos: Union[str, bool] = False,
parse_bandstructure: Union[str, bool] = False,
average_v_hartree: bool = True,
run_bader: bool = (SETTINGS.CP2K_RUN_BADER and _BADER_EXE_EXISTS),
strip_bandstructure_projections: bool = False,
strip_dos_projections: bool = False,
store_trajectory: bool = False,
store_scf: bool = False,
store_volumetric_data: tuple[str] | None = SETTINGS.CP2K_STORE_VOLUMETRIC_DATA,
) -> tuple[Self, dict[Cp2kObject, dict]]:
"""Create a CP2K calculation document from a directory and file paths.
Parameters
----------
dir_name
The directory containing the calculation outputs.
task_name
The task name.
cp2k_output_file
Path to the main output of cp2k job, relative to dir_name.
volumetric_files
Path to volumetric (Cube) files, relative to dir_name.
parse_dos
Whether to parse the DOS. Can be:
- "auto": Only parse DOS if there are no ionic steps.
- True: Always parse DOS.
- False: Never parse DOS.
parse_bandstructure
How to parse the bandstructure. Can be:
- "auto": Parse the bandstructure with projections for NSCF calculations
and decide automatically if it's line or uniform mode.
- "line": Parse the bandstructure as a line mode calculation with
projections
- True: Parse the bandstructure as a uniform calculation with
projections .
- False: Parse the band structure without projects and just store
vbm, cbm, band_gap, is_metal and efermi rather than the full
band structure object.
average_v_hartree : bool = True
Whether to store the average of the V_HARTREE along the crystal axes.
run_bader : bool = False
Whether to run bader on the charge density.
strip_dos_projections : bool
Whether to strip the element and site projections from the density of
states. This can help reduce the size of DOS objects in systems with
many atoms.
strip_bandstructure_projections : bool
Whether to strip the element and site projections from the band structure.
This can help reduce the size of DOS objects in systems with many atoms.
store_trajectory : bool = False
Whether to store the ionic steps as a pmg trajectory object, which can be
pushed, to a bson data store, instead of as a list od dicts. Useful for
large trajectories.
store_scf : bool = False
Whether to store the SCF convergence data.
store_volumetric_data : tuple[str] | None = SETTINGS.CP2K_STORE_VOLUMETRIC_DATA
Which volumetric files to store.
Returns
-------
Calculation
A CP2K calculation document.
"""
dir_name = Path(dir_name)
cp2k_output_file = dir_name / cp2k_output_file
volumetric_files = [] if volumetric_files is None else volumetric_files
cp2k_output = Cp2kOutput(cp2k_output_file, auto_load=True)
completed_at = str(
datetime.fromtimestamp(os.stat(cp2k_output_file).st_mtime, tz=timezone.utc)
)
output_file_paths = _get_output_file_paths(volumetric_files)
cp2k_objects: dict[Cp2kObject, Any] = _get_volumetric_data(
dir_name, output_file_paths, store_volumetric_data
)
cp2k_objects.update(_get_basis_and_potential_files(dir_name))
dos = _parse_dos(parse_dos, cp2k_output)
if dos is not None:
if strip_dos_projections:
dos = Dos(dos.efermi, dos.energies, dos.densities)
cp2k_objects[Cp2kObject.DOS] = dos # type: ignore[index]
bandstructure = _parse_bandstructure(parse_bandstructure, cp2k_output)
if bandstructure is not None:
if strip_bandstructure_projections:
bandstructure.projections = {}
cp2k_objects[
Cp2kObject.BANDSTRUCTURE # type: ignore[index]
] = bandstructure
bader = None
if run_bader and Cp2kObject.ELECTRON_DENSITY in output_file_paths:
ba = BaderAnalysis(cube_filename=Cp2kObject.ELECTRON_DENSITY)
# TODO vasp version calls bader_analysis_from_path but cp2k
# cube files don't support that yet, do it manually
bader = {
"min_dist": [dct["min_dist"] for dct in ba.data],
"charge": [dct["charge"] for dct in ba.data],
"atomic_volume": [dct["atomic_vol"] for dct in ba.data],
"vacuum_charge": ba.vacuum_charge,
"vacuum_volume": ba.vacuum_volume,
"reference_used": bool(ba.chgref_filename),
"bader_version": ba.version,
}
v_hartree = None
if average_v_hartree:
if Cp2kObject.v_hartree in cp2k_objects:
v_hartree = cp2k_objects[Cp2kObject.v_hartree] # type: ignore[index]
elif Cp2kObject.v_hartree in output_file_paths:
v_hartree_file = output_file_paths[
Cp2kObject.v_hartree # type: ignore[index]
]
v_hartree = VolumetricData.from_cube(dir_name / v_hartree_file)
v_hartree.scale(Ha_to_eV)
input_doc = CalculationInput.from_cp2k_output(cp2k_output)
output_doc = CalculationOutput.from_cp2k_output(
cp2k_output, v_hartree=v_hartree, store_scf=store_scf
)
has_cp2k_completed = Status.SUCCESS if cp2k_output.completed else Status.FAILED
if store_trajectory:
traj = _parse_trajectory(cp2k_output=cp2k_output)
cp2k_objects[Cp2kObject.TRAJECTORY] = traj # type: ignore[index]
return (
cls(
dir_name=str(dir_name),
task_name=task_name,
cp2k_version=cp2k_output.cp2k_version,
has_cp2k_completed=has_cp2k_completed,
completed_at=completed_at,
input=input_doc,
output=output_doc,
output_file_paths={
k.name.lower(): v for k, v in output_file_paths.items()
},
bader=bader,
run_type=run_type(input_doc.dict()),
task_type=task_type(input_doc.dict()),
calc_type=calc_type(input_doc.dict()),
),
cp2k_objects,
)
def _get_output_file_paths(volumetric_files: list[str]) -> dict[Cp2kObject, str]:
"""
Get the output file paths for CP2K output files from the list of volumetric files.
Parameters
----------
volumetric_files
A list of volumetric files associated with the calculation.
Returns
-------
dict[Cp2kObject, str]
A mapping between the CP2K object type and the file path.
"""
output_file_paths = {}
for cp2k_object in Cp2kObject: # type: ignore[attr-defined]
for volumetric_file in volumetric_files:
if cp2k_object.name in str(volumetric_file):
output_file_paths[cp2k_object] = str(volumetric_file)
return output_file_paths
def _get_basis_and_potential_files(dir_name: Path) -> dict[Cp2kObject, DataFile]:
"""
Get the path of the basis and potential files.
This enables the exact info about the basis set /potential used can be retrieved.
Calculation summaries will only have metadata (i.e. the name) that matches to
the basis/potential contained in these files.
"""
data: dict[Cp2kObject, DataFile] = {}
for filename, cls, cp2k_object in (
(dir_name / "BASIS", BasisFile, Cp2kObject.BASIS),
(dir_name / "POTENTIAL", PotentialFile, Cp2kObject.POTENTIAL),
):
if filename.exists():
content = filename.read_text().strip()
if content not in ("None", ""): # ignore empty files
data[cp2k_object] = cls.from_str(content) # type: ignore[index]
return data
def _get_volumetric_data(
dir_name: Path,
output_file_paths: dict[Cp2kObject, str],
store_volumetric_data: tuple[str] | None,
) -> dict[Cp2kObject, VolumetricData]:
"""
Load volumetric data files from a directory.
Parameters
----------
dir_name
The directory containing the files.
output_file_paths
A dictionary mapping the data type to file path relative to dir_name.
store_volumetric_data
The volumetric data files to load. E.g., ("v_hartree", "e_density")
Returns
-------
dict[Cp2kObject, VolumetricData]
A dictionary mapping the CP2K object data type (`Cp2kObject.v_hartree`,
`Cp2kObject.electron_density`, etc) to the volumetric data object.
"""
if store_volumetric_data is None or len(store_volumetric_data) == 0:
return {}
volumetric_data = {}
for file_type, file in output_file_paths.items():
if file_type.name not in store_volumetric_data:
continue
try:
volumetric_data[file_type] = VolumetricData.from_cube(dir_name / file)
except Exception as err:
raise ValueError(f"Failed to parse {file_type} at {file}.") from err
for file_type, data in volumetric_data.items():
if file_type.name in __is_stored_in_Ha__:
data.scale(Ha_to_eV)
return volumetric_data
# TODO As written, this will only get the complete dos if it is available.
# cp2k can only generate the complete DOS for gamma-point only calculations
# and it has to be requested (not default). Should this method grab overall
# dos / elemental project dos if the complete dos is not available, or stick
# to grabbing the complete dos?
def _parse_dos(parse_dos: Union[str, bool], cp2k_output: Cp2kOutput) -> Dos | None:
"""
Parse DOS outputs from cp2k calculation.
Parameters
----------
parse_dos: Whether to parse the DOS. Can be:
- "auto": Only parse DOS if there are no ionic steps.
- True: Always parse DOS.
- False: Never parse DOS.
cp2k_output: Cp2kOutput object for the calculation being parsed.
Returns
-------
A Dos object if parse_dos is set accordingly.
"""
if parse_dos == "auto":
if len(cp2k_output.ionic_steps) == 0:
return cp2k_output.complete_dos
return None
if parse_dos:
return cp2k_output.complete_dos
return None
def _parse_bandstructure(
parse_bandstructure: Union[str, bool], cp2k_output: Cp2kOutput
) -> BandStructure | None:
"""
Get the band structure.
Parameters
----------
parse_bandstructure (bool): Whether to parse. Does not support the
auto/line distinction currently.
"""
if parse_bandstructure:
return cp2k_output.band_structure
return None
def _parse_trajectory(cp2k_output: Cp2kOutput) -> Trajectory | None:
"""
Grab a Trajectory object given a cp2k output object.
If an "ener" file is present containing MD results, it will be added as
frame data to the traj object
"""
ener = (
cp2k_output.filenames.get("ener")[-1]
if cp2k_output.filenames.get("ener")
else None
)
data = parse_energy_file(ener) or None
constant_lattice = all(
s.lattice == cp2k_output.initial_structure.lattice
for s in cp2k_output.structures
)
return Trajectory.from_structures(
cp2k_output.structures, constant_lattice=constant_lattice, frame_properties=data
)