Skip to content

Commit

Permalink
feat(flex): Configure the memory strategy with memory-level. (#3511)
Browse files Browse the repository at this point in the history
Passing parameter "--memory-level=n" to rt_server to configure memory
allocation strategy.
- n = 0: memory allocation will be mmap to files in SHARED mode.
Suitable for scenarios with severe memory constraints.
- n = 1: memory allocation will be mmap to files in PRIVATE mode. The
modified parts in the storage cannot be swapped to disk; if the modified
portions become too large, it may lead to a crash due to insufficient
memory.
- n = 2: the storage of certain data will utilize hugepages, which can
enhance the system's throughput. However, the portion using hugepages
will be fixed in memory usage, and hugepages require static allocation.
- n = 3: the storage of all data will endeavor to make use of hugepages
as much as possible.
  • Loading branch information
luoxiaojian authored Jan 26, 2024
1 parent d7a0d93 commit 9a17e5c
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 187 deletions.
7 changes: 0 additions & 7 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" ON)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)
option(MONITOR_SESSIONS "Whether monitor sessions" OFF)
option(ENABLE_HUGEPAGE "Whether to use hugepages when open mmap array in memory" OFF)

#print options
message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}")
Expand Down Expand Up @@ -49,12 +48,6 @@ if (MONITOR_SESSIONS)
add_definitions(-DMONITOR_SESSIONS)
endif ()


if (ENABLE_HUGEPAGE)
message("Hugepage is enabled")
add_definitions(-DHUGEPAGE)
endif ()

execute_process(COMMAND uname -r OUTPUT_VARIABLE LINUX_KERNEL_VERSION)
string(STRIP ${LINUX_KERNEL_VERSION} LINUX_KERNEL_VERSION)
message(${LINUX_KERNEL_VERSION})
Expand Down
18 changes: 4 additions & 14 deletions flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ int main(int argc, char** argv) {
"graph schema config file")(
"data-path,d", bpo::value<std::string>(), "data directory path")(
"warmup,w", bpo::value<bool>()->default_value(false),
"warmup graph data")("memory-only,m",
bpo::value<bool>()->default_value(true));
"warmup graph data")("memory-level,m",
bpo::value<int>()->default_value(1));
google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;

Expand All @@ -58,7 +58,7 @@ int main(int argc, char** argv) {

bool enable_dpdk = false;
bool warmup = vm["warmup"].as<bool>();
bool memory_only = vm["memory-only"].as<bool>();
int memory_level = vm["memory-level"].as<int>();
uint32_t shard_num = vm["shard-num"].as<uint32_t>();
uint16_t http_port = vm["http-port"].as<uint16_t>();

Expand All @@ -84,17 +84,7 @@ int main(int argc, char** argv) {

auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
gs::GraphDBConfig config(schema, data_path, shard_num);
#ifdef HUGEPAGE
config.allocator_strategy = gs::MemoryStrategy::kHugepagePrefered;
config.vertex_map_strategy = gs::MemoryStrategy::kHugepagePrefered;
config.vertex_table_strategy = gs::MemoryStrategy::kHugepagePrefered;
config.topology_strategy = gs::MemoryStrategy::kHugepagePrefered;
#else
config.allocator_strategy = gs::MemoryStrategy::kMemoryOnly;
config.vertex_map_strategy = gs::MemoryStrategy::kMemoryOnly;
config.vertex_table_strategy = gs::MemoryStrategy::kMemoryOnly;
config.topology_strategy = gs::MemoryStrategy::kMemoryOnly;
#endif
config.memory_level = memory_level;
config.enable_auto_compaction = true;
config.service_port = http_port;
db.Open(config);
Expand Down
22 changes: 11 additions & 11 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,9 @@ Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
GraphDBConfig config(schema, data_dir, thread_num);
config.warmup = warmup;
if (memory_only) {
config.allocator_strategy = MemoryStrategy::kMemoryOnly;
config.topology_strategy = MemoryStrategy::kMemoryOnly;
config.vertex_map_strategy = MemoryStrategy::kMemoryOnly;
config.vertex_table_strategy = MemoryStrategy::kMemoryOnly;
config.memory_level = 1;
} else {
config.allocator_strategy = MemoryStrategy::kSyncToFile;
config.topology_strategy = MemoryStrategy::kSyncToFile;
config.vertex_map_strategy = MemoryStrategy::kSyncToFile;
config.vertex_table_strategy = MemoryStrategy::kSyncToFile;
config.memory_level = 0;
}
config.enable_auto_compaction = enable_auto_compaction;
config.service_port = port;
Expand All @@ -97,8 +91,7 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
work_dir_ = data_dir;
thread_num_ = config.thread_num;
try {
graph_.Open(data_dir, config.vertex_map_strategy,
config.vertex_table_strategy, config.topology_strategy);
graph_.Open(data_dir, config.memory_level);
} catch (std::exception& e) {
LOG(ERROR) << "Exception: " << e.what();
return Result<bool>(StatusCode::InternalError,
Expand Down Expand Up @@ -129,7 +122,14 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
mutable_schema.EmplacePlugins(plugin_paths);

last_compaction_ts_ = 0;
openWalAndCreateContexts(data_dir, config.allocator_strategy);
MemoryStrategy allocator_strategy = MemoryStrategy::kMemoryOnly;
if (config.memory_level == 0) {
allocator_strategy = MemoryStrategy::kSyncToFile;
} else if (config.memory_level >= 2) {
allocator_strategy = MemoryStrategy::kHugepagePrefered;
}

openWalAndCreateContexts(data_dir, allocator_strategy);

if ((!create_empty_graph) && config.warmup) {
graph_.Warmup(thread_num_);
Expand Down
17 changes: 9 additions & 8 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,22 @@ struct GraphDBConfig {
warmup(false),
enable_auto_compaction(false),
service_port(-1),
vertex_map_strategy(MemoryStrategy::kMemoryOnly),
vertex_table_strategy(MemoryStrategy::kMemoryOnly),
topology_strategy(MemoryStrategy::kMemoryOnly),
allocator_strategy(MemoryStrategy::kMemoryOnly) {}
memory_level(1) {}

Schema schema;
std::string data_dir;
int thread_num;
bool warmup;
bool enable_auto_compaction;
int service_port;
MemoryStrategy vertex_map_strategy;
MemoryStrategy vertex_table_strategy;
MemoryStrategy topology_strategy;
MemoryStrategy allocator_strategy;

/*
0 - sync with disk;
1 - mmap virtual memory;
2 - prefering hugepages;
3 - force hugepages;
*/
int memory_level;
};

class GraphDB {
Expand Down
6 changes: 0 additions & 6 deletions flex/storages/rt_mutable_graph/dual_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ class DualCsrBase {
const std::string& edata_name,
const std::string& snapshot_dir,
size_t src_vertex_cap, size_t dst_vertex_cap) = 0;
#ifdef HUGEPAGE
virtual void OpenWithHugepages(const std::string& oe_name,
const std::string& ie_name,
const std::string& edata_name,
const std::string& snapshot_dir,
size_t src_vertex_cap,
size_t dst_vertex_cap) = 0;
#endif
virtual void Dump(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
const std::string& new_snapshot_dir) = 0;
Expand Down Expand Up @@ -120,15 +118,13 @@ class DualCsr : public DualCsrBase {
out_csr_->open_in_memory(snapshot_dir + "/" + oe_name, src_vertex_cap);
}

#ifdef HUGEPAGE
void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
const std::string& snapshot_dir, size_t src_vertex_cap,
size_t dst_vertex_cap) override {
in_csr_->open_with_hugepages(snapshot_dir + "/" + ie_name, dst_vertex_cap);
out_csr_->open_with_hugepages(snapshot_dir + "/" + oe_name, src_vertex_cap);
}
#endif

void Dump(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
Expand Down Expand Up @@ -261,14 +257,12 @@ class DualCsr<std::string_view> : public DualCsrBase {
column_.resize(std::max(column_.size() + (column_.size() + 4) / 5, 4096ul));
}

#ifdef HUGEPAGE
void OpenWithHugepages(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
const std::string& snapshot_dir, size_t src_vertex_cap,
size_t dst_vertex_cap) override {
LOG(FATAL) << "not supported...";
}
#endif

void Dump(const std::string& oe_name, const std::string& ie_name,
const std::string& edata_name,
Expand Down
8 changes: 0 additions & 8 deletions flex/storages/rt_mutable_graph/mutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,10 @@ class MutableCsrBase {
const std::string& work_dir) = 0;

virtual void open_in_memory(const std::string& prefix, size_t v_cap = 0) = 0;
#ifdef HUGEPAGE
virtual void open_with_hugepages(const std::string& prefix,
size_t v_cap = 0) {
LOG(FATAL) << "not supported...";
}
#endif

virtual void dump(const std::string& name,
const std::string& new_spanshot_dir) = 0;
Expand Down Expand Up @@ -768,7 +766,6 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
}
}

#ifdef HUGEPAGE
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {
mmap_array<int> degree_list;
degree_list.open(prefix + ".deg", false);
Expand Down Expand Up @@ -802,7 +799,6 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
delete cap_list;
}
}
#endif

void warmup(int thread_num) const override {
size_t vnum = adj_lists_.size();
Expand Down Expand Up @@ -1282,7 +1278,6 @@ class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
}
}

#ifdef HUGEPAGE
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {
nbr_list_.open_with_hugepages(prefix + ".snbr", v_cap);
size_t old_size = nbr_list_.size();
Expand All @@ -1293,7 +1288,6 @@ class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
}
}
}
#endif

void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
Expand Down Expand Up @@ -1640,9 +1634,7 @@ class EmptyCsr : public TypedMutableCsrBase<EDATA_T> {

void open_in_memory(const std::string& prefix, size_t v_cap) override {}

#ifdef HUGEPAGE
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {}
#endif

void dump(const std::string& name,
const std::string& new_spanshot_dir) override {}
Expand Down
77 changes: 32 additions & 45 deletions flex/storages/rt_mutable_graph/mutable_property_fragment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,7 @@ inline DualCsrBase* create_csr(EdgeStrategy oes, EdgeStrategy ies,
}

void MutablePropertyFragment::Open(const std::string& work_dir,
bool memory_only) {
Open(work_dir, MemoryStrategy::kMemoryOnly, MemoryStrategy::kMemoryOnly,
MemoryStrategy::kMemoryOnly);
}

void MutablePropertyFragment::Open(const std::string& work_dir,
MemoryStrategy vertex_map_strategy,
MemoryStrategy vertex_table_strategy,
MemoryStrategy topology_strategy) {
int memory_level) {
std::string schema_file = schema_path(work_dir);
std::string snapshot_dir{};
bool build_empty_graph = false;
Expand Down Expand Up @@ -153,46 +145,44 @@ void MutablePropertyFragment::Open(const std::string& work_dir,
for (size_t i = 0; i < vertex_label_num_; ++i) {
std::string v_label_name = schema_.get_vertex_label_name(i);

if (vertex_map_strategy == MemoryStrategy::kMemoryOnly) {
lf_indexers_[i].open_in_memory(snapshot_dir + "/" +
vertex_map_prefix(v_label_name));
#ifdef HUGEPAGE
} else if (vertex_map_strategy == MemoryStrategy::kHugepagePrefered) {
lf_indexers_[i].open_with_hugepages(snapshot_dir + "/" +
vertex_map_prefix(v_label_name));
#endif
} else {
assert(vertex_map_strategy == MemoryStrategy::kSyncToFile);
if (memory_level == 0) {
lf_indexers_[i].open(vertex_map_prefix(v_label_name), snapshot_dir,
tmp_dir_path);
}

if (vertex_table_strategy == MemoryStrategy::kMemoryOnly) {
vertex_data_[i].open(vertex_table_prefix(v_label_name), snapshot_dir,
tmp_dir_path, schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
if (!build_empty_graph) {
vertex_data_[i].copy_to_tmp(vertex_table_prefix(v_label_name),
snapshot_dir, tmp_dir_path);
}
} else if (memory_level == 1) {
lf_indexers_[i].open_in_memory(snapshot_dir + "/" +
vertex_map_prefix(v_label_name));
vertex_data_[i].open_in_memory(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
#ifdef HUGEPAGE
} else if (vertex_table_strategy == MemoryStrategy::kHugepagePrefered) {
} else if (memory_level == 2) {
lf_indexers_[i].open_with_hugepages(
snapshot_dir + "/" + vertex_map_prefix(v_label_name), false);
vertex_data_[i].open_with_hugepages(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
#endif
schema_.get_vertex_storage_strategies(v_label_name), false);
} else {
assert(vertex_table_strategy == MemoryStrategy::kSyncToFile);
vertex_data_[i].open(vertex_table_prefix(v_label_name), snapshot_dir,
tmp_dir_path, schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
}
if (!build_empty_graph &&
(vertex_table_strategy == MemoryStrategy::kSyncToFile)) {
vertex_data_[i].copy_to_tmp(vertex_table_prefix(v_label_name),
snapshot_dir, tmp_dir_path);
assert(memory_level == 3);
lf_indexers_[i].open_with_hugepages(
snapshot_dir + "/" + vertex_map_prefix(v_label_name), true);
vertex_data_[i].open_with_hugepages(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name), true);
}

size_t vertex_capacity =
schema_.get_max_vnum(v_label_name); // lf_indexers_[i].capacity();
if (build_empty_graph) {
Expand Down Expand Up @@ -236,27 +226,24 @@ void MutablePropertyFragment::Open(const std::string& work_dir,
create_csr(oe_strategy, ie_strategy, properties);
ie_[index] = dual_csr_list_[index]->GetInCsr();
oe_[index] = dual_csr_list_[index]->GetOutCsr();
if (topology_strategy == MemoryStrategy::kMemoryOnly) {
dual_csr_list_[index]->OpenInMemory(
if (memory_level == 0) {
dual_csr_list_[index]->Open(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
#ifdef HUGEPAGE
} else if (topology_strategy == MemoryStrategy::kHugepagePrefered) {
tmp_dir_path);
} else if (memory_level >= 2) {
dual_csr_list_[index]->OpenWithHugepages(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
#endif
} else {
assert(topology_strategy == MemoryStrategy::kSyncToFile);
dual_csr_list_[index]->Open(
dual_csr_list_[index]->OpenInMemory(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
tmp_dir_path);
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
}
ie_[index]->resize(vertex_capacities[dst_label_i]);
oe_[index]->resize(vertex_capacities[src_label_i]);
Expand Down
6 changes: 1 addition & 5 deletions flex/storages/rt_mutable_graph/mutable_property_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ class MutablePropertyFragment {
vid_t dst_lid, label_t edge_label, timestamp_t ts,
const Any& arc, Allocator& alloc);

void Open(const std::string& work_dir, bool memory_only);

void Open(const std::string& work_dir, MemoryStrategy vertex_map_strategy,
MemoryStrategy vertex_table_strategy,
MemoryStrategy topology_strategy);
void Open(const std::string& work_dir, int memory_level);

void Compact(uint32_t version);

Expand Down
Loading

0 comments on commit 9a17e5c

Please sign in to comment.