Using the dataflow Firetasks

This group includes custom Firetasks to manage dataflow between Fireworks. The input data and output data are stored in the Firework spec and passed to the subsequent Firetasks and Fireworks via FWAction objects. The module includes:

  • CommandLineTask
  • ForeachTask
  • JoinDictTask
  • JoinListTask
  • ImportDataTask

To use a Python function in a dataflow context, see e.g. PyTask.

CommandLineTask

The CommandLineTask provides methods to handle commands in a shell with command line options, manage the inputs and outputs of commands and receive file metadata from parent Fireworks and pass file metadata to child Fireworks.

Required parameters

  • command_spec (dict): a dictionary specification of the command. It has the following structure:

    command_spec = {
        'command': (list), # mandatory, list of strings
        inputs[0]: (dict), # optional
        inputs[1]: (dict), # optional
        # ...
        outputs[0]: (dict), # optional
        outputs[1]: (dict), # optional
        # ...
    }
    

Note

When a str is found instead of dict for some input or output key, for example inputs[1]: 'string', then 'string' is automatically replaced with {spec['string']}.

The command key is a representation of the command as to be used with the Subprocess package. The optional keys inputs[0], inputs[1], …, outputs[0], outputs[0], …, are the actual keys specified in inputs and outputs. The dictionaries dict1, dict2, etc. have the following schema:

{
    'binding': {
        prefix: str or None,
        separator: str or None
    },
    'source': {
        'type': 'path' or 'data' or 'identifier'
                 or 'stdin' or 'stdout' or 'stderr' or None,
        'value': str or int or float
    },
    'target': {
        'type': 'path' or 'data' or 'identifier'
                 or 'stdin' or 'stdout' or 'stderr' or None,
        'value': str
    }
}

Note

If the type in the source field is data then value can be of types str, int and float.

Note

When a str is found instead of dict for some source, for example {'source': 'string'}, then string is replaced with spec['string'].

Optional parameters

  • inputs (list): a list of keys, one for each input argument; default is empty.
  • outputs (list): a list of keys, one for each output argument; default is empty.
  • chunk_number (int): the serial number of the Firetask when it is part of a parallel group generated by a ForeachTask; default is None.

Example

The following workflow executes the command echo -n Hello world! in the command line:

fws:
- fw_id: 1
  name: Run a command and store the result
  spec:
    _tasks:
    - _fw_name: CommandLineTask
      command_spec:
        command: [echo, -n]
        input string: {source: input string}
        output file:
          source: {type: stdout}
          target: {type: path, value: /tmp}
      inputs: [input string]
      outputs: [output file]
    input string: {type: data, value: Hello world!}
links: {}
metadata: {}

The STDOUT output is collected and stored in a new file under /tmp. The full path of the file is stored in spec of the current Firework and all child Fireworks with key output file.

ForeachTask

The purpose of ForeachTask is to dynamically branch the workflow between this Firework and its children by inserting a parallel section of child Fireworks. The number of the spawned parallel Fireworks is determined by the length of the list specified by the split parameter or the optional number of chunks parameter. Each child Firework contains a Firetask (of classes PyTask, CommandLineTask or any Firetask with inputs parameter) which processes one element (or one chunk) from this list. The output is passed to the spec of the Firework(s) right after the detour using a push method, i.e. the outputs of all parallel Fireworks are collected in a list specified in the outputs argument.

Note

the ordering of elements (or chunks) in the resulting outputs list can be different from that in the original list depending on the execution order of spawned Fireworks.

Required parameters

  • task (dict): a dictionary representation of the Firetask
  • split (str): a key in spec which contains input data to be distributed over the parallel child Fireworks. This key must also be available both in the inputs parameter of the Firetask and in the spec.

Optional parameters

  • number of chunks (int): if provided, the input list, specified with split will be divided into this number of sub-lists (chunks) and each chunk will be processed by a separate child Firework. This parameter can be used to reduce the number of parallel Fireworks. Default is the length of the input data list specified in split, i.e. for each data element a child Firework will be created.

Example

The following example demonstrates the use of ForeachTask in combination with PyTask:

fws:
- fw_id: 1
  name: Grind coffee
  spec:
    _tasks:
    - _fw_name: ForeachTask
      split: coffee beans
      task:
        _fw_name: PyTask
        function: auxiliary.printurn
        inputs: [coffee beans]
        outputs: coffee powder
    coffee beans: [arabica, robusta, liberica]
- fw_id: 2
  name: Brew coffee
  spec:
    _tasks:
    - _fw_name: ForeachTask
      split: coffee powder
      task:
        _fw_name: PyTask
        function: auxiliary.printurn
        inputs: [coffee powder, water]
        outputs: pure coffee
    water: workflowing water
- fw_id: 3
  name: Serve coffee
  spec:
    _tasks:
    - _fw_name: PyTask
      function: auxiliary.printurn
      inputs: [pure coffee]
links:
  '1': [2]
  '2': [3]
metadata: {}
name: Workflow for many sorts of coffee

In this example the function auxiliary.printurn prints and returns all its arguments:

def printurn(*args):
    result = []
    for arg in args:
        if isinstance(arg, list) and len(arg) == 1:
            result.append(arg[0])
        else:
            result.append(arg)
    if len(result) == 1:
        result = result[0]
    print(result)
    return result

The module auxiliary, i.e. the file auxiliary.py must be in $PYTHONPATH.

JoinDictTask

This Firetask combines the specified items from spec into a new or existing dictionary in spec.

Required parameters

  • inputs (list): a list of spec keys
  • output (str): a spec key under which the joined items will be stored

Optional parameters

  • rename (dict): a dictionary with key translations for keys, specified in inputs, e.g. {'old name 1': 'new name 1', 'old name 2': 'new name 2'}

JoinListTask

This Firetask combines the items specified by spec keys into a new or existing list in spec.

Required parameters

  • inputs (list): a list of spec keys
  • output (str): a spec key under which the joined items will be stored

Optional parameters

None.

ImportDataTask

This Firetask updates a dictionary in spec with JSON/YAML data from file in a nested dictionary specified by a map string (see below).

Required parameters

  • filename (str): a filename from which the data is imported. The name must end with either .json or .yaml.
  • mapstring (str): a map string in the format maplist[0]/maplist[1]/.... At least maplist[0] has to be defined because this is the key in spec to be used for the import. Every further nesting can be specified by extending the mapstring, for example if mapstring is maplist[0]/maplist[1] then the data will be imported as spec[maplist[0]][maplist[1]].

Optional parameters

None.