Manage workflows with WFEngine
First of all, please have a look at the glossary below for explanation of some terms.
WFEngine (workflow engine) is a Python class used to manage the full life cycle of scientific workflows easily. The WFEngine class is developed as a middleware of a Virtual Research Environment to implement available use cases as Jupyter notebooks. This middleware acts as interface to existing workflow management systems that can be switched as transparent backends.
Configuration
Currently, one backend workflow management system, FireWorks, is supported. However, the long-term intention of this middleware is to integrate further backend systems in such a way that they appear transparent to the end-user. To get started with WFEngine, the backend system must be configured. For the computing resources provided by NHR@KIT or bwUniCluster we provide ready-to-use default configuration files and access to MongoDB. In the following, we first describe the steps required for creating a WFEngine object.
Create a launchpad
The LaunchPad object (launchpad below) for the FireWorks database is created as described here.
Create a workflow query
If you intend to create a new engine with workflows already existing on the launchpad you can define a query to include the wished workflows. For more details on how to define a query we refer to this section.
If you wish to inlcude no workflows and to create an empty engine you can set wf_query = None. Any time later, you can add workflows to the engine.
Create resource configuration
The resource configuration (resconfig) must be completed before creating instances of WFEngine.
Create a WFEngine object
In order to execute workflows on the launchpad, you need to create a WFEngine instance. If you intend to use resources on a remote machine or on a remote computing cluster to execute workflows you need to create a WFEngineRemote instance and continue with this documentation from the Execute workflows section.
The WFEngine object is created with this code:
from virtmat.middleware.engine.wfengine import WFEngine
wfe = WFEngine(launchpad=launchpad, wf_query=wf_query)
Here is a more detailed explanation of all parameters accepted by the WFEngine class:
launchpad(object): ALaunchPadobject that must be created as described here.wf_query(dict): A dictionary containing a workflow query. If the query is empty, i.e.{}, all workflows on the launchpad will be added to the engine (not recommended). If the query isNoneor not specified then no workflows will be added to the engine.qadapter(object): An object of classCommonAdapter. The qadapter object enables execution of workflow nodes via a batch queue. If the qadapter isNoneor not specified then a default qadapter will be created automatically from the resource configuration. The qadapter construction is described here.name(str): An optional worker name. Ifnameis not specified or isNonethen the name of the default worker from the resource configuration will be used. Ifnameis specified then a worker with this name must be in the list of configured workers in resconfig. If a workflow is created outside of the engine and added using theadd_workflow()method then the name should be specified and should match the_fworkerkeyword in the node specifications of the workflow.launchdir(str): An optional path to the directory where to start interactive and batch jobs; defaults to the launchdir in the resource configuration if specified, otherwise to the current working directory.unique_launchdir(bool): Create an individual directory with unique name for every single launch of interactive nodes. Disable withFalseif the code executed in the nodes has no I/O in the current working directory. Default isTrue.sleep_time(int): The launcher thread awakes everysleep_timeseconds to launch nodes that are ready for execution. Default is 30 seconds.
Execute workflows
Launcher thread
The WFEngine class completely automates the execution of the workflows configured in the engine. The class has a start() method to spawn a background launcher thread as well as a stop() method to safely stop the thread, i.e.:
wfe.start()
... # do some work, the thread is running in background
wfe.stop()
This thread periodically checks whether any nodes in the workflow can be executed. The thread launches the selected workflow nodes for execution either directly as interactive jobs or as batch jobs via a batch system, such as SLURM. As the thread is running in the background, the user can perform other management tasks on the workflows configured in the engine, such as query workflows and nodes, modify and rerun nodes, add new nodes, and add new workflows to the engine.
The WFEngine class can be used also non-interactively ro execute workflows. For example, this script will run one hour and execute the workflows configured in the engine:
from time import sleep
from virtmat.middleware.engine.wfengine import WFEngine
wfe = WFEngine.from_file('engine.yaml')
wfe.start()
sleep(3600)
wfe.stop()
NOTE: The launcher thread cannot be started if FireWorks has been set up with MongoMock because the multi-threaded access to the database file will not be reliable.
The function status_summary displays the status of the launcher thread:
wfe.status_summary()
The launcher thread must be shutdown cleanly before exiting the main thread. To stop the launcher thread automatically one can create this special context:
import contextlib
with contextlib.ExitStack() as cm:
wfe = WFEngine(...)
cm.callback(WFEngine.stop, wfe)
wfe.start()
# main REPL
When the main thread leaves the context the callback function will be called and the background thread will be stopped safely. Then the laucher thread will join to the main thread.
Failing to call wfe.stop() explicitly or by a callback gefore exit, will create a deadlock condition, i.e. the main thread will wait for the background thread to finish but the background thread will be still running. In this case, the deadlock can be resolved by pressing Ctrl+C or sending SIGTERM signal. In these latter cases the thread will still be shutdown safely. Sending SIGKILL signal will interrupt possible operations in the launcher thread and is not recommended.
Execute on demand
The list of nodes to execute can be further restricted to a list of nodes by using
the wfe.nodes_torun attribute, e.g. to only execute nodes with fw_ids 12 and 13:
wfe.nodes_torun = [12, 13]
Setting wfe.nodes_torun to an empty list prevents executing any nodes. Setting
wfe.nodes_torun to None (the default) executes all nodes configured in the engine.
Add and remove workflows
There are two ways to add workflows to the engine:
Add a new workflow from a file. A new workflow object
wflowthat can be either constructed from scratch or loaded from a file and added to the engine, for example:from fireworks import Workflow wflow = Workflow.from_file('workflow.yaml') wfe.add_workflow(workflow=wflow)
NOTE: If a workflow is added using this method, one has to make sure that the
_categoryand_fworkerkeys are properly set in thespecdictionaries of the workflow nodes. The_categorymust be eitherinteractiveorbatch. The_fworkermust be the same as the worker name configured in the engine. The worker name configured in the engine can be displayed by usingwfe.name.Add an existing workflow. A workflow that already exists on the launchpad can be added to the engine using a
fw_id:
wfe.add_workflow(fw_id=id_of_the_fw)
where id_of_the_fw refers to the id of a firework which is part of the workflow one wishes to add to the engine.
For example, to add to the engine a workflow, which contains a firework with fw_id 123, one should use wfe.add_workflow(fw_id=123).
A workflow can be removed from the engine using wfe.remove_workflow(fw_id). The fw_id can be found by using the WFQuery class or in the output of wfe.status_summary(). It is noted that the workflow will not be deleted from the launchpad by calling the remove_workflow() function. It will only be de-registered from the engine and, if needed, can be added later again using wfe.add_workflow(fw_id).
Monitor workflows
The call wfe.status_summary() provides an overview of all workflows and the therein contained nodes with their corresponding IDs (fw_ids), current states and modification times. A more detailed information about a specific node can be obtained with the status_detail() function. For example to see the details of node with fw_ids 1234 and 4321 the call will be wfw.status_detail(1234, 4321). The status_detail() function is particularly useful to get the error message of a failed node execution.
Modify and rerun nodes
If for some reason we want to modify a node of a workflow the update_node() function can be used, for example to change parameter temperature in node with fw_id 1234 we can use:
wfe.update_node(1234, {'temperature': 298.15})
The update_node() function cannot be applied to nodes that are in COMPLETED, RESERVED or RUNNING state. COMPLETED nodes must be rerun and RESERVED/RUNNING nodes must be cancelled first. For example, to rerun a COMPLETED node with fw_id 4321 the rerun_node() function can be called as follows
wfe.rerun_node(4321)
The rerun_node() can also be used to rerun nodes in FIZZLED, DEFUSED and PAUSED states. The state of a node after applying rerun_node() is WAITING. If all parents of the node are COMPLETED then the node state after applying rerun_node() is READY.
In many cases, the update_node() and rerun_node() functions are called together. For example, in a node in FIZZLED state, we first call update_node() to correct an error and then rerun_node() to change its state to READY or WAITING. In another case we want to correct an error in a COMPLETED node. In this case we call the rerun_node() function first to get the node in a writable state (such as READY or WAITING) and then call update_node(). In these cases, it is recommended to use the update_rerun_node() function that combines the two functions in the proper ordering:
wfe.update_rerun_node(4321, {'temperature': 273.15})
Cancel node execution
A node in RESERVED or RUNNING state can be cancelled. This is performed with the cancel_job() function. For example, to cancel the evaluation of a node with fw_id 5432 and put the node into WAITING state, regardless of the original state, one can use this statement:
wfe.cancel_job(5432, restart=True)
To cancel the evaluation but put the node into DEFUSED or PAUSED state:
wfe.cancel_job(5432, deactivate=True)
If the original state is RESERVED then the final state will be PAUSED. If the original state is RUNNING then the final state will be DEFUSED. The node can be left permanently in one of these states but can also be updated and/or rerun using the update_node() and rerun_node() functions.
The cancel_job() operation can be applied to both interactive nodes and batch nodes.
Add a workflow node
The FWEngine class provides a method to add a Python function as a new node to an existing workflow. Let us have a workflow with fw_id 1234 that has an output with name x. (The actual value of x depends on the inputs of node 1234 and the function processing these inputs.) Let us compute y = x**a, i.e. and let a = 5. This can be done with the library function math.pow as follows:
wfe.add_node(func='math.pow', inputs=[(1234, 'x', None), (None, 'a', 5)], outputs=['y'])
The func parameter is the fully qualified name of a function available in the system path.
The inputs parameter in the call is a list of positional arguments for the provided Python function. Every input is described by a tuple (fw_id, name, value) with
the fw_id of a existing node providing the input, in this case 1234; if the input is provided as a constant value, then
Noneshould be specified;the name of the input as provided in the list of outputs of node 1234;
the value of the input; if output data from another node is used as input, then this should be set to
None.
The outputs parameter describes the names of the outputs as a list of strings.
Further optional parameters for add_node()
kwargs(dict): keyword arguments to pass to the python functionfunc; default is empty dictionary{}.category(str): the node launch category; must be set either tointeractiveorbatch; default isinteractive.fworker(str): the name of the worker to launch the node; default iswfe.name.qadapter(CommonAdapter): a custom qadapter, overriding the defaut qadapter in the engine that is relevant ifcategoryis set tobatch; default isNone, i.e. no custom qadapter is written to the node.
NOTE: Nodes of batch category will not be added if FireWorks has been
set up with MongoMock
because launching them will not be reliable.
Save and load a WFEngine object
Sometimes it is helpful to save the engine to a file and sometime later to continue with the same engine in another jupyter session. To this end, the to_file() and from_file() methods are provided. The JSON and YAML formats are supported by these methods. For example, an engine wfe can be saved to a YAML file using
wfe.to_file('engine.yaml')
and later loaded from the file using:
from virtmat.middleware.engine.wfengine import WFEngine
wfe = WFEngine.from_file('engine.yaml')
Set and get the workflow IDs or the workflow query
The workflow IDs and the workflow query can be set during the whole lifetime
of the WFEngine object as demonstrated in this example:
wfe.wf_ids = [12345]
print(wfe.wf_query) # will print {'nodes': {'$in': [12345]}}
wfe.wf_query = {'name': 'my workflow'}
print(wfe.wf_ids) # will be [] if the query had no match
Glossary
Scientific workflow is the coordinated execution of repeatable actions accounting for dependencies and concurrency. The actions can be computation, measurement, pre/post processing, data management and analysis, visualization. Other (imprecise) names for a workflow: protocol, recipe, job chain, task sequence, pipeline, procedure. The scientific workflow focusses on how to organize these actions, i.e. keeping the answers of these questions: What order? What inputs? What outputs? What metadata? What resources?
Workflow Management System helps managing scientific workflows, i.e. authoring, validating, editing, persisting, running, and querying workflows and workflow nodes. A workflow management system offers at least some of the following benefits: scalability, concurrency, distributed / heterogeneous computing, data and code reuse, provenance, reproducibility, modular validation, automation, resilience (fault tolerance), rapid prototyping.
Workflow node is an action from the workflow executed separately (in space and/or time) on a given computing resource. For example two nodes can be executed at the same time on different resources or at different moments of time, including a time gap between their execution times. A workflow node can be executed either directly in the interactive Jupyter session (interactive category) or submitted for execution as a batch job to a Batch Queuing System (batch category).
Batch Queuing System (alternative link) helps managing jobs on computing clusters, in particular to manage the computing resources and to schedule jobs to resources according to their specific requirements.
Computing resource requirement can be, for example, the number of processor cores, the size of memory and an estimate of the job running time.
Workflow graph is a directed acyclic graph representing the workflow nodes as vertices and the dependencies between workflow nodes as edges.
Workflow query is a query that retrieves and/or changes information about the workflow. The query can include, for example, workflow name, workflow ID, time of creation, time of last change, the set of nodes and the set of node dependencies identified by their IDs, names and metadata.
Node query is a query that retrieves and/or changes information about a specific workflow node. This query can include description of the action with the list of tasks, the input and output data usually described by metadata (such as location of storage and name in the workflow), input parameters, and the resource requirements.
State of workflow and workflow nodes can be interpreted using this reference keeping in mind that a firework is equivalent to workflow node.