From dd2ab5b361fbc8e53acae34a6a60fe25d3de06e4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 7 Nov 2024 16:46:18 +0800 Subject: [PATCH] fix: ut issues --- include/libs/qworker/qworker.h | 5 +++++ source/dnode/mgmt/exe/dmMain.c | 1 + source/libs/qworker/src/qwMem.c | 2 +- source/util/src/tmempool.c | 4 ++-- source/util/test/memPoolTest.cpp | 27 ++++++++++++++------------- 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 83daf0376c08..0924c2ca0f9a 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -113,6 +113,11 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 int32_t qWorkerDbgEnableDebug(char *option); +void qWorkerRetireJob(uint64_t jobId, int32_t errCode); + +void qWorkerRetireJobs(int64_t retireSize, int32_t errCode); + + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 3ea1ecd7b42c..cd4bdc68e667 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -25,6 +25,7 @@ #endif #include "dmUtil.h" #include "tcs.h" +#include "qworker.h" #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index e15027cf9b7d..e508e63e67f6 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -115,7 +115,7 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { char id[sizeof(tId) + sizeof(eId)] = {0}; QW_SET_TEID(id, tId, eId); - QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo)); + QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo)); code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index c1b034042bc4..6d275c9b851c 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -168,7 +168,7 @@ int32_t mpUpdateCfg(SMemPool* pPool) { } uDebug("memPool %s cfg updated, reserveSize:%dMB, jobQuota:%dMB, threadNum:%d", - pPool->name, pPool->cfg.reserveSize, pPool->cfg.jobQuota, pPool->cfg.threadNum); + pPool->name, *pPool->cfg.reserveSize, *pPool->cfg.jobQuota, pPool->cfg.threadNum); return TSDB_CODE_SUCCESS; } @@ -313,7 +313,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((atomic_load_32(pPool->cfg.reserveSize) * 1048576UL) + size)) { code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB", - pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize); + pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, *pPool->cfg.reserveSize); pPool->cfg.cb.reachFp(pJob->job.jobId, code); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index dc22c4ee1146..f9bc0d67830e 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -37,6 +37,7 @@ #include "tdef.h" #include "tvariant.h" #include "stub.h" +#include "../inc/tmempoolInt.h" namespace { @@ -155,9 +156,10 @@ typedef struct { } SMPTestJobCtx; typedef struct { - int64_t jobQuota; + int32_t jobQuota; bool reserveMode; int64_t upperLimitSize; + int32_t reserveSize; int32_t threadNum; int32_t randTask; } SMPTestParam; @@ -487,7 +489,7 @@ int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { return TSDB_CODE_SUCCESS; } -void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) { +void mptRetireJobsCb(int64_t retireSize, int32_t errCode) { SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); uint64_t jobId = 0; int64_t retiredSize = 0; @@ -514,37 +516,36 @@ void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) { } -void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { - SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &mpJob->jobId, sizeof(mpJob->jobId)); +void mptRetireJobCb(uint64_t jobId, int32_t errCode) { + SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &jobId, sizeof(jobId)); if (NULL == pJob) { - uError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId); + uError("QID:0x%" PRIx64 " fail to get job from job hash", jobId); return; } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); + uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); } else { - uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); + uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); } } void mptInitPool(void) { SMemPoolCfg cfg = {0}; - cfg.reserveMode = mptCtx.param.reserveMode; if (!mptCtx.param.reserveMode) { - cfg.upperLimitSize = mptCtx.param.upperLimitSize; + //cfg.upperLimitSize = mptCtx.param.upperLimitSize; } else { int64_t memSize = 0; ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize)); - cfg.reserveSize = memSize / 1048576UL * MP_DEFAULT_RESERVE_MEM_PERCENT / 100; + cfg.reserveSize = &mptCtx.param.reserveSize; } cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO cfg.chunkSize = 1048576; - cfg.jobQuota = mptCtx.param.jobQuota; - cfg.cb.retireJobsFp = mptRetireJobsCb; - cfg.cb.retireJobFp = mptRetireJobCb; + cfg.jobQuota = &mptCtx.param.jobQuota; + cfg.cb.failFp = mptRetireJobsCb; + cfg.cb.reachFp = mptRetireJobCb; ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle)); }