"""Core definition of a CP2K task document."""
import logging
from collections import OrderedDict
from pathlib import Path
from typing import Any, Optional, Union
import numpy as np
from emmet.core.math import Matrix3D, Vector3D
from emmet.core.structure import MoleculeMetadata, StructureMetadata
from pydantic import BaseModel, Field
from pymatgen.core.structure import Molecule, Structure
from pymatgen.entries.computed_entries import ComputedEntry
from pymatgen.io.cp2k.inputs import Cp2kInput
from pymatgen.io.cp2k.utils import natural_keys
from typing_extensions import Self
from atomate2 import SETTINGS, __version__
from atomate2.common.utils import (
parse_additional_json,
parse_custodian,
parse_transformations,
)
from atomate2.cp2k.schemas.calculation import (
Calculation,
Cp2kObject,
RunStatistics,
Status,
)
from atomate2.utils.datetime import datetime_str
from atomate2.utils.path import get_uri
logger = logging.getLogger(__name__)
_VOLUMETRIC_FILES = ("v_hartree", "ELECTRON_DENSITY", "SPIN_DENSITY")
[docs]
class AnalysisSummary(BaseModel):
"""Calculation relaxation summary."""
delta_volume: float = Field(None, description="Absolute change in volume")
delta_volume_as_percent: float = Field(
None, description="Percentage change in volume"
)
max_force: float = Field(None, description="Maximum force on the atoms")
warnings: list[str] = Field(None, description="Warnings from the VASP drone")
errors: list[str] = Field(None, description="Errors from the VASP drone")
[docs]
@classmethod
def from_cp2k_calc_docs(cls, calc_docs: list[Calculation]) -> Self:
"""Create analysis summary from CP2K calculation documents.
Parameters
----------
calc_docs
VASP calculation documents.
Returns
-------
AnalysisSummary
The relaxation analysis.
"""
from atomate2.cp2k.schemas.calculation import Status
warnings = []
errors = []
if isinstance(calc_docs[0].input.structure, Structure):
initial_vol = calc_docs[0].input.structure.lattice.volume
final_vol = calc_docs[-1].output.structure.lattice.volume
delta_vol = final_vol - initial_vol
percent_delta_vol = 100 * delta_vol / initial_vol
if abs(percent_delta_vol) > SETTINGS.CP2K_VOLUME_CHANGE_WARNING_TOL * 100:
warnings.append(
f"Volume change > {SETTINGS.CP2K_VOLUME_CHANGE_WARNING_TOL * 100}%"
)
else:
delta_vol = None
percent_delta_vol = None
final_calc = calc_docs[-1]
max_force = None
if final_calc.has_cp2k_completed == Status.SUCCESS:
# max force and valid structure checks
structure = final_calc.output.structure
max_force = _get_max_force(final_calc)
if not structure.is_valid():
errors.append("Bad structure (atoms are too close!)")
return cls(
delta_volume=delta_vol,
delta_volume_as_percent=percent_delta_vol,
max_force=max_force,
warnings=warnings,
errors=errors,
)
[docs]
class AtomicKind(BaseModel):
"""A representation of the most important information about each type of species."""
element: str = Field(None, description="Element assigned to this atom kind")
basis: str = Field(None, description="Basis set for this atom kind")
potential: str = Field(
None, description="Name of pseudopotential for this atom kind"
)
auxiliary_basis: Optional[str] = Field(
None, description="Auxiliary basis for this (if any) for this atom kind"
)
ghost: bool = Field(None, description="Whether this atom kind is a ghost")
[docs]
class AtomicKindSummary(BaseModel):
"""A summary of pseudo-potential type and functional."""
atomic_kinds: dict[str, AtomicKind] = Field(
None, description="dictionary mapping atomic kind labels to their info"
)
[docs]
@classmethod
def from_atomic_kind_info(cls, atomic_kind_info: dict) -> Self:
"""Initialize from the atomic_kind_info dictionary."""
d: dict[str, dict[str, Any]] = {"atomic_kinds": {}}
for kind, info in atomic_kind_info.items():
d["atomic_kinds"][kind] = {
"element": info["element"],
"basis": info["orbital_basis_set"],
"potential": info["pseudo_potential"],
"auxiliary_basis": info["auxiliary_basis_set"][0]
if info["auxiliary_basis_set"]
else None,
"ghost": info["pseudo_potential"] == "NONE",
}
return cls(**d)
[docs]
class OutputSummary(BaseModel):
"""Summary of the outputs for a CP2K calculation."""
structure: Union[Structure, Molecule] = Field(
None, description="The output structure object"
)
energy: float = Field(
None, description="The final total DFT energy for the last calculation"
)
energy_per_atom: float = Field(
None, description="The final DFT energy per atom for the last calculation"
)
bandgap: Optional[float] = Field(
None, description="The DFT bandgap for the last calculation"
)
cbm: Optional[float] = Field(None, description="CBM for this calculation")
vbm: Optional[float] = Field(None, description="VBM for this calculation")
forces: list[Vector3D] = Field(
None, description="Forces on atoms from the last calculation"
)
stress: Optional[Matrix3D] = Field(
None, description="Stress on the unit cell from the last calculation"
)
[docs]
@classmethod
def from_cp2k_calc_doc(cls, calc_doc: Calculation) -> Self:
"""Make a summary of CP2K calculation outputs from a CP2K calculation document.
Parameters
----------
calc_doc
A CP2K calculation document.
Returns
-------
OutputSummary
The calculation output summary.
"""
if calc_doc.output.ionic_steps:
forces = calc_doc.output.ionic_steps[-1].get("forces")
stress = calc_doc.output.ionic_steps[-1].get("stress")
else:
forces = None
stress = None
return cls(
structure=calc_doc.output.structure,
energy=calc_doc.output.energy,
energy_per_atom=calc_doc.output.energy_per_atom,
bandgap=calc_doc.output.bandgap,
cbm=calc_doc.output.cbm,
vbm=calc_doc.output.vbm,
forces=forces,
stress=stress,
)
[docs]
class TaskDocument(StructureMetadata, MoleculeMetadata):
"""Definition of CP2K task document."""
dir_name: Optional[str] = Field(
None, description="The directory for this CP2K task"
)
last_updated: str = Field(
default_factory=datetime_str,
description="Timestamp for this task document was last updated",
)
completed_at: Optional[str] = Field(
None, description="Timestamp for when this task was completed"
)
input: Optional[InputSummary] = Field(
None, description="The input to the first calculation"
)
output: Optional[OutputSummary] = Field(
None, description="The output of the final calculation"
)
structure: Union[Structure, Molecule] = Field(
None, description="Final output structure from the task"
)
state: Optional[Status] = Field(None, description="State of this task")
included_objects: Optional[list[Cp2kObject]] = Field(
None, description="list of CP2K objects included with this task document"
)
cp2k_objects: Optional[dict[Cp2kObject, Any]] = Field(
None, description="CP2K objects associated with this task"
)
entry: Optional[ComputedEntry] = Field(
None, description="The ComputedEntry from the task doc"
)
analysis: Optional[AnalysisSummary] = Field(
None, description="Summary of structural relaxation and forces"
)
run_stats: Optional[dict[str, RunStatistics]] = Field(
None,
description="Summary of runtime statistics for each calculation in this task",
)
orig_inputs: Optional[dict[str, Cp2kInput]] = Field(
None, description="Summary of the original CP2K inputs written by custodian"
)
task_label: Optional[str] = Field(None, description="A description of the task")
tags: Optional[list[str]] = Field(
None, description="Metadata tags for this task document"
)
author: Optional[str] = Field(
None, description="Author extracted from transformations"
)
icsd_id: Optional[str] = Field(
None, description="International crystal structure database id of the structure"
)
calcs_reversed: Optional[list[Calculation]] = Field(
None, description="The inputs and outputs for all CP2K runs in this task."
)
transformations: Optional[dict[str, Any]] = Field(
None,
description="Information on the structural transformations, parsed from a "
"transformations.json file",
)
custodian: Any = Field(
None,
description="Information on the custodian settings used to run this "
"calculation, parsed from a custodian.json file",
)
additional_json: Optional[dict[str, Any]] = Field(
None, description="Additional json loaded from the calculation directory"
)
schema: str = Field(
__version__, description="Version of atomate2 used to create the document"
)
[docs]
@classmethod
def from_directory(
cls,
dir_name: Union[Path, str],
volumetric_files: tuple[str, ...] = _VOLUMETRIC_FILES,
store_additional_json: bool = SETTINGS.CP2K_STORE_ADDITIONAL_JSON,
additional_fields: dict[str, Any] = None,
**cp2k_calculation_kwargs,
) -> Self:
"""Create a task document from a directory containing CP2K files.
Parameters
----------
dir_name
The path to the folder containing the calculation outputs.
store_additional_json
Whether to store additional JSON files found in the calculation directory.
volumetric_files
Volumetric files to search for.
additional_fields
dictionary of additional fields to add to output document.
**cp2k_calculation_kwargs
Additional parsing options that will be passed to the
:obj:`.Calculation.from_cp2k_files` function.
Returns
-------
Cp2kTaskDoc
A task document for the calculation.
"""
logger.info(f"Getting task doc in: {dir_name}")
additional_fields = additional_fields or {}
dir_name = Path(dir_name)
task_files = _find_cp2k_files(dir_name, volumetric_files=volumetric_files)
if len(task_files) == 0:
raise FileNotFoundError("No CP2K files found!")
calcs_reversed = []
all_cp2k_objects = []
for task_name, files in task_files.items():
calc_doc, cp2k_objects = Calculation.from_cp2k_files(
dir_name, task_name, **files, **cp2k_calculation_kwargs
)
calcs_reversed.append(calc_doc)
all_cp2k_objects.append(cp2k_objects)
analysis = AnalysisSummary.from_cp2k_calc_docs(calcs_reversed)
transformations, icsd_id, tags, author = parse_transformations(dir_name)
if tags:
tags.extend(additional_fields.get("tags", []))
else:
tags = additional_fields.get("tags")
custodian = parse_custodian(dir_name)
orig_inputs = _parse_orig_inputs(dir_name)
additional_json = None
if store_additional_json:
additional_json = parse_additional_json(dir_name)
dir_name = get_uri(dir_name) # convert to full uri path
# only store objects from last calculation
# TODO: make this an option
cp2k_objects = all_cp2k_objects[-1]
included_objects = None
if cp2k_objects:
included_objects = list(cp2k_objects)
if isinstance(calcs_reversed[0].output.structure, Structure):
attr = "from_structure"
dat = {
"structure": calcs_reversed[0].output.structure,
"meta_structure": calcs_reversed[0].output.structure,
"include_structure": True,
}
elif isinstance(calcs_reversed[0].output.structure, Molecule):
attr = "from_molecule"
dat = {
"structure": calcs_reversed[0].output.structure,
"meta_structure": calcs_reversed[0].output.structure,
"molecule": calcs_reversed[0].output.structure,
"include_molecule": True,
}
doc = getattr(cls, attr)(**dat)
data = {
"structure": calcs_reversed[0].output.structure,
"meta_structure": calcs_reversed[0].output.structure,
"dir_name": dir_name,
"calcs_reversed": calcs_reversed,
"analysis": analysis,
"transformations": transformations,
"custodian": custodian,
"orig_inputs": orig_inputs,
"additional_json": additional_json,
"icsd_id": icsd_id,
"tags": tags,
"author": author,
"completed_at": calcs_reversed[0].completed_at,
"input": InputSummary.from_cp2k_calc_doc(calcs_reversed[0]),
"output": OutputSummary.from_cp2k_calc_doc(calcs_reversed[0]),
"state": _get_state(calcs_reversed, analysis),
"entry": cls.get_entry(calcs_reversed),
"run_stats": _get_run_stats(calcs_reversed),
"cp2k_objects": cp2k_objects,
"included_objects": included_objects,
}
doc = cls(**doc.dict())
doc = doc.copy(update=data)
return doc.copy(update=additional_fields)
[docs]
@staticmethod
def get_entry(
calc_docs: list[Calculation], job_id: Optional[str] = None
) -> ComputedEntry:
"""Get a computed entry from a list of CP2K calculation documents.
Parameters
----------
calc_docs
A list of CP2K calculation documents.
job_id
The job identifier.
Returns
-------
ComputedEntry
A computed entry.
"""
entry_dict = {
"correction": 0.0,
"entry_id": job_id,
"composition": calc_docs[-1].output.structure.composition,
"energy": calc_docs[-1].output.energy,
"parameters": {
# Required to be compatible with MontyEncoder for the ComputedEntry
"run_type": str(calc_docs[-1].run_type),
},
"data": {
"last_updated": datetime_str(),
},
}
return ComputedEntry.from_dict(entry_dict)
def _find_cp2k_files(
path: Union[str, Path],
volumetric_files: tuple[str, ...] = _VOLUMETRIC_FILES,
) -> dict[str, Any]:
"""
Find CP2K files in a directory.
Only files in folders with names matching a task name (or alternatively files
with the task name as an extension, e.g., vasprun.relax1.xml) will be returned.
CP2K files in the current directory will be given the task name "standard".
Parameters
----------
path
Path to a directory to search.
volumetric_files
Volumetric files to search for.
Returns
-------
dict[str, Any]
The filenames of the calculation outputs for each CP2K task, given as a ordered
dictionary of::
{
task_name: {
"cp2k_output_file": cp2k_output_filename,
"volumetric_files": [v_hartree file, e_density file, etc],
"""
task_names = ["precondition"] + [f"relax{i}" for i in range(9)]
path = Path(path)
task_files = OrderedDict()
def _get_task_files(files: list[Path], suffix: str = "") -> dict[str, Any]:
cp2k_files: dict[str, Any] = {}
vol_files = []
for file in files:
if file.match(f"*cp2k.out{suffix}*"):
cp2k_files["cp2k_output_file"] = Path(file).name
for vol in volumetric_files:
_files = [f.name for f in files if f.match(f"*{vol}*cube{suffix}*")]
_files.sort(key=natural_keys, reverse=True)
if _files:
vol_files.append(_files[0])
if len(vol_files) > 0:
# add volumetric files if some were found or other cp2k files were found
cp2k_files["volumetric_files"] = vol_files
return cp2k_files
for task_name in task_names:
subfolder_match = list(path.glob(f"{task_name}/*"))
suffix_match = list(path.glob(f"*.{task_name}*"))
if len(subfolder_match) > 0:
# subfolder match
task_files[task_name] = _get_task_files(subfolder_match)
elif len(suffix_match) > 0:
# try extension schema
task_files[task_name] = _get_task_files(
suffix_match, suffix=f".{task_name}"
)
if len(task_files) == 0:
# get any matching file from the root folder
standard_files = _get_task_files(list(path.glob("*")))
if len(standard_files) > 0:
task_files["standard"] = standard_files
return task_files
def _parse_orig_inputs(dir_name: Path) -> dict[str, Cp2kInput]:
"""
Parse original input files.
Calculations using custodian generate a *.orig file for the inputs. This is useful
to know how the calculation originally started.
Parameters
----------
dir_name
Path to calculation directory.
Returns
-------
dict[str, Cp2kInput]
The original data.
"""
orig_inputs = {}
input_mapping = {
"input": {
"filename": "cp2k.inp",
"object": Cp2kInput,
}
}
for filename in dir_name.glob("*.orig*"):
for name, cp2k_input in input_mapping.items():
fn = cp2k_input.get("filename")
obj = cp2k_input.get("object")
if f"{fn}.orig" in str(filename):
orig_inputs[name.lower()] = obj.from_file(filename)
return orig_inputs
def _get_max_force(calc_doc: Calculation) -> Optional[float]:
"""Get max force acting on atoms from a calculation document."""
forces = (
calc_doc.output.ionic_steps[-1].get("forces")
if calc_doc.output.ionic_steps
else None
)
structure = calc_doc.output.structure
if forces:
forces = np.array(forces)
sdyn = structure.site_properties.get("selective_dynamics")
if sdyn:
forces[np.logical_not(sdyn)] = 0
return max(np.linalg.norm(forces, axis=1))
return None
def _get_state(calc_docs: list[Calculation], analysis: AnalysisSummary) -> Status:
"""Get state from calculation documents and relaxation analysis."""
all_calcs_completed = all(c.has_cp2k_completed == Status.SUCCESS for c in calc_docs)
if len(analysis.errors) == 0 and all_calcs_completed:
return Status.SUCCESS # type: ignore[return-value]
return Status.FAILED # type: ignore[return-value]
def _get_run_stats(calc_docs: list[Calculation]) -> dict[str, RunStatistics]:
"""Get summary of runtime statistics for each calculation in this task."""
run_stats = {}
total = {
"total_time": 0.0,
}
for calc_doc in calc_docs:
stats = calc_doc.output.run_stats
run_stats[calc_doc.task_name] = stats
total["total_time"] += stats.total_time
run_stats["overall"] = RunStatistics(**total)
return run_stats