Skip to content

Commit

Permalink
Rework distributed node startup process
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Baden <[email protected]>
  • Loading branch information
kursatu authored and alexbaden committed Sep 6, 2019
1 parent 61358b3 commit a03a29c
Show file tree
Hide file tree
Showing 10 changed files with 653 additions and 86 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ set(MAPD_LIBRARIES Shared Catalog SqliteConnector Parser Analyzer Planner CsvImp

if("${MAPD_EDITION_LOWER}" STREQUAL "ee")
list(APPEND MAPD_LIBRARIES Distributed)
if(ENABLE_DISTRIBUTED_5_0)
list(APPEND MAPD_LIBRARIES StringDictionaryThread)
endif()
endif()

list(APPEND MAPD_LIBRARIES Calcite)
Expand Down
240 changes: 191 additions & 49 deletions MapDServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
#include <sstream>
#include <thread>
#include <vector>
#include "MapDRelease.h"
#include "Shared/Compressor.h"
#include "Shared/MapDParameters.h"
#include "Shared/file_delete.h"
#include "Shared/mapd_shared_ptr.h"
#include "Shared/scope.h"

using namespace ::apache::thrift;
using namespace ::apache::thrift::concurrency;
Expand Down Expand Up @@ -241,7 +247,8 @@ namespace po = boost::program_options;

class MapDProgramOptions {
public:
MapDProgramOptions(char const* argv0) : log_options_(argv0) {
MapDProgramOptions(char const* argv0, bool dist_v5_ = false)
: log_options_(argv0), dist_v5_(dist_v5_) {
fillOptions();
fillAdvancedOptions();
}
Expand All @@ -250,6 +257,7 @@ class MapDProgramOptions {
std::string base_path;
std::string config_file = {"mapd.conf"};
std::string cluster_file = {"cluster.conf"};
std::string cluster_topology_file = {"cluster_topology.conf"};
std::string license_path = {""};
bool cpu_only = false;
bool verbose_logging = false;
Expand Down Expand Up @@ -300,22 +308,39 @@ class MapDProgramOptions {
int max_session_duration = kMinsPerMonth;
std::string udf_file_name = {""};

private:
void fillOptions();
void fillAdvancedOptions();

po::options_description help_desc;
po::options_description developer_desc;
logger::LogOptions log_options_;
po::variables_map vm;
po::positional_options_description positional_options;

public:
std::vector<LeafHostInfo> db_leaves;
std::vector<LeafHostInfo> string_leaves;
po::variables_map vm;
std::string clusterIds_arg;

std::string getNodeIds();
std::vector<std::string> getNodeIdsArray();
static const std::string nodeIds_token;
void overrideFrom(LeafHostInfo& host,
const std::vector<LeafHostInfo>& hosts,
rapidjson::Document& cluster_json);

boost::optional<int> parse_command_line(int argc, char const* const* argv);
void validate();
void validate_base_path();
void init_logging();
const bool dist_v5_;
};

void MapDProgramOptions::init_logging() {
log_options_.set_base_path(base_path);
logger::init(log_options_);
}

void MapDProgramOptions::fillOptions() {
help_desc.add_options()("help,h", "Show available options.");
help_desc.add_options()(
Expand All @@ -338,10 +363,12 @@ void MapDProgramOptions::fillOptions() {
po::value<size_t>(&mapd_parameters.calcite_max_mem)
->default_value(mapd_parameters.calcite_max_mem),
"Max memory available to calcite JVM.");
help_desc.add_options()("calcite-port",
po::value<int>(&mapd_parameters.calcite_port)
->default_value(mapd_parameters.calcite_port),
"Calcite port number.");
if (!dist_v5_) {
help_desc.add_options()("calcite-port",
po::value<int>(&mapd_parameters.calcite_port)
->default_value(mapd_parameters.calcite_port),
"Calcite port number.");
}
help_desc.add_options()("config",
po::value<std::string>(&config_file),
"Path to server configuration file.");
Expand All @@ -361,10 +388,13 @@ void MapDProgramOptions::fillOptions() {
po::value<size_t>(&mapd_parameters.cuda_grid_size)
->default_value(mapd_parameters.cuda_grid_size),
"Size of grid to use on GPU.");
help_desc.add_options()(
"data",
po::value<std::string>(&base_path)->required()->default_value("data"),
"Directory path to OmniSci data storage (catalogs, raw data, log files, etc).");
if (!dist_v5_) {
help_desc.add_options()(
"data",
po::value<std::string>(&base_path)->required()->default_value("data"),
"Directory path to OmniSci data storage (catalogs, raw data, log files, etc).");
positional_options.add("data", 1);
}
help_desc.add_options()("db-query-list",
po::value<std::string>(&db_query_file),
"Path to file containing OmniSci warmup queries.");
Expand Down Expand Up @@ -394,12 +424,14 @@ void MapDProgramOptions::fillOptions() {
->implicit_value(true),
"Enable the overlaps hash join framework allowing for range "
"join (e.g. spatial overlaps) computation using a hash table.");
help_desc.add_options()(
"enable-string-dict-hash-cache",
po::value<bool>(&g_cache_string_hash)
->default_value(g_cache_string_hash)
->implicit_value(true),
"Cache string hash values in the string dictionary server during import.");
if (!dist_v5_) {
help_desc.add_options()(
"enable-string-dict-hash-cache",
po::value<bool>(&g_cache_string_hash)
->default_value(g_cache_string_hash)
->implicit_value(true),
"Cache string hash values in the string dictionary server during import.");
}
help_desc.add_options()(
"enable-thrift-logs",
po::value<bool>(&g_enable_thrift_logs)
Expand Down Expand Up @@ -450,9 +482,11 @@ void MapDProgramOptions::fillOptions() {
->default_value(g_hll_precision_bits)
->implicit_value(g_hll_precision_bits),
"Number of bits used from the hash value used to specify the bucket number.");
help_desc.add_options()("http-port",
po::value<int>(&http_port)->default_value(http_port),
"HTTP port number.");
if (!dist_v5_) {
help_desc.add_options()("http-port",
po::value<int>(&http_port)->default_value(http_port),
"HTTP port number.");
}
help_desc.add_options()(
"idle-session-duration",
po::value<int>(&idle_session_duration)->default_value(idle_session_duration),
Expand Down Expand Up @@ -483,10 +517,12 @@ void MapDProgramOptions::fillOptions() {
po::value<size_t>(&g_overlaps_max_table_size_bytes)
->default_value(g_overlaps_max_table_size_bytes),
"The maximum size in bytes of the hash table for an overlaps hash join.");
help_desc.add_options()("port,p",
po::value<int>(&mapd_parameters.omnisci_server_port)
->default_value(mapd_parameters.omnisci_server_port),
"TCP Port number.");
if (!dist_v5_) {
help_desc.add_options()("port,p",
po::value<int>(&mapd_parameters.omnisci_server_port)
->default_value(mapd_parameters.omnisci_server_port),
"TCP Port number.");
}
help_desc.add_options()("num-gpus",
po::value<int>(&num_gpus)->default_value(num_gpus),
"Number of gpus to use.");
Expand Down Expand Up @@ -567,7 +603,8 @@ void MapDProgramOptions::fillAdvancedOptions() {
developer_desc.add_options()(
"jit-debug-ir",
po::value<bool>(&jit_debug)->default_value(jit_debug)->implicit_value(true),
"Enable runtime debugger support for the JIT. Note that this flag is incompatible "
"Enable runtime debugger support for the JIT. Note that this flag is "
"incompatible "
"with the `ENABLE_JIT_DEBUG` build flag. The generated code can be found at "
"`/tmp/mapdquery`.");
developer_desc.add_options()(
Expand All @@ -594,26 +631,31 @@ void MapDProgramOptions::fillAdvancedOptions() {
po::value<size_t>(&g_max_memory_allocation_size)
->default_value(g_max_memory_allocation_size),
"Maximum allocation size for a fixed output buffer allocation for projection "
"queries with no pre-flight count. Default is the maximum slab size (sizes greater "
"queries with no pre-flight count. Default is the maximum slab size (sizes "
"greater "
"than the maximum slab size have no affect). Requires bump allocator.");
developer_desc.add_options()(
"min-output-projection-allocation-bytes",
po::value<size_t>(&g_min_memory_allocation_size)
->default_value(g_min_memory_allocation_size),
"Minimum allocation size for a fixed output buffer allocation for projection "
"queries with no pre-flight count. If an allocation of this size cannot be "
"obtained, the query will be retried with different execution parameters and/or on "
"obtained, the query will be retried with different execution parameters and/or "
"on "
"CPU (if allow-cpu-retry is enabled). Requires bump allocator.");
developer_desc.add_options()(
"enable-bump-allocator",
po::value<bool>(&g_enable_bump_allocator)
->default_value(g_enable_bump_allocator)
->implicit_value(true),
"Enable the bump allocator for projection queries on GPU. The bump allocator will "
"allocate a fixed size buffer for each query, track the number of rows passing the "
"kernel during query execution, and copy back only the rows that passed the kernel "
"to CPU after execution. When disabled, pre-flight count queries are used to size "
"the output buffer for projection queries.");
developer_desc.add_options()("enable-bump-allocator",
po::value<bool>(&g_enable_bump_allocator)
->default_value(g_enable_bump_allocator)
->implicit_value(true),
"Enable the bump allocator for projection queries on "
"GPU. The bump allocator will "
"allocate a fixed size buffer for each query, track the "
"number of rows passing the "
"kernel during query execution, and copy back only the "
"rows that passed the kernel "
"to CPU after execution. When disabled, pre-flight "
"count queries are used to size "
"the output buffer for projection queries.");
developer_desc.add_options()("ssl-cert",
po::value<std::string>(&mapd_parameters.ssl_cert_file)
->default_value(std::string("")),
Expand Down Expand Up @@ -647,7 +689,7 @@ void MapDProgramOptions::fillAdvancedOptions() {
po::value<std::string>(&udf_file_name),
"Load user defined extension functions from this file at startup. The file is "
"expected to be a C/C++ file with extension .cpp.");
};
}

namespace {

Expand Down Expand Up @@ -677,11 +719,90 @@ bool trim_and_check_file_exists(std::string& filename, const std::string desc) {

} // namespace

void MapDProgramOptions::validate_base_path() {
boost::algorithm::trim_if(base_path, boost::is_any_of("\"'"));
if (!boost::filesystem::exists(base_path)) {
throw std::runtime_error("OmniSci base directory does not exist at " + base_path);
}
}

void MapDProgramOptions::validate() {
boost::algorithm::trim_if(base_path, boost::is_any_of("\"'"));
const auto data_path = boost::filesystem::path(base_path) / "mapd_data";
if (!boost::filesystem::exists(data_path)) {
throw std::runtime_error("OmniSci data directory does not exist at '" + base_path +
"'");
}

{
const auto lock_file = boost::filesystem::path(base_path) / "omnisci_server_pid.lck";
auto pid = std::to_string(getpid());

int pid_fd = open(lock_file.c_str(), O_RDWR | O_CREAT, 0644);
if (pid_fd == -1) {
auto err = std::string("Failed to open PID file ") + lock_file.c_str() + ". " +
strerror(errno) + ".";
throw std::runtime_error(err);
}
if (lockf(pid_fd, F_TLOCK, 0) == -1) {
close(pid_fd);
auto err = std::string("Another OmniSci Server is using data directory ") +
base_path + ".";
throw std::runtime_error(err);
}
if (ftruncate(pid_fd, 0) == -1) {
close(pid_fd);
auto err = std::string("Failed to truncate PID file ") + lock_file.c_str() + ". " +
strerror(errno) + ".";
throw std::runtime_error(err);
}
if (write(pid_fd, pid.c_str(), pid.length()) == -1) {
close(pid_fd);
auto err = std::string("Failed to write PID file ") + lock_file.c_str() + ". " +
strerror(errno) + ".";
throw std::runtime_error(err);
}
}
boost::algorithm::trim_if(db_query_file, boost::is_any_of("\"'"));
if (db_query_file.length() > 0 && !boost::filesystem::exists(db_query_file)) {
throw std::runtime_error("File containing DB queries " + db_query_file +
" does not exist.");
}
const auto db_file =
boost::filesystem::path(base_path) / "mapd_catalogs" / OMNISCI_SYSTEM_CATALOG;
if (!boost::filesystem::exists(db_file)) {
{ // check old system catalog existsense
const auto db_file = boost::filesystem::path(base_path) / "mapd_catalogs/mapd";
if (!boost::filesystem::exists(db_file)) {
throw std::runtime_error("OmniSci system catalog " + OMNISCI_SYSTEM_CATALOG +
" does not exist.");
}
}
}
if (license_path.length() == 0) {
license_path = base_path + "/omnisci.license";
}

// add all parameters to be displayed on startup
LOG(INFO) << "OmniSci started with data directory at '" << base_path << "'";
if (vm.count("license-path")) {
LOG(INFO) << "License key path set to '" << license_path << "'";
}
LOG(INFO) << " Watchdog is set to " << enable_watchdog;
LOG(INFO) << " Dynamic Watchdog is set to " << enable_dynamic_watchdog;
if (enable_dynamic_watchdog) {
LOG(INFO) << " Dynamic Watchdog timeout is set to " << dynamic_watchdog_time_limit;
}

LOG(INFO) << " Debug Timer is set to " << g_enable_debug_timer;

LOG(INFO) << " Maximum Idle session duration " << idle_session_duration;

LOG(INFO) << " Maximum active session duration " << max_session_duration;
}

boost::optional<int> MapDProgramOptions::parse_command_line(int argc,
char const* const* argv) {
po::positional_options_description positional_options;
positional_options.add("data", 1);

po::options_description all_desc("All options");
all_desc.add(help_desc).add(developer_desc);

Expand Down Expand Up @@ -947,13 +1068,7 @@ void heartbeat() {
}
}

int main(int argc, char** argv) {
MapDProgramOptions prog_config_opts(argv[0]);

if (auto return_code = prog_config_opts.parse_command_line(argc, argv)) {
return *return_code;
}

int startMapdServer(MapDProgramOptions& prog_config_opts) {
// try to enforce an orderly shutdown even after a signal
register_signal_handlers();

Expand Down Expand Up @@ -1093,4 +1208,31 @@ int main(int argc, char** argv) {
} else {
return signum;
}
};
}

const std::string MapDProgramOptions::nodeIds_token = {"nodeIds"};

int main(int argc, char** argv) {
bool has_clust_topo = false;

MapDProgramOptions prog_config_opts(argv[0], has_clust_topo);

try {
if (auto return_code = prog_config_opts.parse_command_line(argc, argv)) {
return *return_code;
}

if (!has_clust_topo) {
prog_config_opts.validate_base_path();
prog_config_opts.validate();
prog_config_opts.init_logging();
return (startMapdServer(prog_config_opts));
}
} catch (std::runtime_error& e) {
std::cerr << "Can't start: " << e.what() << std::endl;
return 1;
} catch (boost::program_options::error& e) {
std::cerr << "Usage Error: " << e.what() << std::endl;
return 1;
}
}
Loading

0 comments on commit a03a29c

Please sign in to comment.