diff --git a/config/config.go b/config/config.go index 8086ba38..c0680890 100644 --- a/config/config.go +++ b/config/config.go @@ -39,6 +39,7 @@ type Config struct { GrpcServer GRPCServer `yaml:"grpc_server"` Common Common `yaml:"common"` DevInit DevInit `yaml:"dev_init"` + LogLevel string `yaml:"log_level"` } type GRPCServer struct { @@ -63,9 +64,13 @@ type DevInit struct { func (c *Config) Parse() error { var level klog.Level var loglevel string - var configFile string pflag.StringVar(&loglevel, "v", "1", "log level") + if err := level.Set(loglevel); err != nil { + return err + } + + var configFile string pflag.StringVar(&configFile, "config-file", defaultConfigFile, "Config file name") cf, err := ioutil.ReadFile(configFile) @@ -75,8 +80,10 @@ func (c *Config) Parse() error { if err = yaml.Unmarshal(cf, c); err != nil { return err } - if err = level.Set(loglevel); err != nil { - return err + if len(c.LogLevel) != 0 && c.LogLevel != "0" { + if serr := level.Set(c.LogLevel); serr != nil { + return serr + } } switch c.DevInit.Mode { diff --git a/config/config.yaml b/config/config.yaml index ca627482..56e7d609 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -10,3 +10,4 @@ common: dev_init: mode: register #register/configmap configmap: /opt/kubeedge/deviceProfile.json +log_level: 1 \ No newline at end of file diff --git a/mappers/modbus-dmi/config.yaml b/mappers/modbus-dmi/config.yaml index 19616cda..5fc4263a 100644 --- a/mappers/modbus-dmi/config.yaml +++ b/mappers/modbus-dmi/config.yaml @@ -9,3 +9,4 @@ common: edgecore_sock: /etc/kubeedge/dmi.sock dev_init: mode: register #register/configmap +log_level: 1 \ No newline at end of file diff --git a/mappers/modbus-dmi/device/device.go b/mappers/modbus-dmi/device/device.go index 657b5a48..cedb41e8 100644 --- a/mappers/modbus-dmi/device/device.go +++ b/mappers/modbus-dmi/device/device.go @@ -18,9 +18,11 @@ package device import ( "context" + "encoding/binary" "encoding/json" "errors" "fmt" + "math" "regexp" "strconv" "sync" @@ -51,17 +53,65 @@ func setVisitor(visitorConfig *modbus.ModbusVisitorConfig, twin *common.Twin, cl return } - klog.V(2).Infof("Convert type: %s, value: %s ", twin.PVisitor.PProperty.DataType, twin.Desired.Value) - value, err := common.Convert(twin.PVisitor.PProperty.DataType, twin.Desired.Value) - if err != nil { - klog.Errorf("Convert error: %v", err) - return - } - - valueInt, _ := value.(int64) - _, err = client.Set(visitorConfig.Register, visitorConfig.Offset, uint16(valueInt)) - if err != nil { - klog.Errorf("Set visitor error: %v %v", err, visitorConfig) + klog.Infof("Convert type: %s, value: %s ", twin.PVisitor.PProperty.DataType, twin.Desired.Value) + value := twin.Desired.Value + switch twin.PVisitor.PProperty.DataType { + case "int": + valueInt, err := strconv.ParseInt(value, 10, 64) + if err != nil { + klog.Errorf("twin %s Convert error: %v", value, err) + return + } + _, err = client.Set(visitorConfig.Register, visitorConfig.Offset, uint16(valueInt)) + if err != nil { + klog.Errorf("Set visitor error: %v %v", err, visitorConfig) + return + } + case "float": + valueFloat, err := strconv.ParseFloat(value, 32) + if err != nil { + klog.Errorf("twin %s Convert error: %v", value, err) + return + } + _, err = client.SetString(visitorConfig.Register, visitorConfig.Offset, visitorConfig.Limit, string(ConvertFloat32ToBytes(float32(valueFloat)))) + if err != nil { + klog.Errorf("Set visitor error: %v %v", err, visitorConfig) + return + } + case "double": + valueDouble, err := strconv.ParseFloat(value, 64) + if err != nil { + klog.Errorf("twin %s Convert error: %v", value, err) + return + } + _, err = client.SetString(visitorConfig.Register, visitorConfig.Offset, visitorConfig.Limit, string(ConvertFloat64ToBytes(valueDouble))) + if err != nil { + klog.Errorf("Set visitor error: %v %v", err, visitorConfig) + return + } + case "boolean": + valueBool, err := strconv.ParseBool(value) + if err != nil { + klog.Errorf("twin %s Convert error: %v", value, err) + return + } + var valueSet uint16 = 0x0000 + if valueBool { + valueSet = 0xFF00 + } + _, err = client.Set(visitorConfig.Register, visitorConfig.Offset, valueSet) + if err != nil { + klog.Errorf("Set visitor error: %v %v", err, visitorConfig) + return + } + case "string": + _, err := client.SetString(visitorConfig.Register, visitorConfig.Offset, visitorConfig.Limit, value) + if err != nil { + klog.Errorf("Set visitor error: %v %v", err, visitorConfig) + return + } + default: + klog.Errorf("wrong DataType of twin %s: %s", value, twin.PVisitor.PProperty.DataType) return } } @@ -121,17 +171,21 @@ func initTwin(ctx context.Context, dev *modbus.ModbusDev) { } setVisitor(&visitorConfig, &dev.Instance.Twins[i], dev.ModbusClient) - twinData := TwinData{Client: dev.ModbusClient, + twinData := TwinData{ + Client: dev.ModbusClient, Name: dev.Instance.Twins[i].PropertyName, Type: dev.Instance.Twins[i].Desired.Metadatas.Type, VisitorConfig: &visitorConfig, Topic: fmt.Sprintf(common.TopicTwinUpdate, dev.Instance.ID), - DeviceName: dev.Instance.Name} - collectCycle := time.Duration(dev.Instance.Twins[i].PVisitor.CollectCycle) + DeviceID: dev.Instance.ID, + DeviceName: dev.Instance.Name, + } + collectCycle := time.Duration(dev.Instance.Twins[i].PVisitor.CollectCycle) * time.Second // If the collect cycle is not set, set it to 1 second. if collectCycle == 0 { collectCycle = 1 * time.Second } + klog.V(2).InfoS("Start to collect", "twin", twinData.Name, "cycle", collectCycle) ticker := time.NewTicker(collectCycle) go func() { for { @@ -171,9 +225,10 @@ func (d *DevPanel) start(ctx context.Context, dev *modbus.ModbusDev) { dev.ModbusClient = client go initTwin(ctx, dev) + klog.Infof("All twins has been set, %+v", dev.Instance) - <-ctx.Done() d.wg.Done() + klog.InfoS("sync wait group done", "deviceID", dev.Instance.ID, "device name", dev.Instance.Name) } // DevInit initialize the device data. @@ -215,13 +270,15 @@ func NewDevPanel() *DevPanel { // DevStart start all devices. func (d *DevPanel) DevStart() { for id, dev := range d.devices { - klog.V(4).Info("Dev: ", id, dev) + klog.Info("Dev: ", id, dev) ctx, cancel := context.WithCancel(context.Background()) d.deviceMuxs[id] = cancel d.wg.Add(1) go d.start(ctx, dev) } + klog.Infoln("Wait all sync wait group") d.wg.Wait() + klog.Infoln("All sync wait group done") } func (d *DevPanel) UpdateDevTwins(deviceID string, twins []common.Twin) error { @@ -301,7 +358,8 @@ func getTwinData(deviceID string, twin common.Twin, client *modbus.ModbusClient) VisitorConfig: &visitorConfig, Topic: fmt.Sprintf(common.TopicTwinUpdate, deviceID), } - return td.GetPayload() + payload, _, err := td.GetPayload() + return payload, err } func (d *DevPanel) GetDevice(deviceID string) (interface{}, error) { @@ -342,3 +400,15 @@ func (d *DevPanel) UpdateModel(model *common.DeviceModel) { func (d *DevPanel) RemoveModel(modelName string) { delete(d.models, modelName) } + +func ConvertFloat64ToBytes(f float64) []byte { + res := make([]byte, 8) + binary.BigEndian.PutUint64(res, math.Float64bits(f)) + return res +} + +func ConvertFloat32ToBytes(f float32) []byte { + res := make([]byte, 4) + binary.BigEndian.PutUint32(res, math.Float32bits(f)) + return res +} diff --git a/mappers/modbus-dmi/device/twindata.go b/mappers/modbus-dmi/device/twindata.go index b45cf1ac..7fdd062c 100644 --- a/mappers/modbus-dmi/device/twindata.go +++ b/mappers/modbus-dmi/device/twindata.go @@ -28,6 +28,7 @@ import ( "k8s.io/klog/v2" dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" + "github.com/kubeedge/mappers-go/pkg/common" "github.com/kubeedge/mappers-go/pkg/driver/modbus" "github.com/kubeedge/mappers-go/pkg/util/grpcclient" @@ -36,12 +37,14 @@ import ( // TwinData is the timer structure for getting twin/data. type TwinData struct { + DeviceID string DeviceName string Client *modbus.ModbusClient Name string Type string VisitorConfig *modbus.ModbusVisitorConfig Results []byte + LastValue string Topic string } @@ -111,53 +114,101 @@ func TransferData(isRegisterSwap bool, isSwap bool, } bits := binary.BigEndian.Uint32(value) data := float64(math.Float32frombits(bits)) * scale - sData := strconv.FormatFloat(data, 'f', 6, 64) + sData := strconv.FormatFloat(data, 'f', 2, 64) return sData, nil case "boolean": - return strconv.FormatBool(value[0] == 1), nil + return strconv.FormatBool(value[0] == 0xFF), nil case "string": - data := string(value) + for i, b := range value { + if !isUpper(b) && !isLowercase(b) && !isNumber(b) && !isSpecial(b) { + value[i] = ' ' + } + } + data := strings.ReplaceAll(string(value), " ", "") return data, nil default: - return "", errors.New("Data type is not support") + return "", errors.New("data type is not support") } } -func (td *TwinData) GetPayload() ([]byte, error) { +func isUpper(b byte) bool { + return 'A' <= b && b <= 'Z' +} + +func isLowercase(b byte) bool { + return 'a' <= b && b <= 'z' +} + +func isNumber(b byte) bool { + return '0' <= b && b <= '9' +} + +func isSpecial(b byte) bool { + whiteList := map[byte]byte{ + '/': '/', + '-': '-', + '_': '_', + '.': '.', + '%': '%', + '+': '+', + ',': ',', + '=': '=', + '@': '@', + '#': '#', + ':': ':', + '^': '^', + '~': '~', + '?': '?', + '&': '&', + '!': '!', + '*': '*', + } + _, ok := whiteList[b] + return ok +} + +func (td *TwinData) GetPayload() ([]byte, bool, error) { var err error td.Results, err = td.Client.Get(td.VisitorConfig.Register, td.VisitorConfig.Offset, uint16(td.VisitorConfig.Limit)) if err != nil { - return nil, fmt.Errorf("get register failed: %v", err) + return nil, false, fmt.Errorf("get register failed: %v", err) } // transfer data according to the dpl configuration sData, err := TransferData(td.VisitorConfig.IsRegisterSwap, td.VisitorConfig.IsSwap, td.Type, td.VisitorConfig.Scale, td.Results) if err != nil { - return nil, fmt.Errorf("transfer Data failed: %v", err) + return nil, false, fmt.Errorf("transfer Data failed: %v", err) } + + // do not report if the twin data is not changed to prevent triggering traffic limiting + changed := sData != td.LastValue + td.LastValue = sData // construct payload var payload []byte if strings.Contains(td.Topic, "$hw") { if payload, err = common.CreateMessageTwinUpdate(td.Name, td.Type, sData); err != nil { - return nil, fmt.Errorf("create message twin update failed: %v", err) + return nil, false, fmt.Errorf("create message twin update failed: %v", err) } } else { if payload, err = common.CreateMessageData(td.Name, td.Type, sData); err != nil { - return nil, fmt.Errorf("create message data failed: %v", err) + return nil, false, fmt.Errorf("create message data failed: %v", err) } } klog.V(2).Infof("Get the %s value as %s", td.Name, sData) - return payload, nil + return payload, changed, nil } // Run timer function. func (td *TwinData) Run() { - payload, err := td.GetPayload() + payload, changed, err := td.GetPayload() if err != nil { klog.Errorf("twindata %s get payload failed, err: %s", td.Name, err) return } + if !changed { + return + } var msg common.DeviceTwinUpdate if err = json.Unmarshal(payload, &msg); err != nil { @@ -168,7 +219,7 @@ func (td *TwinData) Run() { twins := parse.ConvMsgTwinToGrpc(msg.Twin) var rdsr = &dmiapi.ReportDeviceStatusRequest{ - DeviceName: td.DeviceName, + DeviceName: td.DeviceID, ReportedDevice: &dmiapi.DeviceStatus{ Twins: twins, State: "OK", diff --git a/pkg/driver/modbus/client.go b/pkg/driver/modbus/client.go index 22ace65a..257faba6 100644 --- a/pkg/driver/modbus/client.go +++ b/pkg/driver/modbus/client.go @@ -23,8 +23,9 @@ import ( "k8s.io/klog/v2" - "github.com/kubeedge/mappers-go/pkg/common" "github.com/sailorvii/modbus" + + "github.com/kubeedge/mappers-go/pkg/common" ) // ModbusTCP is the configurations of modbus TCP. @@ -158,7 +159,7 @@ func (c *ModbusClient) Get(registerType string, addr uint16, quantity uint16) (r case "InputRegister": results, err = c.Client.ReadInputRegisters(addr, quantity) default: - return nil, errors.New("Bad register type") + return nil, errors.New("bad register type") } klog.V(2).Info("Get result: ", results) return results, err @@ -180,18 +181,50 @@ func (c *ModbusClient) Set(registerType string, addr uint16, value uint16) (resu case 1: valueSet = 0xFF00 default: - return nil, errors.New("Wrong value") + return nil, errors.New("wrong value") } results, err = c.Client.WriteSingleCoil(addr, valueSet) case "HoldingRegister": results, err = c.Client.WriteSingleRegister(addr, value) default: - return nil, errors.New("Bad register type") + return nil, errors.New("bad register type") } klog.V(1).Info("Set result:", err, results) return results, err } +func (c *ModbusClient) SetString(registerType string, offset uint16, limit int, value string) (results []byte, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + klog.V(1).InfoS("ModbusClient Set:", "register", registerType, "offset", offset, "limit", limit, "value", value) + + switch registerType { + case "CoilRegister": + var valueSet uint16 + switch value { + case "0": + valueSet = 0x0000 + case "1": + valueSet = 0xFF00 + default: + return nil, errors.New("wrong value") + } + results, err = c.Client.WriteSingleCoil(offset, valueSet) + case "HoldingRegister": + valueBytes := make([]byte, limit*2) + copy(valueBytes, value) + results, err = c.Client.WriteMultipleRegisters(offset, uint16(limit), valueBytes) + if err != nil { + klog.ErrorS(err, "Failed to set HoldingRegister", "offset", offset, "limit", limit, "value", value) + } + default: + return nil, errors.New("bad register type") + } + klog.V(1).InfoS("ModbusClient Set result", "results", results) + return results, err +} + // parity convert into the format that modbus driver requires. func parity(ori string) string { var p string diff --git a/pkg/grpcserver/device.go b/pkg/grpcserver/device.go index e692f1a0..1f3dba8c 100644 --- a/pkg/grpcserver/device.go +++ b/pkg/grpcserver/device.go @@ -7,6 +7,7 @@ import ( "time" dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" + "github.com/kubeedge/mappers-go/pkg/common" "github.com/kubeedge/mappers-go/pkg/driver/modbus" "github.com/kubeedge/mappers-go/pkg/util/parse" @@ -63,7 +64,7 @@ func (s *Server) RemoveDevice(ctx context.Context, request *dmiapi.RemoveDeviceR } func (s *Server) UpdateDevice(ctx context.Context, request *dmiapi.UpdateDeviceRequest) (*dmiapi.UpdateDeviceResponse, error) { - klog.V(2).Info("UpdateDevice") + klog.Info("UpdateDevice") device := request.GetDevice() if device == nil { return nil, errors.New("device is nil") @@ -78,7 +79,7 @@ func (s *Server) UpdateDevice(ctx context.Context, request *dmiapi.UpdateDeviceR return nil, fmt.Errorf("parse device %s protocol failed, err: %s", device.Name, err) } - klog.Infof("model: %+v", model) + klog.Infof("UpdateDevice model: %+v", model) deviceInstance, err := parse.ParseDeviceFromGrpc(device, &model) if err != nil { return nil, fmt.Errorf("parse device %s instance failed, err: %s", device.Name, err) @@ -86,6 +87,7 @@ func (s *Server) UpdateDevice(ctx context.Context, request *dmiapi.UpdateDeviceR deviceInstance.PProtocol = protocol s.devPanel.UpdateDev(&model, deviceInstance, &protocol) + klog.Infof("UpdateDevice success, device: %+v", deviceInstance) return &dmiapi.UpdateDeviceResponse{}, nil } @@ -107,6 +109,7 @@ func (s *Server) CreateDeviceModel(ctx context.Context, request *dmiapi.CreateDe } func (s *Server) UpdateDeviceModel(ctx context.Context, request *dmiapi.UpdateDeviceModelRequest) (*dmiapi.UpdateDeviceModelResponse, error) { + klog.Info("UpdateDeviceModel") deviceModel := request.GetModel() if deviceModel == nil { return nil, errors.New("deviceModel is nil") @@ -118,6 +121,7 @@ func (s *Server) UpdateDeviceModel(ctx context.Context, request *dmiapi.UpdateDe model := parse.ParseDeviceModelFromGrpc(deviceModel) s.devPanel.UpdateModel(&model) + klog.Infof("UpdateDeviceModel model: %+v", model) return &dmiapi.UpdateDeviceModelResponse{}, nil } diff --git a/pkg/util/parse/grpc.go b/pkg/util/parse/grpc.go index 5a2320e6..23b5a119 100644 --- a/pkg/util/parse/grpc.go +++ b/pkg/util/parse/grpc.go @@ -9,6 +9,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/constants" dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" + "github.com/kubeedge/mappers-go/pkg/common" ) @@ -245,6 +246,7 @@ func ParseDeviceModelFromGrpc(model *dmiapi.DeviceModel) common.DeviceModel { Name: property.GetName(), Description: property.GetDescription(), } + klog.V(2).Infof("Parse device model property from grpc, name %s, type %+v", property.GetName(), property.Type) if property.Type.GetString_() != nil { p.DataType = "string" p.AccessMode = property.Type.String_.GetAccessMode() diff --git a/pkg/util/parse/parse.go b/pkg/util/parse/parse.go index 467a8db4..51cbe745 100644 --- a/pkg/util/parse/parse.go +++ b/pkg/util/parse/parse.go @@ -139,6 +139,7 @@ func ParseByUsingRegister(cfg *config.Config, if err != nil { return err } + klog.Infof("RegisterMapper success, deviceList %+v, deviceModelList %+v", deviceList, deviceModelList) if len(deviceList) == 0 || len(deviceModelList) == 0 { return ErrEmptyData @@ -162,7 +163,7 @@ func ParseByUsingRegister(cfg *config.Config, instance.PProtocol = protocol devices[instance.ID] = new(common.DeviceInstance) devices[instance.ID] = instance - klog.V(4).Info("Instance: ", instance.ID) + klog.Info("Instance: ", instance.ID) dms[instance.Model] = modelMap[instance.Model] protocols[instance.ProtocolName] = protocol } diff --git a/pkg/util/parse/type.go b/pkg/util/parse/type.go index d7f0b24b..81bf7232 100644 --- a/pkg/util/parse/type.go +++ b/pkg/util/parse/type.go @@ -10,6 +10,7 @@ import ( "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/constants" "github.com/kubeedge/kubeedge/pkg/apis/devices/v1alpha2" dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" + "github.com/kubeedge/mappers-go/pkg/common" ) @@ -336,12 +337,6 @@ func ConvMsgTwinToGrpc(msgTwin map[string]*common.MsgTwin) []*dmiapi.Twin { for name, twin := range msgTwin { twinData := &dmiapi.Twin{ PropertyName: name, - Desired: &dmiapi.TwinProperty{ - Value: *twin.Expected.Value, - Metadata: map[string]string{ - "type": twin.Metadata.Type, - "timestamp": twin.Expected.Metadata.Timestamp, - }}, Reported: &dmiapi.TwinProperty{ Value: *twin.Actual.Value, Metadata: map[string]string{