# Manage workflows with `WFEngine` First of all, please have a look at [the glossary](#glossary) below for explanation of some terms. `WFEngine` (workflow engine) is a Python class used to manage the full life cycle of [*scientific workflows*](https://en.wikipedia.org/wiki/Scientific_workflow_system#Scientific_workflows) easily. The `WFEngine` class is developed as a middleware of a [Virtual Research Environment](https://en.wikipedia.org/wiki/Virtual_research_environment) to implement available use cases as Jupyter notebooks. This middleware acts as interface to existing *workflow management systems* that can be switched as transparent backends. ## Configuration Currently, one backend *workflow management system*, [FireWorks](https://materialsproject.github.io/fireworks/), is supported. However, the long-term intention of this middleware is to integrate further backend systems in such a way that they appear transparent to the end-user. To get started with WFEngine, the backend system must be configured. For the computing resources provided by [NHR@KIT](https://www.nhr.kit.edu/) or [bwUniCluster](https://wiki.bwhpc.de/e/Category:BwUniCluster_2.0) we provide ready-to-use default configuration files and access to MongoDB. In the following, we first describe the steps required for creating a `WFEngine` object. ### Create a launchpad The `LaunchPad` object (`launchpad` below) for the FireWorks database is created as described [here](launchpad.md). ### Create a workflow query If you intend to create a new engine with workflows already existing on the launchpad you can define a query to include the wished workflows. For more details on how to define a query we refer to [this section](wfquery.md). If you wish to inlcude no workflows and to create an empty engine you can set `wf_query = None`. Any time later, you can add workflows to the engine. ### Create resource configuration The [resource configuration (resconfig)](resconfig.md) must be completed before creating instances of `WFEngine`. ## Create a `WFEngine` object In order to execute workflows on the launchpad, you need to create a WFEngine instance. If you intend to use resources on a remote machine or on a remote computing cluster to execute workflows you need to create a [WFEngineRemote](wfengine_remote.md) instance and continue with this documentation from the [Execute workflows](#execute-workflows) section. The WFEngine object is created with this code: ```python from virtmat.middleware.engine.wfengine import WFEngine wfe = WFEngine(launchpad=launchpad, wf_query=wf_query) ``` Here is a more detailed explanation of all parameters accepted by the `WFEngine` class: - `launchpad` (object): A `LaunchPad` object that must be created as described [here](launchpad.md). - `wf_query` (dict): A dictionary containing a [workflow query](#create-a-workflow-query). If the query is empty, i.e. `{}`, all workflows on the launchpad will be added to the engine (not recommended). If the query is `None` or not specified then no workflows will be added to the engine. - `qadapter` (object): An object of class `CommonAdapter`. The qadapter object enables execution of workflow nodes via a batch queue. If the qadapter is `None` or not specified then a default qadapter will be created automatically from the [resource configuration](resconfig.md). The qadapter construction is described [here](qadapter.md). - `name` (str): An optional worker name. If `name` is not specified or is `None` then the name of the default worker from the [resource configuration](resconfig.md) will be used. If `name` is specified then a worker with this name must be in the list of configured workers in resconfig. If a workflow is created outside of the engine and added using the `add_workflow()` method then the name should be specified and should match the `_fworker` keyword in the node specifications of the workflow. - `launchdir` (str): An optional path to the directory where to start interactive and batch jobs; defaults to the launchdir in the [resource configuration](resconfig.md) if specified, otherwise to the current working directory. - `unique_launchdir` (bool): Create an individual directory with unique name for every single launch of interactive nodes. Disable with `False` if the code executed in the nodes has no I/O in the current working directory. Default is `True`. - `sleep_time` (int): The launcher thread awakes every `sleep_time` seconds to launch nodes that are ready for execution. Default is 30 seconds. ## Execute workflows The `WFEngine` class completely automates the execution of the workflows configured in the engine. The class has a `start()` method to spawn a background launcher thread as well as a `stop()` method to safely stop the thread, i.e.: ```python wfe.start() ... # do some work, the thread is running in background wfe.stop() ``` This thread periodically checks whether any nodes in the workflow can be executed. The thread launches the selected workflow nodes for execution either directly as *interactive* jobs or as *batch* jobs via a batch system, such as [SLURM](https://slurm.schedmd.com/overview.html). As the thread is running in the background, the user can perform other management tasks on the workflows configured in the engine, such as query workflows and nodes, modify and rerun nodes, add new nodes, and add new workflows to the engine. The `WFEngine` class can be used also non-interactively ro execute workflows. For example, this script will run one hour and execute the workflows configured in the engine: ```python from time import sleep from virtmat.middleware.engine.wfengine import WFEngine wfe = WFEngine.from_file('engine.yaml') wfe.start() sleep(3600) wfe.stop() ``` ## Add and remove workflows There are two ways to add workflows to the engine: 1. Add a new workflow from a file. A new workflow object `wflow` that can be either constructed from scratch or loaded from a file and added to the engine, for example: ```python from fireworks import Workflow wflow = Workflow.from_file('workflow.yaml') wfe.add_workflow(workflow=wflow) ``` **NOTE: If a workflow is added using this method, one has to make sure that the `_category` and `_fworker` keys are properly set in the `spec` dictionaries of the workflow nodes. The `_category` must be either `interactive` or `batch`. The `_fworker` must be the same as the worker name configured in the engine. The worker name configured in the engine can be displayed by using `wfe.name`.** 2. Add an existing workflow. A workflow that already exists on the launchpad can be added to the engine using a `fw_id`: ```python wfe.add_workflow(fw_id=id_of_the_fw) ``` where `id_of_the_fw` refers to the id of a firework which is part of the workflow one wishes to add to the engine. For example, to add to the engine a workflow, which contains a firework with `fw_id` 123, one should use `wfe.add_workflow(fw_id=123)`. A workflow can be removed from the engine using `wfe.remove_workflow(fw_id)`. The `fw_id` can be found by using the [`WFQuery` class](wfquery.md) or in the output of `wfe.status_summary()`. It is noted that the workflow will not be deleted from the launchpad by calling the `remove_workflow()` function. It will only be de-registered from the engine and, if needed, can be added later again using `wfe.add_workflow(fw_id)`. ## Monitor workflows The call `wfe.status_summary()` provides an overview of all workflows and the therein contained nodes with their corresponding IDs (`fw_id`s), current states and modification times. In addition, this function will display the current status of the launcher thread running in the background. A more detailed information about a specific node can be obtained with the `status_detail()` function. For example to see the details of node with `fw_id`s 1234 and 4321 the call will be `wfw.status_detail(1234, 4321)`. The `status_detail()` function is particularly useful to get the error message of a failed node execution. ## Modify and rerun nodes If for some reason we want to modify a node of a workflow the `update_node()` function can be used, for example to change parameter `temperature` in node with `fw_id` 1234 we can use: ```python wfe.update_node(1234, {'temperature': 298.15}) ``` The `update_node()` function cannot be applied to nodes that are in COMPLETED, RUNNING or FIZZLED state. COMPLETED node must be rerun and RUNNING nodes must be [cancelled](#cancel-node-execution) first. For example, to rerun a COMPLETED node with `fw_id` 4321 the `rerun_node()` function can be called as follows ```python wfe.rerun_node(4321) ``` The `rerun_node()` can also be used to rerun nodes in FIZZLED state. In many cases, the `update_node()` and `rerun_node()` functions are called together. For example, in a node in FIZZLED state, we first call `update()` to correct an error and then `rerun_node()` to change its state to READY or WAITING. In another case we want to correct an error in a COMPLETED node. In this case we call the `rerun_node()` function first to get the node in a writable state (such as READY or WAITING) and then call `update()`. In these cases, it is recommended to use the `update_rerun_node()` function that combines the two functions in the proper ordering: ```python wfe.update_rerun_node(4321, {'temperature': 273.15}) ``` ## Cancel node execution A node scheduled for batch execution (in RESERVED state) or a RUNNING node in batch execution can be cancelled. This is performed with the `cancel_job()` function: ```python wfe.cancel_job(5432, restart=True) ``` Interactive nodes or batch nodes in other states that RESERVED and RUNNING cannot be cancelled. ## Add a workflow node The `FWEngine` class provides a method to add a Python function as a new node to an existing workflow. Let us have a workflow with `fw_id` 1234 that has an output with name `x`. (The actual value of `x` depends on the inputs of node 1234 and the function processing these inputs.) Let us compute `y = x**a`, i.e. and let `a = 5`. This can be done with the library function `math.pow` as follows: ```python wfe.add_node(func='math.pow', inputs=[(1234, 'x', None), (None, 'a', 5)], outputs=['y']) ``` The `func` parameter is the fully qualified name of a function available in the system path. The `inputs` parameter in the call is a list of positional arguments for the provided Python function. Every input is described by a tuple (fw_id, name, value) with 1. the fw_id of a existing node providing the input, in this case 1234; if the input is provided as a constant value, then `None` should be specified; 2. the name of the input as provided in the list of outputs of node 1234; 3. the value of the input; if output data from another node is used as input, then this should be set to `None`. The `outputs` parameter describes the names of the outputs as a list of strings. ## Further optional parameters for `add_node()` * `kwargs` (dict): keyword arguments to pass to the python function `func`; default is empty dictionary `{}`. * `category` (str): the node launch category; must be set either to `interactive` or `batch`; default is `interactive`. * `fworker` (str): the name of the worker to launch the node; default is `wfe.name`. * `qadapter` (`CommonAdapter`): a custom qadapter, overriding the defaut qadapter in the engine that is relevant if `category` is set to `batch`; default is `None`, i.e. no custom qadapter is written to the node. ## Save and load a WFEngine object Sometimes it is helpful to save the engine to a file and sometime later to continue with the same engine in another jupyter session. To this end, the `to_file()` and `from_file()` methods are provided. The JSON and YAML formats are supported by these methods. For example, an engine `wfe` can be saved to a YAML file using ```python wfe.to_file('engine.yaml') ``` and later loaded from the file using: ```python from virtmat.middleware.engine.wfengine import WFEngine wfe = WFEngine.from_file('engine.yaml') ``` ## Set and get the workflow IDs or the workflow query The workflow IDs and the workflow query can be set during the whole lifetime of the `WFEngine` object as demonstrated in this example: ```python wfe.wf_ids = [12345] print(wfe.wf_query) # will print {'nodes': {'$in': [12345]}} wfe.wf_query = {'name': 'my workflow'} print(wfe.wf_ids) # will be [] if the query had no match ``` # Glossary * **Scientific [workflow](https://en.wikipedia.org/wiki/Workflow)** is the coordinated execution of repeatable *actions* accounting for dependencies and concurrency. The *actions* can be computation, measurement, pre/post processing, data management and analysis, visualization. Other (imprecise) names for a workflow: protocol, recipe, job chain, task sequence, pipeline, procedure. The scientific workflow focusses on how to *organize* these actions, i.e. keeping the answers of these questions: What order? What inputs? What outputs? What metadata? What resources? * [**Workflow Management System**](https://en.wikipedia.org/wiki/Scientific_workflow_system) helps managing scientific workflows, i.e. authoring, validating, editing, persisting, running, and querying workflows and workflow nodes. A workflow management system offers at least some of the following benefits: scalability, concurrency, distributed / heterogeneous computing, data and code reuse, provenance, reproducibility, modular validation, automation, resilience (fault tolerance), rapid prototyping. * **Workflow node** is an action from the workflow executed separately (in space and/or time) on a given computing resource. For example two nodes can be executed at the same time on different resources or at different moments of time, including a time gap between their execution times. A workflow node can be executed either directly in the interactive Jupyter session (*interactive* category) or submitted for execution as a batch job to a Batch Queuing System (*batch* category). * [**Batch Queuing System**](https://en.wikipedia.org/wiki/Job_scheduler) helps managing jobs on computing clusters, in particular to manage the computing resources and to schedule jobs to resources according to their specific requirements. * [**Computing resource**](https://en.wikipedia.org/wiki/System_resource) requirement can be, for example, the number of processors, the size of memory and an estimate of the job running time. * **Workflow graph** is a [directed acyclic graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) representing the workflow nodes as vertices and the dependencies between workflow nodes as edges. * **Workflow query** is a [query](https://en.wikipedia.org/wiki/Query#Computing_and_technology) that retrieves and/or changes information about the workflow. The query can include, for example, workflow name, workflow ID, time of creation, time of last change, the set of nodes and the set of node dependencies identified by their IDs, names and metadata. * **Node query** is a [query](https://en.wikipedia.org/wiki/Query#Computing_and_technology) that retrieves and/or changes information about a specific workflow node. This query can include description of the action with the list of tasks, the input and output data usually described by metadata (such as location of storage and name in the workflow), input parameters, and the resource requirements. * **State** of workflow and workflow nodes can be interpreted using [this reference](https://materialsproject.github.io/fireworks/reference.html#interpretation-of-state-of-fws-and-wfs) keeping in mind that a *firework* is equivalent to *workflow node*.