fireworks.core package

Subpackages

Submodules

fireworks.core.firework module

This module contains some of the most central FireWorks classes.

  • A Workflow is a sequence of FireWorks as a DAG (directed acyclic graph).

  • A Firework defines a workflow step and contains one or more Firetasks along with its Launches.

  • A Launch describes the run of a Firework on a computing resource.

  • A FiretaskBase defines the contract for tasks that run within a Firework (Firetasks).

  • A FWAction encapsulates the output of a Firetask and tells FireWorks what to do next after a job completes.

class fireworks.core.firework.FWAction(stored_data=None, exit=False, update_spec=None, mod_spec=None, additions=None, detours=None, defuse_children=False, defuse_workflow=False, propagate=False)

Bases: FWSerializable

A FWAction encapsulates the output of a Firetask (it is returned by a Firetask after the Firetask completes). The FWAction allows a user to store rudimentary output data as well as return commands that alter the workflow.

__init__(stored_data=None, exit=False, update_spec=None, mod_spec=None, additions=None, detours=None, defuse_children=False, defuse_workflow=False, propagate=False) None
Parameters:
  • stored_data (dict) – data to store from the run. Does not affect the operation of FireWorks.

  • exit (bool) – if set to True, any remaining Firetasks within the same Firework are skipped.

  • update_spec (dict) – specifies how to update the child FW’s spec

  • mod_spec ([dict]) – update the child FW’s spec using the DictMod language (more flexible than update_spec)

  • additions ([Workflow]) – a list of WFs/FWs to add as children

  • detours ([Workflow]) – a list of WFs/FWs to add as children (they will inherit the current FW’s children)

  • defuse_children (bool) – defuse all the original children of this Firework

  • defuse_workflow (bool) – defuse all incomplete steps of this workflow

  • propagate (bool) – apply any update_spec and mod_spec modifications not only to direct children, but to all dependent FireWorks down to the Workflow’s leaves.

classmethod from_dict(*args, **kwargs)
property skip_remaining_tasks

If the FWAction gives any dynamic action, we skip the subsequent Firetasks.

Returns:

bool

to_dict(*args, **kwargs)
class fireworks.core.firework.FireTaskBase(*args, **kwargs)

Bases: FiretaskBase

class fireworks.core.firework.FiretaskBase(*args, **kwargs)

Bases: defaultdict, FWSerializable, ABC

FiretaskBase is used like an abstract class that defines a computing task (Firetask). All Firetasks should inherit from FiretaskBase.

You can set parameters of a Firetask like you’d use a dict.

__init__(*args, **kwargs) None
classmethod from_dict(*args, **kwargs)
optional_params = None
required_params = None
abstract run_task(fw_spec) NoReturn

This method gets called when the Firetask is run. It can take in a Firework spec, perform some task using that data, and then return an output in the form of a FWAction.

Parameters:

fw_spec (dict) – A Firework spec. This comes from the master spec. In addition, this spec contains a special “_fw_env” key that contains the env settings of the FWorker calling this method. This provides for abstracting out certain commands or settings. For example, “foo” may be named “foo1” in resource 1 and “foo2” in resource 2. The FWorker env can specify { “foo”: “foo1”}, which maps an abstract variable “foo” to the relevant “foo1” or “foo2”. You can then write a task that uses fw_spec[“_fw_env”][“foo”] that will work across all these multiple resources.

Returns:

(FWAction)

to_dict(*args, **kwargs)
class fireworks.core.firework.Firework(tasks, spec=None, name=None, launches=None, archived_launches=None, state='WAITING', created_on=None, fw_id=None, parents=None, updated_on=None)

Bases: FWSerializable

A Firework is a workflow step and might be contain several Firetasks.

STATE_RANKS = {'ARCHIVED': -2, 'COMPLETED': 5, 'DEFUSED': 0, 'FIZZLED': -1, 'PAUSED': 0, 'READY': 2, 'RESERVED': 3, 'RUNNING': 4, 'WAITING': 1}
__init__(tasks, spec=None, name=None, launches=None, archived_launches=None, state='WAITING', created_on=None, fw_id=None, parents=None, updated_on=None) None
Parameters:
  • tasks (Firetask or [Firetask]) – a list of Firetasks to run in sequence.

  • spec (dict) – specification of the job to run. Used by the Firetask.

  • launches ([Launch]) – a list of Launch objects of this Firework.

  • archived_launches ([Launch]) – a list of archived Launch objects of this Firework.

  • state (str) – the state of the FW (e.g. WAITING, RUNNING, COMPLETED, ARCHIVED)

  • created_on (datetime) – time of creation

  • fw_id (int) – an identification number for this Firework.

  • parents (Firework or [Firework]) – list of parent FWs this FW depends on.

  • updated_on (datetime) – last time the STATE was updated.

classmethod from_dict(*args, **kwargs)
property state

Returns: str: The current state of the Firework.

to_db_dict()

Return firework dict with updated launches and state.

to_dict(*args, **kwargs)
class fireworks.core.firework.Launch(state, launch_dir, fworker=None, host=None, ip=None, trackers=None, action=None, state_history=None, launch_id=None, fw_id=None)

Bases: FWSerializable

A Launch encapsulates data about a specific run of a Firework on a computing resource.

__init__(state, launch_dir, fworker=None, host=None, ip=None, trackers=None, action=None, state_history=None, launch_id=None, fw_id=None) None
Parameters:
  • state (str) – the state of the Launch (e.g. RUNNING, COMPLETED)

  • launch_dir (str) – the directory where the Launch takes place

  • fworker (FWorker) – The FireWorker running the Launch

  • host (str) – the hostname where the launch took place (set automatically if None)

  • ip (str) – the IP address where the launch took place (set automatically if None)

  • trackers ([Tracker]) – File Trackers for this Launch

  • action (FWAction) – the output of the Launch

  • state_history ([dict]) – a history of all states of the Launch and when they occurred

  • launch_id (int) – launch_id set by the LaunchPad

  • fw_id (int) – id of the Firework this Launch is running.

classmethod from_dict(*args, **kwargs)
property last_pinged

Returns: datetime: the time the Launch last pinged a heartbeat that it was still running.

property reservedtime_secs

Returns: int: number of seconds the Launch was stuck as RESERVED in a queue.

property runtime_secs

Returns: int: the number of seconds that the Launch ran for.

set_reservation_id(reservation_id) None

Adds the job_id to the reservation.

Parameters:

reservation_id (int or str) – the id of the reservation (e.g., queue reservation)

property state

Returns: str: The current state of the Launch.

property time_end

Returns: datetime: the time the Launch was COMPLETED or FIZZLED.

property time_reserved

Returns: datetime: the time the Launch was RESERVED in the queue.

property time_start

Returns: datetime: the time the Launch started RUNNING.

to_db_dict(*args, **kwargs)
to_dict(*args, **kwargs)
touch_history(update_time=None, checkpoint=None) None

Updates the update_on field of the state history of a Launch. Used to ping that a Launch is still alive.

Parameters:

update_time (datetime)

class fireworks.core.firework.Tracker(filename, nlines=25, content='', allow_zipped=False)

Bases: FWSerializable

A Tracker monitors a file and returns the last N lines for updating the Launch object.

MAX_TRACKER_LINES = 1000
__init__(filename, nlines=25, content='', allow_zipped=False) None
Parameters:
  • filename (str)

  • nlines (int) – number of lines to track

  • content (str) – tracked content

  • allow_zipped (bool) – if set, will look for zipped file.

classmethod from_dict(m_dict)
to_dict()
track_file(launch_dir=None)

Reads the monitored file and returns back the last N lines.

Parameters:

launch_dir (str) – directory where job was launched in case of relative filename

Returns:

the content(last N lines)

Return type:

str

class fireworks.core.firework.Workflow(fireworks: Sequence[Firework], links_dict: dict[int, list[int]] | None = None, name: str | None = None, metadata: dict[str, Any] | None = None, created_on: datetime | None = None, updated_on: datetime | None = None, fw_states: dict[int, str] | None = None)

Bases: FWSerializable

A Workflow connects a group of FireWorks in an execution order.

Bases: dict, FWSerializable

An inner class for storing the DAG links between FireWorks.

__init__(*args, **kwargs) None
classmethod from_dict(m_dict)
property nodes

Return list of all nodes.

Return a dict of child and its parents.

Note: if performance of parent_links becomes an issue, override delitem/setitem to update parent_links

to_db_dict()

Convert to str form for Mongo, which cannot have int keys .

Returns:

dict

to_dict()

Convert to str form for Mongo, which cannot have int keys.

Returns:

dict

__init__(fireworks: Sequence[Firework], links_dict: dict[int, list[int]] | None = None, name: str | None = None, metadata: dict[str, Any] | None = None, created_on: datetime | None = None, updated_on: datetime | None = None, fw_states: dict[int, str] | None = None) None
Parameters:
  • fireworks ([Firework]) – all FireWorks in this workflow.

  • links_dict (dict) – links between the FWs as (parent_id):[(child_id1, child_id2)]

  • name (str) – name of workflow.

  • metadata (dict) – metadata for this Workflow.

  • created_on (datetime) – time of creation

  • updated_on (datetime) – time of update

  • fw_states (dict) – leave this alone unless you are purposefully creating a Lazy-style WF.

append_wf(new_wf, fw_ids, detour=False, pull_spec_mods=False)

Method to add a workflow as a child to a Firework Note: detours must have children that have STATE_RANK that is WAITING or below.

Parameters:
  • new_wf (Workflow) – New Workflow to add.

  • fw_ids ([int]) – ids of the parent Fireworks on which to add the Workflow.

  • detour (bool) – add children of the current Firework to the Workflow’s leaves.

  • pull_spec_mods (bool) – pull spec mods of COMPLETED parents, refreshes the WF states.

Returns:

list of Firework ids that were updated or new.

Return type:

list[int]

apply_action(action: FWAction, fw_id: int) list[int]

Apply a FWAction on a Firework in the Workflow.

Parameters:
  • action (FWAction) – action to apply

  • fw_id (int) – id of Firework on which to apply the action

Returns:

list of Firework ids that were updated or new.

Return type:

list[int]

classmethod from_dict(m_dict: dict[str, Any]) Workflow

Return Workflow from its dict representation.

Parameters:

m_dict (dict) – either a Workflow dict or a Firework dict

Returns:

Workflow

classmethod from_firework(fw: Firework, name: str | None = None, metadata=None) Workflow

Return Workflow from the given Firework.

Parameters:
  • fw (Firework)

  • name (str) – New workflow’s name. if not provided, the firework name is used

  • metadata (dict) – New workflow’s metadata.

Returns:

Workflow

classmethod from_wflow(wflow: Workflow) Workflow

Create a fresh Workflow from an existing one.

Parameters:

wflow (Workflow)

Returns:

Workflow

property fws: list[Firework]

Return list of all fireworks.

property leaf_fw_ids: list[int]

Gets leaf FireWorks of this workflow (those with no children).

Returns:

Firework ids of leaf FWs.

Return type:

list[int]

refresh(fw_id, updated_ids=None)

Refreshes the state of a Firework and any affected children.

Parameters:
  • fw_id (int) – id of the Firework on which to perform the refresh

  • updated_ids ([int])

Returns:

list of Firework ids that were updated

Return type:

set(int)

remove_fws(fw_ids) None

Remove the fireworks corresponding to the input firework ids and update the workflow i.e the parents of the removed fireworks become the parents of the children fireworks (only if the children dont have any other parents).

Parameters:

fw_ids (list) – list of fw ids to remove.

rerun_fw(fw_id, updated_ids=None)

Archives the launches of a Firework so that it can be re-run.

Parameters:
  • fw_id (int) – id of firework to rerun

  • updated_ids (set(int)) – set of fireworks id to rerun

Returns:

list of Firework ids that were updated.

Return type:

list[int]

reset(reset_ids: bool = True) None

Reset the states of all Fireworks in this workflow to ‘WAITING’.

Parameters:

reset_ids (bool) – if True, give each Firework a new id.

property root_fw_ids: list[int]

Gets root FireWorks of this workflow (those with no parents).

Returns:

Firework ids of root FWs.

Return type:

list[int]

property state: str

Returns: state (str): state of workflow.

to_db_dict() dict[str, Any]
to_dict() dict[str, Any]
to_display_dict()

fireworks.core.fworker module

This module contains FireWorker, which encapsulates information about a computing resource.

class fireworks.core.fworker.FWorker(name='Automatically generated Worker', category='', query=None, env=None)

Bases: FWSerializable

__init__(name='Automatically generated Worker', category='', query=None, env=None) None
Parameters:
  • name (str) – the name of the resource, should be unique

  • category (str or [str]) – a String describing a specific category of job to pull, does not need to be unique. If the FWorker should pull jobs of multiple categories, use a list of str.

  • query (dict) – a dict query that restricts the type of Firework this resource will run

  • env (dict) – a dict of special environment variables for the resource. This env is passed to running Firetasks as a _fw_env in the fw_spec, which provides for abstraction of resource-specific commands or settings. See fireworks.core.firework.FiretaskBase for information on how to use this env variable in Firetasks.

classmethod auto_load()

Returns FWorker object from settings file(my_fworker.yaml).

classmethod from_dict(*args, **kwargs)
property query

Returns updated query dict.

to_dict(*args, **kwargs)

fireworks.core.launchpad module

The LaunchPad manages the FireWorks database.

class fireworks.core.launchpad.LaunchPad(host=None, port=None, name=None, username=None, password=None, logdir=None, strm_lvl=None, user_indices=None, wf_user_indices=None, authsource=None, uri_mode=False, mongoclient_kwargs=None)

Bases: FWSerializable

The LaunchPad manages the FireWorks database.

__init__(host=None, port=None, name=None, username=None, password=None, logdir=None, strm_lvl=None, user_indices=None, wf_user_indices=None, authsource=None, uri_mode=False, mongoclient_kwargs=None) None
Parameters:
  • host (str) – hostname. If uri_mode is True, a MongoDB connection string URI (https://docs.mongodb.com/manual/reference/connection-string/) can be used instead of the remaining options below.

  • port (int) – port number

  • name (str) – database name

  • username (str)

  • password (str)

  • logdir (str) – path to the log directory

  • strm_lvl (str) – the logger stream level

  • user_indices (list) – list of ‘fireworks’ collection indexes to be built

  • wf_user_indices (list) – list of ‘workflows’ collection indexes to be built

  • authsource (str) – authSource parameter for MongoDB authentication; defaults to “name” (i.e., db name) if not set

  • uri_mode (bool) – if set True, all Mongo connection parameters occur through a MongoDB URI string (set as the host).

  • mongoclient_kwargs (dict) – A list of any other custom keyword arguments to be passed into the MongoClient connection. Use these kwargs to specify SSL/TLS or serverSelectionTimeoutMS arguments. Note these arguments are different depending on the major pymongo version used; see pymongo documentation for more details.

add_offline_run(launch_id, fw_id, name) None

Add the launch and firework to the offline_run collection.

Parameters:
  • launch_id (int) – launch id.

  • fw_id (id) – firework id

  • name (str)

add_wf(wf, reassign_all=True)

Add workflow(or firework) to the launchpad. The firework ids will be reassigned.

Parameters:
  • wf (Workflow/Firework) – Workflow or Firework object

  • reassign_all (bool) – If True, the firework ids will be assigned starting from the next available id. Defaults to True.

Returns:

mapping between old and new Firework ids

Return type:

dict

append_wf(new_wf, fw_ids, detour=False, pull_spec_mods=True) None

Append a new workflow on top of an existing workflow.

Parameters:
  • new_wf (original children of the parent fw_ids to the) – The new workflow to append

  • fw_ids ([int]) – The parent fw_ids at which to append the workflow

  • detour (bool) – Whether to connect the new Workflow in a “detour” style, i.e., move

  • new_wf

  • pull_spec_mods (bool) – Whether the new Workflow should pull the FWActions of the parent fw_ids

archive_wf(fw_id) None

Archive the workflow containing the given firework id.

Parameters:

fw_id (int) – firework id

classmethod auto_load()
bulk_add_wfs(wfs) None

Adds a list of workflows to the fireworks database using insert_many for both the fws and wfs, is more efficient than adding them one at a time.

Parameters:

wfs ([Workflow]) – list of workflows or fireworks

Returns:

None

cancel_reservation(launch_id) None

Given the launch id, cancel the reservation and rerun the fireworks.

cancel_reservation_by_reservation_id(reservation_id) None

Given the reservation id, cancel the reservation and rerun the corresponding fireworks.

change_launch_dir(launch_id, launch_dir) None

Change the launch directory corresponding to the given launch id.

Parameters:
  • launch_id (int)

  • launch_dir (str) – path to the new launch directory.

checkout_fw(fworker, launch_dir, fw_id=None, host=None, ip=None, state='RUNNING')

Checkout the next ready firework, mark it with the given state(RESERVED or RUNNING) and return it to the caller. The caller is responsible for running the Firework.

Parameters:
  • fworker (FWorker) – A FWorker instance

  • launch_dir (str) – the dir the FW will be run in (for creating a Launch object)

  • fw_id (int) – Firework id

  • host (str) – the host making the request (for creating a Launch object)

  • ip (str) – the ip making the request (for creating a Launch object)

  • state (str) – RESERVED or RUNNING, the fetched firework’s state will be set to this value.

Returns:

firework and the new launch id.

Return type:

(Firework, int)

complete_launch(launch_id, action=None, state='COMPLETED')

Internal method used to mark a Firework’s Launch as completed.

Parameters:
  • launch_id (int)

  • action (FWAction) – the FWAction of what to do next

  • state (str) – COMPLETED or FIZZLED

Returns:

updated launch

Return type:

dict

defuse_fw(fw_id, rerun_duplicates=True)

Given the firework id, defuse the firework and refresh the workflow.

Parameters:
  • fw_id (int) – firework id

  • rerun_duplicates (bool) – if True, duplicate fireworks(ones with the same launch) are marked for rerun and then defused.

defuse_wf(fw_id, defuse_all_states=True) None

Defuse the workflow containing the given firework id.

Parameters:
  • fw_id (int) – firework id

  • defuse_all_states (bool)

delete_fws(fw_ids, delete_launch_dirs=False) None

Delete a set of fireworks identified by their fw_ids.

ATTENTION: This function serves maintenance purposes and will leave workflows untouched. Its use will thus result in a corrupted database. Use ‘delete_wf’ instead for consistently deleting workflows together with their fireworks.

Parameters:
  • fw_ids ([int]) – Firework ids

  • delete_launch_dirs (bool) – if True all the launch directories associated with the WF will be deleted as well, if possible.

delete_wf(fw_id, delete_launch_dirs=False) None

Delete the workflow containing firework with the given id.

Parameters:
  • fw_id (int) – Firework id

  • delete_launch_dirs (bool) – if True all the launch directories associated with the WF will be deleted as well, if possible.

delete_launch_dirs

detect_lostruns(expiration_secs=14400, fizzle=False, rerun=False, max_runtime=None, min_runtime=None, refresh=False, query=None, launch_query=None)

Detect lost runs i.e running fireworks that haven’t been updated within the specified time limit or running firework whose launch has been marked fizzed or completed.

Parameters:
  • expiration_secs (seconds) – expiration time in seconds

  • fizzle (bool) – if True, mark the lost runs fizzed

  • rerun (bool) – if True, mark the lost runs fizzed and rerun

  • max_runtime (seconds) – maximum run time

  • min_runtime (seconds) – minimum run time

  • refresh (bool) – if True, refresh the workflow with inconsistent fireworks.

  • query (dict) – restrict search to FWs matching this query

  • launch_query (dict) – restrict search to launches matching this query (e.g. host restriction)

Returns:

tuple of list of lost launch ids, lost firework ids and

inconsistent firework ids.

Return type:

([int], [int], [int])

detect_unreserved(expiration_secs=1209600, rerun=False)

Return the reserved launch ids that have not been updated for a while.

Parameters:
  • expiration_secs (seconds) – time limit

  • rerun (bool) – if True, the expired reservations are cancelled and the fireworks rerun.

Returns:

list of expired launch ids

Return type:

[int]

forget_offline(launchid_or_fwid, launch_mode=True) None

Unmark the offline run for the given launch or firework id.

Parameters:
  • launchid_or_fwid (int) – launch od or firework id

  • launch_mode (bool) – if True then launch id is given.

classmethod from_dict(d)
future_run_exists(fworker=None) bool

Check if database has any current OR future Fireworks available.

Returns:

True if database has any ready or waiting Fireworks.

Return type:

bool

get_fw_by_id(fw_id)

Given a Firework id, give back a Firework object.

Parameters:

fw_id (int) – Firework id.

Returns:

Firework object

get_fw_dict_by_id(fw_id)

Given firework id, return firework dict.

Parameters:

fw_id (int) – Firework id.

Returns:

dict

get_fw_ids(query=None, sort=None, limit=0, count_only=False, launches_mode=False)

Return all the fw ids that match a query.

Parameters:
  • query (dict) – representing a Mongo query

  • [ (sort) – sort argument in Pymongo format

  • limit (int) – limit the results

  • count_only (bool) – only return the count rather than explicit ids

  • launches_mode (bool) – query the launches collection instead of fireworks

Returns:

list of firework ids matching the query

Return type:

list

get_fw_ids_from_reservation_id(reservation_id)

Given the reservation id, return the list of firework ids.

Parameters:

reservation_id (int)

Returns:

list of firework ids.

Return type:

[int]

get_fw_ids_in_wfs(wf_query=None, fw_query=None, sort=None, limit=0, count_only=False, launches_mode=False)

Return all fw ids that match fw_query within workflows that match wf_query.

Parameters:
  • wf_query (dict) – representing a Mongo query on workflows

  • fw_query (dict) – representing a Mongo query on Fireworks

  • [ (sort) – sort argument in Pymongo format

  • limit (int) – limit the results

  • count_only (bool) – only return the count rather than explicit ids

  • launches_mode (bool) – query the launches collection instead of fireworks

Returns:

list of firework ids matching the query

Return type:

list

get_launch_by_id(launch_id)

Given a Launch id, return details of the Launch.

Parameters:

launch_id (int) – launch id.

Returns:

Launch object

get_launchdir(fw_id, launch_idx=-1)

Returns the directory of the most recent launch of a fw_id :param fw_id: (int) fw_id to get launch id for :param launch_idx: (int) index of the launch to get. Default is -1, which is most recent.

get_logdir()

Return the log directory.

AJ: This is needed for job packing due to Proxy objects not being fully featured…

get_new_fw_id(quantity=1)

Checkout the next Firework id.

Parameters:

quantity (int) – optionally ask for many ids, otherwise defaults to 1 this then returns the first fw_id in that range

get_new_launch_id()

Checkout the next Launch id.

get_recovery(fw_id, launch_id='last')

function to get recovery data for a given fw and launch :param fw_id: fw id to get recovery data for :type fw_id: int :param launch_id: launch_id to get recovery data for, if ‘last’

recovery data is generated from last launch.

get_reservation_id_from_fw_id(fw_id)

Given the firework id, return the reservation id.

get_tracker_data(fw_id)
Parameters:

fw_id (id) – firework id.

Returns:

list tracker dicts

Return type:

[dict]

get_wf_by_fw_id(fw_id)

Given a Firework id, give back the Workflow containing that Firework.

Parameters:

fw_id (int) – Firework id.

Returns:

A Workflow object

get_wf_by_fw_id_lzyfw(fw_id: int) Workflow

Given a FireWork id, give back the Workflow containing that FireWork.

Parameters:

fw_id (int) – FireWork id.

Returns:

A Workflow object

get_wf_ids(query=None, sort=None, limit=0, count_only=False)

Return one fw id for all workflows that match a query.

Parameters:
  • query (dict) – representing a Mongo query

  • [ (sort) – sort argument in Pymongo format

  • limit (int) – limit the results

  • count_only (bool) – only return the count rather than explicit ids

Returns:

list of firework ids

Return type:

list

get_wf_summary_dict(fw_id, mode='more')

A much faster way to get summary information about a Workflow by querying only for needed information.

Parameters:
  • fw_id (int) – A Firework id.

  • mode (str) – Choose between “more”, “less” and “all” in terms of quantity of information.

Returns:

information about Workflow.

Return type:

dict

log_message(level, message) None

Support for job packing.

Parameters:
maintain(infinite=True, maintain_interval=None) None

Perform launchpad maintenance: detect lost runs and unreserved RESERVE launches.

Parameters:
  • infinite (bool)

  • maintain_interval (seconds) – sleep time

mark_fizzled(launch_id) None

Mark the launch corresponding to the given id as FIZZLED.

Parameters:

launch_id (int) – launch id.

Returns:

updated launch

Return type:

dict

pause_fw(fw_id)

Given the firework id, pauses the firework and refresh the workflow.

Parameters:

fw_id (int) – firework id

pause_wf(fw_id) None

Pause the workflow containing the given firework id.

Parameters:

fw_id (int) – firework id

ping_launch(launch_id, ptime=None, checkpoint=None) None

Ping that a Launch is still alive: updates the ‘update_on ‘field of the state history of a Launch.

Parameters:
  • launch_id (int)

  • ptime (datetime)

recover_offline(launch_id, ignore_errors=False, print_errors=False)

Update the launch state using the offline data in FW_offline.json file.

Parameters:
  • launch_id (int) – launch id.

  • ignore_errors (bool)

  • print_errors (bool)

Returns:

firework id if the recovering fails otherwise None

reignite_fw(fw_id)

Given the firework id, re-ignite(set state=WAITING) the defused firework.

Parameters:

fw_id (int) – firework id

reignite_wf(fw_id) None

Reignite the workflow containing the given firework id.

Parameters:

fw_id (int) – firework id

rerun_fw(fw_id, rerun_duplicates=True, recover_launch=None, recover_mode=None)

Rerun the firework corresponding to the given id.

Parameters:
  • fw_id (int) – firework id

  • rerun_duplicates (bool) – flag for whether duplicates should be rerun

  • recover_launch ('last' or int) – launch_id for last recovery, if set to ‘last’ (default), recovery will find the last available launch. If it is an int, will recover that specific launch

  • recover_mode ('prev_dir' or 'cp') – flag to indicate whether to copy or run recovery fw in previous directory

Returns:

list of firework ids that were rerun

Return type:

[int]

reserve_fw(fworker, launch_dir, host=None, ip=None, fw_id=None)

Checkout the next ready firework and mark the launch reserved.

Parameters:
  • fworker (FWorker)

  • launch_dir (str) – path to the launch directory.

  • host (str) – hostname

  • ip (str) – ip address

  • fw_id (int) – fw_id to be reserved, if desired

Returns:

the checked out firework and the new launch id.

Return type:

(Firework, int)

reset(password, require_password=True, max_reset_wo_password=25) None

Create a new FireWorks database. This will overwrite the existing FireWorks database! To safeguard against accidentally erasing an existing database, a password must be entered.

Parameters:
  • password (str) – A String representing today’s date, e.g. ‘2012-12-31’

  • require_password (bool) – Whether a password is required to reset the DB. Setting to false is dangerous because running code unintentionally could clear your DB - use max_reset_wo_password to minimize risk.

  • max_reset_wo_password (int) – A failsafe; when require_password is set to False, FWS will not clear DBs that contain more workflows than this parameter

restore_backup_data(launch_id, fw_id) None

For the given launch id and firework id, restore the back up data.

resume_fw(fw_id)

Given the firework id, resume (set state=WAITING) the paused firework.

Parameters:

fw_id (int) – firework id

run_exists(fworker=None)

Checks to see if the database contains any FireWorks that are ready to run.

Returns:

True if the database contains any FireWorks that are ready to run.

Return type:

bool

set_priority(fw_id, priority) None

Set priority to the firework with the given id.

Parameters:
  • fw_id (int) – firework id

  • priority

set_reservation_id(launch_id, reservation_id) None

Set reservation id to the launch corresponding to the given launch id.

Parameters:
  • launch_id (int)

  • reservation_id (int)

to_dict()

Note: usernames/passwords are exported as unencrypted Strings!

tuneup(bkground=True) None

Database tuneup: build indexes.

update_spec(fw_ids, spec_document, mongo=False) None

Update fireworks with a spec. Sometimes you need to modify a firework in progress.

Parameters:
  • [int] (fw_ids) – All fw_ids to modify.

  • spec_document (dict) – The spec document. Note that only modifications to the spec key are allowed. So if you supply {“_tasks.1.parameter”: “hello”}, you are effectively modifying spec._tasks.1.parameter in the actual fireworks collection.

  • mongo (bool) – spec_document uses mongo syntax to directly update the spec

class fireworks.core.launchpad.LazyFirework(fw_id, fw_coll, launch_coll, fallback_fs)

Bases: object

A LazyFirework only has the fw_id, and retrieves other data just-in-time. This representation can speed up Workflow loading as only “important” FWs need to be fully loaded.

__init__(fw_id, fw_coll, launch_coll, fallback_fs) None
Parameters:
  • fw_id (int) – firework id

  • fw_coll (pymongo.collection) – fireworks collection

  • launch_coll (pymongo.collection) – launches collection.

property archived_launches
property created_on
db_fields = ('name', 'fw_id', 'spec', 'created_on', 'state')
db_launch_fields = ('launches', 'archived_launches')
property full_fw
property launches
property name
property parents
property partial_fw
property spec
property state
property tasks
to_db_dict()
to_dict()
property updated_on
exception fireworks.core.launchpad.LockedWorkflowError

Bases: ValueError

Error raised if the context manager WFLock can’t acquire the lock on the WF within the selected time interval (WFLOCK_EXPIRATION_SECS), if the killing of the lock is disabled (WFLOCK_EXPIRATION_KILL).

class fireworks.core.launchpad.WFLock(lp, fw_id, expire_secs=300, kill=False)

Bases: object

Lock a Workflow, i.e. for performing update operations Raises a LockedWorkflowError if the lock couldn’t be acquired within expire_secs and kill==False. Calling functions are responsible for handling the error in order to avoid database inconsistencies.

__init__(lp, fw_id, expire_secs=300, kill=False) None
Parameters:
  • lp (LaunchPad)

  • fw_id (int) – Firework id

  • expire_secs (int) – max waiting time in seconds.

  • kill (bool) – force lock acquisition or not.

fireworks.core.launchpad.get_action_from_gridfs(action_dict, fallback_fs)

Helper function to obtain the correct dictionary of the FWAction associated with a launch. If necessary retrieves the information from gridfs based on its identifier, otherwise simply returns the dictionary in input. Should be used when accessing a launch to ensure the presence of the correct action dictionary.

Parameters:
  • action_dict (dict) – the dictionary contained in the “action” key of a launch document.

  • fallback_fs (GridFS) – the GridFS with the actions exceeding the 16MB limit.

Returns:

the dictionary of the action.

Return type:

dict

fireworks.core.launchpad.sort_aggregation(sort)

Build sorting aggregation pipeline.

Parameters:

[ (sort) – sorting keys and directions as a list of (str, int) tuples, i.e. [(‘updated_on’, 1)]

fireworks.core.rocket module

A Rocket fetches a Firework from the database, runs the sequence of Firetasks inside, and then completes the Launch.

class fireworks.core.rocket.Rocket(launchpad: LaunchPad, fworker: FWorker, fw_id: int)

Bases: object

The Rocket fetches a workflow step from the FireWorks database and executes it.

__init__(launchpad: LaunchPad, fworker: FWorker, fw_id: int) None

Args: launchpad (LaunchPad): A LaunchPad object for interacting with the FW database.

If none, reads FireWorks from FW.json and writes to FWAction.json

fworker (FWorker): A FWorker object describing the computing resource fw_id (int): id of a specific Firework to run (quit if it cannot be found).

decorate_fwaction(fwaction: FWAction, my_spec: dict[str, any], m_fw: Firework, launch_dir: str) FWAction
run(pdb_on_exception: bool = False) bool

Run the rocket (check out a job from the database and execute it).

Parameters:

pdb_on_exception (bool) – whether to invoke the debugger on a caught exception. Default to False.

Returns:

True if the rocket ran successfully, False is if it failed or no job in the DB was ready to run.

Return type:

bool

static update_checkpoint(launchpad: LaunchPad, launch_dir: str, launch_id: int, checkpoint: dict[str, any]) None

Helper function to update checkpoint.

Parameters:
  • launchpad (LaunchPad) – LaunchPad to ping with checkpoint data

  • launch_dir (str) – directory in which FW_offline.json was created

  • launch_id (int) – launch id to update

  • checkpoint (dict) – checkpoint data

fireworks.core.rocket.background_task(btask, spec, stop_event, master_thread) None
fireworks.core.rocket.do_ping(launchpad: LaunchPad, launch_id: int) None
fireworks.core.rocket.ping_launch(launchpad: LaunchPad, launch_id: int, stop_event: Event, master_thread: Thread) None
fireworks.core.rocket.start_background_task(btask, spec)
fireworks.core.rocket.start_ping_launch(launchpad: LaunchPad, launch_id: int) Event | None
fireworks.core.rocket.stop_backgrounds(ping_stop, btask_stops) None

fireworks.core.rocket_launcher module

This module contains methods for launching Rockets, both singly and in rapid-fire mode.

fireworks.core.rocket_launcher.get_fworker(fworker)
fireworks.core.rocket_launcher.launch_rocket(launchpad, fworker=None, fw_id=None, strm_lvl='INFO', pdb_on_exception=False)

Run a single rocket in the current directory.

Parameters:
  • launchpad (LaunchPad)

  • fworker (FWorker)

  • fw_id (int) – if set, a particular Firework to run

  • strm_lvl (str) – level at which to output logs to stdout

  • pdb_on_exception (bool) – if True, Python will start the debugger on a firework exception

Returns:

bool

fireworks.core.rocket_launcher.rapidfire(launchpad: LaunchPad, fworker: FWorker = None, m_dir: str | None = None, nlaunches: int = 0, max_loops: int = -1, sleep_time: int | None = None, strm_lvl: str = 'INFO', timeout: int | None = None, local_redirect: bool = False, pdb_on_exception: bool = False) None

Keeps running Rockets in m_dir until we reach an error. Automatically creates subdirectories for each Rocket. Usually stops when we run out of FireWorks from the LaunchPad.

Parameters:
  • launchpad (LaunchPad)

  • fworker (FWorker object)

  • m_dir (str) – the directory in which to loop Rocket running

  • nlaunches (int) – 0 means ‘until completion’, -1 or “infinite” means to loop until max_loops

  • max_loops (int) – maximum number of loops (default -1 is infinite)

  • sleep_time (int) – secs to sleep between rapidfire loop iterations

  • strm_lvl (str) – level at which to output logs to stdout

  • timeout (int) – of seconds after which to stop the rapidfire process

  • local_redirect (bool) – redirect standard input and output to local file

  • pdb_on_exception (bool) – if True, python will start the debugger on a firework exception

Module contents