"""Tools for running jobflow locally."""from__future__importannotationsimportloggingimporttypingiftyping.TYPE_CHECKING:frompathlibimportPathimportjobflowlogger=logging.getLogger(__name__)
[docs]defrun_locally(flow:jobflow.Flow|jobflow.Job|list[jobflow.Job],log:bool=True,store:jobflow.JobStore|None=None,create_folders:bool=False,root_dir:str|Path|None=None,ensure_success:bool=False,allow_external_references:bool=False,raise_immediately:bool=False,)->dict[str,dict[int,jobflow.Response]]:""" Run a :obj:`Job` or :obj:`Flow` locally. Parameters ---------- flow : Flow | Job | list[Job] A job or flow. log : bool Whether to print log messages. store : JobStore A job store. If a job store is not specified then :obj:`JobflowSettings.JOB_STORE` will be used. By default this is a maggma ``MemoryStore`` but can be customised by setting the jobflow configuration file. create_folders : bool Whether to run each job in a new folder. root_dir : str | Path | None The root directory to run the jobs in or where to create new subfolders if ``create_folders`` is True. If None then the current working directory will be used. ensure_success : bool Raise an error if the flow was not executed successfully. allow_external_references : bool If False all the references to other outputs should be from other Jobs of the Flow. raise_immediately : bool If True, raise an exception immediately if a job fails. If False, continue running the flow and only raise an exception at the end if the flow did not finish running successfully. Returns ------- dict[str, dict[int, Response]] The responses of the jobs, as a dict of ``{uuid: {index: response}}``. """fromcollectionsimportdefaultdictfromdatetimeimportdatetime,timezonefrompathlibimportPathfromrandomimportrandintfrommonty.osimportcdfromjobflowimportSETTINGS,initialize_loggerfromjobflow.core.flowimportget_flowfromjobflow.core.referenceimportOnMissingifstoreisNone:store=SETTINGS.JOB_STOREroot_dir=Path.cwd()ifroot_dirisNoneelsePath(root_dir).resolve()root_dir.mkdir(exist_ok=True)store.connect()iflog:initialize_logger()flow=get_flow(flow,allow_external_references=allow_external_references)stopped_parents:set[str]=set()errored:set[str]=set()responses:dict[str,dict[int,jobflow.Response]]=defaultdict(dict)stop_jobflow=Falsedef_run_job(job:jobflow.Job,parents):nonlocalstop_jobflowifstop_jobflow:returnNone,Trueiflen(set(parents).intersection(stopped_parents))>0:# stop children has been called for one of the jobs' parentslogger.info(f"{job.name} is a child of a job with stop_children=True, skipping...")stopped_parents.add(job.uuid)returnNone,Falseif(len(set(parents).intersection(errored))>0andjob.config.on_missing_references==OnMissing.ERROR):errored.add(job.uuid)returnNone,Falseifraise_immediately:response=job.run(store=store)else:try:response=job.run(store=store)exceptException:importtracebacklogger.info(f"{job.name} failed with exception:\n{traceback.format_exc()}")errored.add(job.uuid)returnNone,Falseresponses[job.uuid][job.index]=responseifresponse.stored_dataisnotNone:logger.warning("Response.stored_data is not supported with local manager.")ifresponse.stop_children:stopped_parents.add(job.uuid)ifresponse.stop_jobflow:stop_jobflow=TruereturnNone,Truediversion_responses=[]ifresponse.replaceisnotNone:# first run any restartsdiversion_responses.append(_run(response.replace))ifresponse.detourisnotNone:# next any detoursdiversion_responses.append(_run(response.detour))ifresponse.additionisnotNone:# finally any additionsdiversion_responses.append(_run(response.addition))ifnotall(diversion_responses):returnNone,Falsereturnresponse,Falsedef_get_job_dir():ifcreate_folders:time_now=datetime.now(tz=timezone.utc).strftime(SETTINGS.DIRECTORY_FORMAT)job_dir=root_dir/f"job_{time_now}-{randint(10000,99999)}"job_dir.mkdir()returnjob_dirreturnroot_dirdef_run(root_flow):encountered_bad_response=Falseforjob,parentsinroot_flow.iterflow():job_dir=_get_job_dir()withcd(job_dir):response,jobflow_stopped=_run_job(job,parents)ifresponseisnotNone:response.job_dir=job_direncountered_bad_response=encountered_bad_responseorresponseisNoneifjobflow_stopped:returnFalsereturnnotencountered_bad_responselogger.info("Started executing jobs locally")finished_successfully=_run(flow)logger.info("Finished executing jobs locally")ifensure_successandnotfinished_successfully:raiseRuntimeError("Flow did not finish running successfully")returndict(responses)