Skip to content

Commit

Permalink
feat(#3916): support @@execute_mode = 'request'
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed May 11, 2024
1 parent 6569b42 commit e822d58
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 29 deletions.
4 changes: 4 additions & 0 deletions hybridse/include/vm/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ class Engine {
/// request row info exists in 'values' option, as a format of:
/// 1. [(col1_expr, col2_expr, ... ), (...), ...]
/// 2. (col1_expr, col2_expr, ... )
//
// This function only check on request/batchrequest mode, for batch mode it does nothing.
// As for old-fashioned usage, request row does not need to appear in SQL, so it won't report
// error even request rows is empty, instead checks should performed at the very beginning of Compute.
static absl::Status ExtractRequestRowsInSQL(SqlContext* ctx);

std::shared_ptr<CompileInfo> GetCacheLocked(const std::string& db,
Expand Down
40 changes: 28 additions & 12 deletions hybridse/src/vm/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,17 +462,23 @@ int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, Row* o
if (!sql_request_rows.empty()) {
row = sql_request_rows.at(0);
}
auto task = std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)
->get_sql_context()
.cluster_job->GetTask(task_id)
.GetRoot();

auto info = std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_);
if (!info->GetRequestSchema().empty() && row.empty()) {
// a non-empty request row required but it not
LOG(WARNING) << "request SQL requires a non-empty request row, but empty row received";
// TODO(someone): use status
return common::StatusCode::kRunSessionError;
}

auto task = info->get_sql_context().cluster_job->GetTask(task_id).GetRoot();

if (nullptr == task) {
LOG(WARNING) << "fail to run request plan: taskid" << task_id << " not exist!";
return -2;
}
DLOG(INFO) << "Request Row Run with task_id " << task_id;
RunnerContext ctx(std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job, row,
sp_name_, is_debug_);
RunnerContext ctx(info->get_sql_context().cluster_job, row, sp_name_, is_debug_);
auto output = task->RunWithCache(ctx);
if (!output) {
LOG(WARNING) << "Run request plan output is null";
Expand All @@ -491,13 +497,23 @@ int32_t BatchRequestRunSession::Run(const std::vector<Row>& request_batch, std::
}
int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector<Row>& request_batch,
std::vector<Row>& output) {
std::vector<::hybridse::codec::Row>& sql_request_rows =
std::dynamic_pointer_cast<SqlCompileInfo>(GetCompileInfo())->get_sql_context().request_rows;
auto info = std::dynamic_pointer_cast<SqlCompileInfo>(GetCompileInfo());
std::vector<::hybridse::codec::Row>& sql_request_rows = info->get_sql_context().request_rows;

std::vector<::hybridse::codec::Row> rows = sql_request_rows;
if (rows.empty()) {
rows = request_batch;
}

if (!info->GetRequestSchema().empty() && rows.empty()) {
// a non-empty request row list required but it not
LOG(WARNING) << "batchrequest SQL requires a non-empty request row list, but empty row list received";
// TODO(someone): use status
return common::StatusCode::kRunSessionError;
}

RunnerContext ctx(std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job,
sql_request_rows.empty() ? request_batch : sql_request_rows, sp_name_, is_debug_);
auto task =
std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job->GetTask(id).GetRoot();
RunnerContext ctx(info->get_sql_context().cluster_job, rows, sp_name_, is_debug_);
auto task = info->get_sql_context().cluster_job->GetTask(id).GetRoot();
if (nullptr == task) {
LOG(WARNING) << "Fail to run request plan: taskid" << id << " not exist!";
return -2;
Expand Down
3 changes: 2 additions & 1 deletion hybridse/src/vm/engine_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "vm/engine_context.h"

#include "absl/container/flat_hash_map.h"
#include "absl/strings/ascii.h"

namespace hybridse {
namespace vm {
Expand Down Expand Up @@ -61,7 +62,7 @@ std::string EngineModeName(EngineMode mode) {

absl::StatusOr<EngineMode> UnparseEngineMode(absl::string_view str) {
auto& m = getModeMap();
auto it = m.find(str);
auto it = m.find(absl::AsciiStrToLower(str));
if (it != m.end()) {
return it->second;
}
Expand Down
3 changes: 2 additions & 1 deletion src/client/tablet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ bool TabletClient::Query(const std::string& db, const std::string& sql, const st
}

bool TabletClient::Query(const std::string& db, const std::string& sql,
hybridse::vm::EngineMode default_mode,
const std::vector<openmldb::type::DataType>& parameter_types,
const std::string& parameter_row,
brpc::Controller* cntl, ::openmldb::api::QueryResponse* response, const bool is_debug) {
if (cntl == NULL || response == NULL) return false;
::openmldb::api::QueryRequest request;
request.set_sql(sql);
request.set_db(db);
request.set_is_batch(true);
request.set_is_batch(default_mode == hybridse::vm::kBatchMode);
request.set_is_debug(is_debug);
request.set_parameter_row_size(parameter_row.size());
request.set_parameter_row_slices(1);
Expand Down
2 changes: 1 addition & 1 deletion src/client/tablet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TabletClient : public Client {
const openmldb::common::VersionPair& pair,
std::string& msg); // NOLINT

bool Query(const std::string& db, const std::string& sql,
bool Query(const std::string& db, const std::string& sql, hybridse::vm::EngineMode default_mode,
const std::vector<openmldb::type::DataType>& parameter_types, const std::string& parameter_row,
brpc::Controller* cntl, ::openmldb::api::QueryResponse* response, const bool is_debug = false);

Expand Down
37 changes: 27 additions & 10 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "sdk/base.h"
#include "sdk/base_impl.h"
#include "sdk/batch_request_result_set_sql.h"
#include "sdk/internal/system_variable.h"
#include "sdk/job_table_helper.h"
#include "sdk/node_adapter.h"
#include "sdk/query_future_impl.h"
Expand Down Expand Up @@ -1214,8 +1215,8 @@ std::shared_ptr<::hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQLParamete
cntl->set_timeout_ms(options_->request_timeout);
DLOG(INFO) << "send query to tablet " << client->GetEndpoint();
auto response = std::make_shared<::openmldb::api::QueryResponse>();
if (!client->Query(db, sql, parameter_types, parameter ? parameter->GetRow() : "", cntl.get(), response.get(),
options_->enable_debug)) {
if (!client->Query(db, sql, GetDefaultEngineMode(), parameter_types, parameter ? parameter->GetRow() : "",
cntl.get(), response.get(), options_->enable_debug)) {
// rpc error is in cntl or response
RPC_STATUS_AND_WARN(status, cntl, response, "Query rpc failed");
return {};
Expand Down Expand Up @@ -1994,7 +1995,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::HandleSQLCmd(const h
case hybridse::node::kCmdShowGlobalVariables: {
std::string db = openmldb::nameserver::INFORMATION_SCHEMA_DB;
std::string table = openmldb::nameserver::GLOBAL_VARIABLES;
std::string sql = "select * from " + table;
std::string sql = "select * from " + table + " CONFIG (execute_mode = 'online')";
::hybridse::sdk::Status status;
auto rs = ExecuteSQLParameterized(db, sql, std::shared_ptr<openmldb::sdk::SQLRequestRow>(), &status);
if (status.code != 0) {
Expand Down Expand Up @@ -2863,9 +2864,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
}
case hybridse::node::kPlanTypeFuncDef:
case hybridse::node::kPlanTypeQuery: {
::hybridse::vm::EngineMode default_mode = (!cluster_sdk_->IsClusterMode() || is_online_mode)
? ::hybridse::vm::EngineMode::kBatchMode
: ::hybridse::vm::EngineMode::kOffline;
::hybridse::vm::EngineMode default_mode = GetDefaultEngineMode();
// execute_mode in query config clause takes precedence
auto mode = ::hybridse::vm::Engine::TryDetermineEngineMode(sql, default_mode);
if (mode != ::hybridse::vm::EngineMode::kOffline) {
Expand Down Expand Up @@ -3170,10 +3169,27 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteOfflineQuery(
}
}

bool SQLClusterRouter::IsOnlineMode() {
::hybridse::vm::EngineMode SQLClusterRouter::GetDefaultEngineMode() const {
std::lock_guard<::openmldb::base::SpinMutex> lock(mu_);
auto it = session_variables_.find("execute_mode");
if (it != session_variables_.end() && it->second == "online") {
if (it != session_variables_.end()) {
// 1. infer from system variable
auto m = hybridse::vm::UnparseEngineMode(it->second).value_or(hybridse::vm::EngineMode::kBatchMode);

// 2. standalone mode do not have offline
if (!cluster_sdk_->IsClusterMode() && m == hybridse::vm::kOffline) {
return hybridse::vm::kBatchMode;
}
return m;
}

return hybridse::vm::EngineMode::kBatchMode;
}

bool SQLClusterRouter::IsOnlineMode() const {
std::lock_guard<::openmldb::base::SpinMutex> lock(mu_);
auto it = session_variables_.find("execute_mode");
if (it != session_variables_.end() && (it->second == "online" || it->second == "request")) {
return true;
}
return false;
Expand Down Expand Up @@ -3240,8 +3256,9 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod
std::transform(value.begin(), value.end(), value.begin(), ::tolower);
// TODO(hw): validation can be simpler
if (key == "execute_mode") {
if (value != "online" && value != "offline") {
return {StatusCode::kCmdError, "the value of execute_mode must be online|offline"};
auto s = sdk::internal::CheckSystemVariableSet(key, value);
if (!s.ok()) {
return {hybridse::common::kCmdError, s.ToString()};
}
} else if (key == "enable_trace" || key == "sync_job") {
if (value != "true" && value != "false") {
Expand Down
5 changes: 3 additions & 2 deletions src/sdk/sql_cluster_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ class SQLClusterRouter : public SQLRouter {

bool NotifyTableChange() override;

bool IsOnlineMode() override;
::hybridse::vm::EngineMode GetDefaultEngineMode() const;
bool IsOnlineMode() const override;
bool IsEnableTrace();

std::string GetDatabase() override;
Expand Down Expand Up @@ -452,7 +453,7 @@ class SQLClusterRouter : public SQLRouter {
DBSDK* cluster_sdk_;
std::map<std::string, std::map<hybridse::vm::EngineMode, base::lru_cache<std::string, std::shared_ptr<SQLCache>>>>
input_lru_cache_;
::openmldb::base::SpinMutex mu_;
mutable ::openmldb::base::SpinMutex mu_;
::openmldb::base::Random rand_;
std::atomic<uint32_t> insert_memory_usage_limit_ = 0; // [0-100], the default value 0 means unlimited
};
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/sql_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class SQLRouter {

virtual bool NotifyTableChange() = 0;

virtual bool IsOnlineMode() = 0;
virtual bool IsOnlineMode() const = 0;

virtual std::string GetDatabase() = 0;

Expand Down
8 changes: 7 additions & 1 deletion src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,7 @@ void TabletImpl::ProcessQuery(bool is_sub, RpcController* ctrl, const openmldb::
auto mode = hybridse::vm::Engine::TryDetermineEngineMode(request->sql(), default_mode);

::hybridse::base::Status status;
// FIXME(someone): it does not handles batchrequest
if (mode == hybridse::vm::EngineMode::kBatchMode) {
// convert repeated openmldb:type::DataType into hybridse::codec::Schema
hybridse::codec::Schema parameter_schema;
Expand Down Expand Up @@ -5425,7 +5426,12 @@ void TabletImpl::RunRequestQuery(RpcController* ctrl, const openmldb::api::Query
}
if (ret != 0) {
response.set_code(::openmldb::base::kSQLRunError);
response.set_msg("fail to run sql");
if (ret == hybridse::common::StatusCode::kRunSessionError) {
// special handling
response.set_msg("request SQL requires a non-empty request row, but empty row received");
} else {
response.set_msg("fail to run sql");
}
return;
} else if (row.GetRowPtrCnt() != 1) {
response.set_code(::openmldb::base::kSQLRunError);
Expand Down

0 comments on commit e822d58

Please sign in to comment.