diff --git a/go.mod b/go.mod index f9411449..63c25511 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,8 @@ require ( golang.org/x/net v0.0.0-20220225172249-27dd8689420f google.golang.org/grpc v1.47.0 gopkg.in/yaml.v2 v2.4.0 + gorm.io/driver/sqlite v1.4.4 + gorm.io/gorm v1.24.0 k8s.io/api v0.24.1 k8s.io/apimachinery v0.24.1 k8s.io/klog v1.0.0 @@ -52,12 +54,15 @@ require ( github.com/gorilla/websocket v1.4.2 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kubeedge/viaduct v0.0.0 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-colorable v0.0.9 // indirect github.com/mattn/go-isatty v0.0.12 // indirect + github.com/mattn/go-sqlite3 v1.14.15 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/mgutz/logxi v0.0.0-20161027140823-aebf8a7d67ab // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index ecfb1768..4ff58204 100644 --- a/go.sum +++ b/go.sum @@ -703,6 +703,11 @@ github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5/go.mod h1:DM4VvS github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -805,6 +810,8 @@ github.com/mattn/go-shellwords v1.0.11/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lL github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= +github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI= @@ -1745,6 +1752,10 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.4.4 h1:gIufGoR0dQzjkyqDyYSCvsYR6fba1Gw5YKDqKeChxFc= +gorm.io/driver/sqlite v1.4.4/go.mod h1:0Aq3iPO+v9ZKbcdiz8gLWRw5VOPcBOPUQJFLq5e2ecI= +gorm.io/gorm v1.24.0 h1:j/CoiSm6xpRpmzbFJsQHYj+I8bGYWLXVHeYEyyKlF74= +gorm.io/gorm v1.24.0/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/mappers/windows-virtual-exec/Makefile b/mappers/windows-virtual-exec/Makefile new file mode 100644 index 00000000..5d0bf8ef --- /dev/null +++ b/mappers/windows-virtual-exec/Makefile @@ -0,0 +1,33 @@ +SHELL := /bin/bash + +curr_dir := $(patsubst %/,%,$(dir $(abspath $(lastword $(MAKEFILE_LIST))))) +rest_args := $(wordlist 2, $(words $(MAKECMDGOALS)), $(MAKECMDGOALS)) +$(eval $(rest_args):;@:) + +help: + # + # Usage: + # make generate : generate a mapper based on a template. + # make mapper {mapper-name} : execute mapper building process. + # + # Actions: + # - mod, m : download code dependencies. + # - lint, l : verify code via go fmt and `golangci-lint`. + # - build, b : compile code. + # - package, p : package docker image. + # - clean, c : clean output binary. + # + # Parameters: + # ARM : true or undefined + # ARM64 : true or undefined + # + # Example: + # - make mapper windows-virtual-exec : execute `build` "windows-virtual-exec" mapper. + @echo + +make_rules := $(shell ls $(curr_dir)/hack/make-rules | sed 's/.sh//g') +$(make_rules): + @$(curr_dir)/hack/make-rules/$@.sh $(rest_args) + +.DEFAULT_GOAL := help +.PHONY: $(make_rules) build test package \ No newline at end of file diff --git a/mappers/windows-virtual-exec/README.md b/mappers/windows-virtual-exec/README.md new file mode 100644 index 00000000..a771273c --- /dev/null +++ b/mappers/windows-virtual-exec/README.md @@ -0,0 +1,114 @@ +# Windows Execution Virtual Device + +## Overall design + +Overall, based on the modularity of Kubeedge, this project utilizes the independent data communication capability provided by DeviceTwin module to expand the usage scenarios of Kubeedge and adapt the problems encountered by the actual landing of Kubeedge in the automotive industry: + +1. In order to explore the possibility of Kubeedge cloud-native solution in the automotive industry, Kubeedge is introduced for multi-region- and multi-state edge node management, so that automotive software testing can be automated and the testing environment can go to the cloud. + +2. With the increase of automotive testing and simulation needs, some traditional industrial software relies on native Windows platform, even though Kubeedge has provided support for Windows Server with container capability, upgrading the original Windows Enterprise Edition machine to Windows Server is unacceptable to the staff and cost department. + +3. Automated testing uses third-party simulation and debugging software based on the Windows platform. In order to analyze and model the test data, it is necessary to upload the test process and result data from the test host computer to the cloud. + +For this reason, the following design is made: + +![overall-design](./static/1-simple.png) + +The overall design is shown in the figure, the software test host, regardless of Windows or Linux, is unified and managed by Kubeedge as a node. Users use K8s API to issue commands to operate it and create corresponding test tasks, and the DeviceTwin module of Kubeedge provides additional data communication capability to realize the triggering and reporting of non-container environment Windows nodes. + +## CRDs + +Non-containerized Windows environments execute automotive testing tasks that rely on third-party closed-source tools in a bare metal form, where the environment is relatively static and the tasks performed are variable. Therefore, we believe that Windows nodes need a unified executor to execute test tasks (scripts) issued by the cloud, scheduling, running commands and returning data to the cloud. + +In this project, automotive software testing has the following characteristics: + +1. The execution environment is decentralized and fixed, and the automotive software runs on a collection of hardware called a “pedestal”, which is physically fixed and configured with a PC to access it as a “host computer”. The dais and the host computer are dispersed across the country and around the world, depending on the location of the business unit. +2. Test tasks are dynamically plugged in and out, and their operational status needs to be monitored to prevent the desktop from failing. 3. +3. During the execution of tasks, test logs are constantly generated and need to be aggregated and analyzed in the cloud. + +Kubeedge has designed DeviceTwin to support device and data management, and defined CRD resources for Device and DeviceModel. CRD declarative yaml design is simple for users to understand, easy to operate, and can be promoted to testers; Kubeedge CloudCore has implemented DeviceController, a controller for Device resource objects. Kubeedge CloudCore has already implemented DeviceController for Device resource.Considering a test task as a virtual device, according to the design of Kubeedge Device and the requirements of automotive software testing, the Device and DeviceModel are defined as follows: + +### Device Model + +```yaml +apiVersion: devices.kubeedge.io/v1alpha2 +kind: DeviceModel +metadata: + name: win-exec-model + namespace: default +spec: + properties: + - name: exec-file-content + description: custom content to execute + type: + string: + accessMode: ReadOnly + - name: exec-file-name + description: save custom content as filename + type: + string: + accessMode: ReadOnly + - name: exec-command + description: entrypoint of target + type: + string: + accessMode: ReadOnly + - name: status + description: status of current executation + type: + string: + accessMode: ReadWrite + - name: output + description: console output of current executation + type: + string: + accessMode: ReadWrite +``` + +### Device Instance + +```yaml +apiVersion: devices.kubeedge.io/v1alpha2 +kind: Device +metadata: + name: exec-instance-001 + labels: + description: "test" + model: win-exec-model +spec: + deviceModelRef: + name: win-exec-model + nodeSelector: + nodeSelectorTerms: + - matchExpressions: + - key: '' + operator: In + values: + - win11-node + protocol: + customizedProtocal: + protocolName: winExec +status: + twins: + - propertyName: status + - propertyName: output + - propertyName: exec-file-content + desired: + value: 'echo "hello,world"' + - propertyName: exec-file-name + desired: + value: 'run.bat' + - propertyName: exec-command + desired: + value: 'run.bat' +``` + +The Device Instance is defined based on the Device Model and instantiates the contents of exec-file-content, exec-file-name and exec-command. The status and output fields are reflected in the actual field after the execution is completed, and the result is reported to the cloud as the test task script. The result of the execution is reported to the cloud. + +## Mapper + +According to the docs/proposals/device-crd.md document, the lifecycle of an IoT device consists of six parts: registration, configuration, upgrade, monitoring, logout, and destruction. Among them, registration, upgrade, logout, and destruction are not considered in the device-crd.md document. Therefore, device is designed for device configuration and monitoring. Configuration is designed to reconfigure the device multiple times without adding new functionality, setting the expected expectation in the CRD, i.e., a declarative configuration of the behavior that the device should have. Monitoring is designed to constantly update the state of the device so that the cloud can be informed of the state of the device in time for the next step. + +Considering the test task as an abstract device, configuring the device means setting the parameters of the test task; monitoring the status of the device means collecting and monitoring the execution logs of the test task. + +The execution of test tasks and reporting of results uses only a small portion of the mapper framework, which supports device creation, query and deletion, and does not require the introduction of complete framework logic. The operations are triggered as callback functions, controlled by the cloud, and in addition to the cloud commands, messages are actively pushed to the local MQTT to trigger the next operation. mapper loads the local database into memory at startup, and then triggers the execution of the task and the reporting of the status based on the type of message. diff --git a/mappers/windows-virtual-exec/cmd/main.go b/mappers/windows-virtual-exec/cmd/main.go new file mode 100644 index 00000000..53926053 --- /dev/null +++ b/mappers/windows-virtual-exec/cmd/main.go @@ -0,0 +1,64 @@ +/* +Copyright 2024 The KubeEdge Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "os" + "os/signal" + + klog "k8s.io/klog/v2" + + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/config" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/model" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/mqtt" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/store" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/missions" +) + +func main() { + var err error + var c config.Config + + klog.InitFlags(nil) + defer klog.Flush() + + if err = c.Parse(); err != nil { + klog.Fatal(err) + } + + store.InitDB("internal.db") + if err := store.DB.AutoMigrate(&model.Mission{}); err != nil { + klog.Errorf("Failed to init db: %v", err) + } + + if err := mqtt.InitClient( + c.Mqtt.ServerAddress, + c.Mqtt.Username, + c.Mqtt.Password, + c.Mqtt.Cert, + c.Mqtt.PrivateKey, + ); err != nil { + klog.Fatal(err) + } + + missions.InitCallback(c.NodeName) + klog.Info("Start to subscribe") + missions.InitMissions(c.NodeName) + + // waiting kill signal + var ch = make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + <-ch + klog.Info("Exit") +} diff --git a/mappers/windows-virtual-exec/hack/make-rules/mapper.sh b/mappers/windows-virtual-exec/hack/make-rules/mapper.sh new file mode 100755 index 00000000..27a774c8 --- /dev/null +++ b/mappers/windows-virtual-exec/hack/make-rules/mapper.sh @@ -0,0 +1,125 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset +set -o pipefail + +CURR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd -P)" +ROOT_DIR="$(cd "${CURR_DIR}/../.." && pwd -P)" +source "${ROOT_DIR}/hack/lib/init.sh" + +mkdir -p "${CURR_DIR}/bin" +mkdir -p "${CURR_DIR}/dist" + +function mod() { + [[ "${2:-}" != "only" ]] + local mapper="${1}" + + # the mapper is sharing the vendor with root + pushd "${ROOT_DIR}" >/dev/null || exist 1 + echo "downloading dependencies for mapper ${mapper}..." + + if [[ "$(go env GO111MODULE)" == "off" ]]; then + echo "go mod has been disabled by GO111MODULE=off" + else + echo "tidying" + go mod tidy + echo "vending" + go mod vendor + fi + + echo "...done" + popd >/dev/null || return +} + +function lint() { + [[ "${2:-}" != "only" ]] && mod "$@" + local mapper="${1}" + + echo "fmt and linting mapper ${mapper}..." + + gofmt -s -w "${CURR_DIR}/" + golangci-lint run "${CURR_DIR}/..." + + echo "...done" +} + +function build() { + [[ "${2:-}" != "only" ]] && lint "$@" + local mapper="${1}" + + local flags=" -w -s " + local ext_flags=" -extldflags '-static' " + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for windows/arm" + platform=("windows/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for windows/arm64" + platform=("windows/arm64") + else + local os="windows" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + echo "building ${platform}" + + local os_arch + IFS="/" read -r -a os_arch <<<"${platform}" + local os=${os_arch[0]} + local arch=${os_arch[1]} + GOOS=${os} GOARCH=${arch} CGO_ENABLED=0 go build \ + -ldflags "${flags} ${ext_flags}" \ + -o "${CURR_DIR}/bin/${mapper}_${os}_${arch}.exe" \ + "${CURR_DIR}/cmd/main.go" + + echo "...done" +} + +function package() { + echo "docker package not support for windows virtual exec driver" +} + +function clean() { + local mapper="${1}" + + echo "cleanup mapper ${mapper}..." + + rm -rf "${CURR_DIR}/bin/*" + + echo "...done" +} + +function entry() { + local mapper="${1:-}" + shift 1 + + local stages="${1:-build}" + shift $(($# > 0 ? 1 : 0)) + + IFS="," read -r -a stages <<<"${stages}" + local commands=$* + if [[ ${#stages[@]} -ne 1 ]]; then + commands="only" + fi + + for stage in "${stages[@]}"; do + echo "# make mapper ${mapper} ${stage} ${commands}" + case ${stage} in + m | mod) mod "${mapper}" "${commands}" ;; + l | lint) lint "${mapper}" "${commands}" ;; + b | build) build "${mapper}" "${commands}" ;; + p | pkg | package) package "${mapper}" "${commands}" ;; + t | test) test "${mapper}" "${commands}" ;; + c | clean) clean "${mapper}" "${commands}" ;; + *) echo "unknown action '${stage}', select from mod,lint,build,test,clean" ;; + esac + done +} + +echo $@ +entry "$@" \ No newline at end of file diff --git a/mappers/windows-virtual-exec/internal/config/config.go b/mappers/windows-virtual-exec/internal/config/config.go new file mode 100644 index 00000000..70c8ef31 --- /dev/null +++ b/mappers/windows-virtual-exec/internal/config/config.go @@ -0,0 +1,82 @@ +/* +Copyright 2024 The KubeEdge Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "errors" + "os" + + "github.com/spf13/pflag" + yaml "gopkg.in/yaml.v2" + klog "k8s.io/klog/v2" +) + +// Config is the Exec mapper configuration. +type Config struct { + Mqtt Mqtt `yaml:"mqtt,omitempty"` + NodeName string `yaml:"nodeName"` +} + +// Mqtt is the Mqtt configuration. +type Mqtt struct { + ServerAddress string `yaml:"server,omitempty"` + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` + Cert string `yaml:"certification,omitempty"` + PrivateKey string `yaml:"privatekey,omitempty"` +} + +// ErrConfigCert error of certification configuration. +var ErrConfigCert = errors.New("Both certification and private key must be provided") + +var defaultConfigFile = "./config.yaml" + +// Parse parse the configuration file. If failed, return error. +func (c *Config) Parse() error { + var level klog.Level + var loglevel string + var configFile string + + pflag.StringVar(&loglevel, "v", "5", "log level") + pflag.StringVar(&configFile, "config-file", defaultConfigFile, "Config file name") + pflag.Parse() + cf, err := os.ReadFile(configFile) + if err != nil { + return err + } + if err = yaml.Unmarshal(cf, c); err != nil { + return err + } + if err = level.Set(loglevel); err != nil { + return err + } + + return c.parseFlags() +} + +// parseFlags parse flags. Certification and Private key must be provided at the same time. +func (c *Config) parseFlags() error { + pflag.StringVar(&c.Mqtt.ServerAddress, "mqtt-address", c.Mqtt.ServerAddress, "MQTT broker address") + pflag.StringVar(&c.Mqtt.Username, "mqtt-username", c.Mqtt.Username, "username") + pflag.StringVar(&c.Mqtt.Password, "mqtt-password", c.Mqtt.Password, "password") + pflag.StringVar(&c.Mqtt.Cert, "mqtt-certification", c.Mqtt.Cert, "certification file path") + pflag.StringVar(&c.Mqtt.PrivateKey, "mqtt-priviatekey", c.Mqtt.PrivateKey, "private key file path") + pflag.Parse() + + if (c.Mqtt.Cert != "" && c.Mqtt.PrivateKey == "") || + (c.Mqtt.Cert == "" && c.Mqtt.PrivateKey != "") { + return ErrConfigCert + } + return nil +} diff --git a/mappers/windows-virtual-exec/internal/core/model/model.go b/mappers/windows-virtual-exec/internal/core/model/model.go new file mode 100644 index 00000000..5c1af330 --- /dev/null +++ b/mappers/windows-virtual-exec/internal/core/model/model.go @@ -0,0 +1,11 @@ +package model + +type Mission struct { + UniqueName string `gorm:"primaryKey"` + Command string + FileContent string + FileName string + WorkingDirectory string + Status string + Output string +} diff --git a/mappers/windows-virtual-exec/internal/core/mqtt/mqtt.go b/mappers/windows-virtual-exec/internal/core/mqtt/mqtt.go new file mode 100644 index 00000000..b9e7b277 --- /dev/null +++ b/mappers/windows-virtual-exec/internal/core/mqtt/mqtt.go @@ -0,0 +1,180 @@ +package mqtt + +import ( + "crypto/tls" + "encoding/json" + "regexp" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/dto" +) + +// Joint the topic like topic := fmt.Sprintf(TopicTwinUpdateDelta, deviceID) +const ( + TopicRevTwinUpdateDelta = "$hw/events/device/%s/twin/update/delta" + + TopicPubTwinUpdateRequest = "$hw/events/device/%s/twin/update" + TopicRecTwinUpdateResponse = "$hw/events/device/%s/twin/update/result" + + TopicPubTwinInfoRequest = "$hw/events/device/%s/twin/get" + TopicRecTwinInfoResponse = "$hw/events/device/%s/twin/get/result" + + TopicPubDeviceStateUpdateRequest = "$hw/events/device/%s/state/update" + TopicRecDeviceStateUpdateResponse = "$hw/events/device/%s/state/update/result" + + TopicPubNodeDeviceListRequest = "$hw/events/node/%s/membership/get" + TopicRecModeDeviceListResponse = "$hw/events/node/%s/membership/get/result" + + TopicRecNodeDeviceUpdate = "$hw/events/node/%s/membership/updated" +) + +var client *Client + +func GetClient() *Client { + return client +} + +// MqttClient is parameters for Mqtt client. +type Client struct { + Qos byte + Retained bool + IP string + User string + Passwd string + Cert string + PrivateKey string + Client mqtt.Client +} + +func InitClient(ip, user, passwd, cert, privkey string) error { + client = &Client{ + IP: ip, + User: user, + Passwd: passwd, + Cert: cert, + PrivateKey: privkey, + } + return client.Connect() +} + +// newTLSConfig new TLS configuration. +// Only one side check. Mqtt broker check the cert from client. +func newTLSConfig(certfile string, privateKey string) (*tls.Config, error) { + // Import client certificate/key pair + cert, err := tls.LoadX509KeyPair(certfile, privateKey) + if err != nil { + return nil, err + } + + // Create tls.Config with desired tls properties + return &tls.Config{ + // ClientAuth = whether to request cert from server. + // Since the server is set up for SSL, this happens + // anyways. + ClientAuth: tls.NoClientCert, + // ClientCAs = certs used to validate client cert. + ClientCAs: nil, + // InsecureSkipVerify = verify that cert contents + // match server. IP matches what is in cert etc. + InsecureSkipVerify: true, + // Certificates = list of certs client sends to server. + Certificates: []tls.Certificate{cert}, + }, nil +} + +// Connect connect to the Mqtt server. +func (mc *Client) Connect() error { + opts := mqtt.NewClientOptions().AddBroker(mc.IP).SetClientID("").SetCleanSession(true) + if mc.Cert != "" { + tlsConfig, err := newTLSConfig(mc.Cert, mc.PrivateKey) + if err != nil { + return err + } + opts.SetTLSConfig(tlsConfig) + } else { + opts.SetUsername(mc.User) + opts.SetPassword(mc.Passwd) + } + + mc.Client = mqtt.NewClient(opts) + // The token is used to indicate when actions have completed. + if tc := mc.Client.Connect(); tc.Wait() && tc.Error() != nil { + return tc.Error() + } + + mc.Qos = 0 // At most 1 time + mc.Retained = false // Not retained + return nil +} + +// Publish publish Mqtt message. +func (mc *Client) Publish(topic string, payload interface{}) error { + if tc := mc.Client.Publish(topic, mc.Qos, mc.Retained, payload); tc.Wait() && tc.Error() != nil { + return tc.Error() + } + return nil +} + +// Subscribe subscribe a Mqtt topic. +func (mc *Client) Subscribe(topic string, onMessage mqtt.MessageHandler) error { + if tc := mc.Client.Subscribe(topic, mc.Qos, onMessage); tc.Wait() && tc.Error() != nil { + return tc.Error() + } + return nil +} + +// getTimestamp get current timestamp. +func getTimestamp() int64 { + return time.Now().UnixNano() / 1e6 +} + +func CreateEmptyMessage() (msg []byte) { + var emptyMsg dto.BaseMessage + + emptyMsg.Timestamp = getTimestamp() + + msg, _ = json.Marshal(emptyMsg) + return +} + +// CreateMessageTwinUpdate create twin update message. +func CreateMessageTwinUpdate(info map[string]string) (msg []byte, err error) { + var updateMsg dto.DeviceTwinUpdate + + updateMsg.BaseMessage.Timestamp = getTimestamp() + updateMsg.Twin = map[string]*dto.MsgTwin{} + + for k := range info { + value := info[k] + updateMsg.Twin[k] = &dto.MsgTwin{} + updateMsg.Twin[k].Actual = &dto.TwinValue{Value: &value} + //updateMsg.Twin[k].Metadata = &dto.TypeMetadata{Type: "string"} + } + + msg, err = json.Marshal(updateMsg) + return +} + +// CreateMessageState create device status message. +func CreateMessageState(state string) (msg []byte, err error) { + var stateMsg dto.DeviceStatusUpdate + + stateMsg.BaseMessage.Timestamp = getTimestamp() + stateMsg.State = state + + msg, err = json.Marshal(stateMsg) + return +} + +// GetDeviceID extract the device ID from Mqtt topic. +func GetDeviceID(topic string) (id string) { + re := regexp.MustCompile(`hw/events/device/(.+?)/`) + return re.FindStringSubmatch(topic)[1] +} + +func GetNodeID(topic string) (id string) { + re := regexp.MustCompile(`hw/events/node/(.+?)/`) + return re.FindStringSubmatch(topic)[1] +} diff --git a/mappers/windows-virtual-exec/internal/core/store/db.go b/mappers/windows-virtual-exec/internal/core/store/db.go new file mode 100644 index 00000000..07de9641 --- /dev/null +++ b/mappers/windows-virtual-exec/internal/core/store/db.go @@ -0,0 +1,16 @@ +package store + +import ( + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +var DB *gorm.DB + +func InitDB(sqliteFile string) { + db, err := gorm.Open(sqlite.Open(sqliteFile), &gorm.Config{}) + if err != nil { + panic("failed to connect database") + } + DB = db +} diff --git a/mappers/windows-virtual-exec/internal/dto/mqtt.go b/mappers/windows-virtual-exec/internal/dto/mqtt.go new file mode 100644 index 00000000..c3f0c02f --- /dev/null +++ b/mappers/windows-virtual-exec/internal/dto/mqtt.go @@ -0,0 +1,107 @@ +package dto + +type BaseMessage struct { + EventID string `json:"event_id"` + Timestamp int64 `json:"timestamp"` +} + +type TwinValue struct { + Value *string `json:"value,omitempty"` + Metadata ValueMetadata `json:"metadata,omitempty"` +} + +type ValueMetadata struct { + Timestamp int64 `json:"timestamp,omitempty"` +} + +type TypeMetadata struct { + Type string `json:"type,omitempty"` +} + +type TwinVersion struct { + CloudVersion int64 `json:"cloud"` + EdgeVersion int64 `json:"edge"` +} + +type MsgTwin struct { + Expected *TwinValue `json:"expected,omitempty"` + Actual *TwinValue `json:"actual,omitempty"` + Optional *bool `json:"optional,omitempty"` + Metadata *TypeMetadata `json:"metadata,omitempty"` + ExpectedVersion *TwinVersion `json:"expected_version,omitempty"` + ActualVersion *TwinVersion `json:"actual_version,omitempty"` +} + +type DeviceTwinUpdate struct { + BaseMessage + Twin map[string]*MsgTwin `json:"twin"` +} + +type DeviceTwinResult struct { + BaseMessage + Twin map[string]*MsgTwin `json:"twin"` +} + +type DeviceTwinDelta struct { + BaseMessage + Twin map[string]*MsgTwin `json:"twin"` + Delta map[string]string `json:"delta"` +} + +type MsgAttr struct { + Value string `json:"value"` + Optional *bool `json:"optional,omitempty"` + Metadata *TypeMetadata `json:"metadata,omitempty"` +} + +type DeviceStatusUpdate struct { + BaseMessage + State string `json:"state,omitempty"` + Attributes map[string]*MsgAttr `json:"attributes"` +} + +type DeviceListUpdate struct { + BaseMessage + AddedDevices []DeviceInfo `json:"added_devices"` + RemovedDevices []DeviceInfo `json:"removed_devices"` +} + +type DeviceList struct { + BaseMessage + Devices []DeviceInfo `json:"devices"` +} + +type DeviceInfo struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` +} + +type MissionDelta struct { + BaseMessage + Twin struct { + ExecCommand *MsgTwin `json:"exec-command"` + ExecFileName *MsgTwin `json:"exec-file-name"` + ExecFileContent *MsgTwin `json:"exec-file-content"` + Output *MsgTwin `json:"output"` + Status *MsgTwin `json:"status"` + } `json:"twin"` + Delta struct { + ExecCommand string `json:"exec-command"` + ExecFileName string `json:"exec-file-name"` + ExecFileContent string `json:"exec-file-content"` + Output string `json:"output"` + Status string `json:"status"` + } `json:"delta"` +} + +type MissionTwins struct { + BaseMessage + Twin struct { + ExecCommand *MsgTwin `json:"exec-command"` + ExecFileName *MsgTwin `json:"exec-file-name"` + ExecFileContent *MsgTwin `json:"exec-file-content"` + Output *MsgTwin `json:"output"` + Status *MsgTwin `json:"status"` + } `json:"twin"` +} diff --git a/mappers/windows-virtual-exec/internal/missions/callback.go b/mappers/windows-virtual-exec/internal/missions/callback.go new file mode 100644 index 00000000..40866edb --- /dev/null +++ b/mappers/windows-virtual-exec/internal/missions/callback.go @@ -0,0 +1,196 @@ +/* +Copyright 2024 The KubeEdge Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package missions + +import ( + "encoding/json" + "fmt" + "path" + + mq "github.com/eclipse/paho.mqtt.golang" + klog "k8s.io/klog/v2" + + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/mqtt" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/dto" +) + +func InitCallback(nodeName string) { + mqttClient := mqtt.GetClient() + err := mqttClient.Subscribe(fmt.Sprintf(mqtt.TopicRecNodeDeviceUpdate, nodeName), onMembershipUpdateMessage) + if err != nil { + klog.Error("Subscribe error: ", err) + } else { + klog.Info("Subscribe topic: ", fmt.Sprintf(mqtt.TopicRecNodeDeviceUpdate, nodeName)) + } + err = mqttClient.Subscribe(fmt.Sprintf(mqtt.TopicRecModeDeviceListResponse, nodeName), onMembershipListMessage) + if err != nil { + klog.Error("Subscribe error: ", err) + } else { + klog.Info("Subscribe topic: ", fmt.Sprintf(mqtt.TopicRecModeDeviceListResponse, nodeName)) + } + err = mqttClient.Subscribe(fmt.Sprintf(mqtt.TopicRevTwinUpdateDelta, "+"), onTwinDelta) + if err != nil { + klog.Error("Subscribe error: ", err) + } else { + klog.Info("Subscribe topic: ", fmt.Sprintf(mqtt.TopicRevTwinUpdateDelta, "+")) + } + err = mqttClient.Subscribe(fmt.Sprintf(mqtt.TopicRecTwinInfoResponse, "+"), onTwinInfo) + if err != nil { + klog.Error("Subscribe error: ", err) + } else { + klog.Info("Subscribe topic: ", fmt.Sprintf(mqtt.TopicRecTwinInfoResponse, "+")) + } +} + +func onMembershipUpdateMessage(_ mq.Client, message mq.Message) { + klog.V(2).Info("Receive message from topic: ", message.Topic()) + nodeID := mqtt.GetNodeID(message.Topic()) + if nodeID == "" { + klog.Error("Wrong topic") + return + } + klog.V(2).Info("Node id: ", nodeID) + var req dto.DeviceListUpdate + if err := json.Unmarshal(message.Payload(), &req); err != nil { + klog.Error("Unmarshal error: ", err) + return + } + + klog.Info("Receive device list update: ", "nodeId: ", nodeID, " update: ", len(req.AddedDevices), " delete: ", len(req.RemovedDevices)) + + for _, device := range req.RemovedDevices { + RemoveMission(device.ID) + } + + for _, device := range req.RemovedDevices { + if _, ok := cache.Load(device.ID); ok { + klog.Info("Device already exists: ", device.ID) + continue + } + klog.Info("Waiting twin update to create device: ", device.ID) + } +} + +func onMembershipListMessage(_ mq.Client, message mq.Message) { + klog.V(2).Info("Receive message from topic: ", message.Topic()) + nodeID := mqtt.GetNodeID(message.Topic()) + if nodeID == "" { + klog.Error("Wrong topic") + return + } + klog.V(2).Info("Node id: ", nodeID) + var req dto.DeviceList + if err := json.Unmarshal(message.Payload(), &req); err != nil { + klog.Error("Unmarshal error: ", err) + return + } + + klog.Info("Receive device list: ", "nodeID: ", nodeID, " count: ", len(req.Devices)) + for _, device := range req.Devices { + err := mqtt.GetClient().Publish(fmt.Sprintf(mqtt.TopicPubTwinInfoRequest, device.ID), mqtt.CreateEmptyMessage()) + if err != nil { + klog.Error("Publish error: ", err) + return + } + } +} + +func onTwinDelta(_ mq.Client, message mq.Message) { + klog.V(2).Info("Receive message from topic: ", message.Topic()) + id := mqtt.GetDeviceID(message.Topic()) + if id == "" { + klog.Error("Wrong topic") + return + } + klog.V(2).Info("Mission id: ", id) + var req dto.MissionDelta + if err := json.Unmarshal(message.Payload(), &req); err != nil { + klog.Error("Unmarshal error: ", err) + return + } + + // check params + if req.Twin.ExecCommand == nil || req.Twin.ExecFileName == nil || req.Twin.ExecFileContent == nil { + klog.Error("Twin format error") + return + } + + if req.Twin.ExecCommand.Expected == nil || req.Twin.ExecCommand.Expected.Value == nil { + klog.Error("Twin ExecCommand format error") + return + } + + if req.Twin.ExecFileName.Expected == nil || req.Twin.ExecFileName.Expected.Value == nil { + klog.Error("Twin ExecFileName format error") + return + } + + if req.Twin.ExecFileContent.Expected == nil || req.Twin.ExecFileContent.Expected.Value == nil { + klog.Error("Twin ExecFileContent format error") + return + } + + err := mqtt.GetClient().Publish(fmt.Sprintf(mqtt.TopicPubTwinInfoRequest, id), mqtt.CreateEmptyMessage()) + if err != nil { + klog.Error("Publish error: ", err) + return + } +} + +// onTwinInfo indeed trigger a new mission +func onTwinInfo(_ mq.Client, message mq.Message) { + klog.V(2).Info("Receive message from topic: ", message.Topic()) + id := mqtt.GetDeviceID(message.Topic()) + if id == "" { + klog.Error("Wrong topic") + return + } + klog.V(2).Info("Mission id: ", id) + var req dto.MissionDelta + if err := json.Unmarshal(message.Payload(), &req); err != nil { + klog.Error("Unmarshal error: ", err) + return + } + if req.Twin.ExecCommand == nil || req.Twin.ExecFileName == nil || req.Twin.ExecFileContent == nil || req.Twin.Output == nil || req.Twin.Status == nil { + klog.Error("Twin format error") + return + } + + if req.Twin.ExecCommand.Expected == nil || req.Twin.ExecCommand.Expected.Value == nil { + klog.Error("Twin ExecCommand format error") + return + } + + if req.Twin.ExecFileName.Expected == nil || req.Twin.ExecFileName.Expected.Value == nil { + klog.Error("Twin ExecFileName format error") + return + } + + if req.Twin.ExecFileContent.Expected == nil || req.Twin.ExecFileContent.Expected.Value == nil { + klog.Error("Twin ExecFileContent format error") + return + } + + _, err := NewMission(MissionConfig{ + UniqueName: id, + Command: *req.Twin.ExecCommand.Expected.Value, + FileContent: *req.Twin.ExecFileContent.Expected.Value, + FileName: *req.Twin.ExecFileName.Expected.Value, + WorkingDirectory: path.Join("tmp", id), + }) + if err != nil { + klog.Error("NewMission error: ", err) + return + } +} diff --git a/mappers/windows-virtual-exec/internal/missions/init.go b/mappers/windows-virtual-exec/internal/missions/init.go new file mode 100644 index 00000000..7128510f --- /dev/null +++ b/mappers/windows-virtual-exec/internal/missions/init.go @@ -0,0 +1,15 @@ +package missions + +import ( + "fmt" + + "k8s.io/klog" + + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/mqtt" +) + +func InitMissions(nodeName string) { + if err := mqtt.GetClient().Publish(fmt.Sprintf(mqtt.TopicPubNodeDeviceListRequest, nodeName), mqtt.CreateEmptyMessage()); err != nil { + klog.Errorf("Failed to init missions on %s", nodeName) + } +} diff --git a/mappers/windows-virtual-exec/internal/missions/mission.go b/mappers/windows-virtual-exec/internal/missions/mission.go new file mode 100644 index 00000000..b8196283 --- /dev/null +++ b/mappers/windows-virtual-exec/internal/missions/mission.go @@ -0,0 +1,230 @@ +/* +Copyright 2024 The KubeEdge Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package missions + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "sync" + + klog "k8s.io/klog/v2" + + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/model" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/mqtt" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/core/store" + "github.com/kubeedge/mappers-go/mappers/windows-virtual-exec/internal/utils/encode" +) + +const ( + StatusOK = "ok" + StatusError = "error" + StatusWaiting = "waiting" + StatusWorking = "working" +) + +type Mission struct { + exec *Command + Config MissionConfig `json:"config"` + Status string `json:"status"` // ok, error, waiting, working + Output string `json:"output"` +} + +type MissionConfig struct { + UniqueName string `json:"uniqueName"` // use as device id + Command string `json:"command"` + FileContent string `json:"fileContent"` + FileName string `json:"fileName"` + WorkingDirectory string `json:"workingDirectory"` +} + +var cache = sync.Map{} +var createMutex sync.Mutex + +// NewMission add a new mission in memory cache +func NewMission(config MissionConfig) (client *Mission, err error) { + defer func() { + go client.Run() + }() + // load cached mission + if data, ok := cache.Load(config.UniqueName); ok { + klog.Info("Get mission from cache: ", config.UniqueName) + return data.(*Mission), nil + } + + // prevent New operation simutaneously in different goroutines + createMutex.Lock() + defer createMutex.Unlock() + + // not in memory, add a new mission + client = &Mission{ + exec: &Command{}, + Config: config, + } + + var mission model.Mission + if store.DB.Model(&mission).Where("unique_name = ?", config.UniqueName).Find(&mission).RowsAffected > 0 && mission.Status != StatusWorking { + client.Config.UniqueName = mission.UniqueName + client.Config.Command = mission.Command + client.Config.FileContent = mission.FileContent + client.Config.FileName = mission.FileName + client.Config.WorkingDirectory = mission.WorkingDirectory + client.Status = mission.Status + client.Output = mission.Output + klog.Info("Get mission from db: ", config.UniqueName) + return client, nil + } + + client.exec.Cmd = exec.Command("powershell", "-c", client.Config.Command) + client.exec.Cmd.Dir = client.Config.WorkingDirectory + client.Status = StatusWaiting + + if mission.Status == StatusWorking { + client.UpdateDB() + } else { + client.InsertDB() + } + + cache.Store(config.UniqueName, client) + client.ReportDeviceStatus() + klog.Info("New mission: ", config.UniqueName) + return client, nil +} + +func RemoveMission(id string) { + createMutex.Lock() + defer createMutex.Unlock() + + var mission model.Mission + if store.DB.Model(&mission).Where("unique_name = ?", id).Find(&mission).RowsAffected == 0 { + return + } + + cache.Delete(id) +} + +func (c *Mission) InsertDB() { + err := store.DB.Create(&model.Mission{ + UniqueName: c.Config.UniqueName, + Command: c.Config.Command, + FileContent: c.Config.FileContent, + FileName: c.Config.FileName, + WorkingDirectory: c.Config.WorkingDirectory, + Status: c.Status, + Output: c.Output, + }).Error + if err != nil { + klog.Error("InsertDB error: ", err.Error()) + } +} + +func (c *Mission) UpdateDB() { + err := store.DB.Model(&model.Mission{}).Where("unique_name = ?", c.Config.UniqueName).UpdateColumns(map[string]interface{}{ + "status": c.Status, + "output": c.Output, + "command": c.Config.Command, + "file_content": c.Config.FileContent, + "file_name": c.Config.FileName, + "working_directory": c.Config.WorkingDirectory, + }).Error + if err != nil { + klog.Error("InsertDB error: ", err.Error()) + } +} + +func (c *Mission) Run() { + if c.Status == StatusOK || c.Status == StatusError || c.Status == StatusWorking { + klog.Info("Mission status is not waiting, skip with current status ", c.Status) + return + } + + defer func() { + c.UpdateDB() + c.ReportMissionStatus() + klog.Info("Mission finished: ", c.Config.UniqueName, " result: ", c.Status) + }() + + c.Status = StatusWorking + c.ReportMissionStatus() + klog.Info("Mission start: ", c.Config.UniqueName, " status: ", c.Status, " output: ", c.Output) + + // clean working directory in windows + dir := c.Config.WorkingDirectory + os.RemoveAll(dir) + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + klog.Errorf("Failed to make workdir %s: %v", dir, err) + return + } + + file, err := os.Create(filepath.Join(dir, c.Config.FileName)) + if err != nil { + klog.Error("Create file error: ", err) + c.Status = StatusError + c.Output = err.Error() + return + } + _, err = file.WriteString(encode.DecodeBase64(c.Config.FileContent)) + file.Close() + + if err != nil { + klog.Error("Write file error: ", err) + c.Status = StatusError + c.Output = err.Error() + return + } + + err = c.exec.Exec() + if err != nil { + klog.Error("Exec error: ", err) + c.Status = StatusError + c.Output = fmt.Sprintf("【msg】%s\n【err】%s\n", err.Error(), string(c.exec.StdErr)) + return + } + + c.Status = StatusOK + c.Output = string(c.exec.StdOut) +} + +func (c *Mission) ReportDeviceStatus() { + var payload []byte + var err error + if payload, err = mqtt.CreateMessageState("OK"); err != nil { + klog.Errorf("Create message state failed: %v", err) + return + } + if err = mqtt.GetClient().Publish(fmt.Sprintf(mqtt.TopicPubDeviceStateUpdateRequest, c.Config.UniqueName), payload); err != nil { + klog.Errorf("Publish failed: %v", err) + return + } +} + +func (c *Mission) ReportMissionStatus() { + var payload []byte + var err error + if payload, err = mqtt.CreateMessageTwinUpdate(map[string]string{ + "status": c.Status, + "output": c.Output, + "exec-command": c.Config.Command, + "exec-file-name": c.Config.FileName, + "exec-file-content": c.Config.FileContent, + }); err != nil { + klog.Errorf("Create message state failed: %v", err) + return + } + if err = mqtt.GetClient().Publish(fmt.Sprintf(mqtt.TopicPubTwinUpdateRequest, c.Config.UniqueName), payload); err != nil { + klog.Errorf("Publish failed: %v", err) + return + } +} diff --git a/mappers/windows-virtual-exec/internal/missions/run.go b/mappers/windows-virtual-exec/internal/missions/run.go new file mode 100644 index 00000000..28f3d4a7 --- /dev/null +++ b/mappers/windows-virtual-exec/internal/missions/run.go @@ -0,0 +1,83 @@ +/* +Copyright 2024 The KubeEdge Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package missions + +import ( + "bytes" + "errors" + "fmt" + "os/exec" + "strings" + "syscall" +) + +type Command struct { + Cmd *exec.Cmd + StdOut []byte + StdErr []byte + ExitCode int +} + +// Exec run command and exit formatted error, callers can print err directly +// Any running error or non-zero exitcode is consider as error +func (cmd *Command) Exec() error { + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Cmd.Stdout = &stdoutBuf + cmd.Cmd.Stderr = &stderrBuf + + errString := fmt.Sprintf("failed to exec '%s'", cmd.GetCommand()) + + err := cmd.Cmd.Start() + if err != nil { + errString = fmt.Sprintf("%s, err: %v", errString, err) + return errors.New(errString) + } + + err = cmd.Cmd.Wait() + if err != nil { + cmd.StdErr = stderrBuf.Bytes() + + if exit, ok := err.(*exec.ExitError); ok { + cmd.ExitCode = exit.Sys().(syscall.WaitStatus).ExitStatus() + errString = fmt.Sprintf("%s, err: %s", errString, stderrBuf.Bytes()) + } else { + cmd.ExitCode = 1 + } + + errString = fmt.Sprintf("%s, err: %v", errString, err) + + return errors.New(errString) + } + + cmd.StdOut, cmd.StdErr = stdoutBuf.Bytes(), stderrBuf.Bytes() + return nil +} + +func (cmd Command) GetCommand() string { + return strings.Join(cmd.Cmd.Args, " ") +} + +func (cmd Command) GetStdOut() string { + if len(cmd.StdOut) != 0 { + return strings.TrimSuffix(string(cmd.StdOut), "\n") + } + return "" +} + +func (cmd Command) GetStdErr() string { + if len(cmd.StdErr) != 0 { + return strings.TrimSuffix(string(cmd.StdErr), "\n") + } + return "" +} diff --git a/mappers/windows-virtual-exec/internal/utils/encode/base64.go b/mappers/windows-virtual-exec/internal/utils/encode/base64.go new file mode 100644 index 00000000..3e21178d --- /dev/null +++ b/mappers/windows-virtual-exec/internal/utils/encode/base64.go @@ -0,0 +1,16 @@ +package encode + +import ( + "encoding/base64" + + klog "k8s.io/klog/v2" +) + +func DecodeBase64(encodedString string) string { + decodedBytes, err := base64.StdEncoding.DecodeString(encodedString) + if err != nil { + klog.Error("Decode error: ", err) + return "" + } + return string(decodedBytes) +} diff --git a/mappers/windows-virtual-exec/internal/utils/timer/timer.go b/mappers/windows-virtual-exec/internal/utils/timer/timer.go new file mode 100644 index 00000000..861b87fe --- /dev/null +++ b/mappers/windows-virtual-exec/internal/utils/timer/timer.go @@ -0,0 +1,43 @@ +/* +Copyright 2020 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timer + +import ( + "time" +) + +// Timer is to call a function periodically. +type Timer struct { + Function func() + Duration time.Duration + Times int +} + +// Start start a timer. +func (t *Timer) Start() { + ticker := time.NewTicker(t.Duration) + if t.Times > 0 { + for i := 0; i < t.Times; i++ { + <-ticker.C + t.Function() + } + } else { + for range ticker.C { + t.Function() + } + } +} diff --git a/mappers/windows-virtual-exec/resource/config.yaml b/mappers/windows-virtual-exec/resource/config.yaml new file mode 100644 index 00000000..45adacad --- /dev/null +++ b/mappers/windows-virtual-exec/resource/config.yaml @@ -0,0 +1,7 @@ +mqtt: + server: tcp://127.0.0.1:1883 + username: "" + password: "" + certification: "" + privatekey: "" +nodeName: win11-node \ No newline at end of file diff --git a/mappers/windows-virtual-exec/resource/windows-virtual-exec-instance.yaml b/mappers/windows-virtual-exec/resource/windows-virtual-exec-instance.yaml new file mode 100644 index 00000000..ce25b842 --- /dev/null +++ b/mappers/windows-virtual-exec/resource/windows-virtual-exec-instance.yaml @@ -0,0 +1,33 @@ +apiVersion: devices.kubeedge.io/v1alpha2 +kind: Device +metadata: + name: exec-instance-001 + labels: + description: "test" + model: win-exec-model +spec: + deviceModelRef: + name: win-exec-model + nodeSelector: + nodeSelectorTerms: + - matchExpressions: + - key: '' + operator: In + values: + - win11-node + protocol: + customizedProtocal: + protocolName: winExec +status: + twins: + - propertyName: status + - propertyName: output + - propertyName: exec-file-content + desired: + value: 'echo "hello,world"' + - propertyName: exec-file-name + desired: + value: 'run.bat' + - propertyName: exec-command + desired: + value: 'run.bat' diff --git a/mappers/windows-virtual-exec/resource/windows-virtual-exec-model.yaml b/mappers/windows-virtual-exec/resource/windows-virtual-exec-model.yaml new file mode 100644 index 00000000..76e96d93 --- /dev/null +++ b/mappers/windows-virtual-exec/resource/windows-virtual-exec-model.yaml @@ -0,0 +1,32 @@ +apiVersion: devices.kubeedge.io/v1alpha2 +kind: DeviceModel +metadata: + name: win-exec-model + namespace: default +spec: + properties: + - name: exec-file-content + description: custom content to execute + type: + string: + accessMode: ReadOnly + - name: exec-file-name + description: save custom content as filename + type: + string: + accessMode: ReadOnly + - name: exec-command + description: entrypoint of target + type: + string: + accessMode: ReadOnly + - name: status + description: status of current executation + type: + string: + accessMode: ReadWrite + - name: output + description: console output of current executation + type: + string: + accessMode: ReadWrite diff --git a/mappers/windows-virtual-exec/static/1-simple.png b/mappers/windows-virtual-exec/static/1-simple.png new file mode 100644 index 00000000..d28e3a03 Binary files /dev/null and b/mappers/windows-virtual-exec/static/1-simple.png differ diff --git a/mappers/windows-virtual-exec/static/2-seq-for-init.png b/mappers/windows-virtual-exec/static/2-seq-for-init.png new file mode 100644 index 00000000..8f48b07e Binary files /dev/null and b/mappers/windows-virtual-exec/static/2-seq-for-init.png differ