Skip to content

Commit

Permalink
Add the API to trim physical memory footprint (#1638)
Browse files Browse the repository at this point in the history
- Fixes #1630
- Fixes #1620

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored Dec 12, 2023
1 parent 2dc2a7e commit 067235c
Show file tree
Hide file tree
Showing 19 changed files with 269 additions and 39 deletions.
8 changes: 8 additions & 0 deletions python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ void bind_client(py::module& mod) {
.def(
"clear", [](ClientBase* self) { throw_on_error(self->Clear()); },
doc::ClientBase_clear)
.def(
"memory_trim",
[](ClientBase* self) -> bool {
bool trimmed = false;
throw_on_error(self->MemoryTrim(trimmed));
return trimmed;
},
doc::ClientBase_memory_trim)
.def(
"label",
[](ClientBase* self, ObjectID id, std::string const& key,
Expand Down
10 changes: 10 additions & 0 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,16 @@ const char* ClientBase_clear = R"doc(
Drop all objects that visible to the current instance in the vineyard cluster.
)doc";

const char* ClientBase_memory_trim = R"doc(
.. method:: memory_trim() -> bool
:noindex:
Trim the memory pool inside the shared memory allocator to return the unused
physical memory back to the OS kernel, like the `malloc_trim` API from glibc.
Returns True if it actually released any memory.
)doc";

const char* ClientBase_reset = R"doc(
.. method:: reset() -> None
:noindex:
Expand Down
1 change: 1 addition & 0 deletions python/pybind11_docs.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ extern const char* ClientBase_list_names;
extern const char* ClientBase_drop_name;
extern const char* ClientBase_sync_meta;
extern const char* ClientBase_clear;
extern const char* ClientBase_memory_trim;
extern const char* ClientBase_reset;
extern const char* ClientBase_connected;
extern const char* ClientBase_instance_id;
Expand Down
40 changes: 40 additions & 0 deletions python/vineyard/core/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,43 @@ def start_requests(rs, state, ipc_socket):
r, message = rs.get(block=True)
if not r:
pytest.fail(message)


def parse_shared_memory_usage():
'''Parse the shared memory usage from /proc/meminfo, in KB.'''
with open('/proc/meminfo', 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
if line.startswith('Shmem:'):
parts = line.split()
return int(parts[1])


def test_memory_trim(vineyard_client):
data = np.ones((1000, 1000, 16))

# cleanup the instance
vineyard_client.clear()
vineyard_client.memory_trim()

original_memory_usage = parse_shared_memory_usage()

data = np.ones((1000, 1000, 16))
data_kbytes = data.nbytes / 1024

rs = []
for i in range(8):
r = vineyard_client.put(data)
rs.append(r)
current_memory_usage = parse_shared_memory_usage()
assert current_memory_usage >= original_memory_usage + i * data_kbytes

for r in rs:
vineyard_client.delete(r)

current_memory_usage = parse_shared_memory_usage()
assert current_memory_usage >= original_memory_usage + (8 - 1) * data_kbytes

vineyard_client.memory_trim()
# there might be some fragmentation overhead
assert parse_shared_memory_usage() <= original_memory_usage + 1 * data_kbytes
11 changes: 11 additions & 0 deletions src/client/client_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,17 @@ Status ClientBase::Clear() {
return Status::OK();
}

Status ClientBase::MemoryTrim(bool& trimmed) {
ENSURE_CONNECTED(this);
std::string message_out;
WriteMemoryTrimRequest(message_out);
RETURN_ON_ERROR(doWrite(message_out));
json message_in;
RETURN_ON_ERROR(doRead(message_in));
RETURN_ON_ERROR(ReadMemoryTrimReply(message_in, trimmed));
return Status::OK();
}

Status ClientBase::Label(const ObjectID object, std::string const& key,
std::string const& value) {
ENSURE_CONNECTED(this);
Expand Down
7 changes: 7 additions & 0 deletions src/client/client_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,13 @@ class ClientBase {
*/
Status Clear();

/**
* @brief Trim the memory pool inside the shared memory allocator to return
* the unused physical memory back to the OS kernel, like the
* `malloc_trim` API from glibc.
*/
Status MemoryTrim(bool& trimmed);

/**
* @brief Associate given labels to an existing object.
*
Expand Down
4 changes: 4 additions & 0 deletions src/common/memory/payload.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ struct Payload {

bool operator==(const Payload& other) const;

inline ObjectID id() const { return object_id; }

inline void Reset() { is_sealed = false, is_owner = true; }

inline void MarkAsSealed() { is_sealed = true; }
Expand Down Expand Up @@ -162,6 +164,8 @@ struct PlasmaPayload : public Payload {
(plasma_id == other.plasma_id) && (data_size == other.data_size));
}

inline PlasmaID id() const { return plasma_id; }

Payload ToNormalPayload() const {
return Payload(object_id, data_size, pointer, store_fd, arena_fd, map_size,
data_offset);
Expand Down
16 changes: 8 additions & 8 deletions src/common/util/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,14 @@ int64_t get_maximum_shared_memory() {
* @brief Return the memory size in human readable way.
*/
std::string prettyprint_memory_size(size_t nbytes) {
if (nbytes > (1L << 40)) {
return std::to_string(nbytes * 1.0 / (1L << 40)) + " TB";
} else if (nbytes > (1L << 30)) {
return std::to_string(nbytes * 1.0 / (1L << 30)) + " GB";
} else if (nbytes > (1L << 20)) {
return std::to_string(nbytes * 1.0 / (1L << 20)) + " MB";
} else if (nbytes > (1L << 10)) {
return std::to_string(nbytes * 1.0 / (1L << 10)) + " KB";
if (nbytes >= (1LL << 40)) {
return std::to_string(nbytes * 1.0 / (1LL << 40)) + " TB";
} else if (nbytes >= (1LL << 30)) {
return std::to_string(nbytes * 1.0 / (1LL << 30)) + " GB";
} else if (nbytes >= (1LL << 20)) {
return std::to_string(nbytes * 1.0 / (1LL << 20)) + " MB";
} else if (nbytes >= (1LL << 10)) {
return std::to_string(nbytes * 1.0 / (1LL << 10)) + " KB";
} else {
return std::to_string(nbytes) + " B";
}
Expand Down
29 changes: 29 additions & 0 deletions src/common/util/protocols.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ const std::string command_t::LABEL_REQUEST = "label_request";
const std::string command_t::LABEL_REPLY = "label_reply";
const std::string command_t::CLEAR_REQUEST = "clear_request";
const std::string command_t::CLEAR_REPLY = "clear_reply";
const std::string command_t::MEMORY_TRIM_REQUEST = "memory_trim_request";
const std::string command_t::MEMORY_TRIM_REPLY = "memory_trim_reply";

// Stream APIs
const std::string command_t::CREATE_STREAM_REQUEST = "create_stream_request";
Expand Down Expand Up @@ -1301,6 +1303,33 @@ Status ReadClearReply(const json& root) {
CHECK_IPC_ERROR(root, command_t::CLEAR_REPLY);
return Status::OK();
}

void WriteMemoryTrimRequest(std::string& msg) {
json root;
root["type"] = command_t::MEMORY_TRIM_REQUEST;

encode_msg(root, msg);
}

Status ReadMemoryTrimRequest(const json& root) {
CHECK_IPC_ERROR(root, command_t::MEMORY_TRIM_REQUEST);
return Status::OK();
}

void WriteMemoryTrimReply(const bool trimmed, std::string& msg) {
json root;
root["type"] = command_t::MEMORY_TRIM_REPLY;
root["trimmed"] = trimmed;

encode_msg(root, msg);
}

Status ReadMemoryTrimReply(const json& root, bool& trimmed) {
CHECK_IPC_ERROR(root, command_t::MEMORY_TRIM_REPLY);
trimmed = root.value("trimmed", false);
return Status::OK();
}

void WriteCreateStreamRequest(const ObjectID& object_id, std::string& msg) {
json root;
root["type"] = command_t::CREATE_STREAM_REQUEST;
Expand Down
10 changes: 10 additions & 0 deletions src/common/util/protocols.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ struct command_t {
static const std::string LABEL_REPLY;
static const std::string CLEAR_REQUEST;
static const std::string CLEAR_REPLY;
static const std::string MEMORY_TRIM_REQUEST;
static const std::string MEMORY_TRIM_REPLY;

// Stream APIs
static const std::string CREATE_STREAM_REQUEST;
Expand Down Expand Up @@ -516,6 +518,14 @@ void WriteClearReply(std::string& msg);

Status ReadClearReply(const json& root);

void WriteMemoryTrimRequest(std::string& msg);

Status ReadMemoryTrimRequest(const json& root);

void WriteMemoryTrimReply(const bool trimmed, std::string& msg);

Status ReadMemoryTrimReply(const json& root, bool& trimmed);

void WriteCreateStreamRequest(const ObjectID& object_id, std::string& msg);

Status ReadCreateStreamRequest(const json& root, ObjectID& object_id);
Expand Down
21 changes: 18 additions & 3 deletions src/common/util/uuid.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ inline ObjectID GenerateSignature() {
return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now();
}

inline bool IsBlob(ObjectID id) { return id & 0x8000000000000000UL; }

const std::string ObjectIDToString(const ObjectID id);

const std::string ObjectIDToString(const PlasmaID id);
Expand Down Expand Up @@ -267,10 +265,27 @@ inline ID GenerateBlobID(const void* ptr) {
}

template <typename ID = ObjectID>
ID EmptyBlobID() {
inline ID EmptyBlobID() {
return GenerateBlobID<ID>(0x8000000000000000UL);
}

template <typename ID = ObjectID>
inline ID PlaceholderBlobID() {
return GenerateBlobID<ID>(std::numeric_limits<uintptr_t>::max());
}

inline bool IsBlob(ObjectID id) { return id & 0x8000000000000000UL; }

template <typename ID = ObjectID>
inline bool IsEmptyBlobID(ID id) {
return EmptyBlobID<ID>() == id;
}

template <typename ID = ObjectID>
inline bool IsPlaceholderBlobID(ID id) {
return PlaceholderBlobID<ID>() == id;
}

template <typename ID>
std::string IDToString(ID id);

Expand Down
13 changes: 13 additions & 0 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ bool SocketConnection::processMessage(const std::string& message_in) {
return doLabelObject(root);
} else if (cmd == command_t::CLEAR_REQUEST) {
return doClear(root);
} else if (cmd == command_t::MEMORY_TRIM_REQUEST) {
return doMemoryTrim(root);
} else if (cmd == command_t::CREATE_STREAM_REQUEST) {
return doCreateStream(root);
} else if (cmd == command_t::OPEN_STREAM_REQUEST) {
Expand Down Expand Up @@ -1078,6 +1080,17 @@ bool SocketConnection::doClear(const json& root) {
return false;
}

bool SocketConnection::doMemoryTrim(const json& root) {
auto self(shared_from_this());
std::string message_out;

TRY_READ_REQUEST(ReadMemoryTrimRequest, root);
bool trimmed = bulk_store_->MemoryTrim();
WriteMemoryTrimReply(trimmed, message_out);
self->doWrite(message_out);
return false;
}

bool SocketConnection::doCreateStream(const json& root) {
auto self(shared_from_this());
ObjectID stream_id;
Expand Down
1 change: 1 addition & 0 deletions src/server/async/socket_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class SocketConnection : public std::enable_shared_from_this<SocketConnection> {
bool doIfPersist(json const& root);
bool doLabelObject(json const& root);
bool doClear(json const& root);
bool doMemoryTrim(json const& root);

bool doCreateStream(json const& root);
bool doOpenStream(json const& root);
Expand Down
1 change: 1 addition & 0 deletions src/server/memory/allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "server/memory/allocator.h"

#include "common/util/logging.h"
#include "server/memory/dlmalloc.h"
#include "server/memory/mimalloc.h"

Expand Down
12 changes: 8 additions & 4 deletions src/server/memory/malloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,11 @@ void* mmap_buffer(int64_t size, bool* is_committed, bool* is_zero) {
size += kMmapRegionsGap;

int fd = create_buffer(size);
return mmap_buffer(fd, size, is_committed, is_zero);
return mmap_buffer(fd, size, true, is_committed, is_zero);
}

void* mmap_buffer(int fd, int64_t size, bool* is_committed, bool* is_zero) {
void* mmap_buffer(int fd, int64_t size, bool gap, bool* is_committed,
bool* is_zero) {
if (fd < 0) {
LOG(ERROR) << "failed to create buffer during mmap: " << strerror(errno);
return nullptr;
Expand Down Expand Up @@ -268,9 +269,12 @@ void* mmap_buffer(int fd, int64_t size, bool* is_committed, bool* is_zero) {
MmapRecord& record = mmap_records[pointer];
record.fd = fd;
record.size = size;
record.kind = MmapRecord::Kind::kMalloc;

// We lie to dlmalloc/mimalloc about where mapped memory actually lives.
pointer = pointer_advance(pointer, kMmapRegionsGap);
if (gap) {
// We lie to dlmalloc/mimalloc about where mapped memory actually lives.
pointer = pointer_advance(pointer, kMmapRegionsGap);
}
return pointer;
}

Expand Down
9 changes: 8 additions & 1 deletion src/server/memory/malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ void GetMallocMapinfo(void* addr, int* fd, int64_t* map_length,
struct MmapRecord {
int fd = -1;
int64_t size = -1;
enum class Kind {
kMalloc = 0,
kAllocator = 1,
kDiskMMap = 2,
};
Kind kind = Kind::kMalloc;
};

/// Hashtable that contains one entry per segment that we got from the OS
Expand All @@ -68,7 +74,8 @@ int create_buffer(int64_t size, std::string const& path);
void* mmap_buffer(int64_t size, bool* is_committed, bool* is_zero);

// Create a buffer, and mmap the buffer as the shared memory space.
void* mmap_buffer(int fd, int64_t size, bool* is_committed, bool* is_zero);
void* mmap_buffer(int fd, int64_t size, bool gap, bool* is_committed,
bool* is_zero);

// Unmap the buffer.
int munmap_buffer(void* addr, int64_t size);
Expand Down
Loading

0 comments on commit 067235c

Please sign in to comment.