Source code for jobflow.managers.fireworks

"""Tools for running :obj:`Flow` and :obj:`Job` objects using the FireWorks package."""

from __future__ import annotations

import typing

from fireworks import FiretaskBase, Firework, FWAction, Workflow, explicit_serialize
from fireworks.utilities.fw_serializers import recursive_serialize, serialize_fw
from monty.json import jsanitize

if typing.TYPE_CHECKING:
    from collections.abc import Sequence

    import jobflow
    from jobflow.core.job import Job


[docs] def flow_to_workflow( flow: jobflow.Flow | jobflow.Job | list[jobflow.Job], store: jobflow.JobStore = None, allow_external_references: bool = False, **kwargs, ) -> Workflow: """ Convert a :obj:`Flow` or a :obj:`Job` to a FireWorks :obj:`Workflow` object. Each firework spec is updated with the contents of the :obj:`Job.config.manager_config` dictionary. Accordingly, a :obj:`.JobConfig` object can be used to configure FireWork options such as metadata and the fireworker. Parameters ---------- flow A flow or job. store A job store. Alternatively, if set to None, :obj:`JobflowSettings.JOB_STORE` will be used. Note, this could be different on the computer that submits the workflow and the computer which runs the workflow. The value of ``JOB_STORE`` on the computer that runs the workflow will be used. allow_external_references If False all the references to other outputs should be from other Jobs of the Flow. **kwargs Keyword arguments passed to Workflow init method. Returns ------- Workflow The job or flow as a workflow. """ from fireworks.core.firework import Workflow from jobflow.core.flow import get_flow parent_mapping: dict[str, Firework] = {} fireworks = [] flow = get_flow(flow, allow_external_references=allow_external_references) for job, parents in flow.iterflow(): fw = job_to_firework(job, store, parents=parents, parent_mapping=parent_mapping) fireworks.append(fw) return Workflow(fireworks, name=kwargs.pop("name", flow.name), **kwargs)
[docs] def job_to_firework( job: jobflow.Job, store: jobflow.JobStore = None, parents: Sequence[str] = None, parent_mapping: dict[str, Firework] = None, **kwargs, ) -> Firework: """ Convert a :obj:`Job` to a :obj:`.Firework`. The firework spec is updated with the contents of the :obj:`Job.config.manager_config` dictionary. Accordingly, a :obj:`.JobConfig` object can be used to configure FireWork options such as metadata and the fireworker. Parameters ---------- job A job. store A job store. Alternatively, if set to None, :obj:`JobflowSettings.JOB_STORE` will be used. Note, this could be different on the computer that submits the workflow and the computer which runs the workflow. The value of ``JOB_STORE`` on the computer that runs the workflow will be used. parents The parent uuids of the job. parent_mapping A dictionary mapping job uuids to Firework objects, as ``{uuid: Firework}``. **kwargs Keyword arguments passed to the Firework constructor. Returns ------- Firework A firework that will run the job. """ from fireworks.core.firework import Firework from jobflow.core.reference import OnMissing if (parents is None) is not (parent_mapping is None): raise ValueError("Both or neither of parents and parent_mapping must be set.") task = JobFiretask(job=job, store=store) job_parents = None if parents is not None and parent_mapping is not None: job_parents = ( [parent_mapping[parent] for parent in parents] if parents else None ) spec = {"_add_launchpad_and_fw_id": True} # this allows the job to know the fw_id if job.config.on_missing_references != OnMissing.ERROR: spec["_allow_fizzled_parents"] = True spec.update(job.config.manager_config) spec.update(job.metadata) # add metadata to spec fw = Firework([task], spec=spec, name=job.name, parents=job_parents, **kwargs) if parent_mapping is not None: parent_mapping[job.uuid] = fw return fw
[docs] @explicit_serialize class JobFiretask(FiretaskBase): """ A firetask that will run any job. Other Parameters ---------------- job : Dict A serialized job. store : JobStore A job store. Alternatively, if set to None, :obj:`JobflowSettings.JOB_STORE` will be used. Note, this could be different on the computer that submits the workflow and the computer which runs the workflow. The value of ``JOB_STORE`` on the computer that runs the workflow will be used. """ required_params = ("job", "store")
[docs] def run_task(self, fw_spec): """Run the job and handle any dynamic firework submissions.""" from jobflow import SETTINGS, initialize_logger job: Job = self.get("job") store = self.get("store") if store is None: store = SETTINGS.JOB_STORE store.connect() # Add the metadata from the fw_spec fw_tags = fw_spec.get("tags", None) if fw_tags is not None: if "tags" in job.metadata: if isinstance(job.metadata["tags"], list): job.metadata["tags"].extend(fw_tags) else: # tags is not a list, make it one job.metadata["tags"] = [job.metadata["tags"], *fw_tags] job.metadata["tags"] = list(dict.fromkeys(job.metadata["tags"])) else: job.metadata.update({"tags": fw_tags}) if hasattr(self, "fw_id"): job.metadata.update({"fw_id": self.fw_id}) initialize_logger() response = job.run(store=store) detours = None additions = None if response.replace is not None: # create a workflow from the new additions; be sure to use original store detours = [flow_to_workflow(response.replace, self.get("store"))] if response.addition is not None: additions = [flow_to_workflow(response.addition, self.get("store"))] if response.detour is not None: detour_wf = flow_to_workflow(response.detour, self.get("store")) if detours is not None: detours.append(detour_wf) else: detours = [detour_wf] return FWAction( stored_data=response.stored_data, detours=detours, additions=additions, defuse_workflow=response.stop_jobflow, defuse_children=response.stop_children, )
@serialize_fw @recursive_serialize def to_dict(self) -> dict: """ Serialize version of the FireTask. Overrides the original method to explicitly jsanitize the Job to handle cases not properly handled by fireworks, like a Callable. """ d = dict(self) d["job"] = jsanitize(d["job"].as_dict()) return d