Query workflows and workflow nodes

The WFQuery class

The WFQuery class helps to perform arbitrary workflow queries.

Note: Currently, the WFQuery class is limited to FireWorks workflows using MongoDB queries written in Python (see PyMongo).

In this (simple) example, we select all workflows that have ‘copper’ or ‘Copper’ in the name and add to the resulting documents only these nodes that have name ‘Adsorption energy’:

from virtmat.middleware.query.wfquery import WFQuery
wfq = WFQuery(launchpad, wf_query={'name': {'$regex': '[Cc]opper'},
                         fw_query={'name': 'Adsorption energy'})

The launchpad object can be constructed as described here. The constructed wfqobject is a list of workflow documents (here as dictionaries) with imbedded information about nodes (fireworks) and launches (via aggregation). The returned documents match both the wf_query and fw_query. The wf_query describes a query for the workflows collection, while fw_query is a query for the nodes (fireworks) collection.

Note: Empty queries, i.e. {}, match all documents on the database.

Note: The wfq object is the result of a completed database query, i.e. it is not updated automatically after it is created while the database might be changing. To query changes in the database a new object of the WFQuery class must be created.

Functions (methods) of the WFQuery class

Function name

Arguments

Returns

Purpose

get_wf_info

fw_ids

None

Display a summary of workflows including specific nodes

get_fw_info

fw_ids

[dict]

Provide detailed information about specific nodes

get_task_info

fw_ids

[dict]

Return the dataflow from PyTask firetasks in selected fireworks

get_wf_ids

[int]

Return the node IDs, one for each worklfow

get_fw_ids

[int]

Return the node IDs, complete list

get_i_names

fw_ids

{str}

Return input data names in selected fireworks

get_i_names

fw_ids

{str}

Return input data names in selected fireworks

get_o_names

fw_ids

{str}

Return output data names in selected fireworks

get_data

dname, io_kind

[dict]

Return the input/output data for a given data name

get_i_data

dname

[dict]

Return the input data for a given data name

get_o_data

dname

[dict]

Return the output data for a given data name

get_nodes_providing

dname, match

[int]

Return the ids of nodes providing a specified output

Note: In all functions, fw_ids is an optional argument. If not specified, then the node IDs returned by get_fw_ids() are processed.

Q&A

Why do I need workflow queries?

Here some specific situations when we need to perform queries:

  1. To see the status of the workflow and/or of the individual nodes in it. If the execution of a node has failed we need to see the reason of the failure (the error message). This is done with a query. The WFEngine class provides monitoring functions for this use case.

  2. To see the inputs, outputs and metadata of a specific node in a specific workflow. This is typically used in developing a workflow.

    Example:

    from virtmat.middleware.query.wfquery import WFQuery
    wfq = WFQuery(launchpad, wf_query={}, fw_query={'fw_id': 12345})
    print(wfq.get_fw_info())
    print(wfq.get_task_info())
    
  3. To get the data from all workflows with nodes matching certain criteria, for example where some input parameter or some result has a specific value. Typical use cases are statistical data analysis and high throughput computing.

    Example:

    wf_query = {'name': '$regex': 'MnOx', 'metadata.constrained spin': False}
    fw_query = {'state': 'COMPLETED', 'name': 'Relax structure *OOH',
                'spec.calculator.parameters.encut': 450,
                '$and' : ['spec.calculator.parameters.ediffg': {'$gte': -0.01},
                          'spec.calculator.parameters.ediffg': {'$lt': 0.0}]}
    wfq = WFQuery(launchpad, wf_query, fw_query)
    print(wfq.get_wf_info()) # info about the matching workflows
    print(wfq.get_fw_info()) # info about all matching nodes
    i_names = wfq.get_i_names() # list of all inputs in the matching nodes
    o_names = wfq.get_o_names() # list of all outputs in the matching nodes
    initial_structures = wfq.get_i_data('initial structure') # 'initial structure' is in i_names
    final_structures = wfq.get_o_data('relaxed structure') # 'relaxed structure' is in o_names
    

How can I use the WFQuery class in combination with the WFEngine class?

We can use the WFQuery class to construct a new engine with existing workflows matching a query. Because WFEngine accepts a wf_query keyword we do not need WFQuery class if fw_query is an empty query. But if we want to choose workflows with additional filters to nodes, we can use the WFQuery class.

Example:

Choose all workflows having MnOx in their names for which constrained spin == True and contain nodes with the name Relax structure *O and the parameter nupdown == 1.

wfe = WFEngine(launchpad) # create an engine with no workflows
wf_query = {'name': '$regex': 'MnOx', 'metadata.constrained spin': True}
fw_query = {'name': 'Relax structure *O', 'spec.calculator.parameters.nupdown': 1}
wf_ids = WFQuery(launchpad, wf_query, fw_query).get_wf_ids()
map(lambda i: wfe.add_workflow(fw_id=i), wf_ids) # add the chosen workflows one-by-one

Another use case is to query the nodes in an existing WFEngine object.

Example 1: Find the completed root nodes (root nodes are nodes with no parent nodes):

wfq = WFQuery(wfe.launchpad, wf_query=wfe.wf_query, fw_query={'state': 'COMPLETED'})
root_nodes = [n['id'] for n in wfq.get_fw_info() if len(n['parents'])==0]

Example 2: Find the nodes that have the data field charges in their outputs.

wfq = WFQuery(wfe.launchpad, wf_query=wfe.wf_query)
nodes = [n['id'] for n in wfq.get_fw_info() if 'charges' in n['outputs']]