Skip to content

Commit

Permalink
Merge branch 'release-0.13.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
sphuber committed Apr 1, 2019
2 parents 19581d0 + 54cb978 commit ba77abf
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 15 deletions.
35 changes: 24 additions & 11 deletions plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,17 @@ def successful(self):
except AttributeError:
raise exceptions.InvalidStateError('process is not in the finished state')

@property
def is_successful(self):
"""Return whether the result of the process is considered successful.
:return: boolean, True if the process is in `Finished` state with `successful` attribute set to `True`
"""
try:
return self._state.successful
except AttributeError:
return False

def killed(self):
return self.state == process_states.ProcessState.KILLED

Expand Down Expand Up @@ -713,7 +724,7 @@ def on_finish(self, result, successful):
""" Entering the FINISHED state """
if successful:
try:
self._check_outputs()
self._check_outputs(self._outputs)
except ValueError:
raise StateEntryFailed(process_states.ProcessState.FINISHED, result, False)

Expand Down Expand Up @@ -1111,7 +1122,7 @@ def out(self, output_port, value):
:param output_port: the name of the output port, can be namespaced
:type output_port: str
:param value: the value for the output port
:raises: TypeError if the output value is not validated against the port
:raises: ValueError if the output value is not validated against the port
"""
self.on_output_emitting(output_port, value)

Expand All @@ -1136,11 +1147,15 @@ def out(self, output_port, value):
validation_error = port.validate_dynamic_ports({port_name: value})

if validation_error:
msg = "Error validating output '{}' for port '{}': {}".format(value, ".".join(validation_error.port),
msg = "Error validating output '{}' for port '{}': {}".format(value, validation_error.port,
validation_error.message)
raise TypeError(msg)
raise ValueError(msg)

output_namespace = self._outputs
for sub_space in namespace:
output_namespace = output_namespace.setdefault(sub_space, {})

self._outputs[output_port] = value
output_namespace[port_name] = value
self.on_output_emitted(output_port, value, dynamic)

@protected
Expand Down Expand Up @@ -1218,10 +1233,8 @@ def _check_inputs(self, inputs):
if validation_error:
raise ValueError(str(validation_error))

def _check_outputs(self):
def _check_outputs(self, outputs):
# Check that the necessary outputs have been emitted
wrapped = utils.wrap_dict(self._outputs, separator=self.spec().namespace_separator)
for name, port in self.spec().outputs.items():
validation_error = port.validate(wrapped.get(name, ports.UNSPECIFIED))
if validation_error:
raise ValueError(str(validation_error))
validation_error = self.spec().validate_outputs(outputs)
if validation_error:
raise ValueError(str(validation_error))
2 changes: 1 addition & 1 deletion plumpy/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "0.13.0"
__version__ = "0.13.1"

__all__ = ['__version__']
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
exec(f.read(), about)

setup(
name="plumpy",
name='plumpy',
version=about['__version__'],
description='A python workflow library',
long_description=open('README.rst').read(),
Expand All @@ -32,7 +32,7 @@
install_requires=[
'frozendict',
'tornado>=4.1, <5.0',
'pika>=1.0.0b2',
'pika>=1.0.0',
'kiwipy[rmq]>=0.5.0, <0.6.0',
'enum34; python_version<"3.4"',
'backports.tempfile; python_version<"3.2"',
Expand Down
29 changes: 29 additions & 0 deletions test/test_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def validator(value):


class TestPortNamespace(TestCase):

BASE_PORT_NAME = 'port'
BASE_PORT_NAMESPACE_NAME = 'port'

Expand Down Expand Up @@ -160,3 +161,31 @@ def test_port_namespace_validate(self):
self.assertEqual(
validation_error.port,
self.port_namespace.NAMESPACE_SEPARATOR.join((self.BASE_PORT_NAMESPACE_NAME, 'sub', 'space', 'output')))

def test_port_namespace_required(self):
"""Verify that validation will fail if required port is not specified."""
port_namespace_sub = self.port_namespace.create_port_namespace('sub.space')
port_namespace_sub.valid_type = int

# Create a required port
self.port_namespace['required_port'] = OutputPort('required_port', valid_type=int, required=True)

# No port values at all should fail
port_values = {}
validation_error = self.port_namespace.validate(port_values)
self.assertIsNotNone(validation_error)

# Some port value, but still the required output is not defined, so should fail
port_values = {'sub': {'space': {'output': 5}}}
validation_error = self.port_namespace.validate(port_values)
self.assertIsNotNone(validation_error)

# Specifying the required port and some additional ones should be valid
port_values = {'sub': {'space': {'output': 5}}, 'required_port': 1}
validation_error = self.port_namespace.validate(port_values)
self.assertIsNone(validation_error)

# Also just the required port should be valid
port_values = {'required_port': 1}
validation_error = self.port_namespace.validate(port_values)
self.assertIsNone(validation_error)
73 changes: 72 additions & 1 deletion test/test_processes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Process tests"""
from __future__ import absolute_import
from tornado import gen, testing

import enum
import six
from six.moves import range
from six.moves import zip
Expand Down Expand Up @@ -414,7 +416,7 @@ def run(self):
self.out("invalid", 5)

proc = InvalidOutput()
with self.assertRaises(TypeError):
with self.assertRaises(ValueError):
proc.execute()

def test_missing_output(self):
Expand Down Expand Up @@ -768,6 +770,75 @@ def define(cls, spec):
# Make sure there are no other inputs
self.assertFalse(original_inputs)

def test_namespaced_process_outputs(self):
"""Test the output namespacing and validation."""
namespace = 'integer.namespace'

class OutputMode(enum.Enum):

NONE = 0
DYNAMIC_PORT_NAMESPACE = 1
SINGLE_REQUIRED_PORT = 2
BOTH_SINGLE_AND_NAMESPACE = 3

class DummyDynamicProcess(Process):

@classmethod
def define(cls, spec):
super(DummyDynamicProcess, cls).define(spec)
spec.input('output_mode', valid_type=OutputMode, default=OutputMode.NONE)
spec.output('required_bool', valid_type=bool)
spec.output_namespace(namespace, valid_type=int, dynamic=True)

def run(self):
if self.inputs.output_mode == OutputMode.NONE:
pass
elif self.inputs.output_mode == OutputMode.DYNAMIC_PORT_NAMESPACE:
self.out(namespace + '.one', 1)
self.out(namespace + '.two', 2)
elif self.inputs.output_mode == OutputMode.SINGLE_REQUIRED_PORT:
self.out('required_bool', False)
elif self.inputs.output_mode == OutputMode.BOTH_SINGLE_AND_NAMESPACE:
self.out('required_bool', False)
self.out(namespace + '.one', 1)
self.out(namespace + '.two', 2)

# Run the process in default mode which should not add any outputs and therefore fail
process = DummyDynamicProcess()
process.execute()

self.assertEqual(process.state, ProcessState.FINISHED)
self.assertFalse(process.is_successful)
self.assertDictEqual(process.outputs, {})

# Attaching only namespaced ports should fail, because the required port is not added
process = DummyDynamicProcess(inputs={'output_mode': OutputMode.DYNAMIC_PORT_NAMESPACE})
process.execute()

self.assertEqual(process.state, ProcessState.FINISHED)
self.assertFalse(process.is_successful)
self.assertEqual(process.outputs['integer']['namespace']['one'], 1)
self.assertEqual(process.outputs['integer']['namespace']['two'], 2)

# Attaching only the single required top-level port should be fine
process = DummyDynamicProcess(inputs={'output_mode': OutputMode.SINGLE_REQUIRED_PORT})
process.execute()

self.assertEqual(process.state, ProcessState.FINISHED)
self.assertTrue(process.is_successful)
self.assertEqual(process.outputs['required_bool'], False)

# Attaching both the required and namespaced ports should result in a successful termination
process = DummyDynamicProcess(inputs={'output_mode': OutputMode.BOTH_SINGLE_AND_NAMESPACE})
process.execute()

self.assertIsNotNone(process.outputs)
self.assertEqual(process.state, ProcessState.FINISHED)
self.assertTrue(process.is_successful)
self.assertEqual(process.outputs['required_bool'], False)
self.assertEqual(process.outputs['integer']['namespace']['one'], 1)
self.assertEqual(process.outputs['integer']['namespace']['two'], 2)


class TestProcessEvents(utils.AsyncTestCase):

Expand Down

0 comments on commit ba77abf

Please sign in to comment.