Manage workflows with WFEngine

First of all, please have a look at the glossary below for explanation of some terms.

WFEngine (workflow engine) is a Python class used to manage the full life cycle of scientific workflows easily. The WFEngine class is developed as a middleware of a Virtual Research Environment to implement available use cases as Jupyter notebooks. This middleware acts as interface to existing workflow management systems that can be switched as transparent backends.

Configuration

Currently, one backend workflow management system, FireWorks, is supported. However, the long-term intention of this middleware is to integrate further backend systems in such a way that they appear transparent to the end-user. To get started with WFEngine, the backend system must be configured. For the computing resources provided by NHR@KIT or bwUniCluster we provide ready-to-use default configuration files and access to MongoDB. In the following, we first describe the steps required for creating a WFEngine object.

Create a launchpad

The LaunchPad object (launchpad below) for the FireWorks database is created as described here.

Create a workflow query

If you intend to create a new engine with workflows already existing on the launchpad you can define a query to include the wished workflows. For more details on how to define a query we refer to this section.

If you wish to inlcude no workflows and to create an empty engine you can set wf_query = None. Any time later, you can add workflows to the engine.

Create resource configuration

The resource configuration (resconfig) must be completed before creating instances of WFEngine.

Create a WFEngine object

In order to execute workflows on the launchpad, you need to create a WFEngine instance. If you intend to use resources on a remote machine or on a remote computing cluster to execute workflows you need to create a WFEngineRemote instance and continue with this documentation from the Execute workflows section.

The WFEngine object is created with this code:

from virtmat.middleware.engine.wfengine import WFEngine
wfe = WFEngine(launchpad=launchpad, wf_query=wf_query)

Here is a more detailed explanation of all parameters accepted by the WFEngine class:

  • launchpad (object): A LaunchPad object that must be created as described here.

  • wf_query (dict): A dictionary containing a workflow query. If the query is empty, i.e. {}, all workflows on the launchpad will be added to the engine (not recommended). If the query is None or not specified then no workflows will be added to the engine.

  • qadapter (object): An object of class CommonAdapter. The qadapter object enables execution of workflow nodes via a batch queue. If the qadapter is None or not specified then a default qadapter will be created automatically from the resource configuration. The qadapter construction is described here.

  • name (str): An optional worker name. If name is not specified or is None then the name of the default worker from the resource configuration will be used. If name is specified then a worker with this name must be in the list of configured workers in resconfig. If a workflow is created outside of the engine and added using the add_workflow() method then the name should be specified and should match the _fworker keyword in the node specifications of the workflow.

  • launchdir (str): An optional path to the directory where to start interactive and batch jobs; defaults to the launchdir in the resource configuration if specified, otherwise to the current working directory.

  • unique_launchdir (bool): Create an individual directory with unique name for every single launch of interactive nodes. Disable with False if the code executed in the nodes has no I/O in the current working directory. Default is True.

  • sleep_time (int): The launcher thread awakes every sleep_time seconds to launch nodes that are ready for execution. Default is 30 seconds.

Execute workflows

The WFEngine class completely automates the execution of the workflows configured in the engine. The class has a start() method to spawn a background launcher thread as well as a stop() method to safely stop the thread, i.e.:

wfe.start()
... # do some work, the thread is running in background
wfe.stop()

This thread periodically checks whether any nodes in the workflow can be executed. The thread launches the selected workflow nodes for execution either directly as interactive jobs or as batch jobs via a batch system, such as SLURM. As the thread is running in the background, the user can perform other management tasks on the workflows configured in the engine, such as query workflows and nodes, modify and rerun nodes, add new nodes, and add new workflows to the engine.

The WFEngine class can be used also non-interactively ro execute workflows. For example, this script will run one hour and execute the workflows configured in the engine:

from time import sleep
from virtmat.middleware.engine.wfengine import WFEngine
wfe = WFEngine.from_file('engine.yaml')
wfe.start()
sleep(3600)
wfe.stop()

Add and remove workflows

There are two ways to add workflows to the engine:

  1. Add a new workflow from a file. A new workflow object wflow that can be either constructed from scratch or loaded from a file and added to the engine, for example:

    from fireworks import Workflow
    wflow = Workflow.from_file('workflow.yaml')
    wfe.add_workflow(workflow=wflow)
    

    NOTE: If a workflow is added using this method, one has to make sure that the _category and _fworker keys are properly set in the spec dictionaries of the workflow nodes. The _category must be either interactive or batch. The _fworker must be the same as the worker name configured in the engine. The worker name configured in the engine can be displayed by using wfe.name.

  2. Add an existing workflow. A workflow that already exists on the launchpad can be added to the engine using a fw_id:

wfe.add_workflow(fw_id=id_of_the_fw)

where id_of_the_fw refers to the id of a firework which is part of the workflow one wishes to add to the engine. For example, to add to the engine a workflow, which contains a firework with fw_id 123, one should use wfe.add_workflow(fw_id=123).

A workflow can be removed from the engine using wfe.remove_workflow(fw_id). The fw_id can be found by using the WFQuery class or in the output of wfe.status_summary(). It is noted that the workflow will not be deleted from the launchpad by calling the remove_workflow() function. It will only be de-registered from the engine and, if needed, can be added later again using wfe.add_workflow(fw_id).

Monitor workflows

The call wfe.status_summary() provides an overview of all workflows and the therein contained nodes with their corresponding IDs (fw_ids), current states and modification times. In addition, this function will display the current status of the launcher thread running in the background. A more detailed information about a specific node can be obtained with the status_detail() function. For example to see the details of node with fw_ids 1234 and 4321 the call will be wfw.status_detail(1234, 4321). The status_detail() function is particularly useful to get the error message of a failed node execution.

Modify and rerun nodes

If for some reason we want to modify a node of a workflow the update_node() function can be used, for example to change parameter temperature in node with fw_id 1234 we can use:

wfe.update_node(1234, {'temperature': 298.15})

The update_node() function cannot be applied to nodes that are in COMPLETED, RUNNING or FIZZLED state. COMPLETED node must be rerun and RUNNING nodes must be cancelled first. For example, to rerun a COMPLETED node with fw_id 4321 the rerun_node() function can be called as follows

wfe.rerun_node(4321)

The rerun_node() can also be used to rerun nodes in FIZZLED state.

In many cases, the update_node() and rerun_node() functions are called together. For example, in a node in FIZZLED state, we first call update() to correct an error and then rerun_node() to change its state to READY or WAITING. In another case we want to correct an error in a COMPLETED node. In this case we call the rerun_node() function first to get the node in a writable state (such as READY or WAITING) and then call update(). In these cases, it is recommended to use the update_rerun_node() function that combines the two functions in the proper ordering:

wfe.update_rerun_node(4321, {'temperature': 273.15})

Cancel node execution

A node scheduled for batch execution (in RESERVED state) or a RUNNING node in batch execution can be cancelled. This is performed with the cancel_job() function:

wfe.cancel_job(5432, restart=True)

Interactive nodes or batch nodes in other states that RESERVED and RUNNING cannot be cancelled.

Add a workflow node

The FWEngine class provides a method to add a Python function as a new node to an existing workflow. Let us have a workflow with fw_id 1234 that has an output with name x. (The actual value of x depends on the inputs of node 1234 and the function processing these inputs.) Let us compute y = x**a, i.e. and let a = 5. This can be done with the library function math.pow as follows:

wfe.add_node(func='math.pow', inputs=[(1234, 'x', None), (None, 'a', 5)], outputs=['y'])

The func parameter is the fully qualified name of a function available in the system path.

The inputs parameter in the call is a list of positional arguments for the provided Python function. Every input is described by a tuple (fw_id, name, value) with

  1. the fw_id of a existing node providing the input, in this case 1234; if the input is provided as a constant value, then None should be specified;

  2. the name of the input as provided in the list of outputs of node 1234;

  3. the value of the input; if output data from another node is used as input, then this should be set to None.

The outputs parameter describes the names of the outputs as a list of strings.

Further optional parameters for add_node()

  • kwargs (dict): keyword arguments to pass to the python function func; default is empty dictionary {}.

  • category (str): the node launch category; must be set either to interactive or batch; default is interactive.

  • fworker (str): the name of the worker to launch the node; default is wfe.name.

  • qadapter (CommonAdapter): a custom qadapter, overriding the defaut qadapter in the engine that is relevant if category is set to batch; default is None, i.e. no custom qadapter is written to the node.

Save and load a WFEngine object

Sometimes it is helpful to save the engine to a file and sometime later to continue with the same engine in another jupyter session. To this end, the to_file() and from_file() methods are provided. The JSON and YAML formats are supported by these methods. For example, an engine wfe can be saved to a YAML file using

wfe.to_file('engine.yaml')

and later loaded from the file using:

from virtmat.middleware.engine.wfengine import WFEngine
wfe = WFEngine.from_file('engine.yaml')

Set and get the workflow IDs or the workflow query

The workflow IDs and the workflow query can be set during the whole lifetime of the WFEngine object as demonstrated in this example:

wfe.wf_ids = [12345]
print(wfe.wf_query) # will print {'nodes': {'$in': [12345]}}
wfe.wf_query = {'name': 'my workflow'}
print(wfe.wf_ids) # will be [] if the query had no match

Glossary

  • Scientific workflow is the coordinated execution of repeatable actions accounting for dependencies and concurrency. The actions can be computation, measurement, pre/post processing, data management and analysis, visualization. Other (imprecise) names for a workflow: protocol, recipe, job chain, task sequence, pipeline, procedure. The scientific workflow focusses on how to organize these actions, i.e. keeping the answers of these questions: What order? What inputs? What outputs? What metadata? What resources?

  • Workflow Management System helps managing scientific workflows, i.e. authoring, validating, editing, persisting, running, and querying workflows and workflow nodes. A workflow management system offers at least some of the following benefits: scalability, concurrency, distributed / heterogeneous computing, data and code reuse, provenance, reproducibility, modular validation, automation, resilience (fault tolerance), rapid prototyping.

  • Workflow node is an action from the workflow executed separately (in space and/or time) on a given computing resource. For example two nodes can be executed at the same time on different resources or at different moments of time, including a time gap between their execution times. A workflow node can be executed either directly in the interactive Jupyter session (interactive category) or submitted for execution as a batch job to a Batch Queuing System (batch category).

  • Batch Queuing System helps managing jobs on computing clusters, in particular to manage the computing resources and to schedule jobs to resources according to their specific requirements.

  • Computing resource requirement can be, for example, the number of processors, the size of memory and an estimate of the job running time.

  • Workflow graph is a directed acyclic graph representing the workflow nodes as vertices and the dependencies between workflow nodes as edges.

  • Workflow query is a query that retrieves and/or changes information about the workflow. The query can include, for example, workflow name, workflow ID, time of creation, time of last change, the set of nodes and the set of node dependencies identified by their IDs, names and metadata.

  • Node query is a query that retrieves and/or changes information about a specific workflow node. This query can include description of the action with the list of tasks, the input and output data usually described by metadata (such as location of storage and name in the workflow), input parameters, and the resource requirements.

  • State of workflow and workflow nodes can be interpreted using this reference keeping in mind that a firework is equivalent to workflow node.