Skip to content

Commit

Permalink
Merge pull request #28797 from taosdata/enh/TD-32907-3.0
Browse files Browse the repository at this point in the history
enh: add bypassFlag to facilitate performance testing
  • Loading branch information
guanshengliang authored Nov 19, 2024
2 parents 686c20c + ca49a3e commit a4b6d9d
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 19 deletions.
1 change: 1 addition & 0 deletions docs/zh/14-reference/01-components/01-taosd.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ charset 的有效值是 UTF-8。
|checkpointBackupDir | |内部参数,用于恢复 snode 数据|
|enableAuditDelete | |内部参数,用于测试审计功能|
|slowLogThresholdTest| |内部参数,用于测试慢日志|
|bypassFlag |3.3.4.5 后|内部参数,用于短路测试,默认值 0|

### 压缩参数
|参数名称|支持版本|参数含义|
Expand Down
1 change: 1 addition & 0 deletions docs/zh/14-reference/01-components/02-taosc.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ TDengine 客户端驱动提供了应用编程所需要的全部 API,并且在
|safetyCheckLevel |3.3.3.0 后|内部参数,用于随机失败测试|
|simdEnable |3.3.4.3 后|内部参数,用于测试 SIMD 加速|
|AVX512Enable |3.3.4.3 后|内部参数,用于测试 AVX512 加速|
|bypassFlag |3.3.4.5 后|内部参数,用于短路测试,缺省值:0|

### SHELL 相关
|参数名称|支持版本|参数含义|
Expand Down
1 change: 1 addition & 0 deletions include/common/tglobal.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ extern int64_t tsTickPerHour[3];
extern int32_t tsCountAlwaysReturnValue;
extern float tsSelectivityRatio;
extern int32_t tsTagFilterResCacheSize;
extern int32_t tsBypassFlag;

// queue & threads
extern int32_t tsNumOfRpcThreads;
Expand Down
10 changes: 10 additions & 0 deletions include/util/tdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,16 @@ enum {

enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };

/**
* RB: return before
* RA: return after
* NR: not return, skip and go on following steps
*/
#define TSDB_BYPASS_RB_RPC_SEND_SUBMIT 0x01u
#define TSDB_BYPASS_RA_RPC_RECV_SUBMIT 0x02u
#define TSDB_BYPASS_RB_TSDB_WRITE_MEM 0x04u
#define TSDB_BYPASS_RB_TSDB_COMMIT 0x08u

#define DEFAULT_HANDLE 0
#define MNODE_HANDLE 1
#define QNODE_HANDLE -1
Expand Down
13 changes: 11 additions & 2 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ float tsSelectivityRatio = 1.0;
int32_t tsTagFilterResCacheSize = 1024 * 10;
char tsTagFilterCache = 0;

int32_t tsBypassFlag = 0;

// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
Expand Down Expand Up @@ -612,6 +614,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "safetyCheckLevel", tsSafetyCheckLevel, 0, 5, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "bypassFlag", tsBypassFlag, 0, INT32_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));

tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
Expand Down Expand Up @@ -1303,6 +1306,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "safetyCheckLevel");
tsSafetyCheckLevel = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "bypassFlag");
tsBypassFlag = pItem->i32;

TAOS_RETURN(TSDB_CODE_SUCCESS);
}

Expand Down Expand Up @@ -2046,7 +2053,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"bypassFlag", &tsBypassFlag}};

if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
Expand Down Expand Up @@ -2302,7 +2310,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags},
{"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay},
{"tsmaDataDeleteMark", &tsmaDataDeleteMark},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"bypassFlag", &tsBypassFlag}};

if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/tsdb/tsdbCommit2.c
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
int64_t nRow = imem->nRow;
int64_t nDel = imem->nDel;

if (nRow == 0 && nDel == 0) {
if ((nRow == 0 && nDel == 0) || (tsBypassFlag & TSDB_BYPASS_RB_TSDB_COMMIT)) {
(void)taosThreadMutexLock(&tsdb->mutex);
tsdb->imem = NULL;
(void)taosThreadMutexUnlock(&tsdb->mutex);
Expand Down
4 changes: 4 additions & 0 deletions source/dnode/vnode/src/tsdb/tsdbMemTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi
tb_uid_t suid = pSubmitTbData->suid;
tb_uid_t uid = pSubmitTbData->uid;

if (tsBypassFlag & TSDB_BYPASS_RB_TSDB_WRITE_MEM) {
goto _err;
}

// create/get STbData to op
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
if (code) {
Expand Down
4 changes: 4 additions & 0 deletions source/dnode/vnode/src/vnd/vnodeSvr.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;

if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) {
return TSDB_CODE_MSG_PREPROCESSED;
}

SDecoder *pCoder = &(SDecoder){0};

if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
Expand Down
21 changes: 5 additions & 16 deletions source/libs/scheduler/src/schRemote.c
Original file line number Diff line number Diff line change
Expand Up @@ -1345,30 +1345,19 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}

#if 1
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);

if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
#else
if (TDMT_VND_SUBMIT != msgType) {
if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (TDMT_VND_SUBMIT == msgType)) {
taosMemoryFree(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
} else {
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);

if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
} else {
taosMemoryFree(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
}
#endif

return TSDB_CODE_SUCCESS;

Expand Down
66 changes: 66 additions & 0 deletions tests/army/alter/alterConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,70 @@ def alterSupportVnodes(self):
tdSql.query('show dnodes')
tdSql.checkData(0, 3, "64")

def checkKeyValue(self, res, key, value, ikey = 0, ival = 1):
result = False
for row in res:
if row[ikey] == key:
if row[ival] != value:
raise Exception(f"key:{key} value:{row[ival]} != {value}")
else:
tdLog.info(f"key:{key} value:{row[ival]} == {value}")
result = True
break
if not result:
raise Exception(f"key:{key} not found")

def alterBypassFlag(self):
"""Add test case for altering bypassFlag(TD-32907)
"""
tdSql.execute(f"drop database if exists db")
tdSql.execute(f"create database db")
tdSql.execute("use db")
self.checkKeyValue(tdSql.getResult("show local variables;"), "bypassFlag", "0")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "0", 1, 2)
tdSql.execute("alter local 'bypassFlag 1'")
self.checkKeyValue(tdSql.getResult("show local variables;"), "bypassFlag", "1")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "0", 1, 2)
tdSql.execute("create table stb0(ts timestamp, c0 int) tags(t0 int)")
tdSql.execute("create table ctb0 using stb0 tags(0)")
tdSql.execute("insert into ctb0 values(now, 1)")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.execute("alter local 'bypassFlag 0'")
tdSql.execute("alter all dnodes 'bypassFlag 2'")
self.checkKeyValue(tdSql.getResult("show local variables"), "bypassFlag", "0")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "2", 1, 2)
tdSql.execute("insert into ctb0 values(now, 2)")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.execute("alter all dnodes 'bypassFlag 4'")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "4", 1, 2)
tdSql.execute("insert into ctb0 values(now, 4)")
tdSql.execute("insert into ctb1 using stb0 tags(1) values(now, 10)")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.query("show db.tables")
tdSql.checkRows(2)
tdSql.execute("alter all dnodes 'bypassFlag 8'")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "8", 1, 2)
tdSql.execute("insert into ctb0 values(now, 8)")
tdSql.execute("insert into ctb1 values(now, 18)")
tdSql.query("select * from stb0")
tdSql.checkRows(2)
tdSql.execute("flush database db")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.execute("alter all dnodes 'bypassFlag 0'")
self.checkKeyValue(tdSql.getResult("show local variables"), "bypassFlag", "0")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "0", 1, 2)
tdSql.execute("insert into ctb0 values(now, 80)")
tdSql.execute("insert into ctb1 values(now, 180)")
tdSql.query("select * from stb0")
tdSql.checkRows(2)
tdSql.execute("flush database db")
tdSql.query("select * from stb0")
tdSql.checkRows(2)

# run
def run(self):
tdLog.debug(f"start to excute {__file__}")
Expand All @@ -110,6 +174,8 @@ def run(self):
self.alterTtlConfig()
# TS-5390
self.alterCachemodel()
# TD-32907
self.alterBypassFlag()

tdLog.success(f"{__file__} successfully executed")

Expand Down

0 comments on commit a4b6d9d

Please sign in to comment.