virtmat.middleware.engine package

Submodules

virtmat.middleware.engine.wfengine module

A simple workflow engine

class virtmat.middleware.engine.wfengine.WFEngine(launchpad, qadapter=None, wf_query=None, name=None, launchdir=None, unique_launchdir=True, sleep_time=30)

Bases: FWSerializable

A simple engine to manage workflows

add_node(func, inputs, outputs=None, name=None, kwargs=None, category=None, fworker=None, qadapter=None)

Add a python function node to an existing workflow

Parameters:
  • func (str) – a function name with an optional module name in the format ‘module.function’

  • inputs ([tuple]) – a list of positional arguments for the provided function. Every input is described by a tuple (fw_id, name, value) with the following elements: fw_id (int): The fw_id of a parent node providing the input; if the input is provided as a constant value, then None should be specified. name (str): The name of the input as provided in the list of outputs of the parent node; value: The value of the input; if output data from a parent node is used as input, then this should be set to None.

  • outputs ([str]) – names of the outputs

  • name (str, None) – name of the node

  • kwargs (dict, None) – a dictionary of keyword arguments for func

  • category (str, None) – job category, either ‘batch’ or ‘interactive’

  • fworker (FWorker, None) – fworker for executing the batch jobs

  • qadapter (CommonAdapter, None) – qadapter for submitting batch jobs

add_workflow(workflow=None, fw_id=None)

Add a workflow to the engine. Either a workflow object or a fw_id must be defined.

Parameters:
  • workflow (Workflow, None) – a workflow object

  • fw_id (int, None) – a fw_id of a workflow existing on the launchpad

append_wf_id(wf_id)

append a workflow id (wf_id) to the list of wf_ids

cancel_job(fw_id, restart=False, deactivate=False)

Cancel the execution of a node in RESERVED or RUNNING state. Either restart or deactivate can be set to True if required.

Parameters:
  • fw_id (int) – the fw_id of the node to cancel

  • restart (bool) – restart node after cancelling

  • deactivate (bool) – deactivate node after cancelling

classmethod from_dict(*args, **kwargs)
property fw_ids

get the current firework ids of the engine

get_failed()

Get failed job ids

Returns:

a list of fw_ids of failed jobs

Return type:

([int])

get_lost_jobs(time=14400, fizzle=False)

Detect nodes that have been launched but not updated within the specified time.

Parameters:
  • time (int) – minimim time in seconds since the most recent update

  • fizzle (bool) – set state of detected nodes to FIZZLED

Returns:

a list of fw_ids of the lost runs

Return type:

lost_fw_ids ([int])

get_unreserved_nodes(time=1209600)

Detect reserved nodes, i.e. in ‘RESERVED’ state within FireWorks, that have not been updated for a while. Possible inconsistent states in SLURM are ‘CANCELLED’, ‘FAILED’, ‘COMPLETED’, ‘OUT_OF_MEMORY’, ‘BOOT_FAIL’, ‘TIMEOUT’ and ‘DEADLINE’

Parameters:

time (int) – minimum time in seconds since the most recent update

Returns:

a list of dictionaries containing the fw_ids, the reservation ids, the SLURM states and the launch directories of such reserved nodes

Return type:

([dict])

logger = <Logger virtmat.middleware.engine.wfengine (DEBUG)>
property name

get the name of the engine

qlaunch(fw_id)

Launch a batch node by submitting a job to the queuing system

Parameters:

fw_id (int) – a fwd_id of the node to launch

Returns:

the reservation ID in case of successful submission,

False or None in case of error during submission

Return type:

(int|bool)

Raises:

ConfigurationException – when fw_id is invalid, qadapter not defined, fworker not defined, or attempting to use with mongomock

remove_wf_id(wf_id)

remove a workflow id (wf_id) from the list of wf_ids

remove_workflow(fw_id)

Remove a workflow from the engine (but not deleted from launchpad)

Parameters:

fw_id (int) – a fw_id of a node in the workflow to remove

rerun_node(fw_id)

Rerun a workflow node. Nodes in FIZZLED and COMPLETED states will be directly rerun. Nodes in PAUSED and DEFUSED states are resumed and reignited, respectively. In all cases the target node state is WAITING. The final state can be READY if all parents are COMPLETED.

Parameters:

fw_id – the fw_id of the node to rerun

rlaunch(fw_id)

Launch an interactive node

Parameters:

fw_id (int) – a fw_id of the node to launch

Raises:

ConfigurationException – if fw_id is invalid

show_launcher_status()

Check whether a launcher thread is running

show_nodes_status()

Display the status summary of the nodes

show_wf_status(add_io_info=True)

Display the status summary of the workflows

start(raise_exception=False)

Start a launcher thread

status_detail(*fw_ids)

Print a detailed status of specified nodes

Parameters:

fw_ids ([int]) – One or more fw_ids of the nodes

Returns:

a list of dictionaries, containing the nodes

status_summary()

Display a status summary of workflows and nodes

stop(join=False)

Gracefully stop the launcher thread if it is running

to_dict(*args, **kwargs)
update_node(fw_id, update_dict)

Update (modify) a workflow node. Only nodes in FIZZLED, DEFUSED, PAUSED, WAITING, and READY states can be modified.

Parameters:
  • fw_id (int) – the fw_id of the node to modify

  • update_dict (dict) – a dictionary with the updates to perform

update_rerun_node(fw_id, update_dict)

Update (modify) and rerun a workflow node combined in one function. Only nodes in FIZZLED, DEFUSED, PAUSED, and COMPLETED states can be processed.

Parameters:
  • fw_id (int) – the fw_id of the node to process

  • update_dict (dict) – a dictionary with the updates to perform

property wf_ids

get the workflow ids of the engine

property wf_query

get the query for the engine

virtmat.middleware.engine.wfengine_jupyter module

A graphical user interface for WFEngine based on ipywidgets

class virtmat.middleware.engine.wfengine_jupyter.WFEnginejupyter

Bases: object

A class for construcing a GUI for FireWorks

add_node_button_clicked(bvar)

add node button is clicked

add_nodes_button_clicked(bvar)

add nodes button is clicked

add_workflow_button_clicked(bvar)

add workflows from a query or a file

cancel_launch_button_clicked(bvar)

cancel launched (reserved or running) nodes

commit_remove_workflow_button_clicked(bvar)

commit workflows removal from engine

create_wf_id_select()

creates a new selector with updated workflow ids

dump_engine_button_clicked(bvar)

dump the engine to file

func_name = None
job_category = None
jqadapter = None
logger = <Logger virtmat.middleware.engine.wfengine_jupyter (DEBUG)>
lpad_button_clicked(bvar)

load user defined launchpad

manage_nodes_button_clicked(bvar)

Manage nodes button is clicked

manage_workflows_button_clicked(bvar)

manage workflows button is clicked

new_engine_button_clicked(bvar)

create new engine button is clicked

node_id_select = None
nodes_status_button_clicked(bvar)

nodes status summary

qadapter_button_clicked(bvar)

load user defined qadapter

remove_workflow_button_clicked(bvar)

remove workflows from engine

rerun_node_button_clicked(bvar)

rerun selected nodes and print their new status

resume_engine_button_clicked(bvar)

resume engine button is clicked

rows_inputs = None
rows_outputs = None
size_inp = None
size_out = None
start_launcher_clicked(bvar)

start launcher button clicked

status_button_clicked(bvar)

workflow status summary

status_detailed_button_clicked(bvar)

status details about selected nodes

stop_launcher_clicked(bvar)

stop launcher button clicked

update_node_button_clicked(bvar)

update selected nodes

update_rerun_node_button_clicked(bvar)

update and rerun selected nodes

wf_id_select = None
wfe = None
virtmat.middleware.engine.wfengine_jupyter.add_workflow_method_changed(bvar)

select the method to add workflows from radio buttons

virtmat.middleware.engine.wfengine_jupyter.clear_button_outputs()

Clear top buttons outputs

virtmat.middleware.engine.wfengine_jupyter.clear_consoleoutput()

Clear outputs

virtmat.middleware.engine.wfengine_jupyter.configure_button_clicked(bvar)

Configure button is clicked

virtmat.middleware.engine.wfengine_jupyter.configure_engine_method_changed(bvar)

select engine configuration method from radio buttons

virtmat.middleware.engine.wfengine_jupyter.manage_launcher_button_clicked(bvar)

manage launcher button is clicked

virtmat.middleware.engine.wfengine_jupyter.new_workflow_button_clicked(bvar)

new workflow button is clicked

virtmat.middleware.engine.wfengine_jupyter.remote_cluster_changed(bvar)

toggle the remote cluster checkbox

virtmat.middleware.engine.wfengine_jupyter.resconfig_button_clicked(bvar)

resconfig button is clicked

virtmat.middleware.engine.wfengine_remote module

Launch workflow nodes on remote resources

class virtmat.middleware.engine.wfengine_remote.WFEngineRemote(launchpad, qadapter, wf_query, host=None, user=None, conf='', **kwargs)

Bases: WFEngine

A subclass of wfEngine to manage remote workers

Parameters:
  • host – hostname of the remote resource

  • user – username on the remote resource

  • conf – configuration command to set up the remote environment

Passwordless connection via SSH to the remote system must be enabled. Otherwise the following error message will occur: PasswordRequiredException: private key file is encrypted

check_jobcancel(res_id)

Execute the slurm sacct command remotely

exec_cancel(res_id)

Execute the slurm cancel command remotely

classmethod from_dict(*args, **kwargs)
launcher(stop_event)

Awake every sleep_time seconds and launch all READY nodes

logger = <Logger virtmat.middleware.engine.wfengine_remote (DEBUG)>
setup_remote_configuration()

Create remote launch directory and copy all configuration files

setup_remote_fworker()

Create configuration for remote worker

setup_remote_launchpad()

Create launchpad file for remote worker

setup_remote_qadapter()

Create qadapter file for remote worker

slaunch(fw_id)

Launch a batch node on a remote resource

to_dict(*args, **kwargs)

Module contents