diff --git a/colmena/models.py b/colmena/models.py index fc20702..f5b0345 100644 --- a/colmena/models.py +++ b/colmena/models.py @@ -168,23 +168,34 @@ def serialize(self) -> float: _inputs = self.inputs def _serialize_and_proxy(value): + """Helper function for serializing and proxying""" key = str(id(value)) + # Serialized object before proxying to compare size of serialized + # object to value server threshold value = SerializationMethod.serialize(self.serialization_method, value) + if ( self.value_server_threshold is not None and sys.getsizeof(value) >= self.value_server_threshold and not isinstance(value, colmena.value_server.ObjectProxy) ): + # Proxy the value value = colmena.value_server.to_proxy( value, key=key, is_serialized=True, serialization_method=self.serialization_method) + # Serialize the proxy. This is efficient since the proxy is + # just a reference + metadata about the value value = SerializationMethod.serialize(self.serialization_method, value) + return value try: + # Each value in *args and **kwargs is serialized independently args = tuple(map(_serialize_and_proxy, _inputs[0])) kwargs = {k: _serialize_and_proxy(v) for k, v in _inputs[1].items()} self.inputs = (args, kwargs) + + # The entire result is serialized as one object if _value is not None: self.value = _serialize_and_proxy(_value) @@ -215,12 +226,12 @@ def _deserialize(value): _inputs = SerializationMethod.deserialize(self.serialization_method, _inputs) try: - try: - args = tuple(map(_deserialize, _inputs[0])) - except: - raise ValueError('deserialize failed on {}'.format(self)) + # Deserialize each value in *args and **kwargs + args = tuple(map(_deserialize, _inputs[0])) kwargs = {k: _deserialize(v) for k, v in _inputs[1].items()} self.inputs = (args, kwargs) + + # Deserialize result if it exists. If result was proxied, deproxy. if _value is not None: _value = _deserialize(_value) if isinstance(_value, colmena.value_server.ObjectProxy): @@ -231,6 +242,7 @@ def _deserialize(value): _value = _value.deproxy() colmena.value_server.server.evict(key) self.value = _value + return perf_counter() - start_time except Exception as e: # Put the original values back diff --git a/colmena/value_server/proxy.py b/colmena/value_server/proxy.py index e78eb0b..2f03450 100644 --- a/colmena/value_server/proxy.py +++ b/colmena/value_server/proxy.py @@ -106,27 +106,20 @@ def __reduce_ex__(self, protocol): def async_resolve(self) -> None: """Asynchronously resolve the proxy""" - # TODO(gpauloski): use __resolved__ instead - try: - object.__getattribute__(self, '__target__') - except AttributeError: - # TODO(gpauloski): there is an edge case here where is async_resolve() - # is called twice, both cases fail to get attribute __target__ and then - # call __factory__.async_resolve() twice. If, between the two calls - # to async_resolve(), the cache entry for this object is removed, then - # there will be a thread created to resolve the object but the result - # of the thread is never used. In practice this has no consequences - # beyond the performance hit of starting a thread to query the value - # server. Generally async_resolve() should not be called twice :) + if not self.is_resolved(): self.__factory__.async_resolve() def deproxy(self) -> Any: """Get underlying object from proxy""" return self.__wrapped__ + def is_resolved(self) -> bool: + """Return True if proxy is resolved""" + return self.__resolved__ + def reset_proxy(self) -> None: """Reset wrapped object so that the factory is called on next access""" - if self.__resolved__: + if self.is_resolved(): object.__delattr__(self, '__target__') diff --git a/colmena/value_server/tests/test_proxy.py b/colmena/value_server/tests/test_proxy.py index aba2e3a..e01f706 100644 --- a/colmena/value_server/tests/test_proxy.py +++ b/colmena/value_server/tests/test_proxy.py @@ -33,8 +33,14 @@ def init() -> None: def test_proxy() -> None: """Test proxy object behaves like wrapped object""" x = to_proxy(1) + assert not x.is_resolved() + assert isinstance(x, ObjectProxy) + assert not x.is_resolved() + assert isinstance(x, int) + assert x.is_resolved() + assert x == 1 x += 1 assert x == 2 @@ -100,9 +106,11 @@ def test_async_resolve() -> None: assert x.__factory__.async_get_future is None assert not value_server.server.is_cached(key) x.async_resolve() + assert not x.is_resolved() assert x.__factory__.async_get_future is not None assert isinstance(x, list) assert x.__factory__.async_get_future is None + assert x.is_resolved() # x is already resolved so this should be a no-op x.async_resolve() assert x.__factory__.async_get_future is None diff --git a/environment.yml b/environment.yml index 92adf44..d12b15c 100644 --- a/environment.yml +++ b/environment.yml @@ -19,4 +19,4 @@ dependencies: - -e . - parsl>=1.* - pydantic - - lazy-object-proxy>=1.* + - lazy-object-proxy>=1.6.*