Skip to content

Commit

Permalink
Add is_resolved() to ObjectProxy and update Lazy Object Proxy version
Browse files Browse the repository at this point in the history
- Lazy-Object-Proxy added a __resolved__ attribute in version 1.6 that is a boolean flag indicating if a proxy is resolved. Using this flag, some of the internal logic of the ObjectProxy and unit tests were cleaned up.
- Updated `environment.yml` to use Lazy-Object-Proxy>=1.6.*
- Added some helpful comments to the serialize/deserialization methods
of `Result`.
  • Loading branch information
gpauloski committed Mar 28, 2021
1 parent f7c5474 commit 414a303
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 18 deletions.
20 changes: 16 additions & 4 deletions colmena/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,34 @@ def serialize(self) -> float:
_inputs = self.inputs

def _serialize_and_proxy(value):
"""Helper function for serializing and proxying"""
key = str(id(value))
# Serialized object before proxying to compare size of serialized
# object to value server threshold
value = SerializationMethod.serialize(self.serialization_method, value)

if (
self.value_server_threshold is not None and
sys.getsizeof(value) >= self.value_server_threshold and
not isinstance(value, colmena.value_server.ObjectProxy)
):
# Proxy the value
value = colmena.value_server.to_proxy(
value, key=key, is_serialized=True,
serialization_method=self.serialization_method)
# Serialize the proxy. This is efficient since the proxy is
# just a reference + metadata about the value
value = SerializationMethod.serialize(self.serialization_method, value)

return value

try:
# Each value in *args and **kwargs is serialized independently
args = tuple(map(_serialize_and_proxy, _inputs[0]))
kwargs = {k: _serialize_and_proxy(v) for k, v in _inputs[1].items()}
self.inputs = (args, kwargs)

# The entire result is serialized as one object
if _value is not None:
self.value = _serialize_and_proxy(_value)

Expand Down Expand Up @@ -215,12 +226,12 @@ def _deserialize(value):
_inputs = SerializationMethod.deserialize(self.serialization_method, _inputs)

try:
try:
args = tuple(map(_deserialize, _inputs[0]))
except:
raise ValueError('deserialize failed on {}'.format(self))
# Deserialize each value in *args and **kwargs
args = tuple(map(_deserialize, _inputs[0]))
kwargs = {k: _deserialize(v) for k, v in _inputs[1].items()}
self.inputs = (args, kwargs)

# Deserialize result if it exists. If result was proxied, deproxy.
if _value is not None:
_value = _deserialize(_value)
if isinstance(_value, colmena.value_server.ObjectProxy):
Expand All @@ -231,6 +242,7 @@ def _deserialize(value):
_value = _value.deproxy()
colmena.value_server.server.evict(key)
self.value = _value

return perf_counter() - start_time
except Exception as e:
# Put the original values back
Expand Down
19 changes: 6 additions & 13 deletions colmena/value_server/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,20 @@ def __reduce_ex__(self, protocol):

def async_resolve(self) -> None:
"""Asynchronously resolve the proxy"""
# TODO(gpauloski): use __resolved__ instead
try:
object.__getattribute__(self, '__target__')
except AttributeError:
# TODO(gpauloski): there is an edge case here where is async_resolve()
# is called twice, both cases fail to get attribute __target__ and then
# call __factory__.async_resolve() twice. If, between the two calls
# to async_resolve(), the cache entry for this object is removed, then
# there will be a thread created to resolve the object but the result
# of the thread is never used. In practice this has no consequences
# beyond the performance hit of starting a thread to query the value
# server. Generally async_resolve() should not be called twice :)
if not self.is_resolved():
self.__factory__.async_resolve()

def deproxy(self) -> Any:
"""Get underlying object from proxy"""
return self.__wrapped__

def is_resolved(self) -> bool:
"""Return True if proxy is resolved"""
return self.__resolved__

def reset_proxy(self) -> None:
"""Reset wrapped object so that the factory is called on next access"""
if self.__resolved__:
if self.is_resolved():
object.__delattr__(self, '__target__')


Expand Down
8 changes: 8 additions & 0 deletions colmena/value_server/tests/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ def init() -> None:
def test_proxy() -> None:
"""Test proxy object behaves like wrapped object"""
x = to_proxy(1)
assert not x.is_resolved()

assert isinstance(x, ObjectProxy)
assert not x.is_resolved()

assert isinstance(x, int)
assert x.is_resolved()

assert x == 1
x += 1
assert x == 2
Expand Down Expand Up @@ -100,9 +106,11 @@ def test_async_resolve() -> None:
assert x.__factory__.async_get_future is None
assert not value_server.server.is_cached(key)
x.async_resolve()
assert not x.is_resolved()
assert x.__factory__.async_get_future is not None
assert isinstance(x, list)
assert x.__factory__.async_get_future is None
assert x.is_resolved()
# x is already resolved so this should be a no-op
x.async_resolve()
assert x.__factory__.async_get_future is None
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ dependencies:
- -e .
- parsl>=1.*
- pydantic
- lazy-object-proxy>=1.*
- lazy-object-proxy>=1.6.*

0 comments on commit 414a303

Please sign in to comment.