From 67bbf9e68375804e863a430a833b96cfa5c10332 Mon Sep 17 00:00:00 2001 From: wbc6080 Date: Wed, 27 Mar 2024 10:42:26 +0800 Subject: [PATCH] add onvif mapper based mapper-framework Signed-off-by: wbc6080 --- .../onvif-mapper/Dockerfile_stream | 35 ++ .../kubeedge-v1.17.0/onvif-mapper/Makefile | 34 ++ .../kubeedge-v1.17.0/onvif-mapper/README.md | 56 ++ .../kubeedge-v1.17.0/onvif-mapper/cmd/main.go | 61 +++ .../kubeedge-v1.17.0/onvif-mapper/config.yaml | 9 + .../data/dbmethod/influxdb2/client.go | 76 +++ .../data/dbmethod/influxdb2/handler.go | 73 +++ .../data/dbmethod/mysql/client.go | 108 ++++ .../data/dbmethod/mysql/handler.go | 73 +++ .../data/dbmethod/redis/client.go | 136 +++++ .../data/dbmethod/redis/handler.go | 74 +++ .../data/dbmethod/tdengine/client.go | 155 ++++++ .../data/dbmethod/tdengine/handler.go | 74 +++ .../onvif-mapper/data/publish/http/client.go | 73 +++ .../onvif-mapper/data/publish/mqtt/client.go | 63 +++ .../onvif-mapper/data/stream/handler.go | 67 +++ .../onvif-mapper/data/stream/img.go | 243 +++++++++ .../onvif-mapper/data/stream/video.go | 142 +++++ .../onvif-mapper/device/device.go | 487 ++++++++++++++++++ .../onvif-mapper/device/devicetwin.go | 108 ++++ .../onvif-mapper/driver/devicetype.go | 46 ++ .../onvif-mapper/driver/driver.go | 101 ++++ mappers/kubeedge-v1.17.0/onvif-mapper/go.mod | 46 ++ mappers/kubeedge-v1.17.0/onvif-mapper/go.sum | 165 ++++++ .../onvif-mapper/hack/make-rules/mapper.sh | 159 ++++++ .../onvif-mapper/resource/configmap.yaml | 15 + .../onvif-mapper/resource/deployment.yaml | 51 ++ .../resource/onvifdevice-instance.yaml | 45 ++ .../resource/onvifdevice-model.yaml | 20 + .../onvif-mapper/resource/secret.yaml | 7 + 30 files changed, 2802 insertions(+) create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/Dockerfile_stream create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/Makefile create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/README.md create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/cmd/main.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/config.yaml create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/client.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/handler.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/client.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/handler.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/client.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/handler.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/client.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/handler.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/http/client.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/mqtt/client.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/handler.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/img.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/video.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/device/device.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/device/devicetwin.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/driver/devicetype.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/driver/driver.go create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/go.mod create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/go.sum create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/hack/make-rules/mapper.sh create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/resource/configmap.yaml create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/resource/deployment.yaml create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-instance.yaml create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-model.yaml create mode 100644 mappers/kubeedge-v1.17.0/onvif-mapper/resource/secret.yaml diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/Dockerfile_stream b/mappers/kubeedge-v1.17.0/onvif-mapper/Dockerfile_stream new file mode 100644 index 00000000..a5dfc2bd --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/Dockerfile_stream @@ -0,0 +1,35 @@ +FROM golang:1.20.10-bullseye AS builder + +WORKDIR /build + +ENV GO111MODULE=on \ + GOPROXY=https://goproxy.cn,direct + +COPY . . + +RUN apt-get update && \ + apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm + +RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \ + tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \ + ./configure && make && \ + make install + +RUN GOOS=linux go build -o main cmd/main.go + +FROM ubuntu:18.04 + +RUN mkdir -p kubeedge + +RUN apt-get update && \ + apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm + +RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \ + tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \ + ./configure && make && \ + make install + +COPY --from=builder /build/main kubeedge/ +COPY ./config.yaml kubeedge/ + +WORKDIR kubeedge \ No newline at end of file diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/Makefile b/mappers/kubeedge-v1.17.0/onvif-mapper/Makefile new file mode 100644 index 00000000..4dbacfe2 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/Makefile @@ -0,0 +1,34 @@ +SHELL := /bin/bash + +curr_dir := $(patsubst %/,%,$(dir $(abspath $(lastword $(MAKEFILE_LIST))))) +rest_args := $(wordlist 2, $(words $(MAKECMDGOALS)), $(MAKECMDGOALS)) +$(eval $(rest_args):;@:) + +help: + # + # Usage: + # make generate : generate a mapper based on a template. + # make mapper {mapper-name} : execute mapper building process. + # + # Actions: + # - mod, m : download code dependencies. + # - lint, l : verify code via go fmt and `golangci-lint`. + # - build, b : compile code. + # - package, p : package docker image. + # - clean, c : clean output binary. + # + # Parameters: + # ARM : true or undefined + # ARM64 : true or undefined + # + # Example: + # - make mapper modbus ARM64=true : execute `build` "modbus" mapper for ARM64. + # - make mapper modbus test : execute `test` "modbus" mapper. + @echo + +make_rules := $(shell ls $(curr_dir)/hack/make-rules | sed 's/.sh//g') +$(make_rules): + @$(curr_dir)/hack/make-rules/$@.sh $(rest_args) + +.DEFAULT_GOAL := help +.PHONY: $(make_rules) build test package \ No newline at end of file diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/README.md b/mappers/kubeedge-v1.17.0/onvif-mapper/README.md new file mode 100644 index 00000000..e2d1d2cf --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/README.md @@ -0,0 +1,56 @@ +This mapper is for the ONVIF IP camera. For resource-limited, it's only be tested with the HIKVISION camera. + +Supported functions: +- Save frame. You can define it in device-instance.yaml and save the rtsp stream as video frame files. +- Save video. You can define it in device-instance.yaml and save the rtsp stream as video files. + +steps: + +1. Run onvif mapper + + There are two ways to run onvif mapper: + +a). Start locally +- Install the dependences: + ``` + sudo apt-get update && + sudo apt-get install -y upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm + ``` + and install ffmpeg with commond: + ``` + sudo curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && + tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && + ./configure && make && + sudo make install + ``` + This may take about 5 minutes to download and build all dependencies. +- Locally compile + + You can compile and run the mapper code directly: + ``` + go run cmd/main.go --v --config-file + ``` +b). Start using a container image +- Build onvif mapper image: + ``` + docker build -f Dockerfile_stream -t [YOUR MAPPER IMAGE NAME] . + ``` + It may take about 8 minutes to build the docker image + +- Deploy onvif mapper container: + + After successfully building the onvif mapper image, you can deploy the mapper in the cluster through deployment or other methods. + A sample configuration file for mapper deployment is provided in the **resource** directory. + +2. Build and submit the device yaml file: + + After successfully deploying onvif mapper, users can build the device-instance and device-model configuration files according to the + characteristics of the user edge onvif device, and execute the following commands to submit to the kubeedge cluster: + ``` + kubectl apply -f + ``` + + An example device-model and device-instance configuration file for onvif device is provided in the resource directory. +3. View log: + + Users can view the logs of the mapper container to determine whether the edge device is managed correctly. diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/cmd/main.go b/mappers/kubeedge-v1.17.0/onvif-mapper/cmd/main.go new file mode 100644 index 00000000..fd820503 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/cmd/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "errors" + "os" + + "k8s.io/klog/v2" + + "github.com/kubeedge/onvif/device" + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/config" + "github.com/kubeedge/mapper-framework/pkg/grpcclient" + "github.com/kubeedge/mapper-framework/pkg/grpcserver" + "github.com/kubeedge/mapper-framework/pkg/httpserver" +) + +func main() { + var err error + var c *config.Config + + klog.InitFlags(nil) + defer klog.Flush() + + if c, err = config.Parse(); err != nil { + klog.Fatal(err) + } + klog.Infof("config: %+v", c) + + klog.Infoln("Mapper will register to edgecore") + deviceList, deviceModelList, err := grpcclient.RegisterMapper(true) + if err != nil { + klog.Fatal(err) + } + klog.Infoln("Mapper register finished") + + panel := device.NewDevPanel() + err = panel.DevInit(deviceList, deviceModelList) + if err != nil && !errors.Is(err, device.ErrEmptyData) { + klog.Fatal(err) + } + klog.Infoln("devInit finished") + go panel.DevStart() + + // start http server + httpServer := httpserver.NewRestServer(panel, c.Common.HTTPPort) + go httpServer.StartServer() + + // start grpc server + grpcServer := grpcserver.NewServer( + grpcserver.Config{ + SockPath: c.GrpcServer.SocketPath, + Protocol: common.ProtocolCustomized, + }, + panel, + ) + defer grpcServer.Stop() + if err = grpcServer.Start(); err != nil { + klog.Fatal(err) + } + +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/config.yaml b/mappers/kubeedge-v1.17.0/onvif-mapper/config.yaml new file mode 100644 index 00000000..b8ae89db --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/config.yaml @@ -0,0 +1,9 @@ +grpc_server: + socket_path: /etc/kubeedge/onvif.sock +common: + name: Onvif-mapper + version: v1.13.0 + api_version: v1.0.0 + protocol: onvif # TODO add your protocol name + address: 127.0.0.1 + edgecore_sock: /etc/kubeedge/dmi.sock diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/client.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/client.go new file mode 100644 index 00000000..9da4232b --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/client.go @@ -0,0 +1,76 @@ +package influxdb2 + +import ( + "context" + "encoding/json" + "os" + "time" + + "k8s.io/klog/v2" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +type DataBaseConfig struct { + Influxdb2ClientConfig *Influxdb2ClientConfig `json:"influxdb2ClientConfig,omitempty"` + Influxdb2DataConfig *Influxdb2DataConfig `json:"influxdb2DataConfig,omitempty"` +} + +type Influxdb2ClientConfig struct { + Url string `json:"url,omitempty"` + Org string `json:"org,omitempty"` + Bucket string `json:"bucket,omitempty"` +} + +type Influxdb2DataConfig struct { + Measurement string `json:"measurement,omitempty"` + Tag map[string]string `json:"tag,omitempty"` + FieldKey string `json:"fieldKey,omitempty"` +} + +func NewDataBaseClient(clientConfig json.RawMessage, dataConfig json.RawMessage) (*DataBaseConfig, error) { + // parse influx database config data + influxdb2ClientConfig := new(Influxdb2ClientConfig) + influxdb2DataConfig := new(Influxdb2DataConfig) + err := json.Unmarshal(clientConfig, influxdb2ClientConfig) + if err != nil { + return nil, err + } + err = json.Unmarshal(dataConfig, influxdb2DataConfig) + if err != nil { + return nil, err + } + return &DataBaseConfig{ + Influxdb2ClientConfig: influxdb2ClientConfig, + Influxdb2DataConfig: influxdb2DataConfig, + }, nil +} + +func (d *DataBaseConfig) InitDbClient() influxdb2.Client { + var usrtoken string + usrtoken = os.Getenv("TOKEN") + client := influxdb2.NewClient(d.Influxdb2ClientConfig.Url, usrtoken) + + return client +} + +func (d *DataBaseConfig) CloseSession(client influxdb2.Client) { + client.Close() +} + +func (d *DataBaseConfig) AddData(data *common.DataModel, client influxdb2.Client) error { + // write device data to influx database + writeAPI := client.WriteAPIBlocking(d.Influxdb2ClientConfig.Org, d.Influxdb2ClientConfig.Bucket) + p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement, + d.Influxdb2DataConfig.Tag, + map[string]interface{}{d.Influxdb2DataConfig.FieldKey: data.Value}, + time.Now()) + // write point immediately + err := writeAPI.WritePoint(context.Background(), p) + if err != nil { + klog.V(4).Info("Exit AddData") + return err + } + return nil +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/handler.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/handler.go new file mode 100644 index 00000000..1a3aff43 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/influxdb2/handler.go @@ -0,0 +1,73 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package influxdb2 + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/onvif/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + dbClient := dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel, dbClient) + if err != nil { + klog.Errorf("influx database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession(dbClient) + return + } + } + }() +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/client.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/client.go new file mode 100644 index 00000000..e2e9a259 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/client.go @@ -0,0 +1,108 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "time" + + _ "github.com/go-sql-driver/mysql" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +var ( + DB *sql.DB +) + +type DataBaseConfig struct { + MySQLClientConfig *MySQLClientConfig `json:"mysqlClientConfig"` +} + +type MySQLClientConfig struct { + Addr string `json:"addr,omitempty"` + Database string `json:"database,omitempty"` + UserName string `json:"userName,omitempty"` +} + +func NewDataBaseClient(config json.RawMessage) (*DataBaseConfig, error) { + configdata := new(MySQLClientConfig) + err := json.Unmarshal(config, configdata) + if err != nil { + return nil, err + } + return &DataBaseConfig{ + MySQLClientConfig: configdata, + }, nil +} + +func (d *DataBaseConfig) InitDbClient() error { + password := os.Getenv("PASSWORD") + usrName := d.MySQLClientConfig.UserName + addr := d.MySQLClientConfig.Addr + dataBase := d.MySQLClientConfig.Database + dataSourceName := fmt.Sprintf("%s:%s@tcp(%s)/%s", usrName, password, addr, dataBase) + var err error + DB, err = sql.Open("mysql", dataSourceName) + if err != nil { + return fmt.Errorf("connection to %s of mysql faild with err:%v", dataBase, err) + } + + return nil +} + +func (d *DataBaseConfig) CloseSession() { + err := DB.Close() + if err != nil { + klog.Errorf("close mysql failed with err:%v", err) + } +} + +func (d *DataBaseConfig) AddData(data *common.DataModel) error { + deviceName := data.DeviceName + propertyName := data.PropertyName + namespace := data.Namespace + tableName := namespace + "/" + deviceName + "/" + propertyName + datatime := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + + createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (id INT AUTO_INCREMENT PRIMARY KEY, ts DATETIME NOT NULL,field TEXT)", tableName) + _, err := DB.Exec(createTable) + if err != nil { + return fmt.Errorf("create tabe into mysql failed with err:%v", err) + } + + stmt, err := DB.Prepare(fmt.Sprintf("INSERT INTO `%s` (ts,field) VALUES (?,?)", tableName)) + if err != nil { + return fmt.Errorf("prepare parament failed with err:%v", err) + } + defer func(stmt *sql.Stmt) { + err := stmt.Close() + if err != nil { + klog.Errorf("close mysql's statement failed with err:%v", err) + } + }(stmt) + _, err = stmt.Exec(datatime, data.Value) + if err != nil { + return fmt.Errorf("insert data into msyql failed with err:%v", err) + } + + return nil +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/handler.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/handler.go new file mode 100644 index 00000000..b189b22c --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/mysql/handler.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/onvif/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.MySQLClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init redis database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("mysql database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession() + return + } + } + }() +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/client.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/client.go new file mode 100644 index 00000000..6d67d380 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/client.go @@ -0,0 +1,136 @@ +package redis + +import ( + "context" + "encoding/json" + "errors" + "os" + "strconv" + + "github.com/go-redis/redis/v8" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +var ( + RedisCli *redis.Client +) + +type DataBaseConfig struct { + RedisClientConfig *RedisClientConfig +} + +type RedisClientConfig struct { + Addr string `json:"addr,omitempty"` + DB int `json:"db,omitempty"` + PoolSize int `json:"poolSize,omitempty"` + MinIdleConns int `json:"minIdleConns,omitempty"` +} + +func NewDataBaseClient(config json.RawMessage) (*DataBaseConfig, error) { + configdata := new(RedisClientConfig) + err := json.Unmarshal(config, configdata) + if err != nil { + return nil, err + } + return &DataBaseConfig{RedisClientConfig: configdata}, nil +} + +func (d *DataBaseConfig) InitDbClient() error { + var password string + password = os.Getenv("PASSWORD") + RedisCli = redis.NewClient(&redis.Options{ + Addr: d.RedisClientConfig.Addr, + Password: password, + DB: d.RedisClientConfig.DB, + PoolSize: d.RedisClientConfig.PoolSize, + MinIdleConns: d.RedisClientConfig.MinIdleConns, + }) + pong, err := RedisCli.Ping(context.Background()).Result() + if err != nil { + klog.Errorf("init redis database failed, err = %v", err) + return err + } + klog.V(1).Infof("init redis database successfully, with return cmd %s", pong) + return nil +} + +func (d *DataBaseConfig) CloseSession() { + err := RedisCli.Close() + if err != nil { + klog.V(4).Info("close database failed") + } +} + +func (d *DataBaseConfig) AddData(data *common.DataModel) error { + ctx := context.Background() + // The key to construct the ordered set, here DeviceName is used as the key + klog.V(1).Infof("deviceName:%s", data.DeviceName) + // Check if the current ordered set exists + exists, err := RedisCli.Exists(ctx, data.DeviceName).Result() + if err != nil { + klog.V(4).Info("Exit AddData") + return err + } + deviceData := "TimeStamp: " + strconv.FormatInt(data.TimeStamp, 10) + " PropertyName: " + data.PropertyName + " data: " + data.Value + if exists == 0 { + // The ordered set does not exist, create a new ordered set and add data + _, err = RedisCli.ZAdd(ctx, data.DeviceName, &redis.Z{ + Score: float64(data.TimeStamp), + Member: deviceData, + }).Result() + if err != nil { + klog.V(4).Info("Exit AddData") + return err + } + } else { + // The ordered set already exists, add data directly + _, err = RedisCli.ZAdd(ctx, data.DeviceName, &redis.Z{ + Score: float64(data.TimeStamp), + Member: deviceData, + }).Result() + if err != nil { + klog.V(4).Info("Exit AddData") + return err + } + } + return nil +} + +func (d *DataBaseConfig) GetDataByDeviceName(deviceName string) ([]*common.DataModel, error) { + ctx := context.Background() + + dataJSON, err := RedisCli.ZRevRange(ctx, deviceName, 0, -1).Result() + if err != nil { + klog.V(4).Infof("fail query data for deviceName,err:%v", err) + } + + var dataModels []*common.DataModel + + for _, jsonStr := range dataJSON { + var data common.DataModel + if err := json.Unmarshal([]byte(jsonStr), &data); err != nil { + klog.V(4).Infof("Error unMarshaling data: %v\n", err) + continue + } + + dataModels = append(dataModels, &data) + } + return dataModels, nil +} + +func (d *DataBaseConfig) GetPropertyDataByDeviceName(deviceName string, propertyData string) ([]*common.DataModel, error) { + //TODO implement me + return nil, errors.New("implement me") +} + +func (d *DataBaseConfig) GetDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) { + //TODO implement me + return nil, errors.New("implement me") +} + +func (d *DataBaseConfig) DeleteDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) { + //TODO implement me + return nil, errors.New("implement me") +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/handler.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/handler.go new file mode 100644 index 00000000..06e5b367 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/redis/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package redis + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/onvif/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init redis database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("redis database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession() + return + } + } + }() + +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/client.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/client.go new file mode 100644 index 00000000..f267a02a --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/client.go @@ -0,0 +1,155 @@ +package tdengine + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + _ "github.com/taosdata/driver-go/v3/taosRestful" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +var ( + DB *sql.DB +) + +type DataBaseConfig struct { + TDEngineClientConfig *TDEngineClientConfig `json:"config,omitempty"` +} +type TDEngineClientConfig struct { + Addr string `json:"addr,omitempty"` + DBName string `json:"dbName,omitempty"` +} + +func NewDataBaseClient(config json.RawMessage) (*DataBaseConfig, error) { + configdata := new(TDEngineClientConfig) + err := json.Unmarshal(config, configdata) + if err != nil { + return nil, err + } + return &DataBaseConfig{ + TDEngineClientConfig: configdata, + }, nil +} +func (d *DataBaseConfig) InitDbClient() error { + username := os.Getenv("USERNAME") + password := os.Getenv("PASSWORD") + dsn := fmt.Sprintf("%s:%s@http(%s)/%s", username, password, d.TDEngineClientConfig.Addr, d.TDEngineClientConfig.DBName) + var err error + DB, err = sql.Open("taosRestful", dsn) + if err != nil { + klog.Errorf("init TDEngine db fail, err= %v:", err) + } + klog.V(1).Infof("init TDEngine database successfully") + return nil +} + +func (d *DataBaseConfig) CloseSessio() { + err := DB.Close() + if err != nil { + klog.Errorf("close TDEngine failed") + } +} + +func (d *DataBaseConfig) AddData(data *common.DataModel) error { + + legal_table := strings.Replace(data.DeviceName, "-", "_", -1) + legal_tag := strings.Replace(data.PropertyName, "-", "_", -1) + + stable_name := fmt.Sprintf("SHOW STABLES LIKE '%s'", legal_table) + stabel := fmt.Sprintf("CREATE STABLE %s (ts timestamp, devicename binary(64), propertyname binary(64), data binary(64),type binary(64)) TAGS (localtion binary(64));", legal_table) + + datatime := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + insertSQL := fmt.Sprintf("INSERT INTO %s USING %s TAGS ('%s') VALUES('%v','%s', '%s', '%s', '%s');", + legal_tag, legal_table, legal_tag, datatime, data.DeviceName, data.PropertyName, data.Value, data.Type) + + rows, _ := DB.Query(stable_name) + defer rows.Close() + + if err := rows.Err(); err != nil { + klog.Errorf("query stable failed:%v", err) + } + + switch rows.Next() { + case false: + _, err := DB.Exec(stabel) + if err != nil { + klog.Errorf("create stable failed %v\n", err) + } + _, err = DB.Exec(insertSQL) + if err != nil { + klog.Errorf("failed add data to TdEngine:%v", err) + } + case true: + _, err := DB.Exec(insertSQL) + if err != nil { + klog.Errorf("failed add data to TdEngine:%v", err) + } + default: + klog.Infoln("failed add data to TdEngine") + } + + return nil +} +func (d *DataBaseConfig) GetDataByDeviceName(deviceName string) ([]*common.DataModel, error) { + querySql := fmt.Sprintf("SELECT ts, devicename, propertyname, data, type FROM %s", deviceName) + rows, err := DB.Query(querySql) + if err != nil { + return nil, err + } + defer rows.Close() + var dataModel []*common.DataModel + for rows.Next() { + var data common.DataModel + var ts time.Time + err := rows.Scan(&ts, &data.DeviceName, &data.PropertyName, &data.Value, &data.Type) + if err != nil { + klog.Errorf(" data scan error: %v\n", err) + //fmt.Printf("scan error:\n", err) + return nil, err + } + data.TimeStamp = ts.Unix() + dataModel = append(dataModel, &data) + } + return dataModel, nil +} +func (d *DataBaseConfig) GetPropertyDataByDeviceName(deviceName string, propertyData string) ([]*common.DataModel, error) { + //TODO implement me + panic("implement me") +} +func (d *DataBaseConfig) GetDataByTimeRange(deviceName string, start int64, end int64) ([]*common.DataModel, error) { + + legal_table := strings.Replace(deviceName, "-", "_", -1) + startTime := time.Unix(start, 0).UTC().Format("2006-01-02 15:04:05") + endTime := time.Unix(end, 0).UTC().Format("2006-01-02 15:04:05") + //Query data within a specified time range + querySQL := fmt.Sprintf("SELECT ts, devicename, propertyname, data, type FROM %s WHERE ts >= '%s' AND ts <= '%s'", legal_table, startTime, endTime) + fmt.Println(querySQL) + rows, err := DB.Query(querySQL) + if err != nil { + return nil, err + } + defer rows.Close() + + var dataModels []*common.DataModel + for rows.Next() { + var data common.DataModel + var ts time.Time + err := rows.Scan(&ts, &data.DeviceName, &data.PropertyName, &data.Value, &data.Type) + if err != nil { + klog.V(4).Infof("data scan failed:%v", err) + continue + } + dataModels = append(dataModels, &data) + } + return dataModels, nil +} +func (d *DataBaseConfig) DeleteDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) { + //TODO implement me + panic("implement me") +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/handler.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/handler.go new file mode 100644 index 00000000..9b6626e5 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/dbmethod/tdengine/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tdengine + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/onvif/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("tdengine database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSessio() + return + } + } + }() + +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/http/client.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/http/client.go new file mode 100644 index 00000000..1ff484a9 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/http/client.go @@ -0,0 +1,73 @@ +package http + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/global" +) + +type PushMethod struct { + HTTP *HTTPConfig `json:"http"` +} + +type HTTPConfig struct { + HostName string `json:"hostName,omitempty"` + Port int `json:"port,omitempty"` + RequestPath string `json:"requestPath,omitempty"` + Timeout int `json:"timeout,omitempty"` +} + +func NewDataPanel(config json.RawMessage) (global.DataPanel, error) { + httpConfig := new(HTTPConfig) + err := json.Unmarshal(config, httpConfig) + if err != nil { + return nil, err + } + return &PushMethod{ + HTTP: httpConfig, + }, nil +} + +func (pm *PushMethod) InitPushMethod() error { + klog.V(1).Info("Init HTTP") + return nil +} + +func (pm *PushMethod) Push(data *common.DataModel) { + klog.V(2).Info("Publish device data by HTTP") + + targetUrl := pm.HTTP.HostName + ":" + strconv.Itoa(pm.HTTP.Port) + pm.HTTP.RequestPath + payload := data.PropertyName + "=" + data.Value + formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + currentTime := "&time" + "=" + formatTimeStr + payload += currentTime + + klog.V(3).Infof("Publish %v to %s", payload, targetUrl) + + resp, err := http.Post(targetUrl, + "application/x-www-form-urlencoded", + strings.NewReader(payload)) + + if err != nil { + fmt.Println(err) + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + // handle error + klog.Errorf("Publish device data by HTTP failed, err = %v", err) + return + } + klog.V(1).Info("############### Message published. ###############") + klog.V(3).Infof("HTTP reviced %s", string(body)) + +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/mqtt/client.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/mqtt/client.go new file mode 100644 index 00000000..ed8fe554 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/publish/mqtt/client.go @@ -0,0 +1,63 @@ +package mqtt + +import ( + "encoding/json" + "fmt" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/global" +) + +type PushMethod struct { + MQTT *MQTTConfig `json:"http"` +} + +type MQTTConfig struct { + Address string `json:"address,omitempty"` + Topic string `json:"topic,omitempty"` + QoS int `json:"qos,omitempty"` + Retained bool `json:"retained,omitempty"` +} + +func NewDataPanel(config json.RawMessage) (global.DataPanel, error) { + mqttConfig := new(MQTTConfig) + err := json.Unmarshal(config, mqttConfig) + if err != nil { + return nil, err + } + return &PushMethod{ + MQTT: mqttConfig, + }, nil +} + +func (pm *PushMethod) InitPushMethod() error { + klog.V(1).Info("Init MQTT") + return nil +} + +func (pm *PushMethod) Push(data *common.DataModel) { + klog.V(1).Infof("Publish %v to %s on topic: %s, Qos: %d, Retained: %v", + data.Value, pm.MQTT.Address, pm.MQTT.Topic, pm.MQTT.QoS, pm.MQTT.Retained) + + opts := mqtt.NewClientOptions().AddBroker(pm.MQTT.Address) + client := mqtt.NewClient(opts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + str_time := "time is " + formatTimeStr + " " + str_publish := str_time + pm.MQTT.Topic + ": " + data.Value + + token := client.Publish(pm.MQTT.Topic, byte(pm.MQTT.QoS), pm.MQTT.Retained, str_publish) + token.Wait() + + client.Disconnect(250) + klog.V(2).Info("############### Message published. ###############") +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/handler.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/handler.go new file mode 100644 index 00000000..180892f7 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/handler.go @@ -0,0 +1,67 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "encoding/json" + "fmt" + + "k8s.io/klog/v2" + + "github.com/kubeedge/onvif/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +type StreamConfig struct { + Format string `json:"format"` + OutputDir string `json:"outputDir"` + FrameCount int `json:"frameCount"` + FrameInterval int `json:"frameInterval"` + VideoNum int `json:"videoNum"` +} + +func StreamHandler(twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig) error { + // Get RTSP URI from camera device + streamURI, err := client.GetDeviceData(visitorConfig) + if err != nil { + return err + } + + // parse streamConfig data from device visitorConfig + var streamConfig StreamConfig + visitorConfigData, err := json.Marshal(visitorConfig.VisitorConfigData) + err = json.Unmarshal(visitorConfigData, &streamConfig) + if err != nil { + return fmt.Errorf("Unmarshal streamConfigs error: %v", err) + } + + switch twin.PropertyName { + // Currently, the function of saving frames and saving videos is built-in according to the configuration. + // Other functions can be expanded here. + case common.SaveFrame: + err = SaveFrame(streamURI.(string), streamConfig.OutputDir, streamConfig.Format, streamConfig.FrameCount, streamConfig.FrameInterval) + case common.SaveVideo: + err = SaveVideo(streamURI.(string), streamConfig.OutputDir, streamConfig.Format, streamConfig.FrameCount, streamConfig.VideoNum) + default: + err = fmt.Errorf("cannot find the processing method for the corresponding Property %s of the stream data", twin.PropertyName) + } + if err != nil { + return err + } + klog.V(2).Infof("Successfully processed streaming data by %s", twin.PropertyName) + return nil +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/img.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/img.go new file mode 100644 index 00000000..d7d05d00 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/img.go @@ -0,0 +1,243 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "errors" + "fmt" + "time" + "unsafe" + + "github.com/sailorvii/goav/avcodec" + "github.com/sailorvii/goav/avformat" + "github.com/sailorvii/goav/avutil" + "github.com/sailorvii/goav/swscale" + "k8s.io/klog/v2" +) + +// GenFileName generate file name with current time. Formate f. +func GenFileName(dir string, format string) string { + return fmt.Sprintf("%s/f%s.%s", dir, time.Now().Format(time.RFC3339Nano), format) +} + +func save(frame *avutil.Frame, width int, height int, dir string, format string) error { + // Save video frames to picture file + outputFile := GenFileName(dir, format) + var outputFmtCtx *avformat.Context + avformat.AvAllocOutputContext2(&outputFmtCtx, nil, nil, &outputFile) + if outputFmtCtx == nil { + return errors.New("Could not create output context") + } + defer outputFmtCtx.AvformatFreeContext() + + ofmt := avformat.AvGuessFormat("", outputFile, "") + outputFmtCtx.SetOformat(ofmt) + + avIOContext, err := avformat.AvIOOpen(outputFile, avformat.AVIO_FLAG_WRITE) + if err != nil { + return fmt.Errorf("Could not open output file '%s'", outputFile) + } + outputFmtCtx.SetPb(avIOContext) + + outStream := outputFmtCtx.AvformatNewStream(nil) + if outStream == nil { + return errors.New("Failed allocating output stream") + } + + // Set the frame format + pCodecCtx := outStream.Codec() + pCodecCtx.SetCodecId(ofmt.GetVideoCodec()) + pCodecCtx.SetCodecType(avformat.AVMEDIA_TYPE_VIDEO) + pCodecCtx.SetPixelFormat(avcodec.AV_PIX_FMT_YUVJ420P) + pCodecCtx.SetWidth(width) + pCodecCtx.SetHeight(height) + pCodecCtx.SetTimeBase(1, 25) + outputFmtCtx.AvDumpFormat(0, outputFile, 1) + + // Get video codec + pCodec := avcodec.AvcodecFindEncoder(pCodecCtx.CodecId()) + if pCodec == nil { + return errors.New("Codec not found.") + } + defer pCodecCtx.AvcodecClose() + + // open video codec + cctx := avcodec.Context(*pCodecCtx) + defer cctx.AvcodecClose() + if cctx.AvcodecOpen2(pCodec, nil) < 0 { + return errors.New("Could not open codec.") + } + + outputFmtCtx.AvformatWriteHeader(nil) + ySize := width * height + + // Write media data to media files + var packet avcodec.Packet + packet.AvNewPacket(ySize * 3) + defer packet.AvPacketUnref() + var gotPicture int + if cctx.AvcodecEncodeVideo2(&packet, frame, &gotPicture) < 0 { + return errors.New("Encode Error") + } + if gotPicture == 1 { + packet.SetStreamIndex(outStream.Index()) + outputFmtCtx.AvWriteFrame(&packet) + } + + outputFmtCtx.AvWriteTrailer() + if outputFmtCtx.Oformat().GetFlags()&avformat.AVFMT_NOFILE == 0 { + if err = outputFmtCtx.Pb().Close(); err != nil { + return fmt.Errorf("close output fmt context failed: %v", err) + } + } + return nil +} + +// SaveFrame save frame. +func SaveFrame(input string, outDir string, format string, frameCount int, frameInterval int) error { + // Open video file + avformat.AvDictSet(&avformat.Dict, "rtsp_transport", "tcp", 0) + avformat.AvDictSet(&avformat.Dict, "max_delay", "5000000", 0) + + pFormatContext := avformat.AvformatAllocContext() + if avformat.AvformatOpenInput(&pFormatContext, input, nil, &avformat.Dict) != 0 { + return fmt.Errorf("Unable to open file %s", input) + } + // Retrieve stream information + if pFormatContext.AvformatFindStreamInfo(nil) < 0 { + return errors.New("Couldn't find stream information") + } + // Dump information about file onto standard error + pFormatContext.AvDumpFormat(0, input, 0) + // Find the first video stream + streamIndex := -1 + for i := 0; i < int(pFormatContext.NbStreams()); i++ { + if pFormatContext.Streams()[i].CodecParameters().AvCodecGetType() == avformat.AVMEDIA_TYPE_VIDEO { + streamIndex = i + break + } + } + if streamIndex == -1 { + return errors.New("couldn't find video stream") + } + // Get a pointer to the codec context for the video stream + pCodecCtxOrig := pFormatContext.Streams()[streamIndex].Codec() + // Find the decoder for the video stream + pCodec := avcodec.AvcodecFindDecoder(pCodecCtxOrig.CodecId()) + if pCodec == nil { + return errors.New("unsupported codec") + } + // Copy context + pCodecCtx := pCodec.AvcodecAllocContext3() + if pCodecCtx.AvcodecCopyContext((*avcodec.Context)(unsafe.Pointer(pCodecCtxOrig))) != 0 { + return errors.New("couldn't copy codec context") + } + + // Open codec + if pCodecCtx.AvcodecOpen2(pCodec, nil) < 0 { + return errors.New("could not open codec") + } + + // Allocate video frame + pFrame := avutil.AvFrameAlloc() + + // Allocate an AVFrame structure + pFrameRGB := avutil.AvFrameAlloc() + if pFrameRGB == nil { + return errors.New("unable to allocate RGB Frame") + } + // Determine required buffer size and allocate buffer + numBytes := uintptr(avcodec.AvpictureGetSize(avcodec.AV_PIX_FMT_YUVJ420P, pCodecCtx.Width(), + pCodecCtx.Height())) + buffer := avutil.AvMalloc(numBytes) + + // Assign appropriate parts of buffer to image planes in pFrameRGB + // Note that pFrameRGB is an AVFrame, but AVFrame is a superset + // of AVPicture + avp := (*avcodec.Picture)(unsafe.Pointer(pFrameRGB)) + avp.AvpictureFill((*uint8)(buffer), avcodec.AV_PIX_FMT_YUVJ420P, pCodecCtx.Width(), pCodecCtx.Height()) + + // initialize SWS context for software scaling + swsCtx := swscale.SwsGetcontext( + pCodecCtx.Width(), + pCodecCtx.Height(), + (swscale.PixelFormat)(pCodecCtx.PixFmt()), + pCodecCtx.Width(), + pCodecCtx.Height(), + avcodec.AV_PIX_FMT_YUVJ420P, + avcodec.SWS_BICUBIC, + nil, + nil, + nil, + ) + frameNum := 0 + failureNum := 0 + failureCount := 5 * frameCount + packet := avcodec.AvPacketAlloc() + // Start capturing and saving video frames + for { + if failureNum >= failureCount { + klog.Error("the number of failed attempts to save frames has reached the upper limit") + return errors.New("the number of failed attempts to save frames has reached the upper limit") + } + + if pFormatContext.AvReadFrame(packet) < 0 { + klog.Error("Read frame failed") + time.Sleep(time.Second) + continue + } + + // Is this a packet from the video stream? + if packet.StreamIndex() != streamIndex { + failureNum++ + continue + } + + // Decode video frame + response := pCodecCtx.AvcodecSendPacket(packet) + if response < 0 { + klog.Errorf("Error while sending a packet to the decoder: %s", avutil.ErrorFromCode(response)) + failureNum++ + continue + } + response = pCodecCtx.AvcodecReceiveFrame((*avutil.Frame)(unsafe.Pointer(pFrame))) + if response == avutil.AvErrorEAGAIN || response == avutil.AvErrorEOF { + failureNum++ + continue + } else if response < 0 { + klog.Errorf("Error while receiving a frame from the decoder: %s", avutil.ErrorFromCode(response)) + failureNum++ + continue + } + // Convert the image from its native format to RGB + swscale.SwsScale2(swsCtx, avutil.Data(pFrame), + avutil.Linesize(pFrame), 0, pCodecCtx.Height(), + avutil.Data(pFrameRGB), avutil.Linesize(pFrameRGB)) + + // Save the frame to disk + err := save(pFrameRGB, pCodecCtx.Width(), pCodecCtx.Height(), outDir, format) + if err != nil { + klog.Error(err) + continue + } + frameNum++ + if frameNum >= frameCount { + return nil + } + time.Sleep(time.Nanosecond * time.Duration(frameInterval)) + } +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/video.go b/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/video.go new file mode 100644 index 00000000..7954e215 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/data/stream/video.go @@ -0,0 +1,142 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "errors" + "fmt" + + "github.com/sailorvii/goav/avcodec" + "github.com/sailorvii/goav/avformat" + "github.com/sailorvii/goav/avutil" + "k8s.io/klog/v2" +) + +// SaveVideo save video. +func SaveVideo(inputFile string, outDir string, format string, frameCount int, videoNum int) error { + var fragmentedMp4Options int + //initialize input file with Context + var inputFmtCtx *avformat.Context + + avformat.AvDictSet(&avformat.Dict, "rtsp_transport", "tcp", 0) + avformat.AvDictSet(&avformat.Dict, "max_delay", "5000000", 0) + + if avformat.AvformatOpenInput(&inputFmtCtx, inputFile, nil, &avformat.Dict) < 0 { + return fmt.Errorf("could not open input file '%s", inputFile) + } + defer inputFmtCtx.AvformatFreeContext() + //read stream information + + if inputFmtCtx.AvformatFindStreamInfo(nil) < 0 { + return errors.New("failed to retrieve input stream information") + } + + //initialize streamMapping + streamMappingSize := int(inputFmtCtx.NbStreams()) + streamMapping := make([]int, streamMappingSize) + var streamIndex int + + validTypeMap := map[avcodec.MediaType]int{ + avformat.AVMEDIA_TYPE_VIDEO: 1, + avformat.AVMEDIA_TYPE_AUDIO: 1, + avformat.AVMEDIA_TYPE_SUBTITLE: 1, + } + var inCodecParam *avcodec.AvCodecParameters + defer inCodecParam.AvCodecParametersFree() + + var outputFmtCtx *avformat.Context + outputFile := GenFileName(outDir, format) + avformat.AvAllocOutputContext2(&outputFmtCtx, nil, nil, &outputFile) + if outputFmtCtx == nil { + return errors.New("Could not create output context") + } + defer outputFmtCtx.AvformatFreeContext() + + for index, inStream := range inputFmtCtx.Streams() { + inCodecParam = inStream.CodecParameters() + inCodecType := inCodecParam.AvCodecGetType() + + if validTypeMap[inCodecType] == 0 { + streamMapping[index] = -1 + continue + } + streamMapping[index] = streamIndex + streamIndex++ + outStream := outputFmtCtx.AvformatNewStream(nil) + if outStream == nil { + return errors.New("Failed allocating output stream") + } + if inCodecParam.AvCodecParametersCopyTo(outStream.CodecParameters()) < 0 { + return errors.New("Failed to copy codec parameters") + } + } + + // initialize opts + var opts *avutil.Dictionary + defer opts.AvDictFree() + if fragmentedMp4Options != 0 { + opts.AvDictSet("movflags", "frag_keyframe+empty_moov+default_base_moof", 0) + } + var packet avcodec.Packet + defer packet.AvPacketUnref() + + // Capture a set number of video segments + for idx := 0; idx < videoNum; idx++ { + outputFile = GenFileName(outDir, format) + // initialize output file with Context + outputFmtCtx.AvDumpFormat(0, outputFile, 1) + if outputFmtCtx.Oformat().GetFlags()&avformat.AVFMT_NOFILE == 0 { + avIOContext, err := avformat.AvIOOpen(outputFile, avformat.AVIO_FLAG_WRITE) + if err != nil { + return fmt.Errorf("could not open output file '%s'", outputFile) + } + outputFmtCtx.SetPb(avIOContext) + } + + if outputFmtCtx.AvformatWriteHeader(&opts) < 0 { + return errors.New("Error occurred when opening output file") + } + // Capture and generate video according to the set number of frames + for i := 1; i < frameCount; i++ { + if inputFmtCtx.AvReadFrame(&packet) < 0 { + return errors.New("read frame failed") + } + index := packet.StreamIndex() + inputStream := inputFmtCtx.Streams()[index] + if index >= streamMappingSize || streamMapping[index] < 0 { + continue + } + packet.SetStreamIndex(streamMapping[index]) + outputStream := outputFmtCtx.Streams()[index] + packet.AvPacketRescaleTs(inputStream.TimeBase(), outputStream.TimeBase()) + packet.SetPos(-1) + if outputFmtCtx.AvInterleavedWriteFrame(&packet) < 0 { + klog.Error("Error muxing packet") + continue + } + } + + outputFmtCtx.AvWriteTrailer() + if outputFmtCtx.Oformat().GetFlags()&avformat.AVFMT_NOFILE == 0 { + if outputFmtCtx.Pb().Close() != nil { + klog.Error("Error close output context") + return errors.New("error close output context") + } + } + } + return nil +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/device/device.go b/mappers/kubeedge-v1.17.0/onvif-mapper/device/device.go new file mode 100644 index 00000000..287ef48b --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/device/device.go @@ -0,0 +1,487 @@ +package device + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/signal" + "strings" + "sync" + "time" + + "k8s.io/klog/v2" + + dbInflux "github.com/kubeedge/onvif/data/dbmethod/influxdb2" + dbMysql "github.com/kubeedge/onvif/data/dbmethod/mysql" + dbRedis "github.com/kubeedge/onvif/data/dbmethod/redis" + dbTdengine "github.com/kubeedge/onvif/data/dbmethod/tdengine" + httpMethod "github.com/kubeedge/onvif/data/publish/http" + mqttMethod "github.com/kubeedge/onvif/data/publish/mqtt" + "github.com/kubeedge/onvif/data/stream" + "github.com/kubeedge/onvif/driver" + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1beta1" + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/global" + "github.com/kubeedge/mapper-framework/pkg/util/parse" +) + +type DevPanel struct { + deviceMuxs map[string]context.CancelFunc + devices map[string]*driver.CustomizedDev + models map[string]common.DeviceModel + wg sync.WaitGroup + serviceMutex sync.Mutex + quitChan chan os.Signal +} + +var ( + devPanel *DevPanel + once sync.Once +) + +var ErrEmptyData = errors.New("device or device model list is empty") + +// NewDevPanel init and return devPanel +func NewDevPanel() *DevPanel { + once.Do(func() { + devPanel = &DevPanel{ + deviceMuxs: make(map[string]context.CancelFunc), + devices: make(map[string]*driver.CustomizedDev), + models: make(map[string]common.DeviceModel), + wg: sync.WaitGroup{}, + serviceMutex: sync.Mutex{}, + quitChan: make(chan os.Signal), + } + }) + return devPanel +} + +// DevStart start all devices. +func (d *DevPanel) DevStart() { + for id, dev := range d.devices { + klog.V(4).Info("Dev: ", id, dev) + ctx, cancel := context.WithCancel(context.Background()) + d.deviceMuxs[id] = cancel + d.wg.Add(1) + go d.start(ctx, dev) + } + signal.Notify(d.quitChan, os.Interrupt) + go func() { + <-d.quitChan + for id, device := range d.devices { + err := device.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("Service has stopped but failed to stop %s:%v", id, err) + } + } + klog.V(1).Info("Exit mapper") + os.Exit(1) + }() + d.wg.Wait() +} + +// start the device +func (d *DevPanel) start(ctx context.Context, dev *driver.CustomizedDev) { + defer d.wg.Done() + + var protocolConfig driver.ProtocolConfig + if err := json.Unmarshal(dev.Instance.PProtocol.ConfigData, &protocolConfig); err != nil { + klog.Errorf("Unmarshal ProtocolConfigs error: %v", err) + return + } + client, err := driver.NewClient(protocolConfig) + if err != nil { + klog.Errorf("Init dev %s error: %v", dev.Instance.Name, err) + return + } + dev.CustomizedClient = client + err = dev.CustomizedClient.InitDevice() + if err != nil { + klog.Errorf("Init device %s error: %v", dev.Instance.ID, err) + return + } + go dataHandler(ctx, dev) + <-ctx.Done() +} + +// dataHandler initialize the timer to handle data plane and devicetwin. +func dataHandler(ctx context.Context, dev *driver.CustomizedDev) { + for _, twin := range dev.Instance.Twins { + twin.Property.PProperty.DataType = strings.ToLower(twin.Property.PProperty.DataType) + var visitorConfig driver.VisitorConfig + + err := json.Unmarshal(twin.Property.Visitors, &visitorConfig) + visitorConfig.VisitorConfigData.DataType = strings.ToLower(visitorConfig.VisitorConfigData.DataType) + if err != nil { + klog.Errorf("Unmarshal VisitorConfig error: %v", err) + continue + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + klog.Error(err) + continue + } + + // If the device property type is streaming, it will directly enter the streaming data processing function, + // such as saving frames or saving videos, and will no longer push it to the user database and application. + // If there are other needs for stream data processing, users can add functions in the mapper/data/stream directory. + if twin.Property.PProperty.DataType == "stream" { + err = stream.StreamHandler(&twin, dev.CustomizedClient, &visitorConfig) + if err != nil { + klog.Errorf("processed streaming data by %s Error: %v", twin.PropertyName, err) + } + continue + } + + // handle twin + twinData := &TwinData{ + DeviceName: dev.Instance.Name, + DeviceNamespace: dev.Instance.Namespace, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.ObservedDesired.Metadata.Type, + ObservedDesired: twin.ObservedDesired, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, dev.Instance.ID), + CollectCycle: time.Duration(twin.Property.CollectCycle), + ReportToCloud: twin.Property.ReportToCloud, + } + go twinData.Run(ctx) + // handle push method + if twin.Property.PushMethod.MethodConfig != nil && twin.Property.PushMethod.MethodName != "" { + dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, dev.Instance.Namespace, common.WithType(twin.ObservedDesired.Metadata.Type)) + pushHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + } + // handle database + if twin.Property.PushMethod.DBMethod.DBMethodName != "" { + dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, dev.Instance.Namespace, common.WithType(twin.ObservedDesired.Metadata.Type)) + dbHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + switch twin.Property.PushMethod.DBMethod.DBMethodName { + // TODO add more database + case "influx": + dbInflux.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "redis": + dbRedis.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "tdengine": + dbTdengine.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "mysql": + dbMysql.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + } + } + } +} + +// pushHandler start data panel work +func pushHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + var dataPanel global.DataPanel + var err error + // initialization dataPanel + switch twin.Property.PushMethod.MethodName { + case "http": + dataPanel, err = httpMethod.NewDataPanel(twin.Property.PushMethod.MethodConfig) + case "mqtt": + dataPanel, err = mqttMethod.NewDataPanel(twin.Property.PushMethod.MethodConfig) + default: + err = errors.New("custom protocols are not currently supported when push data") + } + if err != nil { + klog.Errorf("new data panel error: %v", err) + return + } + // initialization PushMethod + err = dataPanel.InitPushMethod() + if err != nil { + klog.Errorf("init publish method err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + dataPanel.Push(dataModel) + case <-ctx.Done(): + return + } + } + }() +} + +// dbHandler start db client to save data +func dbHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + switch twin.Property.PushMethod.DBMethod.DBMethodName { + // TODO add more database + case "influx": + dbInflux.DataHandler(ctx, twin, client, visitorConfig, dataModel) + + case "redis": + dbRedis.DataHandler(ctx, twin, client, visitorConfig, dataModel) + + case "tdengine": + dbTdengine.DataHandler(ctx, twin, client, visitorConfig, dataModel) + + case "mysql": + dbMysql.DataHandler(ctx, twin, client, visitorConfig, dataModel) + } +} + +// setVisitor check if visitor property is readonly, if not then set it. +func setVisitor(visitorConfig *driver.VisitorConfig, twin *common.Twin, dev *driver.CustomizedDev) error { + if twin.Property.PProperty.AccessMode == "ReadOnly" { + klog.V(3).Infof("%s twin readonly property: %s", dev.Instance.Name, twin.PropertyName) + return nil + } + klog.V(2).Infof("Convert type: %s, value: %s ", twin.Property.PProperty.DataType, twin.ObservedDesired.Value) + value, err := common.Convert(twin.Property.PProperty.DataType, twin.ObservedDesired.Value) + if err != nil { + klog.Errorf("Failed to convert value as %s : %v", twin.Property.PProperty.DataType, err) + return err + } + err = dev.CustomizedClient.SetDeviceData(value, visitorConfig) + if err != nil { + return fmt.Errorf("%s set device data error: %v", twin.PropertyName, err) + } + return nil +} + +// DevInit initialize the device +func (d *DevPanel) DevInit(deviceList []*dmiapi.Device, deviceModelList []*dmiapi.DeviceModel) error { + if len(deviceList) == 0 || len(deviceModelList) == 0 { + return ErrEmptyData + } + + for i := range deviceModelList { + model := deviceModelList[i] + cur := parse.GetDeviceModelFromGrpc(model) + d.models[model.Name] = cur + } + + for i := range deviceList { + device := deviceList[i] + commonModel := d.models[device.Spec.DeviceModelReference] + protocol, err := parse.BuildProtocolFromGrpc(device) + if err != nil { + return err + } + instance, err := parse.GetDeviceFromGrpc(device, &commonModel) + if err != nil { + return err + } + instance.PProtocol = protocol + + cur := new(driver.CustomizedDev) + cur.Instance = *instance + d.devices[instance.ID] = cur + } + + return nil +} + +// UpdateDev stop old device, then update and start new device +func (d *DevPanel) UpdateDev(model *common.DeviceModel, device *common.DeviceInstance) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + + if oldDevice, ok := d.devices[device.ID]; ok { + err := d.stopDev(oldDevice, device.ID) + if err != nil { + klog.Error(err) + } + } + // start new device + d.devices[device.ID] = new(driver.CustomizedDev) + d.devices[device.ID].Instance = *device + d.models[model.ID] = *model + + ctx, cancelFunc := context.WithCancel(context.Background()) + d.deviceMuxs[device.ID] = cancelFunc + d.wg.Add(1) + go d.start(ctx, d.devices[device.ID]) +} + +// UpdateDevTwins update device's twins +func (d *DevPanel) UpdateDevTwins(deviceID string, twins []common.Twin) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + dev.Instance.Twins = twins + model := d.models[dev.Instance.Model] + d.UpdateDev(&model, &dev.Instance) + + return nil +} + +// DealDeviceTwinGet get device's twin data +func (d *DevPanel) DealDeviceTwinGet(deviceID string, twinName string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return nil, fmt.Errorf("not found device %s", deviceID) + } + var res []parse.TwinResultResponse + for _, twin := range dev.Instance.Twins { + if twinName != "" && twin.PropertyName != twinName { + continue + } + payload, err := getTwinData(deviceID, twin, d.devices[deviceID]) + if err != nil { + return nil, err + } + item := parse.TwinResultResponse{ + PropertyName: twinName, + Payload: payload, + } + res = append(res, item) + } + return json.Marshal(res) +} + +// getTwinData get twin +func getTwinData(deviceID string, twin common.Twin, dev *driver.CustomizedDev) ([]byte, error) { + var visitorConfig driver.VisitorConfig + err := json.Unmarshal(twin.Property.Visitors, &visitorConfig) + if err != nil { + return nil, err + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + return nil, err + } + twinData := &TwinData{ + DeviceName: deviceID, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.ObservedDesired.Metadata.Type, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, deviceID), + } + return twinData.GetPayLoad() +} + +// GetDevice get device instance +func (d *DevPanel) GetDevice(deviceID string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + found, ok := d.devices[deviceID] + if !ok || found == nil { + return nil, fmt.Errorf("device %s not found", deviceID) + } + + // get the latest reported twin value + for i, twin := range found.Instance.Twins { + payload, err := getTwinData(deviceID, twin, found) + if err != nil { + return nil, err + } + found.Instance.Twins[i].Reported.Value = string(payload) + } + return found, nil +} + +// RemoveDevice remove device instance +func (d *DevPanel) RemoveDevice(deviceID string) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev := d.devices[deviceID] + delete(d.devices, deviceID) + err := d.stopDev(dev, deviceID) + if err != nil { + return err + } + return nil +} + +// stopDev stop device and goroutine +func (d *DevPanel) stopDev(dev *driver.CustomizedDev, id string) error { + cancelFunc, ok := d.deviceMuxs[id] + if !ok { + return fmt.Errorf("can not find device %s from device muxs", id) + } + + err := dev.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("stop device %s error: %v", id, err) + } + cancelFunc() + return nil +} + +// GetModel if the model exists, return device model +func (d *DevPanel) GetModel(modelID string) (common.DeviceModel, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + if model, ok := d.models[modelID]; ok { + return model, nil + } + return common.DeviceModel{}, fmt.Errorf("deviceModel %s not found", modelID) +} + +// UpdateModel update device model +func (d *DevPanel) UpdateModel(model *common.DeviceModel) { + d.serviceMutex.Lock() + d.models[model.ID] = *model + d.serviceMutex.Unlock() +} + +// RemoveModel remove device model +func (d *DevPanel) RemoveModel(modelID string) { + d.serviceMutex.Lock() + delete(d.models, modelID) + d.serviceMutex.Unlock() +} + +// GetTwinResult Get twin's value and data type +func (d *DevPanel) GetTwinResult(deviceID string, twinName string) (string, string, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return "", "", fmt.Errorf("not found device %s", deviceID) + } + var res string + var dataType string + for _, twin := range dev.Instance.Twins { + if twinName != "" && twin.PropertyName != twinName { + continue + } + var visitorConfig driver.VisitorConfig + err := json.Unmarshal(twin.Property.Visitors, &visitorConfig) + if err != nil { + return "", "", err + } + err = setVisitor(&visitorConfig, &twin, dev) + + data, err := dev.CustomizedClient.GetDeviceData(&visitorConfig) + if err != nil { + return "", "", fmt.Errorf("get device data failed: %v", err) + } + res, err = common.ConvertToString(data) + if err != nil { + return "", "", err + } + dataType = twin.Property.PProperty.DataType + } + return res, dataType, nil +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/device/devicetwin.go b/mappers/kubeedge-v1.17.0/onvif-mapper/device/devicetwin.go new file mode 100644 index 00000000..d694352c --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/device/devicetwin.go @@ -0,0 +1,108 @@ +package device + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/onvif/driver" + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1beta1" + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/grpcclient" + "github.com/kubeedge/mapper-framework/pkg/util/parse" +) + +type TwinData struct { + DeviceName string + DeviceNamespace string + Client *driver.CustomizedClient + Name string + Type string + ObservedDesired common.TwinProperty + VisitorConfig *driver.VisitorConfig + Topic string + Results interface{} + CollectCycle time.Duration + ReportToCloud bool +} + +func (td *TwinData) GetPayLoad() ([]byte, error) { + var err error + td.VisitorConfig.VisitorConfigData.DataType = strings.ToLower(td.VisitorConfig.VisitorConfigData.DataType) + td.Results, err = td.Client.GetDeviceData(td.VisitorConfig) + if err != nil { + return nil, fmt.Errorf("get device data failed: %v", err) + } + sData, err := common.ConvertToString(td.Results) + if err != nil { + klog.Errorf("Failed to convert %s %s value as string : %v", td.DeviceName, td.Name, err) + return nil, err + } + if len(sData) > 30 { + klog.V(4).Infof("Get %s : %s ,value is %s......", td.DeviceName, td.Name, sData[:30]) + } else { + klog.V(4).Infof("Get %s : %s ,value is %s", td.DeviceName, td.Name, sData) + } + var payload []byte + if strings.Contains(td.Topic, "$hw") { + if payload, err = common.CreateMessageTwinUpdate(td.Name, td.Type, sData, td.ObservedDesired.Value); err != nil { + return nil, fmt.Errorf("create message twin update failed: %v", err) + } + } else { + if payload, err = common.CreateMessageData(td.Name, td.Type, sData); err != nil { + return nil, fmt.Errorf("create message data failed: %v", err) + } + } + return payload, nil +} + +func (td *TwinData) PushToEdgeCore() { + payload, err := td.GetPayLoad() + if err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + var msg common.DeviceTwinUpdate + if err = json.Unmarshal(payload, &msg); err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + twins := parse.ConvMsgTwinToGrpc(msg.Twin) + + var rdsr = &dmiapi.ReportDeviceStatusRequest{ + DeviceName: td.DeviceName, + DeviceNamespace: td.DeviceNamespace, + ReportedDevice: &dmiapi.DeviceStatus{ + Twins: twins, + //State: "OK", + }, + } + + if err := grpcclient.ReportDeviceStatus(rdsr); err != nil { + klog.Errorf("fail to report device status of %s with err: %+v", rdsr.DeviceName, err) + } +} + +func (td *TwinData) Run(ctx context.Context) { + if !td.ReportToCloud { + return + } + if td.CollectCycle == 0 { + td.CollectCycle = common.DefaultCollectCycle + } + ticker := time.NewTicker(td.CollectCycle) + for { + select { + case <-ticker.C: + td.PushToEdgeCore() + case <-ctx.Done(): + return + } + } +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/driver/devicetype.go b/mappers/kubeedge-v1.17.0/onvif-mapper/driver/devicetype.go new file mode 100644 index 00000000..553022a8 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/driver/devicetype.go @@ -0,0 +1,46 @@ +package driver + +import ( + "sync" + + goonvif "github.com/use-go/onvif" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +// CustomizedDev is the customized device configuration and client information. +type CustomizedDev struct { + Instance common.DeviceInstance + CustomizedClient *CustomizedClient +} + +type CustomizedClient struct { + deviceMutex sync.Mutex + ProtocolConfig + dev *goonvif.Device //Save the device controller created by the device driver +} + +type ProtocolConfig struct { + ProtocolName string `json:"protocolName"` + ConfigData `json:"configData"` +} + +type ConfigData struct { + URL string `json:"url,omitempty"` // the url of onvif device + UserName string `json:"userName"` // the username of onvif device + Password string `json:"password"` // the password of device user +} + +type VisitorConfig struct { + ProtocolName string `json:"protocolName"` + VisitorConfigData `json:"configData"` +} + +type VisitorConfigData struct { + DataType string `json:"dataType"` + Format string `json:"format"` // datatype of device data + OutputDir string `json:"outputDir"` // the url of onvif device + FrameCount int `json:"frameCount"` // the username of onvif device + FrameInterval int `json:"frameInterval"` // the password of device user + VideoNum int `json:"videoNum"` // number of videos collected +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/driver/driver.go b/mappers/kubeedge-v1.17.0/onvif-mapper/driver/driver.go new file mode 100644 index 00000000..15de4725 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/driver/driver.go @@ -0,0 +1,101 @@ +package driver + +import ( + "context" + "errors" + "fmt" + "net/http" + "os" + "sync" + + goonvif "github.com/use-go/onvif" + "github.com/use-go/onvif/media" + mediasdk "github.com/use-go/onvif/sdk/media" + "k8s.io/klog/v2" +) + +func NewClient(protocol ProtocolConfig) (*CustomizedClient, error) { + client := &CustomizedClient{ + ProtocolConfig: protocol, + deviceMutex: sync.Mutex{}, + // TODO initialize the variables you added + } + return client, nil +} + +func (c *CustomizedClient) InitDevice() error { + // you can use c.ProtocolConfig + // Through this function, the onvif device variable can be generated by ProtocolConfig. + password, err := readPassword(c.ProtocolConfig.Password) // get device password + if err != nil { + return err + } + + deviceParams := goonvif.DeviceParams{ + Xaddr: c.ProtocolConfig.URL, + Username: c.ProtocolConfig.UserName, + Password: password, + HttpClient: new(http.Client), + } + + dev, err := goonvif.NewDevice(deviceParams) //build onvif device according to use-go/onvif + if err != nil { + return err + } + c.dev = dev + return nil + +} + +func (c *CustomizedClient) GetDeviceData(visitor *VisitorConfig) (interface{}, error) { + // you can use c.ProtocolConfig and visitor + // Through this function, the authentication file and RTSP URI of the onvif device will be returned. + ctx := context.Background() + if c.dev == nil { + return nil, fmt.Errorf("device does not exist") + } + + // get onvif device ProfilesToken + getProfiles := media.GetProfiles{} + profilesResp, err := mediasdk.Call_GetProfiles(ctx, c.dev, getProfiles) + if err != nil { + klog.Error(err) + return nil, err + } else if len(profilesResp.Profiles) == 0 { + return "", fmt.Errorf("no onvif profiles found") + } + token := profilesResp.Profiles[0].Token + + // get onvif device RTSP Stream Uri with ProfilesToken + getStreamUri := media.GetStreamUri{ + ProfileToken: token, + } + uriResp, err := mediasdk.Call_GetStreamUri(ctx, c.dev, getStreamUri) + if err != nil { + return "", fmt.Errorf("GetStreamURI failed: %v", err.Error()) + } + return string(uriResp.MediaUri.Uri), nil +} + +func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *VisitorConfig) error { + // TODO: set device's data, such as controlling the position and posture of the camera, will be added in subsequent versions + // you can use c.ProtocolConfig and visitor + klog.V(2).Infof("still wait to complete setdevicedata feature") + return nil +} + +func (c *CustomizedClient) StopDevice() error { + // stop device + // you can use c.ProtocolConfig + klog.Info("Stop onvif device successfully") + return nil +} + +func readPassword(filename string) (string, error) { + b, err := os.ReadFile(filename) + if err != nil { + return "", errors.New("Failed to load certificate") + } + // Remove the last character '\n' + return string(b[:len(b)-1]), nil +} diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/go.mod b/mappers/kubeedge-v1.17.0/onvif-mapper/go.mod new file mode 100644 index 00000000..418d299f --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/go.mod @@ -0,0 +1,46 @@ +module github.com/kubeedge/onvif + +go 1.20 + +require ( + github.com/eclipse/paho.mqtt.golang v1.2.0 + github.com/go-redis/redis/v8 v8.11.5 + github.com/go-sql-driver/mysql v1.7.1 + github.com/golang/protobuf v1.5.3 // indirect + github.com/gorilla/mux v1.8.0 // indirect + github.com/influxdata/influxdb-client-go/v2 v2.13.0 + github.com/kubeedge/kubeedge v1.17.0-beta.0 + github.com/kubeedge/mapper-framework v1.16.1-0.20240424015840-62ddff0c5c00 + github.com/sailorvii/goav v0.1.4 + github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace // indirect + github.com/taosdata/driver-go/v3 v3.5.1 + github.com/use-go/onvif v0.0.9 + golang.org/x/net v0.19.0 // indirect + google.golang.org/grpc v1.56.3 // indirect + google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + k8s.io/klog/v2 v2.100.1 +) + +require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/avast/retry-go v3.0.0+incompatible // indirect + github.com/beevik/etree v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/elgs/gostrgen v0.0.0-20161222160715-9d61ae07eeae // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/gofrs/uuid v3.2.0+incompatible // indirect + github.com/google/uuid v1.3.1 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/juju/errors v0.0.0-20220331221717-b38fca44723b // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect + github.com/rs/zerolog v1.26.1 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect +) diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/go.sum b/mappers/kubeedge-v1.17.0/onvif-mapper/go.sum new file mode 100644 index 00000000..32023998 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/go.sum @@ -0,0 +1,165 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= +github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs= +github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/elgs/gostrgen v0.0.0-20161222160715-9d61ae07eeae h1:3KvK2DmA7TxQ6PZ2f0rWbdqjgJhRcqgbY70bBeE4clI= +github.com/elgs/gostrgen v0.0.0-20161222160715-9d61ae07eeae/go.mod h1:wruC5r2gHdr/JIUs5Rr1V45YtsAzKXZxAnn/5rPC97g= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.7.0/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gosuri/uilive v0.0.0-20170323041506-ac356e6e42cd/go.mod h1:qkLSc0A5EXSP6B04TrN4oQoxqFI7A8XvoXSlJi8cwk8= +github.com/gosuri/uiprogress v0.0.0-20170224063937-d0567a9d84a1/go.mod h1:C1RTYn4Sc7iEyf6j8ft5dyoZ4212h8G1ol9QQluh5+0= +github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xGjpABD7j65pI8kFphDM= +github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0sr5D8LolXHqAAOfPw9v/RIRHl4= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/juju/errors v0.0.0-20220331221717-b38fca44723b h1:AxFeSQJfcm2O3ov1wqAkTKYFsnMw2g1B4PkYujfAdkY= +github.com/juju/errors v0.0.0-20220331221717-b38fca44723b/go.mod h1:jMGj9DWF/qbo91ODcfJq6z/RYc3FX3taCBZMCcpI4Ls= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kubeedge/kubeedge v1.17.0-beta.0 h1:BmVSsdQhlBpEu/9bTs91gpnYnQnxEvIJEhCSN9a8LKk= +github.com/kubeedge/kubeedge v1.17.0-beta.0/go.mod h1:4rERR0DnbDONgQPsSG+gu53kBZNXXwj3kVs8g8jfCXM= +github.com/kubeedge/mapper-framework v1.16.1-0.20240424015840-62ddff0c5c00 h1:d6UJUNuSL9sYzHaZwwLMZzbucdI1q+jdWQPqweX2vAc= +github.com/kubeedge/mapper-framework v1.16.1-0.20240424015840-62ddff0c5c00/go.mod h1:mYEJop0LCKuErFPrpKiCTGsqwijDZSr4REJTJCTGl5U= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.27.7 h1:fVih9JD6ogIiHUN6ePK7HJidyEDpWGVB5mzM7cWNXoU= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= +github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= +github.com/sailorvii/goav v0.1.4 h1:4FwbikqIxx26dcHlZ8195WSPQSWbNnvRvTSgRTPgh2w= +github.com/sailorvii/goav v0.1.4/go.mod h1:upppsyLr1RLWDZ0+U3RYYGTv9NVwCjz14j/zzxRM018= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace h1:9PNP1jnUjRhfmGMlkXHjYPishpcw4jpSt/V/xYY3FMA= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/taosdata/driver-go/v3 v3.5.1 h1:ln8gLJ6HR6gHU6dodmOa9utUjPUpAcdIplh6arFO26Q= +github.com/taosdata/driver-go/v3 v3.5.1/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/use-go/onvif v0.0.9 h1:t6y5uN1LGrdSpNDiy4Vn9HazYgVxdWUBfdBb5cApR7g= +github.com/use-go/onvif v0.0.9/go.mod h1:l6K5BgFel7AARm7a9oVj5uvTdwvgttudcP8pUxUf5go= +github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= +google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= +k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/hack/make-rules/mapper.sh b/mappers/kubeedge-v1.17.0/onvif-mapper/hack/make-rules/mapper.sh new file mode 100644 index 00000000..4cba4250 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/hack/make-rules/mapper.sh @@ -0,0 +1,159 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset +set -o pipefail + +CURR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd -P)" +ROOT_DIR="$(cd "${CURR_DIR}/../.." && pwd -P)" +source "${ROOT_DIR}/hack/lib/init.sh" + +mkdir -p "${CURR_DIR}/bin" +mkdir -p "${CURR_DIR}/dist" + +function mod() { + [[ "${2:-}" != "only" ]] + local mapper="${1}" + + # the mapper is sharing the vendor with root + pushd "${ROOT_DIR}" >/dev/null || exist 1 + echo "downloading dependencies for mapper ${mapper}..." + + if [[ "$(go env GO111MODULE)" == "off" ]]; then + echo "go mod has been disabled by GO111MODULE=off" + else + echo "tidying" + go mod tidy + echo "vending" + go mod vendor + fi + + echo "...done" + popd >/dev/null || return +} + +function lint() { + [[ "${2:-}" != "only" ]] && mod "$@" + local mapper="${1}" + + echo "fmt and linting mapper ${mapper}..." + + gofmt -s -w "${CURR_DIR}/" + golangci-lint run "${CURR_DIR}/..." + + echo "...done" +} + +function build() { + [[ "${2:-}" != "only" ]] && lint "$@" + local mapper="${1}" + + local flags=" -w -s " + local ext_flags=" -extldflags '-static' " + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + echo "building ${platform}" + + local os_arch + IFS="/" read -r -a os_arch <<<"${platform}" + local os=${os_arch[0]} + local arch=${os_arch[1]} + GOOS=${os} GOARCH=${arch} CGO_ENABLED=0 go build \ + -ldflags "${flags} ${ext_flags}" \ + -o "${CURR_DIR}/bin/${mapper}_${os}_${arch}" \ + "${CURR_DIR}/cmd/main.go" + + cp ${CURR_DIR}/bin/${mapper}_${os}_${arch} ${CURR_DIR}/bin/${mapper} + echo "...done" +} + +function package() { + [[ "${2:-}" != "only" ]] && build "$@" + local mapper="${1}" + + echo "packaging mapper ${mapper}..." + + local image_name="${mapper}-mapper" + local tag=v1.0 + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + pushd "${CURR_DIR}" >/dev/null 2>&1 + if [[ "${platform}" =~ darwin/* ]]; then + echo "package into Darwin OS image is unavailable, please use CROSS=true env to containerize multiple arch images or use OS=linux ARCH=amd64 env to containerize linux/amd64 image" + fi + + local image_tag="${image_name}:${tag}-${platform////-}" + echo "packaging ${image_tag}" + sudo docker build \ + --platform "${platform}" \ + -t "${image_tag}" . + popd >/dev/null 2>&1 + + echo "...done" +} + +function clean() { + local mapper="${1}" + + echo "cleanup mapper ${mapper}..." + + rm -rf "${CURR_DIR}/bin/*" + + echo "...done" +} + +function entry() { + local mapper="${1:-}" + shift 1 + + local stages="${1:-build}" + shift $(($# > 0 ? 1 : 0)) + + IFS="," read -r -a stages <<<"${stages}" + local commands=$* + if [[ ${#stages[@]} -ne 1 ]]; then + commands="only" + fi + + for stage in "${stages[@]}"; do + echo "# make mapper ${mapper} ${stage} ${commands}" + case ${stage} in + m | mod) mod "${mapper}" "${commands}" ;; + l | lint) lint "${mapper}" "${commands}" ;; + b | build) build "${mapper}" "${commands}" ;; + p | pkg | package) package "${mapper}" "${commands}" ;; + t | test) test "${mapper}" "${commands}" ;; + c | clean) clean "${mapper}" "${commands}" ;; + *) echo "unknown action '${stage}', select from mod,lint,build,test,clean" ;; + esac + done +} + +echo $@ +entry "$@" \ No newline at end of file diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/resource/configmap.yaml b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/configmap.yaml new file mode 100644 index 00000000..e05e4c01 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/configmap.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: cm-mapper +data: + configData: | + grpc_server: + socket_path: /etc/kubeedge/onvif.sock + common: + name: Onvif-mapper + version: v1.13.0 + api_version: v1.0.0 + protocol: onvif # TODO add your protocol name + address: 127.0.0.1 + edgecore_sock: /etc/kubeedge/dmi.sock diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/resource/deployment.yaml b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/deployment.yaml new file mode 100644 index 00000000..5b6699e0 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/deployment.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mapper-test + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: demo + template: + metadata: + labels: + app: demo + spec: + nodeName: edge-node # replace with your edge node name + containers: + - name: demo + volumeMounts: # Required, mapper need to communicate with grpcclient and get the config + - name: test-volume + mountPath: /etc/kubeedge + - name: config + mountPath: /tmp/config + - name: secret + mountPath: /etc/secret + readOnly: true + image: docker.io/library/onvif-mapper:v1.0.0 # Replace with your mapper image name + imagePullPolicy: IfNotPresent + resources: + limits: + cpu: 300m + memory: 500Mi + requests: + cpu: 100m + memory: 100Mi + command: [ "/bin/sh","-c" ] + args: [ "/kubeedge/main --config-file /tmp/config/config.yaml --v 4" ] + volumes: + - name: test-volume + hostPath: + path: /etc/kubeedge + type: Directory + - name: config + configMap: + name: cm-mapper + items: + - key: configData + path: config.yaml + - name: secret + secret: + secretName: mysecret diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-instance.yaml b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-instance.yaml new file mode 100644 index 00000000..273acaad --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-instance.yaml @@ -0,0 +1,45 @@ +apiVersion: devices.kubeedge.io/v1beta1 +kind: Device +metadata: + name: onvif-device-01 + namespace: default +spec: + deviceModelRef: + name: onvif-model # need to be the same as the model name defined in onvif-model.yaml + protocol: + protocolName: onvif + configData: + url: 192.168.168.64:80 # Replace it with the address of your own onvif camera + userName: admin # Replace it with the username of your own onvif camera + password: /etc/secret/password # Fill in the fields according to your secret.yaml + nodeName: edge-node # Replace it with your edge node name + properties: + - name: getURI + visitors: + protocolName: onvif + configData: + url: 192.168.168.64:80 + userName: admin + password: /etc/secret/password + dataType: string + reportCycle: 10000000000 # Data publish frequency, default is once every 10 seconds + collectCycle: 10000000000 # Data reporting frequency to the cloud, default is once every 10 seconds + reportToCloud: true + - name: saveFrame + visitors: + protocolName: onvif + configData: + format: jpg + outputDir: /tmp/case/ + frameCount: 30 + frameInterval: 1000000 + dataType: stream + - name: saveVideo + visitors: + protocolName: onvif + configData: + frameCount: 1000 + format: mp4 + outputDir: /tmp/case/ + videoNum: 2 + dataType: stream diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-model.yaml b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-model.yaml new file mode 100644 index 00000000..46767178 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/onvifdevice-model.yaml @@ -0,0 +1,20 @@ +apiVersion: devices.kubeedge.io/v1beta1 +kind: DeviceModel +metadata: + name: onvif-model + namespace: default +spec: + protocol: onvif + properties: + - name: getURI + description: get camera uri + type: STRING + accessMode: ReadOnly + - name: saveFrame + description: get camera uri + type: STREAM + accessMode: ReadOnly + - name: saveVideo + description: get camera uri + type: STREAM + accessMode: ReadOnly diff --git a/mappers/kubeedge-v1.17.0/onvif-mapper/resource/secret.yaml b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/secret.yaml new file mode 100644 index 00000000..b7f92344 --- /dev/null +++ b/mappers/kubeedge-v1.17.0/onvif-mapper/resource/secret.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Secret +metadata: + name: mysecret +type: Opaque +data: + password: YOUR_PASSWORD # Fill in based on your camera user password, base64 encryption is required