============================ Using the dataflow Firetasks ============================ This group includes custom Firetasks to manage dataflow between Fireworks. The input data and output data are stored in the Firework **spec** and passed to the subsequent Firetasks and Fireworks via *FWAction* objects. The module includes: * CommandLineTask * ForeachTask * JoinDictTask * JoinListTask * ImportDataTask To use a Python function in a dataflow context, see e.g. :doc:`PyTask `. CommandLineTask =============== The CommandLineTask provides methods to handle commands in a shell with command line options, manage the inputs and outputs of commands and receive file metadata from parent Fireworks and pass file metadata to child Fireworks. Required parameters ------------------- * **command_spec** *(dict)*: a dictionary specification of the command. It has the following structure:: command_spec = { 'command': (list), # mandatory, list of strings inputs[0]: (dict), # optional inputs[1]: (dict), # optional # ... outputs[0]: (dict), # optional outputs[1]: (dict), # optional # ... } .. note:: When a ``str`` is found instead of ``dict`` for some input or output key, for example ``inputs[1]: 'string'``, then ``'string'`` is automatically replaced with ``{spec['string']}``. The ``command`` key is a representation of the command as to be used with the *Subprocess* package. The optional keys ``inputs[0]``, ``inputs[1]``, ..., ``outputs[0]``, ``outputs[0]``, ..., are the actual keys specified in ``inputs`` and ``outputs``. The dictionaries ``dict1``, ``dict2``, etc. have the following schema:: { 'binding': { prefix: str or None, separator: str or None }, 'source': { 'type': 'path' or 'data' or 'identifier' or 'stdin' or 'stdout' or 'stderr' or None, 'value': str or int or float }, 'target': { 'type': 'path' or 'data' or 'identifier' or 'stdin' or 'stdout' or 'stderr' or None, 'value': str } } .. note:: If the ``type`` in the ``source`` field is ``data`` then ``value`` can be of types ``str``, ``int`` and ``float``. .. note:: When a ``str`` is found instead of ``dict`` for some ``source``, for example ``{'source': 'string'}``, then ``string`` is replaced with ``spec['string']``. Optional parameters ------------------- * **inputs** *(list)*: a list of keys, one for each input argument; default is empty. * **outputs** *(list)*: a list of keys, one for each output argument; default is empty. * **chunk_number** *(int)*: the serial number of the Firetask when it is part of a parallel group generated by a ForeachTask; default is ``None``. Example ------- The following workflow executes the command ``echo -n Hello world!`` in the command line:: fws: - fw_id: 1 name: Run a command and store the result spec: _tasks: - _fw_name: CommandLineTask command_spec: command: [echo, -n] input string: {source: input string} output file: source: {type: stdout} target: {type: path, value: /tmp} inputs: [input string] outputs: [output file] input string: {type: data, value: Hello world!} links: {} metadata: {} The STDOUT output is collected and stored in a new file under ``/tmp``. The full path of the file is stored in **spec** of the current Firework and all child Fireworks with key ``output file``. ForeachTask =========== The purpose of ForeachTask is to dynamically branch the workflow between this Firework and its children by inserting a parallel section of child Fireworks. The number of the spawned parallel Fireworks is determined by the length of the list specified by the ``split`` parameter or the optional ``number of chunks`` parameter. Each child Firework contains a Firetask (of classes PyTask, CommandLineTask or any Firetask with ``inputs`` parameter) which processes one element (or one chunk) from this list. The output is passed to the **spec** of the Firework(s) right after the detour using a push method, i.e. the outputs of all parallel Fireworks are collected in a list specified in the ``outputs`` argument. .. note:: the ordering of elements (or chunks) in the resulting ``outputs`` list can be different from that in the original list depending on the execution order of spawned Fireworks. Required parameters ------------------- * **task** *(dict)*: a dictionary representation of the Firetask * **split** *(str)*: a key in **spec** which contains input data to be distributed over the parallel child Fireworks. This key must also be available both in the ``inputs`` parameter of the Firetask and in the **spec**. Optional parameters ------------------- * **number of chunks** *(int)*: if provided, the input list, specified with ``split`` will be divided into this number of sub-lists (chunks) and each chunk will be processed by a separate child Firework. This parameter can be used to reduce the number of parallel Fireworks. Default is the length of the input data list specified in ``split``, i.e. for each data element a child Firework will be created. Example ------- The following example demonstrates the use of ForeachTask in combination with PyTask:: fws: - fw_id: 1 name: Grind coffee spec: _tasks: - _fw_name: ForeachTask split: coffee beans task: _fw_name: PyTask func: auxiliary.printurn inputs: [coffee beans] outputs: [coffee powder] coffee beans: [arabica, robusta, liberica] - fw_id: 2 name: Brew coffee spec: _tasks: - _fw_name: ForeachTask split: coffee powder task: _fw_name: PyTask func: auxiliary.printurn inputs: [coffee powder, water] outputs: [pure coffee] water: workflowing water - fw_id: 3 name: Serve coffee spec: _tasks: - _fw_name: PyTask func: auxiliary.printurn inputs: [pure coffee] links: '1': [2] '2': [3] metadata: {} name: Workflow for many sorts of coffee In this example the function ``auxiliary.printurn`` prints and returns all its arguments:: def printurn(*args): result = [] for arg in args: if isinstance(arg, list) and len(arg) == 1: result.append(arg[0]) else: result.append(arg) if len(result) == 1: result = result[0] print(result) return result The module ``auxiliary``, i.e. the file ``auxiliary.py`` must be in ``$PYTHONPATH``. JoinDictTask ============ This Firetask combines the specified items from **spec** into a new or existing dictionary in **spec**. Required parameters ------------------- * **inputs** *(list)*: a list of **spec** keys * **output** *(str)*: a **spec** key under which the joined items will be stored Optional parameters ------------------- * **rename** *(dict)*: a dictionary with key translations for keys, specified in ``inputs``, e.g. ``{'old name 1': 'new name 1', 'old name 2': 'new name 2'}`` JoinListTask ============ This Firetask combines the items specified by **spec** keys into a new or existing list in **spec**. Required parameters ------------------- * **inputs** *(list)*: a list of **spec** keys * **output** *(str)*: a **spec** key under which the joined items will be stored Optional parameters ------------------- None. ImportDataTask ============== This Firetask updates a dictionary in **spec** with JSON/YAML data from file in a nested dictionary specified by a map string (see below). Required parameters ------------------- * **filename** *(str)*: a filename from which the data is imported. The name must end with either ``.json`` or ``.yaml``. * **mapstring** *(str)*: a map string in the format ``maplist[0]/maplist[1]/...``. At least ``maplist[0]`` has to be defined because this is the key in **spec** to be used for the import. Every further nesting can be specified by extending the mapstring, for example if ``mapstring`` is ``maplist[0]/maplist[1]`` then the data will be imported as ``spec[maplist[0]][maplist[1]]``. Optional parameters ------------------- None.