Source code for jobflow.managers.local

"""Tools for running jobflow locally."""

from __future__ import annotations

import logging
import typing

if typing.TYPE_CHECKING:
    from pathlib import Path

    import jobflow

logger = logging.getLogger(__name__)


[docs] def run_locally( flow: jobflow.Flow | jobflow.Job | list[jobflow.Job], log: bool = True, store: jobflow.JobStore | None = None, create_folders: bool = False, root_dir: str | Path | None = None, ensure_success: bool = False, allow_external_references: bool = False, raise_immediately: bool = False, ) -> dict[str, dict[int, jobflow.Response]]: """ Run a :obj:`Job` or :obj:`Flow` locally. Parameters ---------- flow : Flow | Job | list[Job] A job or flow. log : bool Whether to print log messages. store : JobStore A job store. If a job store is not specified then :obj:`JobflowSettings.JOB_STORE` will be used. By default this is a maggma ``MemoryStore`` but can be customised by setting the jobflow configuration file. create_folders : bool Whether to run each job in a new folder. root_dir : str | Path | None The root directory to run the jobs in or where to create new subfolders if ``create_folders`` is True. If None then the current working directory will be used. ensure_success : bool Raise an error if the flow was not executed successfully. allow_external_references : bool If False all the references to other outputs should be from other Jobs of the Flow. raise_immediately : bool If True, raise an exception immediately if a job fails. If False, continue running the flow and only raise an exception at the end if the flow did not finish running successfully. Returns ------- dict[str, dict[int, Response]] The responses of the jobs, as a dict of ``{uuid: {index: response}}``. """ from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from random import randint from monty.os import cd from jobflow import SETTINGS, initialize_logger from jobflow.core.flow import get_flow from jobflow.core.reference import OnMissing if store is None: store = SETTINGS.JOB_STORE root_dir = Path.cwd() if root_dir is None else Path(root_dir).resolve() root_dir.mkdir(exist_ok=True) store.connect() if log: initialize_logger() flow = get_flow(flow, allow_external_references=allow_external_references) stopped_parents: set[str] = set() errored: set[str] = set() responses: dict[str, dict[int, jobflow.Response]] = defaultdict(dict) stop_jobflow = False def _run_job(job: jobflow.Job, parents): nonlocal stop_jobflow if stop_jobflow: return None, True if len(set(parents).intersection(stopped_parents)) > 0: # stop children has been called for one of the jobs' parents logger.info( f"{job.name} is a child of a job with stop_children=True, skipping..." ) stopped_parents.add(job.uuid) return None, False if ( len(set(parents).intersection(errored)) > 0 and job.config.on_missing_references == OnMissing.ERROR ): errored.add(job.uuid) return None, False if raise_immediately: response = job.run(store=store) else: try: response = job.run(store=store) except Exception: import traceback logger.info( f"{job.name} failed with exception:\n{traceback.format_exc()}" ) errored.add(job.uuid) return None, False responses[job.uuid][job.index] = response if response.stored_data is not None: logger.warning("Response.stored_data is not supported with local manager.") if response.stop_children: stopped_parents.add(job.uuid) if response.stop_jobflow: stop_jobflow = True return None, True diversion_responses = [] if response.replace is not None: # first run any restarts diversion_responses.append(_run(response.replace)) if response.detour is not None: # next any detours diversion_responses.append(_run(response.detour)) if response.addition is not None: # finally any additions diversion_responses.append(_run(response.addition)) if not all(diversion_responses): return None, False return response, False def _get_job_dir(): if create_folders: time_now = datetime.now(tz=timezone.utc).strftime(SETTINGS.DIRECTORY_FORMAT) job_dir = root_dir / f"job_{time_now}-{randint(10000, 99999)}" job_dir.mkdir() return job_dir return root_dir def _run(root_flow): encountered_bad_response = False for job, parents in root_flow.iterflow(): job_dir = _get_job_dir() with cd(job_dir): response, jobflow_stopped = _run_job(job, parents) if response is not None: response.job_dir = job_dir encountered_bad_response = encountered_bad_response or response is None if jobflow_stopped: return False return not encountered_bad_response logger.info("Started executing jobs locally") finished_successfully = _run(flow) logger.info("Finished executing jobs locally") if ensure_success and not finished_successfully: raise RuntimeError("Flow did not finish running successfully") return dict(responses)