diff --git a/CMakeLists.txt b/CMakeLists.txt index bb90634cee..c9a0cd2413 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -481,6 +481,9 @@ set(MAPD_LIBRARIES Shared Catalog SqliteConnector Parser Analyzer Planner CsvImp if("${MAPD_EDITION_LOWER}" STREQUAL "ee") list(APPEND MAPD_LIBRARIES Distributed) + if(ENABLE_DISTRIBUTED_5_0) + list(APPEND MAPD_LIBRARIES StringDictionaryThread) + endif() endif() list(APPEND MAPD_LIBRARIES Calcite) diff --git a/MapDServer.cpp b/MapDServer.cpp index 0c50cc7237..3e708f4818 100644 --- a/MapDServer.cpp +++ b/MapDServer.cpp @@ -46,6 +46,12 @@ #include #include #include +#include "MapDRelease.h" +#include "Shared/Compressor.h" +#include "Shared/MapDParameters.h" +#include "Shared/file_delete.h" +#include "Shared/mapd_shared_ptr.h" +#include "Shared/scope.h" using namespace ::apache::thrift; using namespace ::apache::thrift::concurrency; @@ -241,7 +247,8 @@ namespace po = boost::program_options; class MapDProgramOptions { public: - MapDProgramOptions(char const* argv0) : log_options_(argv0) { + MapDProgramOptions(char const* argv0, bool dist_v5_ = false) + : log_options_(argv0), dist_v5_(dist_v5_) { fillOptions(); fillAdvancedOptions(); } @@ -250,6 +257,7 @@ class MapDProgramOptions { std::string base_path; std::string config_file = {"mapd.conf"}; std::string cluster_file = {"cluster.conf"}; + std::string cluster_topology_file = {"cluster_topology.conf"}; std::string license_path = {""}; bool cpu_only = false; bool verbose_logging = false; @@ -300,22 +308,39 @@ class MapDProgramOptions { int max_session_duration = kMinsPerMonth; std::string udf_file_name = {""}; - private: void fillOptions(); void fillAdvancedOptions(); po::options_description help_desc; po::options_description developer_desc; logger::LogOptions log_options_; - po::variables_map vm; + po::positional_options_description positional_options; public: std::vector db_leaves; std::vector string_leaves; + po::variables_map vm; + std::string clusterIds_arg; + + std::string getNodeIds(); + std::vector getNodeIdsArray(); + static const std::string nodeIds_token; + void overrideFrom(LeafHostInfo& host, + const std::vector& hosts, + rapidjson::Document& cluster_json); boost::optional parse_command_line(int argc, char const* const* argv); + void validate(); + void validate_base_path(); + void init_logging(); + const bool dist_v5_; }; +void MapDProgramOptions::init_logging() { + log_options_.set_base_path(base_path); + logger::init(log_options_); +} + void MapDProgramOptions::fillOptions() { help_desc.add_options()("help,h", "Show available options."); help_desc.add_options()( @@ -338,10 +363,12 @@ void MapDProgramOptions::fillOptions() { po::value(&mapd_parameters.calcite_max_mem) ->default_value(mapd_parameters.calcite_max_mem), "Max memory available to calcite JVM."); - help_desc.add_options()("calcite-port", - po::value(&mapd_parameters.calcite_port) - ->default_value(mapd_parameters.calcite_port), - "Calcite port number."); + if (!dist_v5_) { + help_desc.add_options()("calcite-port", + po::value(&mapd_parameters.calcite_port) + ->default_value(mapd_parameters.calcite_port), + "Calcite port number."); + } help_desc.add_options()("config", po::value(&config_file), "Path to server configuration file."); @@ -361,10 +388,13 @@ void MapDProgramOptions::fillOptions() { po::value(&mapd_parameters.cuda_grid_size) ->default_value(mapd_parameters.cuda_grid_size), "Size of grid to use on GPU."); - help_desc.add_options()( - "data", - po::value(&base_path)->required()->default_value("data"), - "Directory path to OmniSci data storage (catalogs, raw data, log files, etc)."); + if (!dist_v5_) { + help_desc.add_options()( + "data", + po::value(&base_path)->required()->default_value("data"), + "Directory path to OmniSci data storage (catalogs, raw data, log files, etc)."); + positional_options.add("data", 1); + } help_desc.add_options()("db-query-list", po::value(&db_query_file), "Path to file containing OmniSci warmup queries."); @@ -394,12 +424,14 @@ void MapDProgramOptions::fillOptions() { ->implicit_value(true), "Enable the overlaps hash join framework allowing for range " "join (e.g. spatial overlaps) computation using a hash table."); - help_desc.add_options()( - "enable-string-dict-hash-cache", - po::value(&g_cache_string_hash) - ->default_value(g_cache_string_hash) - ->implicit_value(true), - "Cache string hash values in the string dictionary server during import."); + if (!dist_v5_) { + help_desc.add_options()( + "enable-string-dict-hash-cache", + po::value(&g_cache_string_hash) + ->default_value(g_cache_string_hash) + ->implicit_value(true), + "Cache string hash values in the string dictionary server during import."); + } help_desc.add_options()( "enable-thrift-logs", po::value(&g_enable_thrift_logs) @@ -450,9 +482,11 @@ void MapDProgramOptions::fillOptions() { ->default_value(g_hll_precision_bits) ->implicit_value(g_hll_precision_bits), "Number of bits used from the hash value used to specify the bucket number."); - help_desc.add_options()("http-port", - po::value(&http_port)->default_value(http_port), - "HTTP port number."); + if (!dist_v5_) { + help_desc.add_options()("http-port", + po::value(&http_port)->default_value(http_port), + "HTTP port number."); + } help_desc.add_options()( "idle-session-duration", po::value(&idle_session_duration)->default_value(idle_session_duration), @@ -483,10 +517,12 @@ void MapDProgramOptions::fillOptions() { po::value(&g_overlaps_max_table_size_bytes) ->default_value(g_overlaps_max_table_size_bytes), "The maximum size in bytes of the hash table for an overlaps hash join."); - help_desc.add_options()("port,p", - po::value(&mapd_parameters.omnisci_server_port) - ->default_value(mapd_parameters.omnisci_server_port), - "TCP Port number."); + if (!dist_v5_) { + help_desc.add_options()("port,p", + po::value(&mapd_parameters.omnisci_server_port) + ->default_value(mapd_parameters.omnisci_server_port), + "TCP Port number."); + } help_desc.add_options()("num-gpus", po::value(&num_gpus)->default_value(num_gpus), "Number of gpus to use."); @@ -567,7 +603,8 @@ void MapDProgramOptions::fillAdvancedOptions() { developer_desc.add_options()( "jit-debug-ir", po::value(&jit_debug)->default_value(jit_debug)->implicit_value(true), - "Enable runtime debugger support for the JIT. Note that this flag is incompatible " + "Enable runtime debugger support for the JIT. Note that this flag is " + "incompatible " "with the `ENABLE_JIT_DEBUG` build flag. The generated code can be found at " "`/tmp/mapdquery`."); developer_desc.add_options()( @@ -594,7 +631,8 @@ void MapDProgramOptions::fillAdvancedOptions() { po::value(&g_max_memory_allocation_size) ->default_value(g_max_memory_allocation_size), "Maximum allocation size for a fixed output buffer allocation for projection " - "queries with no pre-flight count. Default is the maximum slab size (sizes greater " + "queries with no pre-flight count. Default is the maximum slab size (sizes " + "greater " "than the maximum slab size have no affect). Requires bump allocator."); developer_desc.add_options()( "min-output-projection-allocation-bytes", @@ -602,18 +640,22 @@ void MapDProgramOptions::fillAdvancedOptions() { ->default_value(g_min_memory_allocation_size), "Minimum allocation size for a fixed output buffer allocation for projection " "queries with no pre-flight count. If an allocation of this size cannot be " - "obtained, the query will be retried with different execution parameters and/or on " + "obtained, the query will be retried with different execution parameters and/or " + "on " "CPU (if allow-cpu-retry is enabled). Requires bump allocator."); - developer_desc.add_options()( - "enable-bump-allocator", - po::value(&g_enable_bump_allocator) - ->default_value(g_enable_bump_allocator) - ->implicit_value(true), - "Enable the bump allocator for projection queries on GPU. The bump allocator will " - "allocate a fixed size buffer for each query, track the number of rows passing the " - "kernel during query execution, and copy back only the rows that passed the kernel " - "to CPU after execution. When disabled, pre-flight count queries are used to size " - "the output buffer for projection queries."); + developer_desc.add_options()("enable-bump-allocator", + po::value(&g_enable_bump_allocator) + ->default_value(g_enable_bump_allocator) + ->implicit_value(true), + "Enable the bump allocator for projection queries on " + "GPU. The bump allocator will " + "allocate a fixed size buffer for each query, track the " + "number of rows passing the " + "kernel during query execution, and copy back only the " + "rows that passed the kernel " + "to CPU after execution. When disabled, pre-flight " + "count queries are used to size " + "the output buffer for projection queries."); developer_desc.add_options()("ssl-cert", po::value(&mapd_parameters.ssl_cert_file) ->default_value(std::string("")), @@ -647,7 +689,7 @@ void MapDProgramOptions::fillAdvancedOptions() { po::value(&udf_file_name), "Load user defined extension functions from this file at startup. The file is " "expected to be a C/C++ file with extension .cpp."); -}; +} namespace { @@ -677,11 +719,90 @@ bool trim_and_check_file_exists(std::string& filename, const std::string desc) { } // namespace +void MapDProgramOptions::validate_base_path() { + boost::algorithm::trim_if(base_path, boost::is_any_of("\"'")); + if (!boost::filesystem::exists(base_path)) { + throw std::runtime_error("OmniSci base directory does not exist at " + base_path); + } +} + +void MapDProgramOptions::validate() { + boost::algorithm::trim_if(base_path, boost::is_any_of("\"'")); + const auto data_path = boost::filesystem::path(base_path) / "mapd_data"; + if (!boost::filesystem::exists(data_path)) { + throw std::runtime_error("OmniSci data directory does not exist at '" + base_path + + "'"); + } + + { + const auto lock_file = boost::filesystem::path(base_path) / "omnisci_server_pid.lck"; + auto pid = std::to_string(getpid()); + + int pid_fd = open(lock_file.c_str(), O_RDWR | O_CREAT, 0644); + if (pid_fd == -1) { + auto err = std::string("Failed to open PID file ") + lock_file.c_str() + ". " + + strerror(errno) + "."; + throw std::runtime_error(err); + } + if (lockf(pid_fd, F_TLOCK, 0) == -1) { + close(pid_fd); + auto err = std::string("Another OmniSci Server is using data directory ") + + base_path + "."; + throw std::runtime_error(err); + } + if (ftruncate(pid_fd, 0) == -1) { + close(pid_fd); + auto err = std::string("Failed to truncate PID file ") + lock_file.c_str() + ". " + + strerror(errno) + "."; + throw std::runtime_error(err); + } + if (write(pid_fd, pid.c_str(), pid.length()) == -1) { + close(pid_fd); + auto err = std::string("Failed to write PID file ") + lock_file.c_str() + ". " + + strerror(errno) + "."; + throw std::runtime_error(err); + } + } + boost::algorithm::trim_if(db_query_file, boost::is_any_of("\"'")); + if (db_query_file.length() > 0 && !boost::filesystem::exists(db_query_file)) { + throw std::runtime_error("File containing DB queries " + db_query_file + + " does not exist."); + } + const auto db_file = + boost::filesystem::path(base_path) / "mapd_catalogs" / OMNISCI_SYSTEM_CATALOG; + if (!boost::filesystem::exists(db_file)) { + { // check old system catalog existsense + const auto db_file = boost::filesystem::path(base_path) / "mapd_catalogs/mapd"; + if (!boost::filesystem::exists(db_file)) { + throw std::runtime_error("OmniSci system catalog " + OMNISCI_SYSTEM_CATALOG + + " does not exist."); + } + } + } + if (license_path.length() == 0) { + license_path = base_path + "/omnisci.license"; + } + + // add all parameters to be displayed on startup + LOG(INFO) << "OmniSci started with data directory at '" << base_path << "'"; + if (vm.count("license-path")) { + LOG(INFO) << "License key path set to '" << license_path << "'"; + } + LOG(INFO) << " Watchdog is set to " << enable_watchdog; + LOG(INFO) << " Dynamic Watchdog is set to " << enable_dynamic_watchdog; + if (enable_dynamic_watchdog) { + LOG(INFO) << " Dynamic Watchdog timeout is set to " << dynamic_watchdog_time_limit; + } + + LOG(INFO) << " Debug Timer is set to " << g_enable_debug_timer; + + LOG(INFO) << " Maximum Idle session duration " << idle_session_duration; + + LOG(INFO) << " Maximum active session duration " << max_session_duration; +} + boost::optional MapDProgramOptions::parse_command_line(int argc, char const* const* argv) { - po::positional_options_description positional_options; - positional_options.add("data", 1); - po::options_description all_desc("All options"); all_desc.add(help_desc).add(developer_desc); @@ -947,13 +1068,7 @@ void heartbeat() { } } -int main(int argc, char** argv) { - MapDProgramOptions prog_config_opts(argv[0]); - - if (auto return_code = prog_config_opts.parse_command_line(argc, argv)) { - return *return_code; - } - +int startMapdServer(MapDProgramOptions& prog_config_opts) { // try to enforce an orderly shutdown even after a signal register_signal_handlers(); @@ -1093,4 +1208,31 @@ int main(int argc, char** argv) { } else { return signum; } -}; +} + +const std::string MapDProgramOptions::nodeIds_token = {"nodeIds"}; + +int main(int argc, char** argv) { + bool has_clust_topo = false; + + MapDProgramOptions prog_config_opts(argv[0], has_clust_topo); + + try { + if (auto return_code = prog_config_opts.parse_command_line(argc, argv)) { + return *return_code; + } + + if (!has_clust_topo) { + prog_config_opts.validate_base_path(); + prog_config_opts.validate(); + prog_config_opts.init_logging(); + return (startMapdServer(prog_config_opts)); + } + } catch (std::runtime_error& e) { + std::cerr << "Can't start: " << e.what() << std::endl; + return 1; + } catch (boost::program_options::error& e) { + std::cerr << "Usage Error: " << e.what() << std::endl; + return 1; + } +} diff --git a/OmniSciWebServer.go b/OmniSciWebServer.go index 1e3c4f1d6c..0d9a5466d1 100644 --- a/OmniSciWebServer.go +++ b/OmniSciWebServer.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "crypto/rand" "crypto/sha256" "crypto/tls" @@ -34,6 +35,9 @@ import ( "github.com/rs/cors" log "github.com/sirupsen/logrus" "github.com/spf13/pflag" + "github.com/xeipuuv/gojsonschema" + "go.etcd.io/etcd/client" + "go.etcd.io/etcd/pkg/types" graceful "gopkg.in/tylerb/graceful.v1" ) @@ -109,6 +113,358 @@ const ( samlErrorPage = "/saml-error.html" ) +type stringDictionary struct { + NodeID string `json:"nodeId"` + Path string `json:"data"` + Port int `json:"port"` + HostName string `json:"hostname"` +} + +type leaf struct { + NodeID string `json:"nodeId"` + Path string `json:"data"` + Port int `json:"port"` + StringDictionary string `json:"stringDictionary"` + HostName string `json:"hostname"` +} + +type aggregator struct { + NodeID string `json:"nodeId"` + Path string `json:"data"` + Port int `json:"port"` + StringDictionary string `json:"stringDictionary"` + Leaves []string `json:"leaves"` + HostName string `json:"hostname"` + LeafConnTimeout int `json:"leaf_conn_timeout"` + LeafRecvTimeout int `json:"leaf_recv_timeout"` + LeafSendTimeout int `json:"leaf_send_timeout"` +} + +type cluster struct { + HostNames []string `json:"hostnames"` + StringDictionary *stringDictionary `json:"stringDictionary"` + Leaves []leaf `json:"leaves"` + Aggregator *aggregator `json:"aggregator"` +} + +// Keep this in synch with the duplicate in +// LeafHostInfo.cpp +const ( + clusterSchema = "{" + + " \"$id\": \"https://example.com/arrays.schema.json\"," + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\"," + + " \"description\": \"The json schema for the cluster file\"," + + " \"type\": \"object\"," + + " \"additionalProperties\": false," + + " \"properties\": {" + + " \"hostnames\": {" + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"string\"" + + " }" + + " }," + + " \"dynamic\": {" + + " \"type\": \"boolean\"" + + " }," + + " \"aggregator\": {" + + " \"type\": \"object\"," + + " \"$ref\": \"#/definitions/aggregator\"" + + " }," + + " \"stringDictionary\": {" + + " \"type\": \"object\"," + + " \"$ref\": \"#/definitions/stringDictionary\"" + + " }," + + " \"leaves\": {" + + " \"type\": \"array\"," + + " \"items\": {" + + " \"$ref\": \"#/definitions/leaf\"" + + " }," + + " \"description\": \"The leaves on this host\"" + + " }" + + " }," + + " \"definitions\": {" + + " \"stringDictionary\": {" + + " \"type\": \"object\"," + + " \"required\": [" + + " \"data\"," + + " \"port\"" + + " ]," + + " \"additionalProperties\": false," + + " \"properties\": {" + + " \"nodeId\": {" + + " \"type\": \"string\"," + + " \"description\": \"The nodeId on the cluster\"" + + " }," + + " \"data\": {" + + " \"type\": \"string\"," + + " \"description\": \"Base path for storage\"" + + " }," + + " \"port\": {" + + " \"type\": \"integer\"," + + " \"description\": \"Port number to bind to\"" + + " }," + + " \"hostname\": {" + + " \"type\": \"string\"," + + " \"description\": \"The host name where this node will run\"" + + " }," + + + " \"ssl_cert_file\": {" + + " \"type\": \"string\"," + + " \"description\": \"The ssl certificate file location\"" + + " }," + + " \"ssl_key_file\": {" + + " \"type\": \"string\"," + + " \"description\": \"The ssl key file location\"" + + " }," + + " \"cache-string-hash\": {" + + " \"type\": \"boolean\"," + + " \"description\": \"Enable cache to store hashes in string dictionary server\"" + + " }" + + " }" + + " }," + + " \"leaf\": {" + + " \"type\": \"object\"," + + " \"required\": [" + + " \"data\"," + + " \"port\"" + + " ]," + + " \"additionalProperties\": false," + + " \"properties\": {" + + " \"nodeId\": {" + + " \"type\": \"string\"," + + " \"description\": \"The nodeId on the cluster\"" + + " }," + + " \"data\": {" + + " \"type\": \"string\"," + + " \"description\": \"Base path for storage\"" + + " }," + + " \"port\": {" + + " \"type\": \"integer\"," + + " \"description\": \"Port number to bind to\"" + + " }," + + " \"hostname\": {" + + " \"type\": \"string\"," + + " \"description\": \"The host name where this node will run\"" + + " }" + + " }" + + " }," + + " \"aggregator\": {" + + " \"type\": \"object\"," + + " \"required\": [" + + " \"data\"," + + " \"port\"" + + " ]," + + " \"additionalProperties\": false," + + " \"properties\": {" + + " \"nodeId\": {" + + " \"type\": \"string\"," + + " \"description\": \"The nodeId on the cluster\"" + + " }," + + " \"port\": {" + + " \"type\": \"integer\"," + + " \"description\": \"Port number to bind to\"" + + " }," + + " \"data\": {" + + " \"type\": \"string\"," + + " \"description\": \"Base path for storage\"" + + " }," + + " \"hostname\": {" + + " \"type\": \"string\"," + + " \"description\": \"The host name where this node will run\"" + + " }," + + " \"leaf_conn_timeout\": {" + + " \"type\": \"integer\"," + + " \"description\": \"Leaf connect timeout, in milliseconds.\"" + + " }," + + " \"leaf_recv_timeout\": {" + + " \"type\": \"integer\"," + + " \"description\": \"Leaf receive timeout, in milliseconds.\"" + + " }," + + " \"leaf_send_timeout\": {" + + " \"type\": \"integer\"," + + " \"description\": \"Leaf send timeout, in milliseconds.\"" + + " }" + + " }" + + " }" + + " }" + + "}" + + "" +) + +// TODO etcd.go ends up using -2 from the base port +// Make this really independent of that code +func (that *cluster) getEndpointURL(nextHostName *int, nodeHostname string, port int) string { + var hostname string + if nodeHostname != "" { + hostname = nodeHostname + } else { + hostname = that.HostNames[*nextHostName] + *nextHostName = ((*nextHostName) + 1) % len(that.HostNames) + } + // FIXME this will break if encryption is enabled + var returns = "http://" + hostname + ":" + strconv.Itoa(port-2) + return returns +} + +// Preserve the same order as in Etcd::create_initial_urlmap in Cluster.cpp +func (that *cluster) getEndpoints() []string { + var returns []string + nextHostName := 0 + + // Preserve the same order as in LeafHostInfo::parseClusterConfigNew + sd := that.StringDictionary + var endURLStr = that.getEndpointURL(&nextHostName, sd.HostName, sd.Port) + returns = append(returns, endURLStr) + for _, item := range that.Leaves { + var endURLStr = that.getEndpointURL(&nextHostName, item.HostName, item.Port) + returns = append(returns, endURLStr) + } + agg := that.Aggregator + endURLStr = that.getEndpointURL(&nextHostName, agg.HostName, agg.Port) + returns = append(returns, endURLStr) + + return returns +} + +// TODO Pull the schema into the sources by hardcoding it before the release +func getCluster(clusterFile string) *cluster { + // schemaLoader := gojsonschema.NewReferenceLoader("file://./cluster.conf.schema.json") + schemaLoader := gojsonschema.NewStringLoader(clusterSchema) + documentLoader := gojsonschema.NewReferenceLoader("file://" + clusterFile) + + result, err := gojsonschema.Validate(schemaLoader, documentLoader) + if err != nil { + panic(err.Error()) + } + + var clusterConf = cluster{} + + if result.Valid() { + // Now it is validated parse it + jsonBytes, err := ioutil.ReadFile(clusterFile) + + if nil == err { + err = json.Unmarshal(jsonBytes, &clusterConf) + } + } else { + fmt.Printf("The document is not valid. see errors :\n") + for _, desc := range result.Errors() { + log.Printf("- %s\n", desc) + } + } + return &clusterConf +} + +func getEndpoints(initialPeerUrlsmap string) []string { + urlMap, _ := types.NewURLsMap(initialPeerUrlsmap) + returns := []string{} + + for _, v := range urlMap { + // fmt.Printf("key[%s] value[%s]\n", k, v) + var url = v[0] + urlStr := url.String() + returns = append(returns, urlStr) + } + + return returns +} + +func getClient(endPoints []string) (*client.Client, error) { + cfg := client.Config{ + Endpoints: endPoints, + Transport: client.DefaultTransport, + HeaderTimeoutPerRequest: time.Second, + } + var c client.Client + c, err := client.New(cfg) + return &c, err +} + +func handleCluster(clusterFile string) { + var cluster = getCluster(clusterFile) + var endPoints = cluster.getEndpoints() + + c, err := getClient(endPoints) + if err != nil { + log.Fatal(err) + } + + go notifyOnLeader(c, cluster) +} + +func isAggregator(leader *client.Member, clust *cluster) bool { + returns := false + + if clust.Aggregator != nil && clust.Aggregator.NodeID == leader.Name { + returns = true + } + + return returns +} + +func setPort(in *url.URL, portNum int) { + host, port, _ := net.SplitHostPort(in.Host) + port = strconv.Itoa(portNum) + in.Host = host + ":" + port +} + +func getBackendURL(leader *client.Member) (*url.URL, error) { + // TODO MapDServer.cpp ends up using +1 from the base port + // Make this really independent of that code + var returns *url.URL + var err error + + if len(leader.PeerURLs) > 0 { + + backendURLStr := leader.PeerURLs[0] + var backendURL *url.URL + backendURL, err = url.Parse(backendURLStr) + host, port, err := net.SplitHostPort(backendURL.Host) + var sPort = "" + if "" != port && nil == err { + var iport, _ = strconv.Atoi(port) + iport = iport + 4 + sPort = ":" + strconv.Itoa(iport) + } + scheme := "http" + if viper.IsSet("ssl-cert") && viper.IsSet("ssl-private-key") { + scheme = "https" + http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + backendURL.Scheme = scheme + backendURL.Host = host + sPort + returns = backendURL + } + + return returns, err +} + +func notifyOnLeader(cl *client.Client, clust *cluster) { + for { + members := client.NewMembersAPI(*cl) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + leader, _ := members.Leader(ctx) + cancel() + + if nil != leader { + if isAggregator(leader, clust) { + var err error + leaderBackendURL, err := getBackendURL(leader) + if nil == err && leaderBackendURL.String() != backendURL.String() { + var msg = "Aggregator is changed to " + leader.Name + " at " + leaderBackendURL.String() + fmt.Println(msg) + log.Info(msg) + + backendURL = leaderBackendURL + } + } + } + + time.Sleep(200 * time.Millisecond) + } +} + func getLogName(lvl string) string { n := filepath.Base(os.Args[0]) h, _ := os.Hostname() @@ -125,6 +481,7 @@ func init() { pflag.IntP("port", "p", 6273, "frontend server port") pflag.IntP("http-to-https-redirect-port", "", 6280, "frontend server port for http redirect, when https enabled") pflag.StringP("backend-url", "b", "", "url to http-port on omnisci_server [http://localhost:6278]") + pflag.StringP("cluster_topology", "", "", "The cluster configuration file") pflag.StringSliceP("reverse-proxy", "", nil, "additional endpoints to act as reverse proxies, format '/endpoint/:http://target.example.com'") pflag.StringP("frontend", "f", "frontend", "path to frontend directory") pflag.StringP("jupyter-url", "", "", "url for jupyter integration") @@ -191,6 +548,8 @@ func init() { viper.BindPFlag("verbose", pflag.CommandLine.Lookup("verbose")) viper.BindPFlag("version", pflag.CommandLine.Lookup("version")) + viper.BindPFlag("cluster_topology", pflag.CommandLine.Lookup("cluster_topology")) + viper.SetDefault("http-port", 6278) viper.SetEnvPrefix("MAPD") @@ -245,6 +604,11 @@ func init() { backendURLStr = s + "://localhost:" + strconv.Itoa(viper.GetInt("http-port")) } + var cluster = viper.GetString("cluster_topology") + if cluster != "" { + handleCluster(cluster) + } + backendURL, err = url.Parse(backendURLStr) if err != nil { log.Fatal(err) @@ -423,11 +787,6 @@ type ResponseMultiWriter struct { http.ResponseWriter } -func (w *ResponseMultiWriter) writeHeader(c int) { - w.ResponseWriter.Header().Del("Content-Length") - w.ResponseWriter.WriteHeader(c) -} - func (w *ResponseMultiWriter) Write(b []byte) (int, error) { h := w.ResponseWriter.Header() h.Del("Content-Length") @@ -1183,7 +1542,6 @@ func serversHandler(rw http.ResponseWriter, r *http.Request) { s.Username = "admin" s.Password = "HyperInteractive" s.Database = "omnisci" - h, p, _ := net.SplitHostPort(r.Host) s.Port, _ = net.LookupPort("tcp", p) s.Host = h diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index 50d9af630e..7327f3ed51 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -5,7 +5,7 @@ set(TEST_BASE_PATH "./tmp") add_definitions("-DBASE_PATH=\"${TEST_BASE_PATH}\"") add_executable(CodeGeneratorTest CodeGeneratorTest.cpp) -add_executable(ExecuteTest ExecuteTest.cpp) +add_executable(ExecuteTest ExecuteTest.cpp ClusterTester.cpp) add_executable(RunQueryLoop RunQueryLoop.cpp) add_executable(StringDictionaryTest StringDictionaryTest.cpp) add_executable(StringTransformTest StringTransformTest.cpp) diff --git a/Tests/ClusterTester.cpp b/Tests/ClusterTester.cpp new file mode 100644 index 0000000000..b71b59df38 --- /dev/null +++ b/Tests/ClusterTester.cpp @@ -0,0 +1,9 @@ +/* + * Copyright 2019 OmniSci Technologies, Inc. + * + */ + +/* + * @file Cluster.cpp + * Copyright (c) 2019 OmniSci Technologies, Inc. All rights reserved. + */ diff --git a/Tests/ClusterTester.h b/Tests/ClusterTester.h new file mode 100644 index 0000000000..b6e9cd2ea8 --- /dev/null +++ b/Tests/ClusterTester.h @@ -0,0 +1,16 @@ +/* + * Copyright 2019 OmniSci Technologies, Inc. + * + */ + +/* + * @file Cluster.h + * @author + * + * Copyright (c) 2019 OmniSci Technologies, Inc. All rights reserved. + */ + +#ifndef CLUSTER_TESTER_H +#define CLUSTER_TESTER_H + +#endif diff --git a/Tests/ExecuteTest.cpp b/Tests/ExecuteTest.cpp index b121d482fc..2c162d3c39 100644 --- a/Tests/ExecuteTest.cpp +++ b/Tests/ExecuteTest.cpp @@ -27,6 +27,7 @@ #include "../Shared/TimeGM.h" #include "../Shared/scope.h" #include "../SqliteConnector/SqliteConnector.h" +#include "ClusterTester.h" #include "DistributedLoader.h" #include @@ -34,7 +35,6 @@ #include #include #include -#include #ifndef BASE_PATH #define BASE_PATH "./tmp" @@ -512,6 +512,25 @@ void validate_storage_options( } // namespace +TEST(Distributed50, FailOver) { + run_ddl_statement("DROP TABLE IF EXISTS dist5;"); + run_ddl_statement( + "create table dist5 (col1 TEXT ENCODING DICT) with (partitions='replicated');"); + + auto dt = ExecutorDeviceType::CPU; + + EXPECT_NO_THROW(run_multiple_agg("insert into dist5 values('t1');", dt)); + ASSERT_EQ(1, v(run_simple_agg("SELECT count(*) FROM dist5;", dt))); + + EXPECT_NO_THROW(run_multiple_agg("insert into dist5 values('t2');", dt)); + ASSERT_EQ(2, v(run_simple_agg("SELECT count(*) FROM dist5;", dt))); + + EXPECT_NO_THROW(run_multiple_agg("insert into dist5 values('t3');", dt)); + ASSERT_EQ(3, v(run_simple_agg("SELECT count(*) FROM dist5;", dt))); + + run_ddl_statement("DROP TABLE IF EXISTS dist5;"); +} + TEST(Create, StorageOptions) { for (auto dt : {ExecutorDeviceType::CPU, ExecutorDeviceType::GPU}) { SKIP_NO_GPU(); @@ -5366,7 +5385,7 @@ void import_geospatial_test() { gp900913 GEOMETRY(POINT,900913), gl4326none GEOMETRY(LINESTRING,4326) ENCODING NONE, gpoly4326 GEOMETRY(POLYGON,4326) - ) WITH (fragment_size=2); + ) WITH (fragment_size=2); )"; run_ddl_statement(create_ddl); TestHelpers::ValuesGenerator gen("geospatial_test"); @@ -15541,7 +15560,8 @@ int create_and_populate_window_func_table() { run_ddl_statement(drop_test_table); g_sqlite_comparator.query(drop_test_table); const std::string create_test_table{ - "CREATE TABLE test_window_func(x INTEGER, y TEXT, t INTEGER, d DATE, f FLOAT, dd " + "CREATE TABLE test_window_func(x INTEGER, y TEXT, t INTEGER, d DATE, f FLOAT, " + "dd " "DOUBLE);"}; run_ddl_statement(create_test_table); g_sqlite_comparator.query(create_test_table); @@ -15663,7 +15683,8 @@ int create_and_populate_tables(bool with_delete_support = true) { run_ddl_statement(drop_old_test); g_sqlite_comparator.query(drop_old_test); std::string columns_definition{ - "x int not null, y int, xx smallint, str text encoding dict, dt DATE, dt32 DATE " + "x int not null, y int, xx smallint, str text encoding dict, dt DATE, dt32 " + "DATE " "ENCODING FIXED(32), dt16 DATE ENCODING FIXED(16), ts TIMESTAMP"}; const auto create_test_inner = build_create_table_statement(columns_definition, @@ -15675,7 +15696,8 @@ int create_and_populate_tables(bool with_delete_support = true) { g_aggregator); run_ddl_statement(create_test_inner); g_sqlite_comparator.query( - "CREATE TABLE test_inner(x int not null, y int, xx smallint, str text, dt DATE, " + "CREATE TABLE test_inner(x int not null, y int, xx smallint, str text, dt " + "DATE, " "dt32 DATE, dt16 DATE, ts DATETIME);"); } catch (...) { LOG(ERROR) << "Failed to (re-)create table 'test_inner'"; @@ -15947,10 +15969,12 @@ int create_and_populate_tables(bool with_delete_support = true) { "INSERT INTO test VALUES(7, 42, 101, 1001, 't', 1.1, 1.1, null, 2.2, null, " "'foo', null, 'foo', null, " "'real_foo', 'foo'," - "'2014-12-13 22:23:15', '2014-12-13 22:23:15.323', '1999-07-11 14:02:53.874533', " + "'2014-12-13 22:23:15', '2014-12-13 22:23:15.323', '1999-07-11 " + "14:02:53.874533', " "'2006-04-26 " "03:49:04.607435125', " - "'15:13:14', '1999-09-09', '1999-09-09', '1999-09-09', 9, 111.1, 111.1, 'fish', " + "'15:13:14', '1999-09-09', '1999-09-09', '1999-09-09', 9, 111.1, 111.1, " + "'fish', " "null, " "2147483647, -2147483648, null, -1, 32767);"}; run_multiple_agg(insert_query, ExecutorDeviceType::CPU); @@ -15960,7 +15984,8 @@ int create_and_populate_tables(bool with_delete_support = true) { const std::string insert_query{ "INSERT INTO test VALUES(8, 43, -78, 1002, 'f', 1.2, 101.2, -101.2, 2.4, " "-2002.4, 'bar', null, 'bar', null, " - "'real_bar', NULL, '2014-12-13 22:23:15', '2014-12-13 22:23:15.323', '2014-12-13 " + "'real_bar', NULL, '2014-12-13 22:23:15', '2014-12-13 22:23:15.323', " + "'2014-12-13 " "22:23:15.874533', " "'2014-12-13 22:23:15.607435763', '15:13:14', NULL, NULL, NULL, NULL, 222.2, " "222.2, " @@ -16009,7 +16034,8 @@ int create_and_populate_tables(bool with_delete_support = true) { g_aggregator); run_ddl_statement(create_test); g_sqlite_comparator.query( - "CREATE TABLE test_empty(x int not null, y int, z smallint, t bigint, b boolean, " + "CREATE TABLE test_empty(x int not null, y int, z smallint, t bigint, b " + "boolean, " "f " "float, ff float, fn float, d " "double, dn double, str varchar(10), null_str text, fixed_str text, " @@ -16067,7 +16093,8 @@ int create_and_populate_tables(bool with_delete_support = true) { } { const std::string insert_query{ - "INSERT INTO test_one_row VALUES(8, 43, -78, 1002, 'f', 1.2, 101.2, -101.2, 2.4, " + "INSERT INTO test_one_row VALUES(8, 43, -78, 1002, 'f', 1.2, 101.2, -101.2, " + "2.4, " "-2002.4, 'bar', null, 'bar', null, " "'real_bar', NULL, '2014-12-13 22:23:15', " "'15:13:14', NULL, NULL, NULL, NULL, 222.2, 222.2, " @@ -16113,7 +16140,8 @@ int create_and_populate_tables(bool with_delete_support = true) { "text encoding none, m " "timestamp(0), n time(0), o date, o1 date encoding fixed(16), " "o2 date encoding fixed(32), fx int encoding fixed(16), dd decimal(10, 2), " - "dd_notnull decimal(10, 2) not null, ss text encoding dict, u int, ofd int, ufd " + "dd_notnull decimal(10, 2) not null, ss text encoding dict, u int, ofd int, " + "ufd " "int not null, ofq bigint, ufq " "bigint not null"}; const std::string create_test = @@ -16131,7 +16159,8 @@ int create_and_populate_tables(bool with_delete_support = true) { "text, null_str text," "fixed_str text, real_str text, m timestamp(0), n time(0), o date, o1 date, " "o2 date, fx int, dd decimal(10, 2), " - "dd_notnull decimal(10, 2) not null, ss text, u int, ofd int, ufd int not null, " + "dd_notnull decimal(10, 2) not null, ss text, u int, ofd int, ufd int not " + "null, " "ofq bigint, ufq bigint not " "null);"); } catch (...) { @@ -16145,7 +16174,8 @@ int create_and_populate_tables(bool with_delete_support = true) { "'foo', null, 'foo', 'real_foo', " "'2014-12-13 " "22:23:15', " - "'15:13:14', '1999-09-09', '1999-09-09', '1999-09-09', 9, 111.1, 111.1, 'fish', " + "'15:13:14', '1999-09-09', '1999-09-09', '1999-09-09', 9, 111.1, 111.1, " + "'fish', " "null, " "2147483647, -2147483648, null, -1);"}; run_multiple_agg(insert_query, ExecutorDeviceType::CPU); @@ -16172,7 +16202,8 @@ int create_and_populate_tables(bool with_delete_support = true) { "'real_baz', " "'2014-12-13 " "22:23:15', " - "'15:13:14', '1999-09-09', '1999-09-09', '1999-09-09', 11, 333.3, 333.3, 'boat', " + "'15:13:14', '1999-09-09', '1999-09-09', '1999-09-09', 11, 333.3, 333.3, " + "'boat', " "null, 1, -1, " "1, -9223372036854775808);"}; run_multiple_agg(insert_query, ExecutorDeviceType::CPU); @@ -16205,7 +16236,8 @@ int create_and_populate_tables(bool with_delete_support = true) { const std::string drop_old_array_test{"DROP TABLE IF EXISTS array_test_inner;"}; run_ddl_statement(drop_old_array_test); const std::string create_array_test{ - "CREATE TABLE array_test_inner(x int, arr_i16 smallint[], arr_i32 int[], arr_i64 " + "CREATE TABLE array_test_inner(x int, arr_i16 smallint[], arr_i32 int[], " + "arr_i64 " "bigint[], arr_str text[] " "encoding " "dict, " @@ -16221,8 +16253,8 @@ int create_and_populate_tables(bool with_delete_support = true) { if (!g_aggregator) { try { size_t num_shards = choose_shard_count(); - // check if the oversubscriptions to GPU for multiple Shard is correctly functional - // or not. + // check if the oversubscriptions to GPU for multiple Shard is correctly + // functional or not. const size_t num_oversubscription = 10; ShardInfo shard_info{(num_shards) ? "i" : "", num_shards}; @@ -16455,7 +16487,8 @@ int create_and_populate_tables(bool with_delete_support = true) { int create_views() { const std::string create_view_test{ - "CREATE VIEW view_test AS SELECT test.*, test_inner.* FROM test, test_inner WHERE " + "CREATE VIEW view_test AS SELECT test.*, test_inner.* FROM test, test_inner " + "WHERE " "test.str = test_inner.str;"}; const std::string drop_old_view{"DROP VIEW IF EXISTS view_test;"}; const std::string create_join_view_test{ @@ -16516,7 +16549,8 @@ int create_as_select_empty() { run_ddl_statement(drop_ctas_test); g_sqlite_comparator.query(drop_ctas_test); const std::string create_ctas_test{ - "CREATE TABLE empty_ctas_test AS SELECT x, f, d, str, fixed_str FROM test WHERE " + "CREATE TABLE empty_ctas_test AS SELECT x, f, d, str, fixed_str FROM test " + "WHERE " "x > 8;"}; run_ddl_statement(create_ctas_test); g_sqlite_comparator.query(create_ctas_test); @@ -16653,6 +16687,8 @@ void drop_views() { } // namespace int main(int argc, char** argv) { + std::cout << "Starting ExecuteTest" << std::endl; + testing::InitGoogleTest(&argc, argv); namespace po = boost::program_options; @@ -16690,9 +16726,9 @@ int main(int argc, char** argv) { ->implicit_value(true), "Enable the bump allocator for projection queries on GPU."); desc.add_options()("keep-data", "Don't drop tables at the end of the tests"); - desc.add_options()( - "use-existing-data", - "Don't create and drop tables and only run select tests (it implies --keep-data)."); + desc.add_options()("use-existing-data", + "Don't create and drop tables and only run select tests (it " + "implies --keep-data)."); desc.add_options()("dump-ir", po::value(), "Dump IR for all executed queries to file. Currently only supports " diff --git a/ThirdParty/licenses/index.md b/ThirdParty/licenses/index.md index 51d2c2eb23..13b737e0f2 100644 --- a/ThirdParty/licenses/index.md +++ b/ThirdParty/licenses/index.md @@ -28,7 +28,7 @@ SQLite | [Public Domain]() | Catalog mana Xorg | [MIT]() | OpenGL rendering | zlib | [zlib]() | PNG support | --- | --- | --- | -glslang | [BSD BSD-like MIT](https://github.com/KhronosGroup/glslang/blob/master/LICENSE.txt) | Rendering support +glslang | [BSD BSD-like MIT](https://github.com/KhronosGroup/glslang/blob/master/LICENSE.txt) | Rendering support spirv-tools | [Apache-2.0](https://github.com/KhronosGroup/SPIRV-Tools/blob/master/LICENSE) | Rendering support spirv-cross | [Apache-2.0](https://github.com/KhronosGroup/SPIRV-Cross/blob/master/LICENSE) | Rendering support --- | --- | --- | diff --git a/ThriftHandler/MapDHandler.cpp b/ThriftHandler/MapDHandler.cpp index 660f86d369..72478da1fd 100644 --- a/ThriftHandler/MapDHandler.cpp +++ b/ThriftHandler/MapDHandler.cpp @@ -5598,8 +5598,8 @@ void MapDHandler::register_runtime_udf( } // TODO: add UDF registration permission scheme. Currently, UDFs are - // registered globally, that means that all users can use as well as overwrite UDFs that - // was created possibly by anoher user. + // registered globally, that means that all users can use as well as overwrite UDFs + // that was created possibly by anoher user. VLOG(1) << "Registering runtime UDF with signatures:\n" << signatures; /* Changing a UDF implementation (but not the signature) requires diff --git a/ThriftHandler/MapDHandler.h b/ThriftHandler/MapDHandler.h index fbf0821b19..885a0b380f 100644 --- a/ThriftHandler/MapDHandler.h +++ b/ThriftHandler/MapDHandler.h @@ -472,6 +472,7 @@ class MapDHandler : public MapDIf { Catalog_Namespace::UserMetadata& user_meta, std::shared_ptr cat, LogSession&); + void disconnect_impl(const SessionMap::iterator& session_it); void check_table_load_privileges(const TSessionId& session, const std::string& table_name); @@ -656,9 +657,10 @@ class MapDHandler : public MapDIf { static bool has_view_permission(const AccessPrivileges& privs, const TDBObjectPermissions& permissions); - // For the provided upper case column names `uc_column_names`, return the tables - // from `table_names` which contain at least one of them. Used to rank the TABLE - // auto-completion hints by the columns specified in the projection. + // For the provided upper case column names `uc_column_names`, return + // the tables from `table_names` which contain at least one of them. + // Used to rank the TABLE auto-completion hints by the columns + // specified in the projection. std::unordered_set get_uc_compatible_table_names_by_column( const std::unordered_set& uc_column_names, const std::vector& table_names, @@ -666,8 +668,9 @@ class MapDHandler : public MapDIf { SessionMap sessions_; - bool super_user_rights_; // default is "false"; setting to "true" ignores passwd checks - // in "connect(..)" method + bool super_user_rights_; // default is "false"; setting to "true" + // ignores passwd checks in "connect(..)" + // method const int idle_session_duration_; // max duration of idle session const int max_session_duration_; // max duration of session