From d9568d109249a40d4562f02e1ce65a6768cdd4aa Mon Sep 17 00:00:00 2001 From: Prometheus-collab Date: Thu, 26 Sep 2024 17:49:31 +0800 Subject: [PATCH] Providing a Common MQTT Mapper Signed-off-by: Prometheus-collab Signed-off-by: fuchendou --- mappers/mqtt-mapper/config.yaml | 4 +- mappers/mqtt-mapper/driver/devicetype.go | 11 +- mappers/mqtt-mapper/driver/driver.go | 441 ++++++++++++------ .../resource/mqttdevice-instance.yaml | 49 ++ .../resource/mqttdevice-model.yaml | 15 + 5 files changed, 381 insertions(+), 139 deletions(-) create mode 100644 mappers/mqtt-mapper/resource/mqttdevice-instance.yaml create mode 100644 mappers/mqtt-mapper/resource/mqttdevice-model.yaml diff --git a/mappers/mqtt-mapper/config.yaml b/mappers/mqtt-mapper/config.yaml index d456caad..43b00da0 100644 --- a/mappers/mqtt-mapper/config.yaml +++ b/mappers/mqtt-mapper/config.yaml @@ -1,9 +1,9 @@ grpc_server: socket_path: /etc/kubeedge/kubeedge-v1.17.0.sock common: - name: Kubeedge-V1.17.0-mapper + name: github.com/kubeedge/mqtt-mapper version: v1.13.0 api_version: v1.0.0 - protocol: # TODO add your protocol name + protocol: mqtt address: 127.0.0.1 edgecore_sock: /etc/kubeedge/dmi.sock diff --git a/mappers/mqtt-mapper/driver/devicetype.go b/mappers/mqtt-mapper/driver/devicetype.go index 0d01e426..9925b5d3 100644 --- a/mappers/mqtt-mapper/driver/devicetype.go +++ b/mappers/mqtt-mapper/driver/devicetype.go @@ -17,8 +17,8 @@ type CustomizedClient struct { // TODO add some variables to help you better implement device drivers deviceMutex sync.Mutex ProtocolConfig - DeviceInfo string `json:"deviceInfo"` - ParsedDeviceInfo map[string]interface{} `json:"parsedDeviceInfo"` + TempMessage string `json:"tempMessage"` + DeviceConfigData interface{} `json:"deviceConfigData"` } type ProtocolConfig struct { @@ -36,7 +36,6 @@ type ConfigData struct { Password string `json:"password"` // Password for MQTT broker authentication ConnectionTTL time.Duration `json:"connectionTTL"` // Connection timeout duration LastMessage time.Time `json:"lastMessage"` // Timestamp of the last received message - IsData bool `json:"isData"` // Indicates if there is valid data } type VisitorConfig struct { @@ -58,7 +57,10 @@ type VisitorConfigData struct { type OperationInfoType uint const ( - UPDATE OperationInfoType = iota // revision + DEVICEINfO OperationInfoType = iota // set global device config data + UPDATE // update the device config data + SETSINGLEVALUE // find the most related setting value and update + GETSINGLEVALUE // find the most related setting value ) // SerializedFormatType defines the enumeration values for serialized types. @@ -68,5 +70,4 @@ const ( JSON SerializedFormatType = iota // json YAML // yaml XML // xml - JSONPATH ) diff --git a/mappers/mqtt-mapper/driver/driver.go b/mappers/mqtt-mapper/driver/driver.go index 078a14de..9187cc7b 100644 --- a/mappers/mqtt-mapper/driver/driver.go +++ b/mappers/mqtt-mapper/driver/driver.go @@ -5,67 +5,125 @@ import ( "encoding/xml" "errors" "fmt" + "gopkg.in/yaml.v3" "reflect" "strconv" "strings" "sync" + "time" "github.com/kubeedge/mapper-framework/pkg/common" - "gopkg.in/yaml.v3" ) func NewClient(protocol ProtocolConfig) (*CustomizedClient, error) { client := &CustomizedClient{ - ProtocolConfig: protocol, - deviceMutex: sync.Mutex{}, - DeviceInfo: "", - ParsedDeviceInfo: make(map[string]interface{}), + ProtocolConfig: protocol, + deviceMutex: sync.Mutex{}, + TempMessage: "", + DeviceConfigData: nil, } return client, nil } func (c *CustomizedClient) InitDevice() error { configData := &c.ProtocolConfig.ConfigData - _, _, format, _ := configData.SplitTopic() - c.DeviceInfo = c.ProtocolConfig.ConfigData.Message - c.ParsedDeviceInfo, _ = c.ParseMessage(format) + _, operationInfo, _, err := configData.SplitTopic() + if operationInfo != DEVICEINfO { + return errors.New("This is not a device config.") + } + if err != nil { + return err + } + c.TempMessage = configData.Message return nil } func (c *CustomizedClient) GetDeviceData(visitor *VisitorConfig) (interface{}, error) { + configData := &c.ProtocolConfig.ConfigData + _, operationInfo, _, err := configData.SplitTopic() + if operationInfo != DEVICEINfO { + return nil, errors.New("This is not a device config.") + } + if err != nil { + return nil, err + } + visitor.ProcessOperation(c.DeviceConfigData) + return c.DeviceConfigData, nil +} - return nil, nil +func (c *CustomizedClient) SetDeviceData(visitor *VisitorConfig) error { + configData := &c.ProtocolConfig.ConfigData + _, operationInfo, _, err := configData.SplitTopic() + if operationInfo == DEVICEINfO { + return errors.New("This is a device config, not to set device data.") + } + if err != nil { + return err + } + visitor.ProcessOperation(c.DeviceConfigData) + return nil } -func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *VisitorConfig) error { - vPointer := visitor.VisitorConfigData - vPointer.ModifyVisitorConfigData(c.ParsedDeviceInfo) +func (c *CustomizedClient) StopDevice() error { + updateFieldsByTag(c.DeviceConfigData, map[string]interface{}{ + "status": common.DeviceStatusDisCONN, + "Status": common.DeviceStatusDisCONN, + }, "json") + updateFieldsByTag(c.DeviceConfigData, map[string]interface{}{ + "status": common.DeviceStatusDisCONN, + "Status": common.DeviceStatusDisCONN, + }, "yaml") + updateFieldsByTag(c.DeviceConfigData, map[string]interface{}{ + "status": common.DeviceStatusDisCONN, + "Status": common.DeviceStatusDisCONN, + }, "xml") return nil } -func (c *CustomizedClient) GetDeviceStates() (string, error) { - // TODO: GetDeviceStates - return common.DeviceStatusOK, nil +func (c *CustomizedClient) GetDeviceStates(visitor *VisitorConfig) (string, error) { + res, err := visitor.getFieldByTag(c.DeviceConfigData) + if err != nil { + return common.DeviceStatusOK, nil + } + return res, nil + } /* --------------------------------------------------------------------------------------- */ // The function NewConfigData is a constructor for ConfigData to initialize the structure. // It returns the ConfigData instance and an error value to handle the validity of the passed parameters. -func NewConfigData(clientID, topic, message string) (*ConfigData, error) { +func NewConfigData(clientID, brokerURL, topic, message, username, password string, connectionTTL time.Duration) (*ConfigData, error) { if clientID == "" { return nil, errors.New("clientID cannot be empty") } + if brokerURL == "" { + return nil, errors.New("borkerURL cannot be empty") + } if topic == "" { return nil, errors.New("topic cannot be empty") } if message == "" { - message = "default message" + return nil, errors.New("message cannot be empty") + } + if username == "" { + username = "defaultUser" + } + if password == "" { + password = "defaultPass" + } + if connectionTTL == 0 { + connectionTTL = 30 * time.Second // default timeout of 30 seconds } return &ConfigData{ - ClientID: clientID, - Topic: topic, - Message: message, + ClientID: clientID, + BrokerURL: brokerURL, + Topic: topic, + Message: message, + Username: username, + Password: password, + ConnectionTTL: connectionTTL, + LastMessage: time.Now(), // set last message time to current time }, nil } @@ -96,6 +154,9 @@ func (c *ConfigData) GetMessage() (string, error) { // OperationInfoType and SerializedFormatType mappings var operationTypeMap = map[string]OperationInfoType{ "update": UPDATE, + "deviceinfo": DEVICEINfO, + "setsinglevalue" : SETSINGLEVALUE, + "getsinglevalue" : GETSINGLEVALUE, } var serializedFormatMap = map[string]SerializedFormatType{ @@ -145,31 +206,13 @@ func (c *ConfigData) ParseMessage(parseType SerializedFormatType) (map[string]in switch parseType { case JSON: // json - return c.parseJSON() + return c.jsonParse() case YAML: // yaml - convertedMessage, err := convertYAMLToJSON(c.Message) - if err != nil { - return nil, err - } - c.Message = convertedMessage - return c.parseJSON() + return c.yamlParse() case XML: // xml - convertedMessage, err := convertXMLToMap(c.Message) - if err != nil { - return nil, err - } - // c.Message = convertedMessage - originalMap := convertedMessage - var mp map[string]interface{} - for _, value := range originalMap { - if nestedMap, ok := value.(map[string]interface{}); ok { - mp = nestedMap - break - } - } - return mp, err + return c.xmlParse() default: return nil, errors.New("unsupported parse type") @@ -177,67 +220,123 @@ func (c *ConfigData) ParseMessage(parseType SerializedFormatType) (map[string]in } // The function parseJSON parses the Message field of the ConfigData (assumed to be a JSON string). -func (c *ConfigData) parseJSON() (map[string]interface{}, error) { +func (c *ConfigData) jsonParse() (map[string]interface{}, error) { if c.Message == "" { return nil, errors.New("message is empty") } - var result map[string]interface{} - err := json.Unmarshal([]byte(c.Message), &result) + var jsonMsg map[string]interface{} + err := json.Unmarshal([]byte(c.Message), &jsonMsg) if err != nil { return nil, err } - return result, nil + return jsonMsg, nil } -// The function ValidateMessage checks if the message content is valid. -func (c *ConfigData) ValidateMessage() error { +// The function parseYAML parses the Message field of the ConfigData (assumed to be a YAML string). +func (c *ConfigData)yamlParse() (map[string]interface{}, error) { if c.Message == "" { - return errors.New("message is empty") + return nil, errors.New("message is empty") } - // Example: Check if the message is valid JSON (you can expand for other formats) - var temp map[string]interface{} - if err := json.Unmarshal([]byte(c.Message), &temp); err != nil { - return errors.New("message is not valid JSON") + var yamlMsg map[string]interface{} + err := yaml.Unmarshal([]byte(c.Message), &yamlMsg) + if err != nil { + return nil, err } - - return nil + return yamlMsg, nil } -// NewVisitorConfigData creates a new instance of VisitorConfigData using ConfigData pointer and the result of SplitTopic. -func (c *ConfigData) NewVisitorConfigData() (*VisitorConfigData, error) { - // get ClientID - clientID, err := c.GetClientID() - if err != nil { - return nil, err +// The function xmlParse parses the Message field of the ConfigData (assumed to be a XML string). +func (c *ConfigData)xmlParse() (map[string]interface{}, error) { + msg := c.Message + if strings.HasPrefix(msg, "") + if end != -1 { + msg = msg[end+2:] + } } - // get DeviceInfo, OperationInfo and SerializedFormat - deviceInfo, operationInfo, serializedFormat, err := c.SplitTopic() + var node Node + err := xml.Unmarshal([]byte(msg), &node) if err != nil { return nil, err } - // get ParsedMessage - parsedMessage, err := c.ParseMessage(serializedFormat) - if err != nil { - return nil, err + xmlMsg := nodeToMap(node) + var mp map[string]interface{} + for _, value := range xmlMsg { + if nestedMap, ok := value.(map[string]interface{}); ok { + mp = nestedMap + break + } } + return mp, err +} - // create - return &VisitorConfigData{ - DataType: "string", - ClientID: clientID, - DeviceInfo: deviceInfo, - OperationInfo: operationInfo, - SerializedFormat: serializedFormat, - ParsedMessage: parsedMessage, +// NewVisitorConfig creates a new instance of VisitorConfig using ConfigData pointer and the result of SplitTopic. +func (c *ConfigData) NewVisitorConfig() (*VisitorConfig, error) { + // get ClientID + clientID, err := c.GetClientID() + if err != nil { + return nil, err + } + + // get DeviceInfo, OperationInfo and SerializedFormat + deviceInfo, operationInfo, serializedFormat, err := c.SplitTopic() + if err != nil { + return nil, err + } + + // get ParsedMessage + parsedMessage, err := c.ParseMessage(serializedFormat) + if err != nil { + return nil, err + } + + // create + return &VisitorConfig{ + ProtocolName: "mqtt", + VisitorConfigData: VisitorConfigData{ + DataType: "DefaultDataType", + ClientID: clientID, + DeviceInfo: deviceInfo, + OperationInfo: operationInfo, + SerializedFormat: serializedFormat, + ParsedMessage: parsedMessage, + }, }, nil } /* --------------------------------------------------------------------------------------- */ -func (v *VisitorConfigData) ModifyVisitorConfigData(destDataConfig interface{}) error { +// The function ParseMessage parses the Message field according to the incoming type. +// parseType(0: json, 1: yaml, 2: xml) +// The value interface{} represents the parsed structure. +func (v *VisitorConfig) ProcessOperation(deviceConfigData interface{}) error { + if v.VisitorConfigData.ParsedMessage == nil { + return errors.New("visitor message is empty") + } + + if deviceConfigData == nil { + return errors.New("device message is empty") + } + + switch v.VisitorConfigData.OperationInfo { + case DEVICEINfO: // device config data + v.updateFullConfig(deviceConfigData) + return nil + case UPDATE: // update the full text according the visitor config and the tag (json, yaml, xml) + v.updateFullConfig(deviceConfigData) + return nil + case SETSINGLEVALUE: // update the single value according the visitor config and the tag (json, yaml, xml) + v.updateFieldsByTag(deviceConfigData) + return nil + default: + return errors.New("unsupported operation type") + } +} + +func (v *VisitorConfig) updateFullConfig(destDataConfig interface{}) error { destValue := reflect.ValueOf(destDataConfig) if destValue.Kind() != reflect.Ptr || destValue.Elem().Kind() != reflect.Struct { return errors.New("destDataConfig must be a pointer to a struct") @@ -246,7 +345,7 @@ func (v *VisitorConfigData) ModifyVisitorConfigData(destDataConfig interface{}) destValue = destValue.Elem() var tagName string - switch v.SerializedFormat { + switch v.VisitorConfigData.SerializedFormat { case JSON: tagName = "json" case YAML: @@ -258,13 +357,37 @@ func (v *VisitorConfigData) ModifyVisitorConfigData(destDataConfig interface{}) } // Update the destination struct using JSON tag - if err := updateStructFields(destValue, v.ParsedMessage, tagName); err != nil { + if err := updateStructFields(destValue, v.VisitorConfigData.ParsedMessage, tagName); err != nil { return err } return nil } +func (v *VisitorConfig)updateFieldsByTag(destDataConfig interface{}) error { + vv := reflect.ValueOf(destDataConfig).Elem() + + var tagName string + switch v.VisitorConfigData.SerializedFormat { + case JSON: + tagName = "json" + case YAML: + tagName = "yaml" + case XML: + tagName = "xml" + default: + return errors.New("unknown serialized format") + } + + for key, value := range v.VisitorConfigData.ParsedMessage { + if err := setFieldByTag(vv, key, value, tagName); err != nil { + return err + } + } + return nil +} + +/* --------------------------------------------------------------------------------------- */ // updateStructFields recursively updates struct fields from the given map using specified tag type func updateStructFields(structValue reflect.Value, data map[string]interface{}, tagName string) error { structType := structValue.Type() @@ -328,53 +451,6 @@ func updateStructFields(structValue reflect.Value, data map[string]interface{}, return nil } -/* --------------------------------------------------------------------------------------- */ -// The function ConvertYAMLToJSON converts a YAML string to a JSON string. -func convertYAMLToJSON(yamlString string) (string, error) { - // Converting a YAML string to a generic map object - var yamlData map[string]interface{} - err := yaml.Unmarshal([]byte(yamlString), &yamlData) - if err != nil { - return "", err - } - - // Convert a map object to a JSON string - jsonData, err := json.Marshal(yamlData) - if err != nil { - return "", err - } - - return string(jsonData), nil -} - -// The function ConvertXMLToMap converts XML string to map[string]interface{}. -func convertXMLToMap(xmlString string) (map[string]interface{}, error) { - // Wrap the XML content with - wrappedXML := wrapXMLWithRoot(xmlString) - - var node Node - err := xml.Unmarshal([]byte(wrappedXML), &node) - if err != nil { - return nil, err - } - return nodeToMap(node), nil -} - -// The function WrapXMLWithRoot wraps XML strings in tags. -func wrapXMLWithRoot(xmlString string) string { - // Remove the XML declaration if it exists - if strings.HasPrefix(xmlString, "") - if end != -1 { - xmlString = xmlString[end+2:] - } - } - - // Wrap the remaining XML content with - wrappedXML := xmlString - return wrappedXML -} - // Node structure type Node struct { XMLName xml.Name @@ -396,31 +472,74 @@ func convertValue(content string) interface{} { } } +// Convert XML attributes to map entries +func attrsToMap(attrs []xml.Attr) map[string]interface{} { + attrMap := make(map[string]interface{}) + for _, attr := range attrs { + attrMap[attr.Name.Local] = attr.Value + } + return attrMap +} + // The function nodeToMap recursively converts XML nodes to map[string]interface{}. func nodeToMap(node Node) map[string]interface{} { - result := make(map[string]interface{}) + xmlMsg := make(map[string]interface{}) + + // Process attributes + if len(node.Attr) > 0 { + xmlMsg["attributes"] = attrsToMap(node.Attr) + } // If the node has no children, it is a leaf node, apply type conversion. if len(node.Nodes) == 0 { - return map[string]interface{}{node.XMLName.Local: convertValue(strings.TrimSpace(node.Content))} + xmlMsg[node.XMLName.Local] = convertValue(strings.TrimSpace(node.Content)) + return xmlMsg } // Process child nodes recursively. + children := make(map[string]interface{}) for _, child := range node.Nodes { childMap := nodeToMap(child) - if existing, found := result[child.XMLName.Local]; found { + if existing, found := children[child.XMLName.Local]; found { switch v := existing.(type) { case []interface{}: - result[child.XMLName.Local] = append(v, childMap[child.XMLName.Local]) + children[child.XMLName.Local] = append(v, childMap[child.XMLName.Local]) default: - result[child.XMLName.Local] = []interface{}{v, childMap[child.XMLName.Local]} + children[child.XMLName.Local] = []interface{}{v, childMap[child.XMLName.Local]} } } else { - result[child.XMLName.Local] = childMap[child.XMLName.Local] + children[child.XMLName.Local] = childMap[child.XMLName.Local] } } - return map[string]interface{}{node.XMLName.Local: result} + xmlMsg[node.XMLName.Local] = children + return xmlMsg +} + +func setFieldByTag(v reflect.Value, key string, value interface{}, tagName string) error { + if v.Kind() == reflect.Pointer { + v = v.Elem() + } + for i := 0; i < v.NumField(); i++ { + field := v.Type().Field(i) + fieldVal := v.Field(i) + + if field.Tag.Get(tagName) == key { + val := reflect.ValueOf(value) + if fieldVal.Type() != val.Type() { + return fmt.Errorf("type mismatch: cannot assign %s to %s", val.Type(), fieldVal.Type()) + } + fieldVal.Set(val) + return nil + } + + if fieldVal.Kind() == reflect.Struct { + if err := setFieldByTag(fieldVal, key, value, tagName); err == nil { + return nil + } + } + } + return fmt.Errorf("no such field with tag: %s", key) } // The function MapToJSON converts map[string]interface{} to JSON string. @@ -440,4 +559,62 @@ func StructToJSON(v interface{}) (string, error) { return string(jsonData), nil } -/* --------------------------------------------------------------------------------------- */ +func updateFieldsByTag(s interface{}, updates map[string]interface{}, tagName string) error { + v := reflect.ValueOf(s).Elem() + for key, value := range updates { + if err := setFieldByTag(v, key, value, tagName); err != nil { + return err + } + } + return nil +} + +func (v * VisitorConfig)getFieldByTag(s interface{}) (string, error) { + vv := reflect.ValueOf(s).Elem() + + var tagName string + switch v.VisitorConfigData.SerializedFormat { + case JSON: + tagName = "json" + case YAML: + tagName = "yaml" + case XML: + tagName = "xml" + default: + return "", errors.New("unknown serialized format") + } + + res, err := findFieldByTag(vv, "status", tagName) + if err != nil { + res, err = findFieldByTag(vv, "Status", tagName) + if err != nil { + return "", err + } else { + return res, nil + } + } else { + return res, nil + } +} + +func findFieldByTag(v reflect.Value, key string, tagName string) (string, error) { + if v.Kind() == reflect.Pointer { + v = v.Elem() + } + for i := 0; i < v.NumField(); i++ { + field := v.Type().Field(i) + fieldVal := v.Field(i) + + if field.Tag.Get(tagName) == key { + return fieldVal.String(), nil + } + + if fieldVal.Kind() == reflect.Struct { + if value, err := findFieldByTag(fieldVal, key, tagName); err == nil { + return value, nil + } + } + } + return "", fmt.Errorf("no such field with tag: %s", key) +} +/* --------------------------------------------------------------------------------------- */ \ No newline at end of file diff --git a/mappers/mqtt-mapper/resource/mqttdevice-instance.yaml b/mappers/mqtt-mapper/resource/mqttdevice-instance.yaml new file mode 100644 index 00000000..8bd91b8e --- /dev/null +++ b/mappers/mqtt-mapper/resource/mqttdevice-instance.yaml @@ -0,0 +1,49 @@ +apiVersion: devices.kubeedge.io/v1beta1 + kind: Device + metadata: + name: beta1-device + spec: + deviceModelRef: + name: temperture-model + nodeName: k8s-worker1 + properties: + - name: temperature + collectCycle: 10000000000 # The frequency of reporting data to the cloud, once every 10 seconds + reportCycle: 10000000000 # The frequency of data push to user applications or databases, once every 10 seconds + reportToCloud: true + desired: + value: "30" + pushMethod: + mqtt: + address: tcp://101.133.150.110:1883 + topic: temperture/update/json + qos: 0 + retained: false + dbMethod: + influxdb2: + influxdb2ClientConfig: + url: http://127.0.0.1:8086 + org: test-org + bucket: test-bucket + influxdb2DataConfig: + measurement: temperture_stats + tag: + unit: temperature + fieldKey: temperture_value + visitors: + protocolName: mqtt + configData: + topic: "sensor/data" + qos: 1 + retain: false + clientId: "temperture_client" + username: "user" + password: "pass" + cleanSession: true + keepAlive: 60 + + protocol: + protocolName: mqtt + configData: + ip: 101.133.150.110 + port: 1883 \ No newline at end of file diff --git a/mappers/mqtt-mapper/resource/mqttdevice-model.yaml b/mappers/mqtt-mapper/resource/mqttdevice-model.yaml new file mode 100644 index 00000000..5a601acf --- /dev/null +++ b/mappers/mqtt-mapper/resource/mqttdevice-model.yaml @@ -0,0 +1,15 @@ + apiVersion: devices.kubeedge.io/v1beta1 + kind: DeviceModel + metadata: + name: temperture-model + namespace: default + spec: + properties: + - name: temperture + description: Temperture sensor model + type: INT + accessMode: ReadWrite + maximum: "100" + minimum: "1" + unit: "Celsius" + protocol: mqtt \ No newline at end of file