Source code for jobflow.managers.local
"""Tools for running jobflow locally."""
from __future__ import annotations
import logging
import typing
if typing.TYPE_CHECKING:
pass
import jobflow
__all__ = ["run_locally"]
logger = logging.getLogger(__name__)
[docs]def run_locally(
flow: jobflow.Flow | jobflow.Job | list[jobflow.Job],
log: bool = True,
store: jobflow.JobStore | None = None,
create_folders: bool = False,
ensure_success: bool = False,
) -> dict[str, dict[int, jobflow.Response]]:
"""
Run a :obj:`Job` or :obj:`Flow` locally.
Parameters
----------
flow
A job or flow.
log
Whether to print log messages.
store
A job store. If a job store is not specified then
:obj:`JobflowSettings.JOB_STORE` will be used. By default this is a maggma
``MemoryStore`` but can be customised by setting the jobflow configuration file.
create_folders
Whether to run each job in a new folder.
ensure_success
Raise an error if the flow was not executed successfully.
Returns
-------
Dict[str, Dict[int, Response]]
The responses of the jobs, as a dict of ``{uuid: {index: response}}``.
"""
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from random import randint
from monty.os import cd
from jobflow import SETTINGS, initialize_logger
from jobflow.core.flow import get_flow
from jobflow.core.reference import OnMissing
if store is None:
store = SETTINGS.JOB_STORE
store.connect()
if log:
initialize_logger()
flow = get_flow(flow)
stopped_parents: set[str] = set()
errored: set[str] = set()
responses: dict[str, dict[int, jobflow.Response]] = defaultdict(dict)
stop_jobflow = False
root_dir = Path.cwd()
def _run_job(job: jobflow.Job, parents):
nonlocal stop_jobflow
if stop_jobflow:
return False
if len(set(parents).intersection(stopped_parents)) > 0:
# stop children has been called for one of the jobs' parents
logger.info(
f"{job.name} is a child of a job with stop_children=True, skipping..."
)
stopped_parents.add(job.uuid)
return
if (
len(set(parents).intersection(errored)) > 0
and job.config.on_missing_references == OnMissing.ERROR
):
errored.add(job.uuid)
return
try:
response = job.run(store=store)
except Exception:
import traceback
logger.info(f"{job.name} failed with exception:\n{traceback.format_exc()}")
errored.add(job.uuid)
return
responses[job.uuid][job.index] = response
if response.stored_data is not None:
logger.warning("Response.stored_data is not supported with local manager.")
if response.stop_children:
stopped_parents.add(job.uuid)
if response.stop_jobflow:
stop_jobflow = True
return False
if response.replace is not None:
# first run any restarts
_run(response.replace)
if response.detour is not None:
# next any detours
_run(response.detour)
if response.addition is not None:
# finally any additions
_run(response.addition)
return response
def _get_job_dir():
if create_folders:
time_now = datetime.utcnow().strftime(SETTINGS.DIRECTORY_FORMAT)
job_dir = root_dir / f"job_{time_now}-{randint(10000, 99999)}"
job_dir.mkdir()
return job_dir
else:
return root_dir
def _run(root_flow):
job: jobflow.Job
for job, parents in root_flow.iterflow():
job_dir = _get_job_dir()
with cd(job_dir):
response = _run_job(job, parents)
if response is False:
return False
return response is not None
logger.info("Started executing jobs locally")
finished_successfully = _run(flow)
logger.info("Finished executing jobs locally")
if ensure_success and not finished_successfully:
raise RuntimeError("Flow did not finish running successfully")
return dict(responses)