"""Define NEB VASP jobs."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from glob import glob
from os import mkdir
from pathlib import Path
from typing import TYPE_CHECKING
from emmet.core.neb import NebIntermediateImagesDoc, NebTaskDoc
from jobflow import Flow, Maker, Response, job
from monty.serialization import dumpfn
from pymatgen.io.vasp import Kpoints
from atomate2 import SETTINGS
from atomate2.common.files import gzip_output_folder
from atomate2.common.jobs.neb import NebInterpolation, get_images_from_endpoints
from atomate2.utils.path import strip_hostname
from atomate2.vasp.files import copy_vasp_outputs, write_vasp_input_set
from atomate2.vasp.jobs.base import (
_DATA_OBJECTS,
_FILES_TO_ZIP,
BaseVaspMaker,
get_vasp_task_document,
)
from atomate2.vasp.run import JobType, run_vasp, should_stop_children
from atomate2.vasp.sets.core import NebSetGenerator
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from pymatgen.core import Structure
from atomate2.vasp.sets.base import VaspInputGenerator
logger = logging.getLogger(__name__)
[docs]
def vasp_neb_job(method: Callable) -> job:
"""
Decorate the ``make`` method of VASP NEB job makers.
This is a thin wrapper around :obj:`~jobflow.core.job.Job` that configures common
settings for VASP NEB jobs. For example, it ensures that large data objects
(band structures, density of states, LOCPOT, CHGCAR, etc) are all stored in the
atomate2 data store. It also configures the output schema to be a VASP
:obj:`.NebTaskDoc`.
Parameters
----------
method : callable
A BaseVaspMaker.make method. This should not be specified directly and is
implied by the decorator.
Returns
-------
callable
A decorated version of the make function that will generate VASP NEB jobs.
"""
return job(method, data=_DATA_OBJECTS, schema=NebIntermediateImagesDoc)
[docs]
@job
def collect_neb_output(
endpoint_dirs: list[str | Path] | None, neb_head_dir: str | Path, **neb_doc_kwargs
) -> NebTaskDoc:
"""Parse NEB output from image and optionally endpoint relaxations."""
if endpoint_dirs is not None and len(endpoint_dirs) == 2:
return NebTaskDoc.from_directories(
[strip_hostname(endpoint_path) for endpoint_path in endpoint_dirs],
strip_hostname(neb_head_dir),
**neb_doc_kwargs,
)
return NebTaskDoc.from_directory(neb_head_dir, **neb_doc_kwargs)
[docs]
@dataclass
class NebFromImagesMaker(BaseVaspMaker):
"""
Maker to create VASP NEB jobs from a set of images.
Note on KPOINTS / VASP 6:
--------------------------
There's a bug in VASP 6 compiled with HDF5 support:
https://www.vasp.at/forum/viewtopic.php?f=3&t=18721&p=23430&hilit=neb+hdf5+images#p23430
VASP performs a validation check of whether the KPOINTS file
used in the "head" directory (which contains 00, 01, ..., 0N
subdirectories) is the same as in each image subdirectory.
If KSPACING is used, this check isn't performed.
However, a "kludge" to get around this issue is to simply
copy the KPOINTS file used in the head directory
to each image directory.
Parameters
----------
kpoints_kludge: Kpoints or bool. Default is True.
See "Note on KPOINTS / VASP 6" above.
If True (default), the job checks for the
existence of a KPOINTS file and copies it (if it exists)
to each image subdirectory.
The user can override this with a Kpoints object of their choice.
If kpoints_kludge is set to False, the KPOINTS file will not be
copied to each image.
name : str
The job name.
input_set_generator : .VaspInputGenerator
A generator used to make the input set.
write_input_set_kwargs : dict
Keyword arguments that will get passed to :obj:`.write_vasp_input_set`.
copy_vasp_kwargs : dict
Keyword arguments that will get passed to :obj:`.copy_vasp_outputs`.
run_vasp_kwargs : dict
Keyword arguments that will get passed to :obj:`.run_vasp`.
task_document_kwargs : dict
Keyword arguments that will get passed to :obj:`.TaskDoc.from_directory`.
stop_children_kwargs : dict
Keyword arguments that will get passed to :obj:`.should_stop_children`.
write_additional_data : dict
Additional data to write to the current directory. Given as a dict of
{filename: data}. Note that if using FireWorks, dictionary keys cannot contain
the "." character which is typically used to denote file extensions. To avoid
this, use the ":" character, which will automatically be converted to ".". E.g.
``{"my_file:txt": "contents of the file"}``.
"""
name: str = "NEB"
input_set_generator: VaspInputGenerator = field(default_factory=NebSetGenerator)
run_vasp_kwargs: dict = field(
default_factory=lambda: {
"job_type": JobType.NEB,
"vasp_job_kwargs": {
"output_file": "vasp.out",
"stderr_file": "std_err.txt",
},
}
)
kpoints_kludge: Kpoints | bool = True
[docs]
@vasp_neb_job
def make(
self,
images: list[Structure],
prev_dir: str | Path | None = None,
) -> Response:
"""
Make an NEB job from a list of images.
Parameters
----------
images : list[Structure]
A list of NEB images.
prev_dir : str or Path or None (default)
A previous directory to copy outputs from.
"""
num_frames = len(images)
num_images = num_frames - 2
self.input_set_generator.num_images = num_images
# copy previous inputs
from_prev = prev_dir is not None
if prev_dir is not None:
copy_vasp_outputs(prev_dir, **self.copy_vasp_kwargs)
self.write_input_set_kwargs.setdefault("from_prev", from_prev)
# write vasp input files
write_vasp_input_set(
images[0], self.input_set_generator, **self.write_input_set_kwargs
)
if (
Path("KPOINTS").exists()
and isinstance(self.kpoints_kludge, bool)
and self.kpoints_kludge
):
self.kpoints_kludge = Kpoints.from_file("KPOINTS")
for iimage in range(num_frames):
image_dir = f"{iimage:02}"
mkdir(image_dir)
images[iimage].to(f"{image_dir}/POSCAR")
if isinstance(self.kpoints_kludge, Kpoints):
self.kpoints_kludge.write_file(f"{image_dir}/KPOINTS")
# write any additional data
for filename, data in self.write_additional_data.items():
dumpfn(data, filename.replace(":", "."))
# run vasp
run_vasp(**self.run_vasp_kwargs)
# parse vasp outputs
task_doc = get_vasp_task_document(
Path.cwd(), is_neb=True, **self.task_document_kwargs
)
task_doc.task_label = self.name
# decide whether child jobs should proceed
stop_children = should_stop_children(task_doc, **self.stop_children_kwargs)
# gzip folder
gzip_output_folder(
directory=Path.cwd(),
setting=SETTINGS.VASP_ZIP_FILES,
files_list=_FILES_TO_ZIP,
)
for image_dir in glob(str(Path.cwd() / "[0-9][0-9]")):
gzip_output_folder(
directory=image_dir,
setting=SETTINGS.VASP_ZIP_FILES,
files_list=_FILES_TO_ZIP,
)
return Response(
stop_children=stop_children,
stored_data={"custodian": task_doc.custodian},
output=task_doc,
)
[docs]
@dataclass
class NebFromEndpointsMaker(Maker):
"""Maker to create VASP NEB jobs from two endpoints.
Optionally relax the two endpoints and return a full NEB hop analysis.
If a maker to relax the endpoints is not specified, this job
interpolates the provided endpoints and performs an NEB on the
interpolated images, returning an NebTaskDoc.
Parameters
----------
endpoint_relax_maker : BaseVaspMaker or None (default)
Optional maker to initially relax the endpoints.
images_maker : NebFromImagesMaker
Required maker to perform NEB on interpolated images.
"""
endpoint_relax_maker: BaseVaspMaker | None = None
images_maker: NebFromImagesMaker = field(default_factory=NebFromImagesMaker)
[docs]
def make(
self,
endpoints: tuple[Structure, Structure] | list[Structure],
num_images: int,
prev_dir: str | Path = None,
interpolation_method: NebInterpolation = NebInterpolation.LINEAR,
**interpolation_kwargs,
) -> Flow:
"""
Make an NEB job from a set of endpoints.
Parameters
----------
endpoints : tuple[Structure,Structure] or list[Structure]
A set of two endpoints to interpolate NEB images from.
num_images : int
The number of images to include in the interpolation.
prev_dir : str or Path or None (default)
A previous directory to copy outputs from.
interpolation_method : .NebInterpolation
The method to use to interpolate between images.
**interpolation_kwargs
kwargs to pass to the interpolation function.
"""
if len(endpoints) != 2:
raise ValueError("Please specify exactly two endpoint structures.")
endpoint_jobs = []
endpoint_dirs: Sequence[str | Path] | None = None
if self.endpoint_relax_maker is not None:
endpoint_jobs += [
self.endpoint_relax_maker.make(endpoint, prev_dir=prev_dir)
for endpoint in endpoints
]
for idx in range(2):
endpoint_jobs[idx].append_name(f" endpoint {idx + 1}")
endpoints = [relax_job.output.structure for relax_job in endpoint_jobs]
endpoint_dirs = [
endpoint_job.output.dir_name for endpoint_job in endpoint_jobs
]
get_images = get_images_from_endpoints(
endpoints,
num_images,
interpolation_method=interpolation_method,
**interpolation_kwargs,
)
image_relax_job = self.images_maker.make(get_images.output) # type: ignore[attr-defined]
collate_job = collect_neb_output(
endpoint_dirs, image_relax_job.output.dir_name, store_calculations=False
)
return Flow(
[*endpoint_jobs, get_images, image_relax_job, collate_job],
output=collate_job.output,
)