diff --git a/.golangci.yaml b/.golangci.yaml index b473231..50a5c60 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -2,6 +2,8 @@ run: timeout: 10m issues: + exclude-dirs: + - examples/ max-issues-per-linter: 100 max-same-issues: 100 diff --git a/Makefile b/Makefile index 5090a96..3243a8c 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,7 @@ BUILD_DIR = build TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ') VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0') COMMIT ?= $(shell git rev-parse HEAD) +EXAMPLES = add hello-world define compile_service CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) \ @@ -36,6 +37,9 @@ start-magistrala: stop-magistrala: docker compose -f docker/compose.yaml down +$(EXAMPLES): + GOOS=js GOARCH=wasm tinygo build -o build/$@.wasm -target wasi example/$@/$@.go + help: @echo "Usage: make " @echo "" diff --git a/cmd/propellerd/main.go b/cmd/propellerd/main.go index 2b5fb15..c608653 100644 --- a/cmd/propellerd/main.go +++ b/cmd/propellerd/main.go @@ -25,9 +25,11 @@ func main() { managerCmd := propellerd.NewManagerCmd() tasksCmd := propellerd.NewTasksCmd() + propletCmd := propellerd.NewPropletCmd() rootCmd.AddCommand(managerCmd) rootCmd.AddCommand(tasksCmd) + rootCmd.AddCommand(propletCmd) if err := rootCmd.Execute(); err != nil { log.Fatal(err) diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go deleted file mode 100644 index ef241d8..0000000 --- a/cmd/proplet/main.go +++ /dev/null @@ -1,159 +0,0 @@ -package main - -import ( - "context" - "errors" - "flag" - "fmt" - "log" - "log/slog" - "net/http" - "os" - "os/signal" - "syscall" - "time" - - "github.com/absmach/propeller/proplet" -) - -const registryTimeout = 30 * time.Second - -var ( - wasmFilePath string - wasmBinary []byte - logLevel slog.Level -) - -func main() { - if err := run(); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } -} - -func run() error { - flag.StringVar(&wasmFilePath, "file", "", "Path to the WASM file") - flag.Parse() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := configureLogger("info") - slog.SetDefault(logger) - - logger.Info("Starting Proplet service") - - go func() { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - sig := <-sigChan - logger.Info("Received shutdown signal", slog.String("signal", sig.String())) - cancel() - }() - - hasWASMFile := wasmFilePath != "" - - cfg, err := proplet.LoadConfig("proplet/config.json", hasWASMFile) - if err != nil { - logger.Error("Failed to load configuration", slog.String("path", "proplet/config.json"), slog.Any("error", err)) - - return fmt.Errorf("failed to load configuration: %w", err) - } - - if cfg.RegistryURL != "" { - if err := checkRegistryConnectivity(cfg.RegistryURL, logger); err != nil { - logger.Error("Failed connectivity check for Registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err)) - - return fmt.Errorf("registry connectivity check failed: %w", err) - } - logger.Info("Registry connectivity verified", slog.String("url", cfg.RegistryURL)) - } - - if hasWASMFile { - wasmBinary, err = loadWASMFile(wasmFilePath, logger) - if err != nil { - logger.Error("Failed to load WASM file", slog.String("wasm_file_path", wasmFilePath), slog.Any("error", err)) - - return fmt.Errorf("failed to load WASM file: %w", err) - } - logger.Info("WASM binary loaded at startup", slog.Int("size_bytes", len(wasmBinary))) - } - - if cfg.RegistryURL == "" && wasmBinary == nil { - logger.Error("Neither a registry URL nor a WASM binary file was provided") - - return errors.New("missing registry URL and WASM binary file") - } - - service, err := proplet.NewService(ctx, cfg, wasmBinary, logger) - if err != nil { - logger.Error("Error initializing service", slog.Any("error", err)) - - return fmt.Errorf("service initialization error: %w", err) - } - - if err := service.Run(ctx, logger); err != nil { - logger.Error("Error running service", slog.Any("error", err)) - - return fmt.Errorf("service run error: %w", err) - } - - return nil -} - -func configureLogger(level string) *slog.Logger { - if err := logLevel.UnmarshalText([]byte(level)); err != nil { - log.Printf("Invalid log level: %s. Defaulting to info.\n", level) - logLevel = slog.LevelInfo - } - - logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: logLevel, - }) - - return slog.New(logHandler) -} - -func loadWASMFile(path string, logger *slog.Logger) ([]byte, error) { - logger.Info("Loading WASM file", slog.String("path", path)) - wasmBytes, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("failed to read WASM file: %w", err) - } - - return wasmBytes, nil -} - -func checkRegistryConnectivity(registryURL string, logger *slog.Logger) error { - ctx, cancel := context.WithTimeout(context.Background(), registryTimeout) - defer cancel() - - client := http.Client{} - - logger.Info("Checking registry connectivity", slog.String("url", registryURL)) - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody) - if err != nil { - logger.Error("Failed to create HTTP request", slog.String("url", registryURL), slog.Any("error", err)) - - return fmt.Errorf("failed to create HTTP request: %w", err) - } - - resp, err := client.Do(req) - if err != nil { - logger.Error("Failed to connect to registry", slog.String("url", registryURL), slog.Any("error", err)) - - return fmt.Errorf("failed to connect to registry URL '%s': %w", registryURL, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - logger.Error("Registry returned unexpected status", slog.String("url", registryURL), slog.Int("status_code", resp.StatusCode)) - - return fmt.Errorf("registry URL '%s' returned status: %s", registryURL, resp.Status) - } - - logger.Info("Registry connectivity verified", slog.String("url", registryURL)) - - return nil -} diff --git a/cmd/proplet/start.go b/cmd/proplet/start.go new file mode 100644 index 0000000..1e2f0c6 --- /dev/null +++ b/cmd/proplet/start.go @@ -0,0 +1,84 @@ +package proplet + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/absmach/propeller/proplet" +) + +func StartProplet(ctx context.Context, cancel context.CancelFunc, cfg proplet.Config) error { + var level slog.Level + if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil { + return fmt.Errorf("failed to parse log level: %s", err.Error()) + } + logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: level, + }) + logger := slog.New(logHandler) + slog.SetDefault(logger) + + go func() { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + sig := <-sigChan + logger.Info("Received shutdown signal", slog.String("signal", sig.String())) + cancel() + }() + + if cfg.RegistryURL != "" { + if err := checkRegistryConnectivity(ctx, cfg.RegistryURL, cfg.RegistryTimeout); err != nil { + return errors.Join(errors.New("failed to connect to registry URL"), err) + } + + logger.Info("successfully connected to registry URL", slog.String("url", cfg.RegistryURL)) + } + + mqttClient, err := proplet.NewMQTTClient(ctx, cfg, logger) + if err != nil { + return errors.Join(errors.New("failed to initialize mqtt client"), err) + } + wazero := proplet.NewWazeroRuntime(logger, mqttClient, cfg.ChannelID) + + service, err := proplet.NewService(cfg, mqttClient, logger, wazero) + if err != nil { + return errors.Join(errors.New("failed to initialize service"), err) + } + + if err := service.Run(ctx, logger); err != nil { + return errors.Join(errors.New("failed to run service"), err) + } + + return nil +} + +func checkRegistryConnectivity(ctx context.Context, registryURL string, registryTimeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, registryTimeout) + defer cancel() + + client := http.DefaultClient + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to connect to registry URL: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("fegistry returned unexpected status: %d", resp.StatusCode) + } + + return nil +} diff --git a/examples/add/add.go b/examples/add/add.go new file mode 100644 index 0000000..6440801 --- /dev/null +++ b/examples/add/add.go @@ -0,0 +1,10 @@ +package main + +//export add +func add(x, y uint32) uint32 { + return x + y +} + +// main is required for the `wasi` target, even if it isn't used. +// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main +func main() {} diff --git a/go.mod b/go.mod index bdb794d..812fb99 100644 --- a/go.mod +++ b/go.mod @@ -14,9 +14,9 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/spf13/cobra v1.8.1 github.com/tetratelabs/wazero v1.8.2 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 - go.opentelemetry.io/otel v1.32.0 - go.opentelemetry.io/otel/trace v1.32.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 + go.opentelemetry.io/otel v1.33.0 + go.opentelemetry.io/otel/trace v1.33.0 golang.org/x/sync v0.10.0 ) @@ -40,18 +40,17 @@ require ( github.com/prometheus/common v0.61.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 // indirect - go.opentelemetry.io/otel/metric v1.32.0 // indirect - go.opentelemetry.io/otel/sdk v1.32.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 // indirect + go.opentelemetry.io/otel/metric v1.33.0 // indirect + go.opentelemetry.io/otel/sdk v1.33.0 // indirect go.opentelemetry.io/proto/otlp v1.4.0 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect - google.golang.org/grpc v1.68.1 // indirect + google.golang.org/grpc v1.69.0 // indirect google.golang.org/protobuf v1.35.2 // indirect ) - -require github.com/gorilla/websocket v1.5.3 // indirect diff --git a/go.sum b/go.sum index eebac91..aa195bb 100644 --- a/go.sum +++ b/go.sum @@ -76,20 +76,36 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tetratelabs/wazero v1.8.2 h1:yIgLR/b2bN31bjxwXHD8a3d+BogigR952csSDdLYEv4= github.com/tetratelabs/wazero v1.8.2/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 h1:DheMAlT6POBP+gh8RUH19EOTnQIor5QE0uSRPtzCpSw= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0/go.mod h1:wZcGmeVO9nzP67aYSLDqXNWK87EZWhi7JWj1v7ZXf94= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= +go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 h1:Vh5HayB/0HHfOQA7Ctx69E/Y/DcQSMPpKANYVMQ7fBA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0/go.mod h1:cpgtDBaqD/6ok/UG0jT15/uKjAY8mRA53diogHBg3UI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 h1:cMyu9O88joYEaI47CnQkxO1XZdpoTF9fEnW2duIddhw= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0/go.mod h1:6Am3rn7P9TVVeXYG+wtcGE7IE1tsQ+bP3AuWcKt/gOI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 h1:wpMfgF8E1rkrT1Z6meFh1NDtownE9Ii3n3X2GJYjsaU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0/go.mod h1:wAy0T/dUbs468uOlkT31xjvqQgEVXv58BRFWEgn5v/0= go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= +go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM= +go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM= go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= +go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg= go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= @@ -102,16 +118,14 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583 h1:v+j+5gpj0FopU0KKLDGfDo9ZRRpKdi5UBrCP0f76kuY= -google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 h1:CkkIfIt50+lT6NHAVoRYEyAvQGFM7xEwXUUywFvEb3Q= google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576/go.mod h1:1R3kvZ1dtP3+4p4d3G8uJ8rFk/fWlScl38vanWACI08= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 h1:IfdSdTcLFy4lqUQrQJLkLt1PB+AsqVz6lwkWPzWEz10= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 h1:8ZmaLZE4XWrtU3MyClkYqqtl6Oegr3235h7jxsDyqCY= google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= +google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI= +google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/manager/api/transport.go b/manager/api/transport.go index 19749eb..73db15b 100644 --- a/manager/api/transport.go +++ b/manager/api/transport.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "io" "log/slog" "net/http" "strings" @@ -18,6 +19,11 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) +const ( + maxFileSize = 1024 * 1024 * 100 + fileKey = "file" +) + func MakeHandler(svc manager.Service, logger *slog.Logger, instanceID string) http.Handler { mux := chi.NewRouter() @@ -104,13 +110,25 @@ func decodeEntityReq(key string) kithttp.DecodeRequestFunc { } func decodeTaskReq(_ context.Context, r *http.Request) (interface{}, error) { - if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) { - return nil, errors.Join(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType) - } var req taskReq - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return nil, errors.Join(err, apiutil.ErrValidation) + if err := r.ParseMultipartForm(maxFileSize); err != nil { + return nil, err + } + file, header, err := r.FormFile(fileKey) + if err != nil { + return nil, err + } + defer file.Close() + + if !strings.HasSuffix(header.Filename, ".wasm") { + return nil, errors.Join(apiutil.ErrValidation, errors.New("invalid file extension")) + } + data, err := io.ReadAll(file) + if err != nil { + return nil, err } + req.File = data + req.Name = header.Filename return req, nil } diff --git a/manager/service.go b/manager/service.go index 290bb3f..63f57aa 100644 --- a/manager/service.go +++ b/manager/service.go @@ -149,11 +149,19 @@ func (svc *service) ListTasks(ctx context.Context, offset, limit uint64) (task.T } func (svc *service) UpdateTask(ctx context.Context, t task.Task) (task.Task, error) { - if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { + dbT, err := svc.GetTask(ctx, t.ID) + if err != nil { return task.Task{}, err } + dbT.UpdatedAt = time.Now() + dbT.Name = t.Name + dbT.Inputs = t.Inputs - return t, nil + if err := svc.tasksDB.Update(ctx, dbT.ID, dbT); err != nil { + return task.Task{}, err + } + + return dbT, nil } func (svc *service) DeleteTask(ctx context.Context, taskID string) error { @@ -166,13 +174,13 @@ func (svc *service) StartTask(ctx context.Context, taskID string) error { return err } - p, err := svc.SelectProplet(ctx, t) - if err != nil { + topic := svc.baseTopic + "/control/manager/start" + if err := svc.pubsub.Publish(ctx, topic, t); err != nil { return err } - topic := "channels/" + p.ID + "/messages/control/manager/start" - if err := svc.pubsub.Publish(ctx, topic, t); err != nil { + p, err := svc.SelectProplet(ctx, t) + if err != nil { return err } @@ -185,6 +193,13 @@ func (svc *service) StartTask(ctx context.Context, taskID string) error { return err } + t.State = task.Running + t.UpdatedAt = time.Now() + t.StartTime = time.Now() + if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { + return err + } + return nil } @@ -207,7 +222,7 @@ func (svc *service) StopTask(ctx context.Context, taskID string) error { return err } - topic := "channels/" + p.ID + "/messages/control/manager/stop" + topic := svc.baseTopic + "/control/manager/stop" if err := svc.pubsub.Publish(ctx, topic, t); err != nil { return err } @@ -244,6 +259,8 @@ func (svc *service) handle(ctx context.Context) func(topic string, msg map[strin svc.logger.InfoContext(ctx, "successfully created proplet") case svc.baseTopic + "/control/proplet/alive": return svc.updateLivenessHandler(ctx, msg) + case svc.baseTopic + "/control/proplet/results": + return svc.updateResultsHandler(ctx, msg) } return nil @@ -280,6 +297,9 @@ func (svc *service) updateLivenessHandler(ctx context.Context, msg map[string]in } p, err := svc.GetProplet(ctx, propletID) + if errors.Is(err, pkgerrors.ErrNotFound) { + return svc.createPropletHandler(ctx, msg) + } if err != nil { return err } @@ -295,3 +315,41 @@ func (svc *service) updateLivenessHandler(ctx context.Context, msg map[string]in return nil } + +func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]interface{}) error { + taskID, ok := msg["task_id"].(string) + if !ok { + return errors.New("invalid task_id") + } + if taskID == "" { + return errors.New("task id is empty") + } + + results, ok := msg["results"].([]interface{}) + if !ok { + return errors.New("invalid results") + } + data := make([]uint64, len(results)) + for i := range results { + r, ok := results[i].(float64) + if !ok { + return errors.New("invalid result") + } + data[i] = uint64(r) + } + + t, err := svc.GetTask(ctx, taskID) + if err != nil { + return err + } + t.Results = data + t.State = task.Completed + t.UpdatedAt = time.Now() + t.FinishTime = time.Now() + + if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { + return err + } + + return nil +} diff --git a/propellerd/manager.go b/propellerd/manager.go index fcc617a..317d20c 100644 --- a/propellerd/manager.go +++ b/propellerd/manager.go @@ -40,10 +40,10 @@ var managerCmd = []cobra.Command{ MQTTTimeout: mqttTimeout, } ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() if err := manager.StartManager(ctx, cancel, cfg); err != nil { slog.Error("failed to start manager", slog.String("error", err.Error())) } - cancel() }, }, } diff --git a/propellerd/proplet.go b/propellerd/proplet.go new file mode 100644 index 0000000..8338539 --- /dev/null +++ b/propellerd/proplet.go @@ -0,0 +1,155 @@ +package propellerd + +import ( + "context" + "log/slog" + "time" + + propletcmd "github.com/absmach/propeller/cmd/proplet" + "github.com/absmach/propeller/proplet" + "github.com/google/uuid" + "github.com/spf13/cobra" +) + +var ( + livelinessInterval = 10 * time.Second + registryURL = "" + registryToken = "" + id = uuid.NewString() +) + +var propletCmd = []cobra.Command{ + { + Use: "start", + Short: "Start manager", + Long: `Start manager.`, + Run: func(cmd *cobra.Command, _ []string) { + cfg := proplet.Config{ + LogLevel: logLevel, + ID: id, + MQTTTimeout: mqttTimeout, + MQTTQoS: uint8(mqttQOS), + LivelinessInterval: livelinessInterval, + MQTTAddress: mqttAddress, + RegistryURL: registryURL, + RegistryToken: registryToken, + ChannelID: channelID, + ThingID: thingID, + ThingKey: thingKey, + } + if err := cfg.Validate(); err != nil { + slog.Error("invalid config", slog.Any("error", err)) + + return + } + + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + if err := propletcmd.StartProplet(ctx, cancel, cfg); err != nil { + slog.Error("failed to start manager", slog.String("error", err.Error())) + } + }, + }, +} + +func NewPropletCmd() *cobra.Command { + cmd := cobra.Command{ + Use: "proplet [start]", + Short: "Proplet management", + Long: `Start proplet for Propeller.`, + } + + for i := range propletCmd { + cmd.AddCommand(&propletCmd[i]) + } + + cmd.PersistentFlags().StringVarP( + &logLevel, + "log-level", + "l", + logLevel, + "Log level", + ) + + cmd.PersistentFlags().StringVarP( + &id, + "id", + "i", + id, + "Proplet ID", + ) + + cmd.PersistentFlags().DurationVarP( + &mqttTimeout, + "mqtt-timeout", + "o", + mqttTimeout, + "MQTT Timeout", + ) + + cmd.PersistentFlags().IntVarP( + &mqttQOS, + "mqtt-qos", + "q", + mqttQOS, + "MQTT QOS", + ) + + cmd.PersistentFlags().DurationVarP( + &livelinessInterval, + "liveliness-interval", + "I", + livelinessInterval, + "Liveliness Interval", + ) + + cmd.PersistentFlags().StringVarP( + &mqttAddress, + "mqtt-address", + "m", + mqttAddress, + "MQTT Address", + ) + + cmd.PersistentFlags().StringVarP( + ®istryURL, + "registry-url", + "r", + registryURL, + "Registry URL", + ) + + cmd.PersistentFlags().StringVarP( + ®istryToken, + "registry-token", + "T", + registryToken, + "Registry Token", + ) + + cmd.PersistentFlags().StringVarP( + &channelID, + "channel-id", + "c", + channelID, + "Manager Channel ID", + ) + + cmd.PersistentFlags().StringVarP( + &thingID, + "thing-id", + "t", + thingID, + "Manager Thing ID", + ) + + cmd.PersistentFlags().StringVarP( + &thingKey, + "thing-key", + "k", + thingKey, + "Thing Key", + ) + + return &cmd +} diff --git a/proplet/api/requests.go b/proplet/api/requests.go deleted file mode 100644 index b9dbfa3..0000000 --- a/proplet/api/requests.go +++ /dev/null @@ -1,90 +0,0 @@ -package api - -import ( - "fmt" - - pkgerrors "github.com/absmach/propeller/pkg/errors" -) - -type StartRequest struct { - AppName string `json:"app_name"` - Params []string `json:"params"` -} - -func (r StartRequest) Validate() error { - if r.AppName == "" { - return fmt.Errorf("start request: app_name is required but missing: %w", pkgerrors.ErrMissingAppName) - } - - return nil -} - -type StopRequest struct { - AppName string `json:"app_name"` -} - -func (r StopRequest) Validate() error { - if r.AppName == "" { - return fmt.Errorf("stop request: app_name is required but missing: %w", pkgerrors.ErrMissingAppName) - } - - return nil -} - -type RPCRequest struct { - Method string `json:"method"` - Params []interface{} `json:"params"` - ID int `json:"id"` -} - -func (r RPCRequest) Validate() error { - if r.Method == "" { - return fmt.Errorf("RPC request: method is required but missing: %w", pkgerrors.ErrInvalidMethod) - } - if len(r.Params) == 0 { - return fmt.Errorf("RPC request: params are required but missing: %w", pkgerrors.ErrInvalidParams) - } - - return nil -} - -func (r RPCRequest) ParseParams() (interface{}, error) { - switch r.Method { - case "start": - if len(r.Params) < 1 { - return nil, fmt.Errorf("start method: missing required parameters: %w", pkgerrors.ErrInvalidParams) - } - appName, ok := r.Params[0].(string) - if !ok || appName == "" { - return nil, fmt.Errorf("start method: invalid app_name parameter: %w", pkgerrors.ErrInvalidParams) - } - - return StartRequest{ - AppName: appName, - Params: parseStringSlice(r.Params[1:]), - }, nil - case "stop": - if len(r.Params) < 1 { - return nil, fmt.Errorf("stop method: missing required parameters: %w", pkgerrors.ErrInvalidParams) - } - appName, ok := r.Params[0].(string) - if !ok || appName == "" { - return nil, fmt.Errorf("stop method: invalid app_name parameter: %w", pkgerrors.ErrInvalidParams) - } - - return StopRequest{AppName: appName}, nil - default: - return nil, fmt.Errorf("unknown method '%s': %w", r.Method, pkgerrors.ErrInvalidMethod) - } -} - -func parseStringSlice(params []interface{}) []string { - result := []string{} - for _, param := range params { - if str, ok := param.(string); ok { - result = append(result, str) - } - } - - return result -} diff --git a/proplet/api/responses.go b/proplet/api/responses.go deleted file mode 100644 index dba7770..0000000 --- a/proplet/api/responses.go +++ /dev/null @@ -1,73 +0,0 @@ -package api - -import ( - "fmt" - - pkgerrors "github.com/absmach/propeller/pkg/errors" -) - -type Response struct { - Status string `json:"status"` - Error string `json:"error,omitempty"` -} - -func (r Response) Validate() error { - if r.Status == "" { - return fmt.Errorf("response: status is required but missing: %w", pkgerrors.ErrMissingValue) - } - if r.Status != "success" && r.Status != "failure" { - return fmt.Errorf("response: invalid status '%s': %w", r.Status, pkgerrors.ErrInvalidStatus) - } - if r.Status == "failure" && r.Error == "" { - return fmt.Errorf("response: error message is required for failure status: %w", pkgerrors.ErrInvalidValue) - } - - return nil -} - -type RPCResponse struct { - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` - ID int `json:"id"` -} - -func (r RPCResponse) Validate() error { - if r.ID == 0 { - return fmt.Errorf("RPC response: ID is required but missing or zero: %w", pkgerrors.ErrMissingValue) - } - if r.Error != "" && r.Result != "" { - return fmt.Errorf("RPC response: both result and error cannot be set simultaneously: %w", pkgerrors.ErrInvalidValue) - } - if r.Error == "" && r.Result == "" { - return fmt.Errorf("RPC response: result or error must be set: %w", pkgerrors.ErrMissingResult) - } - - return nil -} - -func NewSuccessResponse() Response { - return Response{ - Status: "success", - } -} - -func NewFailureResponse(err error) Response { - return Response{ - Status: "failure", - Error: err.Error(), - } -} - -func NewRPCSuccessResponse(id int, result string) RPCResponse { - return RPCResponse{ - ID: id, - Result: result, - } -} - -func NewRPCFailureResponse(id int, err error) RPCResponse { - return RPCResponse{ - ID: id, - Error: err.Error(), - } -} diff --git a/proplet/config.go b/proplet/config.go index aa3ebb2..d983015 100644 --- a/proplet/config.go +++ b/proplet/config.go @@ -1,69 +1,44 @@ package proplet import ( - "encoding/json" "errors" - "fmt" "net/url" - "os" + "time" ) type Config struct { - BrokerURL string `json:"broker_url"` - Password string `json:"password"` - PropletID string `json:"proplet_id"` - ChannelID string `json:"channel_id"` - RegistryURL string `json:"registry_url"` - RegistryToken string `json:"registry_token"` + LogLevel string `json:"log_level"` + ID string `json:"id"` + MQTTAddress string `json:"mqtt_address"` + MQTTTimeout time.Duration `json:"mqtt_timeout"` + MQTTQoS byte `json:"mqtt_qos"` + LivelinessInterval time.Duration `json:"liveliness_interval"` + RegistryURL string `json:"registry_url,omitempty"` + RegistryToken string `json:"registry_token,omitempty"` + RegistryTimeout time.Duration `json:"registry_timeout,omitempty"` + ChannelID string `json:"channel_id"` + ThingID string `json:"thing_id"` + ThingKey string `json:"thing_key"` } -func LoadConfig(filepath string, hasWASMFile bool) (Config, error) { - file, err := os.Open(filepath) - if err != nil { - return Config{}, fmt.Errorf("unable to open configuration file '%s': %w", filepath, err) +func (c Config) Validate() error { + if c.MQTTAddress == "" { + return errors.New("MQTT address is required") } - defer file.Close() - - var config Config - decoder := json.NewDecoder(file) - if err := decoder.Decode(&config); err != nil { - return Config{}, fmt.Errorf("failed to parse configuration file '%s': %w", filepath, err) - } - - if err := config.Validate(hasWASMFile); err != nil { - return Config{}, fmt.Errorf("configuration validation failed: %w", err) - } - - return config, nil -} - -func (c Config) Validate(hasWASMFile bool) error { - if c.BrokerURL == "" { - return errors.New("broker_url is required") - } - if _, err := url.Parse(c.BrokerURL); err != nil { - return fmt.Errorf("broker_url is not a valid URL: %w", err) - } - if c.Password == "" { - return errors.New("password is required") - } - if c.PropletID == "" { - return errors.New("proplet_id is required") + if _, err := url.Parse(c.MQTTAddress); err != nil { + return errors.Join(errors.New("MQTT address is not a valid URL"), err) } if c.ChannelID == "" { - return errors.New("channel_id is required") - } - if hasWASMFile { - return nil + return errors.New("magistrala channel id is required") } - if c.RegistryURL == "" { - return errors.New("registry_url is required when not using a WASM file") + if c.ThingID == "" { + return errors.New("magistrala thing id is required") } - if _, err := url.Parse(c.RegistryURL); err != nil { - return fmt.Errorf("registry_url is not a valid URL: %w", err) + if c.ThingKey == "" { + return errors.New("magistrala thing key is required") } - if c.RegistryToken == "" { - return errors.New("registry_token is required when not using a WASM file") + if _, err := url.Parse(c.RegistryURL); err != nil && c.RegistryURL != "" { + return errors.Join(errors.New("registry URL is not a valid URL"), err) } return nil diff --git a/proplet/config.json b/proplet/config.json deleted file mode 100644 index 4717fb5..0000000 --- a/proplet/config.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "broker_url": "mqtt://localhost:1883", - "password": "example-password", - "proplet_id": "proplet-1", - "channel_id": "channel-1", - "registry_url": "", - "registry_token": "" -} diff --git a/proplet/mqtt.go b/proplet/mqtt.go index cbc5806..4199334 100644 --- a/proplet/mqtt.go +++ b/proplet/mqtt.go @@ -1,135 +1,205 @@ package proplet import ( + "context" "encoding/json" + "errors" "fmt" "log/slog" "time" - pkgerrors "github.com/absmach/propeller/pkg/errors" mqtt "github.com/eclipse/paho.mqtt.golang" ) -const livelinessInterval = 10 * time.Second - var ( - RegistryFailurePayload = `{"status":"failure","error":"%v"}` - RegistrySuccessPayload = `{"status":"success"}` - RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry" - lwtPayloadTemplate = `{"status":"offline","proplet_id":"%s","chan_id":"%s"}` - discoveryPayloadTemplate = `{"proplet_id":"%s","chan_id":"%s"}` - alivePayloadTemplate = `{"status":"alive","proplet_id":"%s","chan_id":"%s"}` - aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" - discoveryTopicTemplate = "channels/%s/messages/control/proplet/create" - startTopicTemplate = "channels/%s/messages/control/manager/start" - stopTopicTemplate = "channels/%s/messages/control/manager/stop" - registryUpdateTopicTemplate = "channels/%s/messages/control/manager/updateRegistry" - registryResponseTopic = "channels/%s/messages/registry/server" - fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet" + RegistryFailurePayload = `{"status":"failure","error":"%s"}` + RegistrySuccessPayload = `{"status":"success"}` + lwtPayloadTemplate = `{"status":"offline","proplet_id":"%s","mg_channel_id":"%s"}` + + RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry" + aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" + discoveryTopicTemplate = "channels/%s/messages/control/proplet/create" + startTopicTemplate = "channels/%s/messages/control/manager/start" + stopTopicTemplate = "channels/%s/messages/control/manager/stop" + registryResponseTopic = "channels/%s/messages/registry/server" + fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet" ) -func NewMQTTClient(config Config, logger *slog.Logger) (mqtt.Client, error) { - lwtPayload := fmt.Sprintf(lwtPayloadTemplate, config.PropletID, config.ChannelID) - if lwtPayload == "" { - return nil, fmt.Errorf("failed to prepare MQTT last will payload: %w", pkgerrors.ErrMQTTWillPayloadFailed) - } +type Client interface { + SubscribeToManagerTopics(startHandler, stopHandler, registryHandler mqtt.MessageHandler) error + SubscribeToRegistryTopic(handler mqtt.MessageHandler) error + PublishFetchRequest(appName string) error + PublishResults(taskID string, results []uint64) error +} + +type mqttClient struct { + client mqtt.Client + config Config + logger *slog.Logger +} + +func NewMQTTClient(ctx context.Context, cfg Config, logger *slog.Logger) (Client, error) { + topic := fmt.Sprintf(aliveTopicTemplate, cfg.ChannelID) + lwtPayload := fmt.Sprintf(lwtPayloadTemplate, cfg.ThingID, cfg.ChannelID) opts := mqtt.NewClientOptions(). - AddBroker(config.BrokerURL). - SetClientID("Proplet-"+config.PropletID). - SetUsername(config.PropletID). - SetPassword(config.Password). + AddBroker(cfg.MQTTAddress). + SetClientID(cfg.ID). + SetUsername(cfg.ThingID). + SetPassword(cfg.ThingKey). SetCleanSession(true). - SetWill(aliveTopicTemplate+config.ChannelID, lwtPayloadTemplate+config.PropletID+config.ChannelID, 0, false) + SetWill(topic, lwtPayload, 0, false) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { - logger.Error("MQTT connection lost", slog.Any("error", err)) + args := []any{} + if err != nil { + args = append(args, slog.Any("error", err)) + } + + logger.Info("MQTT connection lost", args...) }) opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) { - logger.Info("MQTT reconnecting") + args := []any{} + if options != nil { + args = append(args, + slog.String("client_id", options.ClientID), + slog.String("username", options.Username), + ) + } + + logger.Info("MQTT reconnecting", args...) }) client := mqtt.NewClient(opts) - password := client.Connect() - if password.Wait() && password.Error() != nil { - return nil, fmt.Errorf("failed to connect to MQTT broker '%s': %w", config.BrokerURL, pkgerrors.ErrMQTTConnectionFailed) + + token := client.Connect() + if ok := token.WaitTimeout(cfg.MQTTTimeout); !ok { + return nil, errors.New("timeout reached while connecting to MQTT broker") + } + if token.Error() != nil { + return nil, fmt.Errorf("failed to connect to %s MQTT broker with error %s", cfg.MQTTAddress, token.Error().Error()) } - logger.Info("MQTT client connected successfully", slog.String("broker_url", config.BrokerURL)) + mc := &mqttClient{ + client: client, + config: cfg, + logger: logger, + } - if err := PublishDiscovery(client, config, logger); err != nil { - return nil, fmt.Errorf("failed to publish discovery message: %w", err) + if err := mc.publishDiscovery(); err != nil { + return nil, err } - go startLivelinessUpdates(client, config, logger) + go mc.startLivelinessUpdates(ctx, logger) + + logger.Info("MQTT client connected successfully", slog.String("broker_url", cfg.MQTTAddress)) - return client, nil + return mc, nil } -func PublishDiscovery(client mqtt.Client, config Config, logger *slog.Logger) error { - topic := fmt.Sprintf(discoveryTopicTemplate, config.ChannelID) - payload := fmt.Sprintf(discoveryPayloadTemplate, config.PropletID, config.ChannelID) - password := client.Publish(topic, 0, false, payload) - password.Wait() - if password.Error() != nil { - return fmt.Errorf("failed to publish discovery message: %w", password.Error()) +func (mc *mqttClient) publishDiscovery() error { + topic := fmt.Sprintf(discoveryTopicTemplate, mc.config.ChannelID) + payload := map[string]interface{}{ + "proplet_id": mc.config.ThingID, + "mg_channel_id": mc.config.ChannelID, } - return nil + return mc.publish(topic, payload) } -func startLivelinessUpdates(client mqtt.Client, config Config, logger *slog.Logger) { - ticker := time.NewTicker(livelinessInterval) +func (mc *mqttClient) startLivelinessUpdates(ctx context.Context, logger *slog.Logger) { + ticker := time.NewTicker(mc.config.LivelinessInterval) defer ticker.Stop() - for range ticker.C { - password := client.Publish(fmt.Sprintf(aliveTopicTemplate, config.ChannelID), 0, false, fmt.Sprintf(alivePayloadTemplate, config.PropletID, config.ChannelID)) - password.Wait() - if password.Error() != nil { - logger.Error("Failed to publish liveliness message", slog.String("topic", fmt.Sprintf(aliveTopicTemplate, config.ChannelID)), slog.Any("error", password.Error())) - } else { - logger.Info("Published liveliness message", slog.String("topic", fmt.Sprintf(aliveTopicTemplate, config.ChannelID))) + for { + select { + case <-ctx.Done(): + logger.Info("stopping liveliness updates") + + return + case <-ticker.C: + topic := fmt.Sprintf(aliveTopicTemplate, mc.config.ChannelID) + payload := map[string]interface{}{ + "status": "alive", + "proplet_id": mc.config.ThingID, + "mg_channel_id": mc.config.ChannelID, + } + + if err := mc.publish(topic, payload); err != nil { + logger.Error("failed to publish liveliness message", slog.Any("error", err)) + } + + logger.Debug("Published liveliness message", slog.String("topic", topic)) } } } -func SubscribeToManagerTopics(client mqtt.Client, config Config, startHandler, stopHandler, registryHandler mqtt.MessageHandler, logger *slog.Logger) error { - if password := client.Subscribe(fmt.Sprintf(startTopicTemplate, config.ChannelID), 0, startHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to start topic: %w", password.Error()) - } +func (mc *mqttClient) SubscribeToManagerTopics(startHandler, stopHandler, registryHandler mqtt.MessageHandler) error { + startTopic := fmt.Sprintf(startTopicTemplate, mc.config.ChannelID) + stopTopic := fmt.Sprintf(stopTopicTemplate, mc.config.ChannelID) - if password := client.Subscribe(fmt.Sprintf(stopTopicTemplate, config.ChannelID), 0, stopHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to stop topic: %w", password.Error()) + if err := mc.subscribe(startTopic, startHandler); err != nil { + return err } - if password := client.Subscribe(fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID), 0, registryHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to registry update topic: %w", password.Error()) + if err := mc.subscribe(stopTopic, stopHandler); err != nil { + return err } - logger.Info("Subscribed to Manager topics", - slog.String("start_topic", fmt.Sprintf(startTopicTemplate, config.ChannelID)), - slog.String("stop_topic", fmt.Sprintf(stopTopicTemplate, config.ChannelID)), - slog.String("registry_update_topic", fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID))) - return nil } -func SubscribeToRegistryTopic(client mqtt.Client, channelID string, handler mqtt.MessageHandler, logger *slog.Logger) error { - if password := client.Subscribe(fmt.Sprintf(registryResponseTopic, channelID), 0, handler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to registry topic '%s': %w", fmt.Sprintf(registryResponseTopic, channelID), password.Error()) +func (mc *mqttClient) SubscribeToRegistryTopic(handler mqtt.MessageHandler) error { + topic := fmt.Sprintf(registryResponseTopic, mc.config.ChannelID) + + return mc.subscribe(topic, handler) +} + +func (mc *mqttClient) subscribe(topic string, handler mqtt.MessageHandler) error { + token := mc.client.Subscribe(topic, mc.config.MQTTQoS, handler) + if ok := token.WaitTimeout(mc.config.MQTTTimeout); !ok { + return fmt.Errorf("timeout reached while subscribing to %s topic", topic) + } + if token.Error() != nil { + return fmt.Errorf("failed to subscribe to %s topic: %w", topic, token.Error()) } return nil } -func PublishFetchRequest(client mqtt.Client, channelID, appName string, logger *slog.Logger) error { - payload, err := json.Marshal(map[string]string{"app_name": appName}) +func (mc *mqttClient) PublishFetchRequest(appName string) error { + payload := map[string]interface{}{ + "app_name": appName, + } + topic := fmt.Sprintf(fetchRequestTopicTemplate, mc.config.ChannelID) + + return mc.publish(topic, payload) +} + +func (mc *mqttClient) PublishResults(taskID string, results []uint64) error { + payload := map[string]interface{}{ + "task_id": taskID, + "results": results, + } + + topic := fmt.Sprintf(resultsTopic, mc.config.ChannelID) + + return mc.publish(topic, payload) +} + +func (mc *mqttClient) publish(topic string, payload map[string]interface{}) error { + data, err := json.Marshal(payload) if err != nil { - return fmt.Errorf("failed to marshal fetch request payload: %w", err) + return errors.Join(errors.New("failed to marshal results payload"), err) + } + + token := mc.client.Publish(topic, 0, false, data) + if ok := token.WaitTimeout(mc.config.MQTTTimeout); !ok { + return errors.New("timeout reached while publishing results") } - if password := client.Publish(fmt.Sprintf(fetchRequestTopicTemplate, channelID), 0, false, payload); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to publish fetch request: %w", password.Error()) + if token.Error() != nil { + return errors.Join(errors.New("failed to publish results"), token.Error()) } return nil diff --git a/proplet/worker.go b/proplet/proplet.go similarity index 100% rename from proplet/worker.go rename to proplet/proplet.go diff --git a/proplet/requests.go b/proplet/requests.go new file mode 100644 index 0000000..cbd0c2a --- /dev/null +++ b/proplet/requests.go @@ -0,0 +1,36 @@ +package proplet + +import "errors" + +type startRequest struct { + ID string + FunctionName string + WasmFile []byte + Params []uint64 +} + +func (r startRequest) Validate() error { + if r.ID == "" { + return errors.New("id is required") + } + if r.FunctionName == "" { + return errors.New("function name is required") + } + if r.WasmFile == nil { + return errors.New("wasm file is required") + } + + return nil +} + +type stopRequest struct { + ID string +} + +func (r stopRequest) Validate() error { + if r.ID == "" { + return errors.New("id is required") + } + + return nil +} diff --git a/proplet/service.go b/proplet/service.go index 5a8d8dc..9fc9a0e 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -12,11 +12,8 @@ import ( "sync" "time" - pkgerrors "github.com/absmach/propeller/pkg/errors" - propletapi "github.com/absmach/propeller/proplet/api" + "github.com/absmach/propeller/task" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/tetratelabs/wazero" - wazeroapi "github.com/tetratelabs/wazero/api" ) const ( @@ -26,13 +23,13 @@ const ( type PropletService struct { config Config - mqttClient mqtt.Client - runtime *WazeroRuntime - wasmBinary []byte + mqttClient Client chunks map[string][][]byte chunkMetadata map[string]*ChunkPayload chunksMutex sync.Mutex + runtime Runtime } + type ChunkPayload struct { AppName string `json:"app_name"` ChunkIdx int `json:"chunk_idx"` @@ -40,98 +37,18 @@ type ChunkPayload struct { Data []byte `json:"data"` } -type WazeroRuntime struct { - runtime wazero.Runtime - modules map[string]wazeroapi.Module - mutex sync.Mutex -} - -func NewWazeroRuntime(ctx context.Context) *WazeroRuntime { - return &WazeroRuntime{ - runtime: wazero.NewRuntime(ctx), - modules: make(map[string]wazeroapi.Module), - } -} - -func (w *WazeroRuntime) StartApp(ctx context.Context, appName string, wasmBinary []byte, functionName string) (wazeroapi.Function, error) { - if appName == "" { - return nil, fmt.Errorf("start app: appName is required but missing: %w", pkgerrors.ErrMissingValue) - } - if len(wasmBinary) == 0 { - return nil, fmt.Errorf("start app: Wasm binary is empty: %w", pkgerrors.ErrInvalidValue) - } - if functionName == "" { - return nil, fmt.Errorf("start app: functionName is required but missing: %w", pkgerrors.ErrMissingValue) - } - - w.mutex.Lock() - defer w.mutex.Unlock() - - if _, exists := w.modules[appName]; exists { - return nil, fmt.Errorf("start app: app '%s' is already running: %w", appName, pkgerrors.ErrAppAlreadyRunning) - } - - module, err := w.runtime.Instantiate(ctx, wasmBinary) - if err != nil { - return nil, fmt.Errorf("start app: failed to instantiate Wasm module for app '%s': %w", appName, pkgerrors.ErrModuleInstantiation) - } - - function := module.ExportedFunction(functionName) - if function == nil { - _ = module.Close(ctx) - - return nil, fmt.Errorf("start app: function '%s' not found in Wasm module for app '%s': %w", functionName, appName, pkgerrors.ErrFunctionNotFound) - } - - w.modules[appName] = module - - return function, nil -} - -func (w *WazeroRuntime) StopApp(ctx context.Context, appName string) error { - if appName == "" { - return fmt.Errorf("stop app: appName is required but missing: %w", pkgerrors.ErrMissingValue) - } - - w.mutex.Lock() - defer w.mutex.Unlock() - - module, exists := w.modules[appName] - if !exists { - return fmt.Errorf("stop app: app '%s' is not running: %w", appName, pkgerrors.ErrAppNotRunning) - } - - if err := module.Close(ctx); err != nil { - return fmt.Errorf("stop app: failed to stop app '%s': %w", appName, pkgerrors.ErrModuleStopFailed) - } - - delete(w.modules, appName) - - return nil -} - -func NewService(ctx context.Context, cfg Config, wasmBinary []byte, logger *slog.Logger) (*PropletService, error) { - mqttClient, err := NewMQTTClient(cfg, logger) - if err != nil { - return nil, fmt.Errorf("failed to initialize MQTT client: %w", err) - } - - runtime := NewWazeroRuntime(ctx) - +func NewService(cfg Config, mqttClient Client, logger *slog.Logger, runtime Runtime) (*PropletService, error) { return &PropletService{ config: cfg, mqttClient: mqttClient, - runtime: runtime, - wasmBinary: wasmBinary, chunks: make(map[string][][]byte), chunkMetadata: make(map[string]*ChunkPayload), + runtime: runtime, }, nil } func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { - if err := SubscribeToManagerTopics( - p.mqttClient, - p.config, + if err := p.mqttClient.SubscribeToManagerTopics( func(client mqtt.Client, msg mqtt.Message) { p.handleStartCommand(ctx, client, msg, logger) }, @@ -141,18 +58,14 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { func(client mqtt.Client, msg mqtt.Message) { p.registryUpdate(ctx, client, msg, logger) }, - logger, ); err != nil { return fmt.Errorf("failed to subscribe to Manager topics: %w", err) } - if err := SubscribeToRegistryTopic( - p.mqttClient, - p.config.ChannelID, + if err := p.mqttClient.SubscribeToRegistryTopic( func(client mqtt.Client, msg mqtt.Message) { p.handleChunk(ctx, client, msg) }, - logger, ); err != nil { return fmt.Errorf("failed to subscribe to Registry topics: %w", err) } @@ -164,54 +77,52 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { } func (p *PropletService) handleStartCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { - var req propletapi.StartRequest - if err := json.Unmarshal(msg.Payload(), &req); err != nil { + var payload task.Task + if err := json.Unmarshal(msg.Payload(), &payload); err != nil { logger.Error("Invalid start command payload", slog.Any("error", err)) return } + req := startRequest{ + ID: payload.ID, + FunctionName: payload.Name, + WasmFile: payload.File, + Params: payload.Inputs, + } + if err := req.Validate(); err != nil { + logger.Error("Invalid start command payload", slog.Any("error", err)) - logger.Info("Received start command", slog.String("app_name", req.AppName)) + return + } - if p.wasmBinary != nil { - logger.Info("Using preloaded WASM binary", slog.String("app_name", req.AppName)) - function, err := p.runtime.StartApp(ctx, req.AppName, p.wasmBinary, "main") - if err != nil { - logger.Error("Failed to start app", slog.String("app_name", req.AppName), slog.Any("error", err)) + logger.Info("Received start command", slog.String("app_name", req.FunctionName)) - return - } - - _, err = function.Call(ctx) - if err != nil { - logger.Error("Error executing app", slog.String("app_name", req.AppName), slog.Any("error", err)) - } else { - logger.Info("App started successfully", slog.String("app_name", req.AppName)) - } + if err := p.runtime.StartApp(ctx, req.WasmFile, req.ID, req.FunctionName, req.Params...); err != nil { + logger.Error("Failed to start app", slog.String("app_name", req.FunctionName), slog.Any("error", err)) return } if p.config.RegistryURL != "" { - err := PublishFetchRequest(p.mqttClient, p.config.ChannelID, req.AppName, logger) + err := p.mqttClient.PublishFetchRequest(req.FunctionName) if err != nil { - logger.Error("Failed to publish fetch request", slog.String("app_name", req.AppName), slog.Any("error", err)) + logger.Error("Failed to publish fetch request", slog.String("app_name", req.FunctionName), slog.Any("error", err)) return } go func() { - logger.Info("Waiting for chunks", slog.String("app_name", req.AppName)) + logger.Info("Waiting for chunks", slog.String("app_name", req.FunctionName)) for { p.chunksMutex.Lock() - metadata, exists := p.chunkMetadata[req.AppName] - receivedChunks := len(p.chunks[req.AppName]) + metadata, exists := p.chunkMetadata[req.FunctionName] + receivedChunks := len(p.chunks[req.FunctionName]) p.chunksMutex.Unlock() if exists && receivedChunks == metadata.TotalChunks { - logger.Info("All chunks received, deploying app", slog.String("app_name", req.AppName)) - go p.deployAndRunApp(ctx, req.AppName) + logger.Info("All chunks received, deploying app", slog.String("app_name", req.FunctionName)) + go p.deployAndRunApp(ctx, req.FunctionName) break } @@ -219,29 +130,29 @@ func (p *PropletService) handleStartCommand(ctx context.Context, _ mqtt.Client, time.Sleep(pollingInterval) } }() - } else { - logger.Warn("Registry URL is empty, and no binary provided", slog.String("app_name", req.AppName)) } } func (p *PropletService) handleStopCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { - var req propletapi.StopRequest + var req stopRequest if err := json.Unmarshal(msg.Payload(), &req); err != nil { logger.Error("Invalid stop command payload", slog.Any("error", err)) return } + if err := req.Validate(); err != nil { + logger.Error("Invalid stop command payload", slog.Any("error", err)) - logger.Info("Received stop command", slog.String("app_name", req.AppName)) + return + } - err := p.runtime.StopApp(ctx, req.AppName) - if err != nil { - logger.Error("Failed to stop app", slog.String("app_name", req.AppName), slog.Any("error", err)) + if err := p.runtime.StopApp(ctx, req.ID); err != nil { + logger.Error("Failed to stop app", slog.Any("error", err)) return } - logger.Info("App stopped successfully", slog.String("app_name", req.AppName)) + logger.Info("App stopped successfully") } func (p *PropletService) handleChunk(ctx context.Context, _ mqtt.Client, msg mqtt.Message) { @@ -283,21 +194,8 @@ func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) { delete(p.chunks, appName) p.chunksMutex.Unlock() - wasmBinary := assembleChunks(chunks) - - function, err := p.runtime.StartApp(ctx, appName, wasmBinary, "main") - if err != nil { - log.Printf("Failed to start app '%s': %v\n", appName, err) - - return - } - - _, err = function.Call(ctx) - if err != nil { - log.Printf("Failed to execute app '%s': %v\n", appName, err) - - return - } + _ = ctx + _ = assembleChunks(chunks) log.Printf("App '%s' started successfully\n", appName) } @@ -363,7 +261,7 @@ func (p *PropletService) registryUpdate(ctx context.Context, client mqtt.Client, ackTopic := fmt.Sprintf(RegistryAckTopicTemplate, p.config.ChannelID) if err := p.UpdateRegistry(ctx, payload.RegistryURL, payload.RegistryToken); err != nil { - client.Publish(ackTopic, 0, false, fmt.Sprintf(RegistryFailurePayload, err)) + client.Publish(ackTopic, 0, false, fmt.Sprintf(RegistryFailurePayload, err.Error())) logger.Error("Failed to update registry configuration", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL), slog.Any("error", err)) } else { client.Publish(ackTopic, 0, false, RegistrySuccessPayload) diff --git a/proplet/wasm.go b/proplet/wasm.go index 286a10d..1dee96d 100644 --- a/proplet/wasm.go +++ b/proplet/wasm.go @@ -2,104 +2,102 @@ package proplet import ( "context" - "fmt" + "errors" + "log/slog" "sync" - "github.com/absmach/propeller/task" "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) -var _ Service = (*proplet)(nil) +var resultsTopic = "channels/%s/messages/control/proplet/results" -type proplet struct { - mu sync.Mutex - Name string - DB map[string]task.Task - TaskCount int - runtimes map[string]wazero.Runtime - functions map[string]api.Function +type Runtime interface { + StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error + StopApp(ctx context.Context, id string) error } -func NewWasmProplet(name string) *proplet { - return &proplet{ - Name: name, - DB: make(map[string]task.Task), - TaskCount: 0, - runtimes: make(map[string]wazero.Runtime), - functions: make(map[string]api.Function), - } +type wazeroRuntime struct { + mutex sync.Mutex + runtimes map[string]wazero.Runtime + results map[string][]uint64 + mqttClient Client + channelID string + logger *slog.Logger } -func (w *proplet) StartTask(ctx context.Context, t task.Task) error { - w.mu.Lock() - defer w.mu.Unlock() +func NewWazeroRuntime(logger *slog.Logger, mqttClient Client, channelID string) Runtime { + return &wazeroRuntime{ + runtimes: make(map[string]wazero.Runtime), + results: make(map[string][]uint64), + mqttClient: mqttClient, + channelID: channelID, + logger: logger, + } +} +func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error { r := wazero.NewRuntime(ctx) + + w.mutex.Lock() + w.runtimes[id] = r + w.mutex.Unlock() + // Instantiate WASI, which implements host functions needed for TinyGo to // implement `panic`. wasi_snapshot_preview1.MustInstantiate(ctx, r) - module, err := r.Instantiate(ctx, t.Function.File) + module, err := r.Instantiate(ctx, wasmBinary) if err != nil { - return err + return errors.Join(errors.New("failed to instantiate Wasm module"), err) } - function := module.ExportedFunction(t.Function.Name) + function := module.ExportedFunction(functionName) if function == nil { - return fmt.Errorf("function %q not found", t.Function.Name) + return errors.New("failed to find exported function") } - w.TaskCount++ - w.runtimes[t.ID] = r - w.functions[t.ID] = function - w.DB[t.ID] = t + go func() { + results, err := function.Call(ctx, args...) + if err != nil { + w.logger.Error("failed to call function", slog.String("id", id), slog.String("function", functionName), slog.String("error", err.Error())) - return nil -} + return + } + w.mutex.Lock() + w.results[id] = results + w.mutex.Unlock() -func (w *proplet) RunTask(ctx context.Context, taskID string) ([]uint64, error) { - w.mu.Lock() - defer w.mu.Unlock() + if err := w.StopApp(ctx, id); err != nil { + w.logger.Error("failed to stop app", slog.String("id", id), slog.String("error", err.Error())) + } - t, ok := w.DB[taskID] - if !ok { - return nil, fmt.Errorf("task %q not found", taskID) - } + if err := w.mqttClient.PublishResults(id, results); err != nil { + w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) - function := w.functions[t.ID] + return + } - result, err := function.Call(ctx, t.Function.Inputs...) - if err != nil { - return nil, err - } + w.logger.Info("Finished running app", slog.String("id", id)) + }() - r := w.runtimes[t.ID] - if err := r.Close(ctx); err != nil { - return nil, err - } - - return result, nil + return nil } -func (w *proplet) StopTask(ctx context.Context, taskID string) error { - w.mu.Lock() - defer w.mu.Unlock() - - r := w.runtimes[taskID] +func (w *wazeroRuntime) StopApp(ctx context.Context, id string) error { + w.mutex.Lock() + defer w.mutex.Unlock() - return r.Close(ctx) -} + r, exists := w.runtimes[id] + if !exists { + return errors.New("there is no runtime for this id") + } -func (w *proplet) RemoveTask(_ context.Context, taskID string) error { - w.mu.Lock() - defer w.mu.Unlock() + if err := r.Close(ctx); err != nil { + return err + } - delete(w.DB, taskID) - delete(w.runtimes, taskID) - delete(w.functions, taskID) - w.TaskCount-- + delete(w.runtimes, id) return nil } diff --git a/task/task.go b/task/task.go index 0fce402..44b7a4e 100644 --- a/task/task.go +++ b/task/task.go @@ -31,17 +31,13 @@ func (s State) String() string { } } -type Function struct { - File []byte - Name string - Inputs []uint64 -} - type Task struct { ID string `json:"id"` Name string `json:"name"` State State `json:"state"` - Function Function `json:"function"` + File []byte `json:"file,omitempty"` + Inputs []uint64 `json:"inputs,omitempty"` + Results []uint64 `json:"results,omitempty"` StartTime time.Time `json:"start_time"` FinishTime time.Time `json:"finish_time"` CreatedAt time.Time `json:"created_at"`