diff --git a/python/vineyard/core/builder.py b/python/vineyard/core/builder.py index 36cc46bb..b6e4353c 100644 --- a/python/vineyard/core/builder.py +++ b/python/vineyard/core/builder.py @@ -156,6 +156,7 @@ def put( builder: Optional[BuilderContext] = None, persist: bool = False, name: Optional[str] = None, + as_async: bool = False, **kwargs ): """Put python value to vineyard. @@ -185,16 +186,22 @@ def put( name: str, optional If given, the name will be automatically associated with the resulted object. Note that only take effect when the object is persisted. + as_async: bool, optional + If true, which means the object will be put to vineyard asynchronously. + Thus we need to use the passed builder. kw: User-specific argument that will be passed to the builder. Returns: ObjectID: The result object id will be returned. """ - if builder is not None: + if builder is not None and not as_async: return builder(client, value, **kwargs) - meta = get_current_builders().run(client, value, **kwargs) + if as_async: + meta = builder.run(client, value, **kwargs) + else: + meta = get_current_builders().run(client, value, **kwargs) # the builders is expected to return an :class:`ObjectMeta`, or an # :class:`Object` (in the `bytes_builder` and `memoryview` builder). diff --git a/python/vineyard/core/client.py b/python/vineyard/core/client.py index cd08edd3..9328585f 100644 --- a/python/vineyard/core/client.py +++ b/python/vineyard/core/client.py @@ -42,6 +42,7 @@ from vineyard._C import VineyardException from vineyard._C import _connect from vineyard.core.builder import BuilderContext +from vineyard.core.builder import get_current_builders from vineyard.core.builder import put from vineyard.core.resolver import ResolverContext from vineyard.core.resolver import get @@ -839,10 +840,11 @@ def _put_internal( builder: Optional[BuilderContext] = None, persist: bool = False, name: Optional[str] = None, + as_async: bool = False, **kwargs, ): try: - return put(self, value, builder, persist, name, **kwargs) + return put(self, value, builder, persist, name, as_async, **kwargs) except NotEnoughMemoryException as exec: with envvars( {'VINEYARD_RPC_SKIP_RETRY': '1', 'VINEYARD_IPC_SKIP_RETRY': '1'} @@ -868,7 +870,7 @@ def _put_internal( host, port = meta[instance_id]['rpc_endpoint'].split(':') self._rpc_client = _connect(host, port) self.compression = previous_compression_state - return put(self, value, builder, persist, name, **kwargs) + return put(self, value, builder, persist, name, as_async, **kwargs) @_apply_docstring(put) def put( @@ -881,16 +883,28 @@ def put( **kwargs, ): if as_async: + def _default_callback(future): try: result = future.result() - print(f"Successfully put object {result}", flush=True) + if isinstance(result, ObjectID): + print(f"Successfully put object {result}", flush=True) + elif isinstance(result, ObjectMeta): + print(f"Successfully put object {result.id}", flush=True) except Exception as e: print(f"Failed to put object: {e}", flush=True) + current_builder = builder or get_current_builders() + thread_pool = self.put_thread_pool result = thread_pool.submit( - self._put_internal, value, builder, persist, name, **kwargs + self._put_internal, + value, + current_builder, + persist, + name, + as_async=True, + **kwargs, ) result.add_done_callback(_default_callback) return result