Skip to content

Commit

Permalink
fix: select from JOB_INFO should always in online mode (#3963)
Browse files Browse the repository at this point in the history
* fix: select from JOB_INFO should always in online mode

Fix error when user set default `execute_mode` to offline:

```sql
set global execute_mode = 'offline';
select 1;
```

* fix: query mode on user & pre_agg tables
  • Loading branch information
aceforeverd authored Jul 23, 2024
1 parent 60e4fb5 commit 4138c1b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object JobInfoManager {
}

def getAllJobs(): List[JobInfo] = {
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)
// TODO: Reorder in output, use orderby desc if SQL supported
resultSetToJobs(rs).sortWith(_.getId > _.getId)
Expand All @@ -82,7 +82,7 @@ object JobInfoManager {
def getUnfinishedJobs(): List[JobInfo] = {
// TODO: Now we can not add index for `state` and run sql with
// s"SELECT * FROM $tableName WHERE state NOT IN (${JobInfo.FINAL_STATE.mkString(",")})"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)

val jobs = mutable.ArrayBuffer[JobInfo]()
Expand All @@ -99,7 +99,7 @@ object JobInfoManager {
}

def stopJob(jobId: Int): JobInfo = {
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)

val jobInfo = if (rs.getFetchSize == 0) {
Expand Down Expand Up @@ -131,7 +131,7 @@ object JobInfoManager {

def getJob(jobId: Int): Option[JobInfo] = {
// TODO: Require to get only one row, https://github.com/4paradigm/OpenMLDB/issues/704
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)

if (rs.getFetchSize == 0) {
Expand Down
10 changes: 5 additions & 5 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ bool SQLClusterRouter::DropTable(const std::string& db, const std::string& table
std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME;
std::string select_aggr_info =
absl::StrCat("select aggr_db, aggr_table from ", meta_db, ".", meta_table, " where base_table = '",
table_info->name(), "' and base_db='", table_info->db(), "';");
table_info->name(), "' and base_db='", table_info->db(), "' CONFIG (execute_mode = 'online');");
auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "get aggr info failed", false);
if (rs->Size() > 0) {
Expand Down Expand Up @@ -5143,7 +5143,7 @@ void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::ma
std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetJobResultSet(int job_id,
::hybridse::sdk::Status* status) {
std::string db = openmldb::nameserver::INTERNAL_DB;
std::string sql = "SELECT * FROM JOB_INFO WHERE id = " + std::to_string(job_id);
std::string sql = absl::Substitute("SELECT * FROM JOB_INFO WHERE id = $0 CONFIG (execute_mode = 'online')", job_id);

auto rs = ExecuteSQLParameterized(db, sql, {}, status);
if (!status->IsOK()) {
Expand All @@ -5164,7 +5164,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetJobResultSet(int

std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetJobResultSet(::hybridse::sdk::Status* status) {
std::string db = openmldb::nameserver::INTERNAL_DB;
std::string sql = "SELECT * FROM JOB_INFO";
std::string sql = "SELECT * FROM JOB_INFO CONFIG (execute_mode = 'online')";
auto rs = ExecuteSQLParameterized(db, sql, std::shared_ptr<openmldb::sdk::SQLRequestRow>(), status);
if (!status->IsOK()) {
return {};
Expand All @@ -5187,7 +5187,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetTaskManagerJobRes
return this->GetJobResultSet(job_id, status);
}
std::string db = openmldb::nameserver::INTERNAL_DB;
std::string sql = "SELECT * FROM JOB_INFO;";
std::string sql = "SELECT * FROM JOB_INFO CONFIG (execute_mode = 'online');";
auto rs = ExecuteSQLParameterized(db, sql, {}, status);
if (!status->IsOK()) {
return {};
Expand Down Expand Up @@ -5226,7 +5226,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetNameServerJobResu
}

absl::StatusOr<bool> SQLClusterRouter::GetUser(const std::string& name, UserInfo* user_info) {
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME);
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME, " CONFIG (execute_mode = 'online')");
hybridse::sdk::Status status;
auto rs =
ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql, std::shared_ptr<openmldb::sdk::SQLRequestRow>(), &status);
Expand Down

0 comments on commit 4138c1b

Please sign in to comment.