diff --git a/.gitignore b/.gitignore index 7b1fb812..3eb3fbc9 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,9 @@ cmake-build-debug *.pyc *.bin *.whl +*.tar.gz +*.zip +*.xz # hive sql work directory /java/hive/docker/dependency/mysql/conf/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b9dc0058..657d0c7b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,3 @@ repos: - id: gitleaks args: - '--verbose' - - repo: https://github.com/crate-ci/typos - rev: v1.18.2 - hooks: - - id: typos diff --git a/modules/graph/writer/util.cc b/modules/graph/writer/util.cc index 9c333645..0a7f71d1 100644 --- a/modules/graph/writer/util.cc +++ b/modules/graph/writer/util.cc @@ -14,6 +14,10 @@ * limitations under the License. */ +#include "graph/writer/util.h" + +#ifdef ENABLE_GAR + #include #include @@ -21,8 +25,6 @@ #include "gar/util/data_type.h" #include "gar/util/file_type.h" -#include "graph/writer/util.h" - namespace GAR = GraphArchive; namespace vineyard { @@ -71,3 +73,5 @@ std::shared_ptr generate_graph_info_with_schema( nullptr, extra_info); } } // namespace vineyard + +#endif diff --git a/modules/graph/writer/util.h b/modules/graph/writer/util.h index 37142841..f47a7f15 100644 --- a/modules/graph/writer/util.h +++ b/modules/graph/writer/util.h @@ -16,6 +16,8 @@ #ifndef MODULES_GRAPH_WRITER_UTIL_H_ #define MODULES_GRAPH_WRITER_UTIL_H_ +#ifdef ENABLE_GAR + #include #include @@ -37,4 +39,6 @@ std::shared_ptr generate_graph_info_with_schema( } // namespace vineyard +#endif + #endif // MODULES_GRAPH_WRITER_UTIL_H_ diff --git a/python/client.cc b/python/client.cc index 523f93c3..9cdf5997 100644 --- a/python/client.cc +++ b/python/client.cc @@ -257,36 +257,42 @@ void bind_client(py::module& mod) { .def( "delete", [](ClientBase* self, const ObjectIDWrapper object_id, - const bool force, const bool deep) { - throw_on_error(self->DelData(object_id, force, deep)); + const bool force, const bool deep, const bool memory_trim) { + throw_on_error(self->DelData(object_id, force, deep, memory_trim)); }, "object_id"_a, py::arg("force") = false, py::arg("deep") = true, - doc::ClientBase_delete) + py::arg("memory_trim") = false, doc::ClientBase_delete) .def( "delete", [](ClientBase* self, const std::vector& object_ids, - const bool force, const bool deep) { + const bool force, const bool deep, const bool memory_trim) { std::vector unwrapped_object_ids(object_ids.size()); for (size_t idx = 0; idx < object_ids.size(); ++idx) { unwrapped_object_ids[idx] = object_ids[idx]; } - throw_on_error(self->DelData(unwrapped_object_ids, force, deep)); + throw_on_error( + self->DelData(unwrapped_object_ids, force, deep, memory_trim)); }, - "object_ids"_a, py::arg("force") = false, py::arg("deep") = true) + "object_ids"_a, py::arg("force") = false, py::arg("deep") = true, + py::arg("memory_trim") = false, doc::ClientBase_delete) .def( "delete", [](ClientBase* self, const ObjectMeta& meta, const bool force, - const bool deep) { - throw_on_error(self->DelData(meta.GetId(), force, deep)); + const bool deep, const bool memory_trim) { + throw_on_error( + self->DelData(meta.GetId(), force, deep, memory_trim)); }, - "object_meta"_a, py::arg("force") = false, py::arg("deep") = true) + "object_meta"_a, py::arg("force") = false, py::arg("deep") = true, + py::arg("memory_trim") = false, doc::ClientBase_delete) .def( "delete", [](ClientBase* self, const Object* object, const bool force, - const bool deep) { - throw_on_error(self->DelData(object->id(), force, deep)); + const bool deep, const bool memory_trim) { + throw_on_error( + self->DelData(object->id(), force, deep, memory_trim)); }, - "object"_a, py::arg("force") = false, py::arg("deep") = true) + "object"_a, py::arg("force") = false, py::arg("deep") = true, + py::arg("memory_trim") = false, doc::ClientBase_delete) .def( "create_stream", [](ClientBase* self, ObjectID const id) { diff --git a/python/pybind11_docs.cc b/python/pybind11_docs.cc index 0bbb725c..72495b78 100644 --- a/python/pybind11_docs.cc +++ b/python/pybind11_docs.cc @@ -571,7 +571,7 @@ Create metadata in vineyardd with a specified instance id. const char* ClientBase_delete = R"doc( .. method:: delete(object_id: ObjectID or List[ObjectID], force: bool = false, - deep: bool = true) -> None + deep: bool = true, memory_trim: bool = False) -> None :noindex: Delete the specific vineyard object. @@ -590,9 +590,16 @@ Delete the specific vineyard object. Note that when deleting objects which have *direct* blob members, the processing on those blobs yields a "deep" behavior. + memory_trim: bool + Whether to trim the memory pool inside the shared memory allocator to + return the unused physical memory back to the OS kernel, like the + `malloc_trim` API from glibc. Default value is False. -.. method:: delete(object_meta: ObjectMeta, force: bool = false, deep: bool = true) - -> None + Note that the memory trimming operation is a best-effort operation and + may not release any memory at all. + +.. method:: delete(object_meta: ObjectMeta, force: bool = false, deep: bool = true, + memory_trim: bool = False) -> None :noindex: Delete the specific vineyard object. @@ -600,7 +607,8 @@ Delete the specific vineyard object. Parameters: object_meta: The corresponding object meta to delete. -.. method:: delete(object: Object, force: bool = false, deep: bool = true) -> None +.. method:: delete(object: Object, force: bool = false, deep: bool = true, + memory_trim: bool = False) -> None :noindex: Delete the specific vineyard object. diff --git a/python/vineyard/_C.pyi b/python/vineyard/_C.pyi index 96c6c670..c7425f7f 100644 --- a/python/vineyard/_C.pyi +++ b/python/vineyard/_C.pyi @@ -234,6 +234,7 @@ class ClientBase: object: Union[ObjectID, Object, ObjectMeta, List[ObjectID]], force: bool = False, deep: bool = True, + memory_trim: bool = False, ) -> None: ... def create_stream(self, id: ObjectID) -> None: ... def open_stream(self, id: ObjectID, mode: str) -> None: ... diff --git a/python/vineyard/core/client.py b/python/vineyard/core/client.py index e884ed3b..dafc60bc 100644 --- a/python/vineyard/core/client.py +++ b/python/vineyard/core/client.py @@ -265,8 +265,9 @@ def delete( object: Union[ObjectID, Object, ObjectMeta, List[ObjectID]], force: bool = False, deep: bool = True, + memory_trim: bool = False, ) -> None: - return self.default_client().delete(object, force, deep) + return self.default_client().delete(object, force, deep, memory_trim) @_apply_docstring(IPCClient.create_stream) def create_stream(self, id: ObjectID) -> None: diff --git a/python/vineyard/core/tests/test_client.py b/python/vineyard/core/tests/test_client.py index f129ce03..ee38eabc 100644 --- a/python/vineyard/core/tests/test_client.py +++ b/python/vineyard/core/tests/test_client.py @@ -313,11 +313,7 @@ def test_memory_trim(vineyard_client): assert current_memory_usage >= original_memory_usage + i * data_kbytes for r in rs: - vineyard_client.delete(r) + vineyard_client.delete(r, memory_trim=True) - current_memory_usage = parse_shared_memory_usage() - assert current_memory_usage >= original_memory_usage + (8 - 1) * data_kbytes - - vineyard_client.memory_trim() # there might be some fragmentation overhead assert parse_shared_memory_usage() <= original_memory_usage + 2 * data_kbytes diff --git a/src/client/client.cc b/src/client/client.cc index 67dcc258..cc0cbfd3 100644 --- a/src/client/client.cc +++ b/src/client/client.cc @@ -918,19 +918,29 @@ Status Client::Release(ObjectID const& id) { } Status Client::DelData(const ObjectID id, const bool force, const bool deep) { - return this->DelData(std::vector{id}, force, deep); + return this->DelData(id, force, deep, false); +} + +Status Client::DelData(const ObjectID id, const bool force, const bool deep, + const bool memory_trim) { + return this->DelData(std::vector{id}, force, deep, memory_trim); } Status Client::DelData(const std::vector& ids, const bool force, const bool deep) { + return this->DelData(ids, force, deep, false); +} + +Status Client::DelData(const std::vector& ids, const bool force, + const bool deep, const bool memory_trim) { ENSURE_CONNECTED(this); for (auto id : ids) { // May contain duplicated blob ids. VINEYARD_DISCARD(Release(id)); } std::string message_out; - WriteDelDataWithFeedbacksRequest(ids, force, deep, /*fastpath=*/false, - message_out); + WriteDelDataWithFeedbacksRequest(ids, force, deep, memory_trim, + /*fastpath=*/false, message_out); RETURN_ON_ERROR(doWrite(message_out)); json message_in; std::vector deleted_bids; diff --git a/src/client/client.h b/src/client/client.h index e6f13b72..310ca80a 100644 --- a/src/client/client.h +++ b/src/client/client.h @@ -781,6 +781,10 @@ class Client final : public BasicIPCClient, */ Status DelData(const ObjectID id, const bool force = false, const bool deep = true); + + Status DelData(const ObjectID id, const bool force, const bool deep, + const bool memory_trim); + /** * @brief Delete multiple metadatas in vineyard. * @@ -797,6 +801,9 @@ class Client final : public BasicIPCClient, Status DelData(const std::vector& ids, const bool force = false, const bool deep = true); + Status DelData(const std::vector& ids, const bool force, + const bool deep, const bool memory_trim); + /** * @brief Create a GPU buffer on vineyard server. See also `CreateBuffer`. * diff --git a/src/client/client_base.cc b/src/client/client_base.cc index 9abd8196..0c2d7113 100644 --- a/src/client/client_base.cc +++ b/src/client/client_base.cc @@ -198,14 +198,24 @@ Status ClientBase::SyncMetaData() { Status ClientBase::DelData(const ObjectID id, const bool force, const bool deep) { - return DelData(std::vector{id}, force, deep); + return DelData(id, force, deep, false); +} + +Status ClientBase::DelData(const ObjectID id, const bool force, const bool deep, + const bool memory_trim) { + return DelData(std::vector{id}, force, deep, memory_trim); } Status ClientBase::DelData(const std::vector& ids, const bool force, const bool deep) { + return DelData(ids, force, deep, false); +} + +Status ClientBase::DelData(const std::vector& ids, const bool force, + const bool deep, const bool memory_trim) { ENSURE_CONNECTED(this); std::string message_out; - WriteDelDataRequest(ids, force, deep, false, message_out); + WriteDelDataRequest(ids, force, deep, memory_trim, false, message_out); RETURN_ON_ERROR(doWrite(message_out)); json message_in; RETURN_ON_ERROR(doRead(message_in)); diff --git a/src/client/client_base.h b/src/client/client_base.h index a3c28eda..306b6182 100644 --- a/src/client/client_base.h +++ b/src/client/client_base.h @@ -170,11 +170,17 @@ class ClientBase { * @param deep Whether to delete the member of this object. Default is true. * Note that when deleting object which has *direct* blob members, the * processing on those blobs yields a "deep" behavior. + * @param memory_trim Whether to trim the memory pool inside the shared memory + * allocator to return the unused physical memory back to the OS. * * @return Status that indicates whether the delete action has succeeded. */ Status DelData(const ObjectID id, const bool force = false, const bool deep = true); + + Status DelData(const ObjectID id, const bool force, const bool deep, + const bool memory_trim); + /** * @brief Delete multiple metadatas in vineyard. * @@ -185,12 +191,17 @@ class ClientBase { * @param deep Whether to delete the member of this object. Default is true. * Note that when deleting objects which have *direct* blob members, * the processing on those blobs yields a "deep" behavior. + * @param memory_trim Whether to trim the memory pool inside the shared memory + * allocator to return the unused physical memory back to the OS. * * @return Status that indicates whether the delete action has succeeded. */ Status DelData(const std::vector& ids, const bool force = false, const bool deep = true); + Status DelData(const std::vector& ids, const bool force, + const bool deep, const bool memory_trim); + /** * @brief List objectmetas in vineyard, using the given typename patterns. * diff --git a/src/common/util/protocols.cc b/src/common/util/protocols.cc index 0d16c68a..d4ff6c59 100644 --- a/src/common/util/protocols.cc +++ b/src/common/util/protocols.cc @@ -858,12 +858,14 @@ Status ReadReleaseReply(json const& root) { void WriteDelDataWithFeedbacksRequest(const std::vector& id, const bool force, const bool deep, + const bool memory_trim, const bool fastpath, std::string& msg) { json root; root["type"] = command_t::DEL_DATA_WITH_FEEDBACKS_REQUEST; root["id"] = std::vector{id}; root["force"] = force; root["deep"] = deep; + root["memory_trim"] = memory_trim; root["fastpath"] = fastpath; encode_msg(root, msg); @@ -871,11 +873,13 @@ void WriteDelDataWithFeedbacksRequest(const std::vector& id, Status ReadDelDataWithFeedbacksRequest(json const& root, std::vector& ids, bool& force, - bool& deep, bool& fastpath) { + bool& deep, bool& memory_trim, + bool& fastpath) { CHECK_IPC_ERROR(root, command_t::DEL_DATA_WITH_FEEDBACKS_REQUEST); root["id"].get_to(ids); force = root.value("force", false); deep = root.value("deep", false); + memory_trim = root.value("memory_trim", false); fastpath = root.value("fastpath", false); return Status::OK(); } @@ -1213,37 +1217,42 @@ Status ReadListDataRequest(const json& root, std::string& pattern, bool& regex, } void WriteDelDataRequest(const ObjectID id, const bool force, const bool deep, - const bool fastpath, std::string& msg) { + const bool memory_trim, const bool fastpath, + std::string& msg) { json root; root["type"] = command_t::DELETE_DATA_REQUEST; root["id"] = std::vector{id}; root["force"] = force; root["deep"] = deep; root["fastpath"] = fastpath; + root["memory_trim"] = memory_trim; encode_msg(root, msg); } void WriteDelDataRequest(const std::vector& ids, const bool force, - const bool deep, const bool fastpath, - std::string& msg) { + const bool deep, const bool memory_trim, + const bool fastpath, std::string& msg) { json root; root["type"] = command_t::DELETE_DATA_REQUEST; root["id"] = ids; root["force"] = force; root["deep"] = deep; root["fastpath"] = fastpath; + root["memory_trim"] = memory_trim; encode_msg(root, msg); } Status ReadDelDataRequest(const json& root, std::vector& ids, - bool& force, bool& deep, bool& fastpath) { + bool& force, bool& deep, bool& memory_trim, + bool& fastpath) { CHECK_IPC_ERROR(root, command_t::DELETE_DATA_REQUEST); root["id"].get_to(ids); force = root.value("force", false); deep = root.value("deep", false); fastpath = root.value("fastpath", false); + memory_trim = root.value("memory_trim", false); return Status::OK(); } diff --git a/src/common/util/protocols.h b/src/common/util/protocols.h index d8a89bea..87728a8e 100644 --- a/src/common/util/protocols.h +++ b/src/common/util/protocols.h @@ -376,11 +376,13 @@ Status ReadReleaseReply(json const& root); void WriteDelDataWithFeedbacksRequest(const std::vector& id, const bool force, const bool deep, + const bool memory_trim, const bool fastpath, std::string& msg); Status ReadDelDataWithFeedbacksRequest(json const& root, std::vector& id, bool& force, - bool& deep, bool& fastpath); + bool& deep, bool& memory_trim, + bool& fastpath); void WriteDelDataWithFeedbacksReply(const std::vector& deleted_bids, std::string& msg); @@ -494,14 +496,16 @@ Status ReadListDataRequest(const json& root, std::string& pattern, bool& regex, size_t& limit); void WriteDelDataRequest(const ObjectID id, const bool force, const bool deep, - const bool fastpath, std::string& msg); + const bool memory_trim, const bool fastpath, + std::string& msg); void WriteDelDataRequest(const std::vector& id, const bool force, - const bool deep, const bool fastpath, - std::string& msg); + const bool deep, const bool memory_trim, + const bool fastpath, std::string& msg); Status ReadDelDataRequest(const json& root, std::vector& id, - bool& force, bool& deep, bool& fastpath); + bool& force, bool& deep, bool& memory_trim, + bool& fastpath); void WriteDelDataReply(std::string& msg); diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index a0664a78..39acbca6 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -804,23 +804,23 @@ bool SocketConnection::doRelease(json const& root) { bool SocketConnection::doDelDataWithFeedbacks(json const& root) { auto self(shared_from_this()); std::vector ids; - bool force, deep, fastpath; + bool force, deep, memory_trim, fastpath; double startTime = GetCurrentTime(); TRY_READ_REQUEST(ReadDelDataWithFeedbacksRequest, root, ids, force, deep, - fastpath); + memory_trim, fastpath); RESPONSE_ON_ERROR(server_ptr_->DelData( - ids, force, deep, fastpath, + ids, force, deep, memory_trim, fastpath, [self, startTime](const Status& status, std::vector const& delete_ids) { std::string message_out; if (status.ok()) { - std::vector deleted_bids; + std::vector deleted_blob_ids; for (auto id : delete_ids) { if (IsBlob(id)) { - deleted_bids.emplace_back(id); + deleted_blob_ids.emplace_back(id); } } - WriteDelDataWithFeedbacksReply(deleted_bids, message_out); + WriteDelDataWithFeedbacksReply(deleted_blob_ids, message_out); } else { VLOG(100) << "Error: " << status.ToString(); WriteErrorReply(status, message_out); @@ -1058,11 +1058,13 @@ bool SocketConnection::doListData(const json& root) { bool SocketConnection::doDelData(const json& root) { auto self(shared_from_this()); std::vector ids; - bool force, deep, fastpath; + bool force, deep, memory_trim, fastpath; double startTime = GetCurrentTime(); - TRY_READ_REQUEST(ReadDelDataRequest, root, ids, force, deep, fastpath); + TRY_READ_REQUEST(ReadDelDataRequest, root, ids, force, deep, memory_trim, + fastpath); RESPONSE_ON_ERROR(server_ptr_->DelData( - ids, force, deep, fastpath, [self, startTime](const Status& status) { + ids, force, deep, memory_trim, fastpath, + [self, startTime](const Status& status) { std::string message_out; if (status.ok()) { WriteDelDataReply(message_out); @@ -1177,7 +1179,7 @@ bool SocketConnection::doClear(const json& root) { [self](const Status& status, const std::vector& objects) { if (status.ok()) { auto s = self->server_ptr_->DelData( - objects, true, true, false, [self](const Status& status) { + objects, true, true, true, false, [self](const Status& status) { std::string message_out; if (status.ok()) { WriteClearReply(message_out); @@ -1210,7 +1212,8 @@ bool SocketConnection::doMemoryTrim(const json& root) { std::string message_out; TRY_READ_REQUEST(ReadMemoryTrimRequest, root); - bool trimmed = bulk_store_->MemoryTrim(); + // deprecated, use `DelData(memory_trim=true)` instead. + bool trimmed = false; WriteMemoryTrimReply(trimmed, message_out); self->doWrite(message_out); return false; diff --git a/src/server/memory/memory.cc b/src/server/memory/memory.cc index 0da0d796..7f2e11e0 100644 --- a/src/server/memory/memory.cc +++ b/src/server/memory/memory.cc @@ -302,7 +302,8 @@ Status BulkStoreBase::GetUnsafe( } template -Status BulkStoreBase::Delete(ID const& object_id) { +Status BulkStoreBase::Delete(ID const& object_id, + const bool memory_trim) { if (object_id == EmptyBlobID() || object_id == GenerateBlobID(std::numeric_limits::max())) { return Status::OK(); @@ -334,14 +335,15 @@ Status BulkStoreBase::Delete(ID const& object_id) { if (target->arena_fd == -1) { // release the memory auto buff_size = target->data_size; - switch (target->kind) { - case Payload::Kind::kMalloc: { + if (target->kind == Payload::Kind::kMalloc) { BulkAllocator::Free(target->pointer, buff_size); DVLOG(10) << "after free: " << IDToString(object_id) << ": " << Footprint() << "(" << FootprintLimit() << ")"; } - default: { - } + if (memory_trim && (target->data_size > 0)) { + memory::recycle_resident_memory( + reinterpret_cast(target->pointer), 0, target->data_size, + true); } } else { // release the span on allocator's arena to release the physical memory @@ -663,9 +665,9 @@ Status BulkStore::FetchAndModify(const ObjectID& id, int64_t& ref_cnt, return Status::OK(); } -Status BulkStore::OnDelete(ObjectID const& id) { +Status BulkStore::OnDelete(ObjectID const& id, const bool memory_trim) { RETURN_ON_ERROR(this->RemoveFromColdList(id, true)); - return Delete(id); + return Delete(id, memory_trim); } Status BulkStore::Shrink(ObjectID const& id, size_t const& size) { diff --git a/src/server/memory/memory.h b/src/server/memory/memory.h index 3ceacece..b34adff6 100644 --- a/src/server/memory/memory.h +++ b/src/server/memory/memory.h @@ -76,7 +76,7 @@ class BulkStoreBase { Status Seal(ID const& object_id); - Status Delete(ID const& object_id); + Status Delete(ID const& object_id, const bool memory_trim = false); Status DeleteGPU(ID const& object_id); @@ -192,7 +192,7 @@ class BulkStore * @brief Required by `ColdObjectTracker`. Currently, the deletion does not * respect the reference count. */ - Status OnDelete(ObjectID const& id); + Status OnDelete(ObjectID const& id, const bool memory_trim = false); /** * @brief Shrink the blob to the given size. diff --git a/src/server/memory/stream_store.cc b/src/server/memory/stream_store.cc index dfe1f5a0..dfce3912 100644 --- a/src/server/memory/stream_store.cc +++ b/src/server/memory/stream_store.cc @@ -170,7 +170,7 @@ Status StreamStore::Pull(ObjectID const stream_id, status = store_->Delete(target); } else { status = server_->DelData( - {target}, false, true, false, [](Status const& status) { + {target}, false, true, false, false, [](Status const& status) { if (!status.ok()) { LOG(WARNING) << "failed to delete the stream chunk: " << status.ToString(); @@ -308,7 +308,7 @@ Status StreamStore::Drop(ObjectID const stream_id) { status = store_->Delete(target); } else { status = server_->DelData( - {target}, false, true, false, [](Status const& status) { + {target}, false, false, true, false, [](Status const& status) { if (!status.ok()) { LOG(WARNING) << "failed to delete the stream chunk: " << status.ToString(); diff --git a/src/server/server/vineyard_server.cc b/src/server/server/vineyard_server.cc index 043c30b7..35ec4f95 100644 --- a/src/server/server/vineyard_server.cc +++ b/src/server/server/vineyard_server.cc @@ -747,8 +747,9 @@ Status VineyardServer::ShallowCopy(const ObjectID id, Status VineyardServer::DelData(const std::vector& ids, const bool force, const bool deep, - const bool fastpath, callback_t<> callback) { - return DelData(ids, force, deep, fastpath, + const bool memory_trim, const bool fastpath, + callback_t<> callback) { + return DelData(ids, force, deep, memory_trim, fastpath, [callback](Status const& status, std::vector const& deleted_ids) { return callback(status); @@ -757,7 +758,8 @@ Status VineyardServer::DelData(const std::vector& ids, Status VineyardServer::DelData( const std::vector& ids, const bool force, const bool deep, - const bool fastpath, callback_t const&> callback) { + const bool memory_trim, const bool fastpath, + callback_t const&> callback) { ENSURE_VINEYARDD_READY(); auto self(shared_from_this()); if (fastpath) { @@ -766,16 +768,16 @@ Status VineyardServer::DelData( RETURN_ON_ASSERT(IsBlob(id), "Fastpath deletion can only be applied to blobs"); } - context_.post([this, ids, callback] { + context_.post([this, memory_trim, ids, callback] { for (auto const id : ids) { - VINEYARD_DISCARD(bulk_store_->OnDelete(id)); + VINEYARD_DISCARD(bulk_store_->OnDelete(id, memory_trim)); } VINEYARD_DISCARD(callback(Status::OK(), ids)); }); return Status::OK(); } meta_service_ptr_->RequestToDelete( - ids, force, deep, + ids, force, deep, memory_trim, [self](const Status& status, const json& meta, std::vector const& ids_to_delete, std::vector& ops, bool& sync_remote) { @@ -817,9 +819,10 @@ Status VineyardServer::DelData( return Status::OK(); } -Status VineyardServer::DeleteBlobBatch(const std::set& ids) { +Status VineyardServer::DeleteBlobBatch(const std::set& ids, + const bool memory_trim) { for (auto object_id : ids) { - VINEYARD_SUPPRESS(this->bulk_store_->OnDelete(object_id)); + VINEYARD_SUPPRESS(this->bulk_store_->OnDelete(object_id, memory_trim)); } return Status::OK(); } @@ -832,7 +835,7 @@ Status VineyardServer::DeleteAllAt(const json& meta, meta, status, meta_tree::FilterAtInstance(meta, instance_id, objects_to_cleanup)); RETURN_ON_ERROR(status); - return DelData(objects_to_cleanup, true, true, false /* fastpath */, + return DelData(objects_to_cleanup, true, true, true, false /* fastpath */, [](Status const& status) -> Status { if (!status.ok()) { VLOG(100) << "Error: failed during cleanup: " diff --git a/src/server/server/vineyard_server.h b/src/server/server/vineyard_server.h index c9e93663..c46aeb46 100644 --- a/src/server/server/vineyard_server.h +++ b/src/server/server/vineyard_server.h @@ -150,13 +150,15 @@ class VineyardServer : public std::enable_shared_from_this { callback_t callback); Status DelData(const std::vector& id, const bool force, - const bool deep, const bool fastpath, callback_t<> callback); + const bool deep, const bool memory_trim, const bool fastpath, + callback_t<> callback); Status DelData(const std::vector& id, const bool force, - const bool deep, const bool fastpath, + const bool deep, const bool memory_trim, const bool fastpath, callback_t const&> callback); - Status DeleteBlobBatch(const std::set& blobs); + Status DeleteBlobBatch(const std::set& blobs, + const bool memory_trim = false); Status DeleteAllAt(const json& meta, InstanceID const instance_id); diff --git a/src/server/services/meta_service.cc b/src/server/services/meta_service.cc index 6d983c81..e828c557 100644 --- a/src/server/services/meta_service.cc +++ b/src/server/services/meta_service.cc @@ -280,13 +280,14 @@ void IMetaService::RequestToGetData(const bool sync_remote, void IMetaService::RequestToDelete( const std::vector& object_ids, const bool force, const bool deep, + const bool memory_trim, callback_t const&, std::vector&, bool&> callback_after_ready, callback_t const&> callback_after_finish) { auto self(shared_from_this()); server_ptr_->GetMetaContext().post([self, object_ids, force, deep, - callback_after_ready, + memory_trim, callback_after_ready, callback_after_finish]() { if (self->stopped_.load()) { VINEYARD_DISCARD(callback_after_finish( @@ -316,7 +317,7 @@ void IMetaService::RequestToDelete( } // apply changes locally (before committing to etcd) - self->metaUpdate(ops, false); + self->metaUpdate(ops, false, memory_trim); if (!sync_remote) { VINEYARD_DISCARD(callback_after_finish(s, processed_delete_set)); @@ -373,7 +374,8 @@ void IMetaService::RequestToShallowCopy( auto status = callback_after_ready(Status::OK(), meta, ops, transient); if (status.ok()) { if (transient) { - self->metaUpdate(ops, false); + // Already trim physical memory for remote deletion events + self->metaUpdate(ops, false, true); return callback_after_finish(Status::OK()); } else { self->RequestToPersist( @@ -1015,7 +1017,8 @@ void IMetaService::delVal(ObjectID const& target, std::set& blobs) { } template -void IMetaService::metaUpdate(const RangeT& ops, bool const from_remote) { +void IMetaService::metaUpdate(const RangeT& ops, const bool from_remote, + const bool memory_trim) { std::set blobs_to_delete; std::vector add_sigs, drop_sigs; @@ -1157,7 +1160,7 @@ void IMetaService::metaUpdate(const RangeT& ops, bool const from_remote) { } #endif - VINEYARD_SUPPRESS(server_ptr_->DeleteBlobBatch(blobs_to_delete)); + VINEYARD_SUPPRESS(server_ptr_->DeleteBlobBatch(blobs_to_delete, memory_trim)); VINEYARD_SUPPRESS(server_ptr_->ProcessDeferred(meta_)); } diff --git a/src/server/services/meta_service.h b/src/server/services/meta_service.h index 294bcb35..3a6599c3 100644 --- a/src/server/services/meta_service.h +++ b/src/server/services/meta_service.h @@ -125,7 +125,7 @@ class IMetaService : public std::enable_shared_from_this { void RequestToDelete( const std::vector& object_ids, const bool force, - const bool deep, + const bool deep, const bool memory_trim, callback_t const&, std::vector&, bool&> callback_after_ready, @@ -226,7 +226,8 @@ class IMetaService : public std::enable_shared_from_this { void delVal(ObjectID const& target, std::set& blobs); template - void metaUpdate(const RangeT& ops, bool const from_remote); + void metaUpdate(const RangeT& ops, bool const from_remote, + const bool memory_trim = false); void instanceUpdate(const op_t& op, const bool from_remote = true);