workflow package¶
Usage Summary¶
- Define delayed functions:
>>> @delayed()
... def A(foo):
... time.sleep(0.05)
... print foo
>>>
>>> @delayed()
... def B():
... print 'Boo!'
>>>
>>> @delayed()
... def C(x, y):
... return x ** y
- Compose the functions using
|
and&
for parallel and sequential evaluation:
>>> root_node = (A('hello world!') | B()) & C(4, 2)
>>> root_node
<cloudmesh_workflow.workflow.AndNode object at ...>
- Evaluate the resulting graph
>>> evaluate(root_node.graph)
Boo!
hello world!
Description¶
This module provides an api for building a workflow graph of labeled functions which can then be evaluated. Nodes connected with a desired ordering or run sequentially, others can be run in parallel.
Syntax is inspired by the parallel (||) and sequential (;) operators. For example:
(A || B) ; (C || D)
means that A and B can be evaluated in parallel, and likewise C and D, but both A and B must be completed before C or D may begin.
The python implementation overrides the bitwise OR (|) and AND (&) operators to provide a similar syntactic feel. The example above should be defined as such:
(A() | B()) & (C() | D())
Note
The python operator precedence for |
and &
is unchanged:
&
has higher precedence than |
.
Usage¶
The first part is to mark top-level functions as delayed()
. The
@delayed()
decoration wraps the function so that calling the
function inserts the Node
, without applying the parameters,
into the call Graph
. You can access the graph
property of
any node to get the current call graph.
For example, define two delayed functions A
and B
:
>>> @delayed()
... def A(x):
... return x*2
>>> @delayed()
... def B(x, y):
... return x ** y
Compose A
and B
to run in parallel
>>> node = A(24) | B(40, 2)
Evaluate the graph:
>>> evaluate(node.graph)
Print the results:
>>> for _, data in node.graph.nodes(data=True):
... n = data['node']
... print n.name, n.result.result()
| None
A 48
B 1600
Cloudmesh Workflow Example¶
Warning
This is a proposed usage example and hasn’t been tested yet.
from cloudmesh_base import Shell
from workflow import delayed, evaluate
@delayed()
def FutureSystems():
"Start a VM on FutureSystems OpenStack Kilo"
Shell.cm('boot', 'kilo')
@delayed()
def Cybera(x, y):
"Start a VM on Cybera cloud"
Shell.cm('boot', 'cybera')
@delayed()
def Rackspace():
"Start a VM on Rackspace"
Shell.cm('boot', 'rackspace')
def main():
"Boot machines in parallel"
node = FutureSystems() | Cybera() | Rackspace()
evaluate(node.graph)
Concepts¶
Deferring function evaluation¶
A delayed
is intended to be used as a decorator to lift
arbitrary functions to have delayed semantics. Evaluation semantics of
delayed
objects is:
Composing Node
s for parallel/sequential semantics¶
A Node
captures the evaluation state of a delayed
function. It provides several important attributes:
graph
: the evaluation graph in which the function is located.f
: the function to evaluate.name
: the name of the node. Typically captured fromf
, but may be a shorthand representation ofOpNode
.result
: the status and result of the evaluation.
Node
s are created by calling delayed
functions and
then composed using &
and |
. Each composition returns a new
Node
in the graph.
Evaluation of a delayed function¶
Once Node
s have been composed to achieve the desired
parallelism, evaluate the graph by calling evaluate()
on the
graph
attribute of the composed node.
API¶
-
class
cloudmesh_workflow.workflow.
delayed
(graph=None, **kws)¶ Bases:
object
A
delayed
is a decorator that delays evaluation of a function until explicitly called for usingevaluate()
.Intended usage: decorate a function such that
__call__()
ing it returns aNode
instance that can be combined with otherNode
instances using the bitwise__and__()
(&
) and__or__()
(|
) operators to create a workflow.Example:
>>> @delayed() ... def foo(*args): ... for a in args: ... print a >>> type(foo) <type 'function'> >>> node = foo(1, 2) & foo(3, 4) >>> print node <cloudmesh_workflow.workflow.AndNode object at ...> >>> evaluate(node.graph) 1 2 3 4
kws
will be passed to theNode
constructor.Parameters: graph ( Graph
orNone
) – Ifgraph
notNone
, this explicitly specifies the graph into which theNode
will be inserted.
-
cloudmesh_workflow.workflow.
evaluate
(Graph) → None¶ Graph
-> IO ()Starting from the root node, evaluate the branches. The graph nodes are updated in-place.
Example:
>>> @delayed() ... def foo(a): ... return a >>> node = foo(42) & foo(24) >>> print evaluate(node.graph) None >>> for _, data in node.graph.nodes(data=True): ... n = data['node'] ... print n.name, n.result.result() & None foo 42 foo 24
-
class
cloudmesh_workflow.workflow.
Node
(f_args_kws, graph=None, executor=None, timeout=None)¶ Bases:
traits.has_traits.HasTraits
A node in the
Graph
and associated state.Node
s can be composed using bitwise__and__()
and__or__()
operators to denote sequential or parallel evaluation order, respectively.For example, give
A
,B
, andC
functions that have been lifted to aNode
type (eg through thedelayed
decorator@delayed()
), to evaluateA
andB
in parallel, thenC
:G = ( (A(argA0, argA1) | B()) & C(argC) ).graph
will create the call
Graph
G
. In order to evaluateG
:evaluate(G)
Create a
Node
to evaluate a functionf
in somegraph
using a givenexecutor
Parameters: - func – f_args_kws = (f, args, kws) a 3-tuple of the function to evaluate (any callable) along with positional and keywork arguments.
- graph – The
Graph
in which to insert the node upon composition with others. A value ofNone
will create a new graph. When composed with another node in a differentNode.graph()
the two graphs with be merged. - executor – a
futures.Executor
instance - timeout – seconds (float or int) to wait.
-
children
¶ [
Node
]The children of this node. See
Node.children_iter()
Return type: list of Node
-
children_iter
¶ Generator of
Node
sThis
yield
‘s all the childrenNode
s of this node.Returns: Child nodes of this node. Return type: generator of Node
-
compose
(other, callable(graph=Graph)) → OpNode¶ Compose this
Node
with anotherNode
.Two Nodes are composed using a proxy
OpNode
. The OpNode defines the evaluation semantics of its child nodes (eg sequantial or parallel).Parameters: - other – a
Node
- MkOpNode – a callable with keyword arg graph constructor for the proxy node
Returns: A new
Node
withself
andother
and children.Return type: - other – a
-
eval
() → None¶ Start and wait for a node.
-
start
() → None¶ Start evaluating this node
Start evaluating this nodes function
self.f
if it hasn’t already started.
-
wait
() → None¶ Wait for this node to finish evaluating
This may timeout if
timeout
is specified.
-
class
cloudmesh_workflow.workflow.
OpNode
(**kwargs)¶ Bases:
cloudmesh_workflow.workflow.Node
A proxy node defining the evaluation semantics of its children
Node
sIntended usage: this class it not intended to be instantiated directly. Rather, classes should inherit from
OpNode
to defined the desired semantics.
-
class
cloudmesh_workflow.workflow.
AndNode
(**kwargs)¶ Bases:
cloudmesh_workflow.workflow.OpNode
Sequential evaluation semantics.
Children of
AndNode
will be evaluated in the order in which they were added as children of this node.Example:
>>> @delayed() ... def foo(a): return 42 >>> foo(42) & foo(24) <cloudmesh_workflow.workflow.AndNode object at ...>
-
start
()¶
-
wait
()¶
-
-
class
cloudmesh_workflow.workflow.
OrNode
(**kwargs)¶ Bases:
cloudmesh_workflow.workflow.OpNode
Parallel evaluation semantics
Children of
OrNode
will be evaluated in parallel, sparked in the order in which they were added as children of this node.Example:
>>> @delayed() ... def foo(a): return 42 >>> foo(42) | foo(24) <cloudmesh_workflow.workflow.OrNode object at ...>
-
start
()¶
-
wait
()¶
-
-
class
cloudmesh_workflow.workflow.
Graph
(data=None, **attr)¶ Bases:
networkx.classes.digraph.DiGraph
A NetworkX
networkx.DiGraph()
where the ordering of edges/nodes is preservedInitialize a graph with edges, name, graph attributes.
- data : input graph
- Data to initialize graph. If data=None (default) an empty graph is created. The data can be an edge list, or any NetworkX graph object. If the corresponding optional Python packages are installed the data can also be a NumPy matrix or 2d ndarray, a SciPy sparse matrix, or a PyGraphviz graph.
- name : string, optional (default=’‘)
- An optional name for the graph.
- attr : keyword arguments, optional (default= no attributes)
- Attributes to add to graph as key=value pairs.
convert
>>> G = nx.Graph() # or DiGraph, MultiGraph, MultiDiGraph, etc >>> G = nx.Graph(name='my graph') >>> e = [(1,2),(2,3),(3,4)] # list of edges >>> G = nx.Graph(e)
Arbitrary graph attribute pairs (key=value) may be assigned
>>> G=nx.Graph(e, day="Friday") >>> G.graph {'day': 'Friday'}
-
adjlist_dict_factory
¶ alias of
OrderedDict
-
node_dict_factory
¶ alias of
OrderedDict