Skip to content

Commit

Permalink
Merge pull request #28779 from taosdata/fix/3_liaohj
Browse files Browse the repository at this point in the history
fix(stream): update the msg encoder.
  • Loading branch information
guanshengliang authored Nov 19, 2024
2 parents 46e9382 + 4b47c4c commit 686c20c
Show file tree
Hide file tree
Showing 25 changed files with 790 additions and 396 deletions.
34 changes: 26 additions & 8 deletions include/libs/stream/streamMsg.h → include/common/streamMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@
#define TDENGINE_STREAMMSG_H

#include "tmsg.h"
#include "trpc.h"
//#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct SStreamRetrieveReq SStreamRetrieveReq;
typedef struct SStreamDispatchReq SStreamDispatchReq;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;

typedef struct SNodeUpdateInfo {
int32_t nodeId;
SEpSet prevEp;
SEpSet newEp;
} SNodeUpdateInfo;

typedef struct SStreamUpstreamEpInfo {
int32_t nodeId;
int32_t childId;
Expand Down Expand Up @@ -170,15 +181,18 @@ typedef struct SStreamHbMsg {
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;

int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);

typedef struct {
SMsgHead head;
int32_t msgId;
} SMStreamHbRspMsg;

int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp);
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp);

typedef struct SRetrieveChkptTriggerReq {
SMsgHead head;
int64_t streamId;
Expand All @@ -189,6 +203,9 @@ typedef struct SRetrieveChkptTriggerReq {
int64_t downstreamTaskId;
} SRetrieveChkptTriggerReq;

int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq);
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq);

typedef struct SCheckpointTriggerRsp {
int64_t streamId;
int64_t checkpointId;
Expand All @@ -198,6 +215,9 @@ typedef struct SCheckpointTriggerRsp {
int32_t rspCode;
} SCheckpointTriggerRsp;

int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp);
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp);

typedef struct SCheckpointReport {
int64_t streamId;
int32_t taskId;
Expand All @@ -222,7 +242,7 @@ typedef struct SRestoreCheckpointInfo {
int32_t nodeId;
} SRestoreCheckpointInfo;

int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);

typedef struct {
Expand All @@ -232,10 +252,8 @@ typedef struct {
int32_t reqType;
} SStreamTaskRunReq;

typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
int64_t ts;
} SCheckpointConsensusEntry;
int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq);
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq);

#ifdef __cplusplus
}
Expand Down
9 changes: 8 additions & 1 deletion include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -3798,7 +3798,14 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
} SVPauseStreamTaskReq;

typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
int64_t chkptId;
} SVResetStreamTaskReq;

typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
Expand Down
21 changes: 11 additions & 10 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,6 @@ typedef struct SSTaskBasicInfo {
SInterval interval;
} SSTaskBasicInfo;

typedef struct SStreamRetrieveReq SStreamRetrieveReq;
typedef struct SStreamDispatchReq SStreamDispatchReq;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;

typedef struct SDispatchMsgInfo {
SStreamDispatchReq* pData; // current dispatch data

Expand Down Expand Up @@ -626,11 +621,11 @@ typedef struct STaskStatusEntry {
STaskCkptInfo checkpointInfo;
} STaskStatusEntry;

typedef struct SNodeUpdateInfo {
int32_t nodeId;
SEpSet prevEp;
SEpSet newEp;
} SNodeUpdateInfo;
//typedef struct SNodeUpdateInfo {
// int32_t nodeId;
// SEpSet prevEp;
// SEpSet newEp;
//} SNodeUpdateInfo;

typedef struct SStreamTaskState {
ETaskStatus state;
Expand All @@ -643,6 +638,11 @@ typedef struct SCheckpointConsensusInfo {
int64_t streamId;
} SCheckpointConsensusInfo;

typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
int64_t ts;
} SCheckpointConsensusEntry;

void streamSetupScheduleTrigger(SStreamTask* pTask);

// dispatch related
Expand Down Expand Up @@ -718,6 +718,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pInfo, int32_t code);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId);

int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
Expand Down
1 change: 1 addition & 0 deletions include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109)

// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
Expand Down
3 changes: 3 additions & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
aux_source_directory(src COMMON_SRC)
aux_source_directory(src/msg COMMON_MSG_SRC)

LIST(APPEND COMMON_SRC ${COMMON_MSG_SRC})

if(TD_ENTERPRISE)
LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c)
Expand Down
Loading

0 comments on commit 686c20c

Please sign in to comment.