From 2497bff6f56c048a316f22d740171180ef68fc49 Mon Sep 17 00:00:00 2001 From: lsy1990 Date: Wed, 7 Sep 2022 13:36:00 +0800 Subject: [PATCH] add rocketmq plugin based on rocketmq console (#171) * add rocketmq plugin based on rocketmq console * refector rocketmq export * refector to rocketmq_diff * rename to rocketmq_offset --- agent/agent.go | 1 + conf/input.rocketmq_diff/rocketmq_diff.toml | 7 + inputs/rocketmq_offset/model.go | 91 ++++++ inputs/rocketmq_offset/rocketmq.go | 310 ++++++++++++++++++++ 4 files changed, 409 insertions(+) create mode 100644 conf/input.rocketmq_diff/rocketmq_diff.toml create mode 100644 inputs/rocketmq_offset/model.go create mode 100644 inputs/rocketmq_offset/rocketmq.go diff --git a/agent/agent.go b/agent/agent.go index b0e244ad..fd95341e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -48,6 +48,7 @@ import ( _ "flashcat.cloud/categraf/inputs/rabbitmq" _ "flashcat.cloud/categraf/inputs/redis" _ "flashcat.cloud/categraf/inputs/redis_sentinel" + _ "flashcat.cloud/categraf/inputs/rocketmq_offset" _ "flashcat.cloud/categraf/inputs/switch_legacy" _ "flashcat.cloud/categraf/inputs/system" _ "flashcat.cloud/categraf/inputs/tomcat" diff --git a/conf/input.rocketmq_diff/rocketmq_diff.toml b/conf/input.rocketmq_diff/rocketmq_diff.toml new file mode 100644 index 00000000..2be1caed --- /dev/null +++ b/conf/input.rocketmq_diff/rocketmq_diff.toml @@ -0,0 +1,7 @@ +# # collect interval +# interval = 15 + + [[instances]] +# rocketmq_console_ip_port= +# ignored_topics=[] + diff --git a/inputs/rocketmq_offset/model.go b/inputs/rocketmq_offset/model.go new file mode 100644 index 00000000..9468b8d3 --- /dev/null +++ b/inputs/rocketmq_offset/model.go @@ -0,0 +1,91 @@ +package rocketmq_offset + +type TopicListData struct { + TopicList []string `json:"topicList"` + BrokerAddr string `json:"brokerAddr"` +} + +type TopicList struct { + Status int `json:"status"` + Data TopicListData `json:"data"` + ErrMsg string `json:"errMsg"` +} + +//(2).get consumerList by topic + +type ConsumerListByTopic struct { + Status int `json:"status"` + ErrMsg string `json:"errMsg"` + Data map[string]TopicGroup `json:"data"` +} + +type TopicGroup struct { + Topic string `json:"topic"` + DiffTotal int `json:"diffTotal"` + LastTimestamp int64 `json:"lastTimestamp"` + QueueStatInfoList []QueueStatInfoList `json:"queueStatInfoList"` +} + +type QueueStatInfoList struct { + BrokerName string `json:"brokerName"` + QueueId int `json:"queueId"` + BrokerOffset int64 `json:"brokerOffset"` + ConsumerOffset int64 `json:"consumerOffset"` + ClientInfo string `json:"clientInfo"` + LastTimestamp int64 `json:"lasttimestamp"` +} + +//(3).mode for prometheus metrics + +type MsgDiff struct { + MsgDiffDetails []*MsgDiffDetail `json:"msg_diff_details"` + MsgDiffTopics map[string]*MsgDiffTopic `json:"msg_diff_topics"` + MsgDiffConsumerGroups map[string]*MsgDiffConsumerGroup `json:"msg_diff_consumergroups"` + MsgDiffTopics_ConsumerGroups map[string]*MsgDiffTopicConsumerGroup `json:"msg_diff_topics_consumergroups"` + MsgDiffBrokers map[string]*MsgDiffBroker `json:"msg_diff_brokers"` + MsgDiffQueues map[string]*MsgDiffQueue `json:"msg_diff_queues"` + MsgDiffClientInfos map[string]*MsgDiffClientInfo `json:"msg_diff_clientinfos"` +} + +type MsgDiffDetail struct { + Broker string `json:"broker"` + QueueId int `json:"queueId"` + ConsumerClientIP string `json:"consumerClientIP"` + ConsumerClientPID string `json:"consumerClientPID"` + Diff int `json:"diff"` + Topic string `json:"topic"` + ConsumerGroup string `json:"consumerGroup"` +} + +type MsgDiffTopic struct { + Diff int `json:"diff"` + Topic string `json:"topic"` +} + +type MsgDiffConsumerGroup struct { + Diff int `json:"diff"` + ConsumerGroup string `json:"consumerGroup"` +} + +type MsgDiffTopicConsumerGroup struct { + Diff int `json:"diff"` + Topic string `json:"topic"` + ConsumerGroup string `json:"consumerGroup"` +} + +type MsgDiffBroker struct { + Broker string `json:"broker"` + Diff int `json:"diff"` +} + +type MsgDiffQueue struct { + Broker string `json:"broker"` + QueueId int `json:"queueId"` + Diff int `json:"diff"` +} + +type MsgDiffClientInfo struct { + ConsumerClientIP string `json:"consumerClientIP"` + ConsumerClientPID string `json:"consumerClientPID"` + Diff int `json:"diff"` +} diff --git a/inputs/rocketmq_offset/rocketmq.go b/inputs/rocketmq_offset/rocketmq.go new file mode 100644 index 00000000..22953425 --- /dev/null +++ b/inputs/rocketmq_offset/rocketmq.go @@ -0,0 +1,310 @@ +package rocketmq_offset + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "strings" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/inputs" + "flashcat.cloud/categraf/pkg/choice" + "flashcat.cloud/categraf/types" +) + +const inputName = "rocketmq_offset" + +const consoleSchema string = "http://" +const topicNameListPath string = "/topic/list.query" +const queryConsumerByTopicPath string = "/topic/queryConsumerByTopic.query?topic=" + +type RocketMQ struct { + config.PluginConfig + Instances []*Instance `toml:"instances"` +} + +func init() { + inputs.Add(inputName, func() inputs.Input { + return &RocketMQ{} + }) +} + +func (pt *RocketMQ) GetInstances() []inputs.Instance { + ret := make([]inputs.Instance, len(pt.Instances)) + for i := 0; i < len(pt.Instances); i++ { + ret[i] = pt.Instances[i] + } + return ret +} + +type Instance struct { + config.InstanceConfig + IgnoredTopics []string `toml:"ignored_topics"` + RocketMQConsoleIPAndPort string `toml:"rocketmq_console_ip_port"` +} + +func (ins *Instance) Gather(slist *types.SampleList) { + //获取rocketmq集群中的topicNameList + topicNameArray := GetTopicNameList(ins.RocketMQConsoleIPAndPort) + if topicNameArray == nil { + log.Println("E! fail to get topic,please check config!") + return + } + + //按照topic聚合msgDiff + var diff_Topic_Map = make(map[string]*MsgDiffTopic) + + //按照consumerGroup聚合msgDiff + //var diff_ConsumerGroup_Slice []model.MsgDiff_ConsumerGroup = []model.MsgDiff_ConsumerGroup{} + var diff_ConsumerGroup_Map = make(map[string]*MsgDiffConsumerGroup) + + //按照topic, consumeGroup聚合msgDiff + //var diff_Topic_ConsumerGroup_Slice []model.MsgDiff_Topics_ConsumerGroup = []model.MsgDiff_Topics_ConsumerGroup{} + var diff_Topic_ConsumerGroup_Map = make(map[string]*MsgDiffTopicConsumerGroup) + + //按照broker聚合msgDiff + //var diff_Broker_Slice []model.MsgDiff_Broker = []model.MsgDiff_Broker{} + var diff_Broker_Map = make(map[string]*MsgDiffBroker) + + //按照clientInfo聚合msgDiff + //var diff_Clientinfo_Slice []model.MsgDiff_ClientInfo = []model.MsgDiff_ClientInfo{} + var diff_Clientinfo_Map = make(map[string]*MsgDiffClientInfo) + + //按照queue聚合msgDiff + //var MsgDiff_Queue_Slice []model.MsgDiff_Queue = []model.MsgDiff_Queue{} + var diff_Queue_Map = make(map[string]*MsgDiffQueue) + + for i := range topicNameArray { + var topicName = topicNameArray[i] + isContain := choice.Contains(topicName, ins.IgnoredTopics) + if isContain { + continue + } + + var data *ConsumerListByTopic = GetConsumerListByTopic(ins.RocketMQConsoleIPAndPort, topicName) + + if data == nil { + continue + } + + topicConsumerGroups := data.Data + + for cgName, consumerInfo := range topicConsumerGroups { + topic := consumerInfo.Topic + + //获取当前consumer信息及对应的rocketmq-queue的信息 + queueStatInfoList := consumerInfo.QueueStatInfoList + + for i := range queueStatInfoList { + + queue := queueStatInfoList[i] + + brokerName := queue.BrokerName + queueId := queue.QueueId + + clientInfo := queue.ClientInfo + consumerClientIP := "" + consumerClientPID := "" + if &clientInfo != nil { + temp_array := strings.Split(clientInfo, "@") + if temp_array != nil { + if len(temp_array) == 1 { + consumerClientIP = temp_array[0] + } else if len(temp_array) == 2 { + consumerClientIP = temp_array[0] + consumerClientPID = temp_array[1] + } + } + } + + diff := int(queue.BrokerOffset) - int(queue.ConsumerOffset) + + tags := map[string]string{ + "BrokerName": brokerName, + "QueueId": fmt.Sprint(queueId), + "ConsumerClientIP": consumerClientIP, + "ConsumerClientPID": consumerClientPID, + "Topic": topic, + "ConsumerGroup": cgName, + } + slist.PushSample(inputName, "diffDetail", diff, tags) + + //按照topic进行msgDiff聚合 + if _, ok := diff_Topic_Map[topic]; ok { + //如果已经存在,计算diff + diff_Topic_Map[topic].Diff = diff_Topic_Map[topic].Diff + diff + } else { + var diffTopic *MsgDiffTopic = new(MsgDiffTopic) + + diffTopic.Diff = diff + diffTopic.Topic = topic + + diff_Topic_Map[topic] = diffTopic + } + + //按照consumerGroup进行msgDiff聚合 + if _, ok := diff_ConsumerGroup_Map[cgName]; ok { + diff_ConsumerGroup_Map[cgName].Diff = diff_ConsumerGroup_Map[cgName].Diff + diff + } else { + var diffConsumerGroup *MsgDiffConsumerGroup = new(MsgDiffConsumerGroup) + + diffConsumerGroup.ConsumerGroup = cgName + diffConsumerGroup.Diff = diff + + diff_ConsumerGroup_Map[cgName] = diffConsumerGroup + } + + //按照topic, consumerGroup进行msgDiff聚合 + topic_cgName := topic + ":" + cgName + if _, ok := diff_Topic_ConsumerGroup_Map[topic_cgName]; ok { + diff_Topic_ConsumerGroup_Map[topic_cgName].Diff = diff_Topic_ConsumerGroup_Map[topic_cgName].Diff + diff + + } else { + var diff_topic_cg *MsgDiffTopicConsumerGroup = new(MsgDiffTopicConsumerGroup) + + diff_topic_cg.ConsumerGroup = cgName + diff_topic_cg.Diff = diff + diff_topic_cg.Topic = topic + + diff_Topic_ConsumerGroup_Map[topic_cgName] = diff_topic_cg + + } + + //按照broker进行msgDiff聚合 + if _, ok := diff_Broker_Map[brokerName]; ok { + diff_Broker_Map[brokerName].Diff = diff_Broker_Map[brokerName].Diff + diff + } else { + var diff_Broker *MsgDiffBroker = new(MsgDiffBroker) + + diff_Broker.Broker = brokerName + diff_Broker.Diff = diff + + diff_Broker_Map[brokerName] = diff_Broker + } + + //按照queueId进行msgDiff聚合 + queuestr := brokerName + ":" + string(queueId) + if _, ok := diff_Queue_Map[string(queueId)]; ok { + diff_Queue_Map[queuestr].Diff = diff_Queue_Map[queuestr].Diff + diff + } else { + var diff_Queue *MsgDiffQueue = new(MsgDiffQueue) + + diff_Queue.Broker = brokerName + diff_Queue.Diff = diff + diff_Queue.QueueId = queueId + + diff_Queue_Map[queuestr] = diff_Queue + } + + //按照clientInfo进行msgDiff聚合 + + if _, ok := diff_Clientinfo_Map[clientInfo]; ok { + diff_Clientinfo_Map[clientInfo].Diff = diff_Clientinfo_Map[clientInfo].Diff + diff + } else { + var diff_ClientInfo *MsgDiffClientInfo = new(MsgDiffClientInfo) + + diff_ClientInfo.ConsumerClientIP = consumerClientIP + diff_ClientInfo.ConsumerClientPID = consumerClientPID + diff_ClientInfo.Diff = diff + + diff_Clientinfo_Map[clientInfo] = diff_ClientInfo + } + + } + } + + } + for topic, value := range diff_Topic_Map { + tags := map[string]string{ + "Topic": topic, + } + slist.PushSample(inputName, "diffTopic", value.Diff, tags) + } + for ConsumerGroup, value := range diff_ConsumerGroup_Map { + tags := map[string]string{ + "ConsumerGroup": ConsumerGroup, + } + slist.PushSample(inputName, "diffConsumerGroup", value.Diff, tags) + } + + for topic_cgName, value := range diff_Topic_ConsumerGroup_Map { + tags := map[string]string{ + "Topic": strings.Split(topic_cgName, ":")[0], + "ConsumerGroup": strings.Split(topic_cgName, ":")[1], + } + slist.PushSample(inputName, "diffTopicConsumerGroup", value.Diff, tags) + } + for broker, value := range diff_Broker_Map { + tags := map[string]string{ + "Broker": broker, + } + slist.PushSample(inputName, "diffBroker", value.Diff, tags) + } + for queuestr, value := range diff_Queue_Map { + tags := map[string]string{ + "Broker": strings.Split(queuestr, ":")[0], + "QueueId": strings.Split(queuestr, ":")[1], + } + slist.PushSample(inputName, "diffBrokerQueue", value.Diff, tags) + } + for _, value := range diff_Clientinfo_Map { + tags := map[string]string{ + "ConsumerClientIP": value.ConsumerClientIP, + "ConsumerClientPID": value.ConsumerClientPID, + } + slist.PushSample(inputName, "diffClientInfo", value.Diff, tags) + } + +} + +func GetTopicNameList(rocketmqConsoleIPAndPort string) []string { + var url = consoleSchema + rocketmqConsoleIPAndPort + topicNameListPath + var content = doRequest(url) + + var jsonData TopicList + err := json.Unmarshal([]byte(content), &jsonData) + + if err != nil { + log.Println("E! unable to decode topic name list", err) + return nil + } + + return jsonData.Data.TopicList +} + +func GetConsumerListByTopic(rocketmqConsoleIPAndPort string, topicName string) *ConsumerListByTopic { + var url = consoleSchema + rocketmqConsoleIPAndPort + queryConsumerByTopicPath + topicName + var content = doRequest(url) + + var jsonData *ConsumerListByTopic + err := json.Unmarshal([]byte(content), &jsonData) + + if err != nil { + return nil + } + + return jsonData +} + +func doRequest(url string) []byte { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil + } + + defer res.Body.Close() + body, err := ioutil.ReadAll(res.Body) + + if err != nil { + log.Println("E! fail to read request data", err) + return nil + } else { + return body + } +}