Skip to content

Commit

Permalink
fix: ut issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dapan1121 committed Nov 7, 2024
1 parent c5c85ef commit dd2ab5b
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 16 deletions.
5 changes: 5 additions & 0 deletions include/libs/qworker/qworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64

int32_t qWorkerDbgEnableDebug(char *option);

void qWorkerRetireJob(uint64_t jobId, int32_t errCode);

void qWorkerRetireJobs(int64_t retireSize, int32_t errCode);


#ifdef __cplusplus
}
#endif
Expand Down
1 change: 1 addition & 0 deletions source/dnode/mgmt/exe/dmMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#endif
#include "dmUtil.h"
#include "tcs.h"
#include "qworker.h"

#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h"
Expand Down
2 changes: 1 addition & 1 deletion source/libs/qworker/src/qwMem.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
char id[sizeof(tId) + sizeof(eId)] = {0};
QW_SET_TEID(id, tId, eId);

QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo));
QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo));

code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
Expand Down
4 changes: 2 additions & 2 deletions source/util/src/tmempool.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ int32_t mpUpdateCfg(SMemPool* pPool) {
}

uDebug("memPool %s cfg updated, reserveSize:%dMB, jobQuota:%dMB, threadNum:%d",
pPool->name, pPool->cfg.reserveSize, pPool->cfg.jobQuota, pPool->cfg.threadNum);
pPool->name, *pPool->cfg.reserveSize, *pPool->cfg.jobQuota, pPool->cfg.threadNum);

return TSDB_CODE_SUCCESS;
}
Expand Down Expand Up @@ -313,7 +313,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((atomic_load_32(pPool->cfg.reserveSize) * 1048576UL) + size)) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB",
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize);
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, *pPool->cfg.reserveSize);
pPool->cfg.cb.reachFp(pJob->job.jobId, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);
Expand Down
27 changes: 14 additions & 13 deletions source/util/test/memPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "tdef.h"
#include "tvariant.h"
#include "stub.h"
#include "../inc/tmempoolInt.h"


namespace {
Expand Down Expand Up @@ -155,9 +156,10 @@ typedef struct {
} SMPTestJobCtx;

typedef struct {
int64_t jobQuota;
int32_t jobQuota;
bool reserveMode;
int64_t upperLimitSize;
int32_t reserveSize;
int32_t threadNum;
int32_t randTask;
} SMPTestParam;
Expand Down Expand Up @@ -487,7 +489,7 @@ int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
return TSDB_CODE_SUCCESS;
}

void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) {
void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
uint64_t jobId = 0;
int64_t retiredSize = 0;
Expand All @@ -514,37 +516,36 @@ void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) {
}


void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &mpJob->jobId, sizeof(mpJob->jobId));
void mptRetireJobCb(uint64_t jobId, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &jobId, sizeof(jobId));
if (NULL == pJob) {
uError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId);
uError("QID:0x%" PRIx64 " fail to get job from job hash", jobId);
return;
}

if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
} else {
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
}

void mptInitPool(void) {
SMemPoolCfg cfg = {0};

cfg.reserveMode = mptCtx.param.reserveMode;
if (!mptCtx.param.reserveMode) {
cfg.upperLimitSize = mptCtx.param.upperLimitSize;
//cfg.upperLimitSize = mptCtx.param.upperLimitSize;
} else {
int64_t memSize = 0;
ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize));
cfg.reserveSize = memSize / 1048576UL * MP_DEFAULT_RESERVE_MEM_PERCENT / 100;
cfg.reserveSize = &mptCtx.param.reserveSize;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.chunkSize = 1048576;
cfg.jobQuota = mptCtx.param.jobQuota;
cfg.cb.retireJobsFp = mptRetireJobsCb;
cfg.cb.retireJobFp = mptRetireJobCb;
cfg.jobQuota = &mptCtx.param.jobQuota;
cfg.cb.failFp = mptRetireJobsCb;
cfg.cb.reachFp = mptRetireJobCb;

ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle));
}
Expand Down

0 comments on commit dd2ab5b

Please sign in to comment.