From e822d58e3677daec9ed46b535b3bb4e7cf151bfe Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Fri, 10 May 2024 16:09:30 +0000 Subject: [PATCH] feat(#3916): support @@execute_mode = 'request' --- hybridse/include/vm/engine.h | 4 ++++ hybridse/src/vm/engine.cc | 40 +++++++++++++++++++++---------- hybridse/src/vm/engine_context.cc | 3 ++- src/client/tablet_client.cc | 3 ++- src/client/tablet_client.h | 2 +- src/sdk/sql_cluster_router.cc | 37 ++++++++++++++++++++-------- src/sdk/sql_cluster_router.h | 5 ++-- src/sdk/sql_router.h | 2 +- src/tablet/tablet_impl.cc | 8 ++++++- 9 files changed, 75 insertions(+), 29 deletions(-) diff --git a/hybridse/include/vm/engine.h b/hybridse/include/vm/engine.h index 09586a8b03d..7e183d43c33 100644 --- a/hybridse/include/vm/engine.h +++ b/hybridse/include/vm/engine.h @@ -429,6 +429,10 @@ class Engine { /// request row info exists in 'values' option, as a format of: /// 1. [(col1_expr, col2_expr, ... ), (...), ...] /// 2. (col1_expr, col2_expr, ... ) + // + // This function only check on request/batchrequest mode, for batch mode it does nothing. + // As for old-fashioned usage, request row does not need to appear in SQL, so it won't report + // error even request rows is empty, instead checks should performed at the very beginning of Compute. static absl::Status ExtractRequestRowsInSQL(SqlContext* ctx); std::shared_ptr GetCacheLocked(const std::string& db, diff --git a/hybridse/src/vm/engine.cc b/hybridse/src/vm/engine.cc index 5c179fb79b6..4adc497bada 100644 --- a/hybridse/src/vm/engine.cc +++ b/hybridse/src/vm/engine.cc @@ -462,17 +462,23 @@ int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, Row* o if (!sql_request_rows.empty()) { row = sql_request_rows.at(0); } - auto task = std::dynamic_pointer_cast(compile_info_) - ->get_sql_context() - .cluster_job->GetTask(task_id) - .GetRoot(); + + auto info = std::dynamic_pointer_cast(compile_info_); + if (!info->GetRequestSchema().empty() && row.empty()) { + // a non-empty request row required but it not + LOG(WARNING) << "request SQL requires a non-empty request row, but empty row received"; + // TODO(someone): use status + return common::StatusCode::kRunSessionError; + } + + auto task = info->get_sql_context().cluster_job->GetTask(task_id).GetRoot(); + if (nullptr == task) { LOG(WARNING) << "fail to run request plan: taskid" << task_id << " not exist!"; return -2; } DLOG(INFO) << "Request Row Run with task_id " << task_id; - RunnerContext ctx(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, row, - sp_name_, is_debug_); + RunnerContext ctx(info->get_sql_context().cluster_job, row, sp_name_, is_debug_); auto output = task->RunWithCache(ctx); if (!output) { LOG(WARNING) << "Run request plan output is null"; @@ -491,13 +497,23 @@ int32_t BatchRequestRunSession::Run(const std::vector& request_batch, std:: } int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector& request_batch, std::vector& output) { - std::vector<::hybridse::codec::Row>& sql_request_rows = - std::dynamic_pointer_cast(GetCompileInfo())->get_sql_context().request_rows; + auto info = std::dynamic_pointer_cast(GetCompileInfo()); + std::vector<::hybridse::codec::Row>& sql_request_rows = info->get_sql_context().request_rows; + + std::vector<::hybridse::codec::Row> rows = sql_request_rows; + if (rows.empty()) { + rows = request_batch; + } + + if (!info->GetRequestSchema().empty() && rows.empty()) { + // a non-empty request row list required but it not + LOG(WARNING) << "batchrequest SQL requires a non-empty request row list, but empty row list received"; + // TODO(someone): use status + return common::StatusCode::kRunSessionError; + } - RunnerContext ctx(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, - sql_request_rows.empty() ? request_batch : sql_request_rows, sp_name_, is_debug_); - auto task = - std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job->GetTask(id).GetRoot(); + RunnerContext ctx(info->get_sql_context().cluster_job, rows, sp_name_, is_debug_); + auto task = info->get_sql_context().cluster_job->GetTask(id).GetRoot(); if (nullptr == task) { LOG(WARNING) << "Fail to run request plan: taskid" << id << " not exist!"; return -2; diff --git a/hybridse/src/vm/engine_context.cc b/hybridse/src/vm/engine_context.cc index 570726aa0eb..47ff39437d6 100644 --- a/hybridse/src/vm/engine_context.cc +++ b/hybridse/src/vm/engine_context.cc @@ -17,6 +17,7 @@ #include "vm/engine_context.h" #include "absl/container/flat_hash_map.h" +#include "absl/strings/ascii.h" namespace hybridse { namespace vm { @@ -61,7 +62,7 @@ std::string EngineModeName(EngineMode mode) { absl::StatusOr UnparseEngineMode(absl::string_view str) { auto& m = getModeMap(); - auto it = m.find(str); + auto it = m.find(absl::AsciiStrToLower(str)); if (it != m.end()) { return it->second; } diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 09e7bdcbd96..d1c2ed6d018 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -72,6 +72,7 @@ bool TabletClient::Query(const std::string& db, const std::string& sql, const st } bool TabletClient::Query(const std::string& db, const std::string& sql, + hybridse::vm::EngineMode default_mode, const std::vector& parameter_types, const std::string& parameter_row, brpc::Controller* cntl, ::openmldb::api::QueryResponse* response, const bool is_debug) { @@ -79,7 +80,7 @@ bool TabletClient::Query(const std::string& db, const std::string& sql, ::openmldb::api::QueryRequest request; request.set_sql(sql); request.set_db(db); - request.set_is_batch(true); + request.set_is_batch(default_mode == hybridse::vm::kBatchMode); request.set_is_debug(is_debug); request.set_parameter_row_size(parameter_row.size()); request.set_parameter_row_slices(1); diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index 66155c968d7..953e38b2288 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -64,7 +64,7 @@ class TabletClient : public Client { const openmldb::common::VersionPair& pair, std::string& msg); // NOLINT - bool Query(const std::string& db, const std::string& sql, + bool Query(const std::string& db, const std::string& sql, hybridse::vm::EngineMode default_mode, const std::vector& parameter_types, const std::string& parameter_row, brpc::Controller* cntl, ::openmldb::api::QueryResponse* response, const bool is_debug = false); diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 705fbd62400..4a08b27802b 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -55,6 +55,7 @@ #include "sdk/base.h" #include "sdk/base_impl.h" #include "sdk/batch_request_result_set_sql.h" +#include "sdk/internal/system_variable.h" #include "sdk/job_table_helper.h" #include "sdk/node_adapter.h" #include "sdk/query_future_impl.h" @@ -1214,8 +1215,8 @@ std::shared_ptr<::hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQLParamete cntl->set_timeout_ms(options_->request_timeout); DLOG(INFO) << "send query to tablet " << client->GetEndpoint(); auto response = std::make_shared<::openmldb::api::QueryResponse>(); - if (!client->Query(db, sql, parameter_types, parameter ? parameter->GetRow() : "", cntl.get(), response.get(), - options_->enable_debug)) { + if (!client->Query(db, sql, GetDefaultEngineMode(), parameter_types, parameter ? parameter->GetRow() : "", + cntl.get(), response.get(), options_->enable_debug)) { // rpc error is in cntl or response RPC_STATUS_AND_WARN(status, cntl, response, "Query rpc failed"); return {}; @@ -1994,7 +1995,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h case hybridse::node::kCmdShowGlobalVariables: { std::string db = openmldb::nameserver::INFORMATION_SCHEMA_DB; std::string table = openmldb::nameserver::GLOBAL_VARIABLES; - std::string sql = "select * from " + table; + std::string sql = "select * from " + table + " CONFIG (execute_mode = 'online')"; ::hybridse::sdk::Status status; auto rs = ExecuteSQLParameterized(db, sql, std::shared_ptr(), &status); if (status.code != 0) { @@ -2863,9 +2864,7 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( } case hybridse::node::kPlanTypeFuncDef: case hybridse::node::kPlanTypeQuery: { - ::hybridse::vm::EngineMode default_mode = (!cluster_sdk_->IsClusterMode() || is_online_mode) - ? ::hybridse::vm::EngineMode::kBatchMode - : ::hybridse::vm::EngineMode::kOffline; + ::hybridse::vm::EngineMode default_mode = GetDefaultEngineMode(); // execute_mode in query config clause takes precedence auto mode = ::hybridse::vm::Engine::TryDetermineEngineMode(sql, default_mode); if (mode != ::hybridse::vm::EngineMode::kOffline) { @@ -3170,10 +3169,27 @@ std::shared_ptr SQLClusterRouter::ExecuteOfflineQuery( } } -bool SQLClusterRouter::IsOnlineMode() { +::hybridse::vm::EngineMode SQLClusterRouter::GetDefaultEngineMode() const { std::lock_guard<::openmldb::base::SpinMutex> lock(mu_); auto it = session_variables_.find("execute_mode"); - if (it != session_variables_.end() && it->second == "online") { + if (it != session_variables_.end()) { + // 1. infer from system variable + auto m = hybridse::vm::UnparseEngineMode(it->second).value_or(hybridse::vm::EngineMode::kBatchMode); + + // 2. standalone mode do not have offline + if (!cluster_sdk_->IsClusterMode() && m == hybridse::vm::kOffline) { + return hybridse::vm::kBatchMode; + } + return m; + } + + return hybridse::vm::EngineMode::kBatchMode; +} + +bool SQLClusterRouter::IsOnlineMode() const { + std::lock_guard<::openmldb::base::SpinMutex> lock(mu_); + auto it = session_variables_.find("execute_mode"); + if (it != session_variables_.end() && (it->second == "online" || it->second == "request")) { return true; } return false; @@ -3240,8 +3256,9 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod std::transform(value.begin(), value.end(), value.begin(), ::tolower); // TODO(hw): validation can be simpler if (key == "execute_mode") { - if (value != "online" && value != "offline") { - return {StatusCode::kCmdError, "the value of execute_mode must be online|offline"}; + auto s = sdk::internal::CheckSystemVariableSet(key, value); + if (!s.ok()) { + return {hybridse::common::kCmdError, s.ToString()}; } } else if (key == "enable_trace" || key == "sync_job") { if (value != "true" && value != "false") { diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index 3d13cafa240..df23391e2f9 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -274,7 +274,8 @@ class SQLClusterRouter : public SQLRouter { bool NotifyTableChange() override; - bool IsOnlineMode() override; + ::hybridse::vm::EngineMode GetDefaultEngineMode() const; + bool IsOnlineMode() const override; bool IsEnableTrace(); std::string GetDatabase() override; @@ -452,7 +453,7 @@ class SQLClusterRouter : public SQLRouter { DBSDK* cluster_sdk_; std::map>>> input_lru_cache_; - ::openmldb::base::SpinMutex mu_; + mutable ::openmldb::base::SpinMutex mu_; ::openmldb::base::Random rand_; std::atomic insert_memory_usage_limit_ = 0; // [0-100], the default value 0 means unlimited }; diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index f68d7d39a1c..55fd72b6b5e 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -220,7 +220,7 @@ class SQLRouter { virtual bool NotifyTableChange() = 0; - virtual bool IsOnlineMode() = 0; + virtual bool IsOnlineMode() const = 0; virtual std::string GetDatabase() = 0; diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 8c59a4f9184..bec227a39b2 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -1690,6 +1690,7 @@ void TabletImpl::ProcessQuery(bool is_sub, RpcController* ctrl, const openmldb:: auto mode = hybridse::vm::Engine::TryDetermineEngineMode(request->sql(), default_mode); ::hybridse::base::Status status; + // FIXME(someone): it does not handles batchrequest if (mode == hybridse::vm::EngineMode::kBatchMode) { // convert repeated openmldb:type::DataType into hybridse::codec::Schema hybridse::codec::Schema parameter_schema; @@ -5425,7 +5426,12 @@ void TabletImpl::RunRequestQuery(RpcController* ctrl, const openmldb::api::Query } if (ret != 0) { response.set_code(::openmldb::base::kSQLRunError); - response.set_msg("fail to run sql"); + if (ret == hybridse::common::StatusCode::kRunSessionError) { + // special handling + response.set_msg("request SQL requires a non-empty request row, but empty row received"); + } else { + response.set_msg("fail to run sql"); + } return; } else if (row.GetRowPtrCnt() != 1) { response.set_code(::openmldb::base::kSQLRunError);