diff --git a/.gitignore b/.gitignore index 3ea852f..0e2f099 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ # Propellerd Build build +config.toml diff --git a/.golangci.yaml b/.golangci.yaml index 49823f9..6bf3b58 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -33,6 +33,7 @@ linters: - err113 - noctx - cyclop + - tagalign linters-settings: gocritic: diff --git a/Makefile b/Makefile index 57467f8..9b222b2 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ') VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0') COMMIT ?= $(shell git rev-parse HEAD) EXAMPLES = addition long-addition -SERVICES = manager proplet cli +SERVICES = manager proplet cli proxy define compile_service CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) \ @@ -20,12 +20,10 @@ endef $(SERVICES): $(call compile_service,$(@)) + +# Install all non-WASM executables from the build directory to GOBIN with 'propeller-' prefix install: - for file in $(BUILD_DIR)/*; do \ - if [[ ! "$$file" =~ \.wasm$$ ]]; then \ - cp "$$file" $(GOBIN)/propeller-`basename "$$file"`; \ - fi \ - done + $(foreach f,$(wildcard $(BUILD_DIR)/*[!.wasm]),cp $(f) $(patsubst $(BUILD_DIR)/%,$(GOBIN)/propeller-%,$(f));) .PHONY: all $(SERVICES) all: $(SERVICES) diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index c6f5f92..2f1a09e 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "log/slog" - "net/http" "os" "time" @@ -26,9 +25,6 @@ type config struct { MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"` MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"` LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"` - RegistryURL string `env:"PROPLET_REGISTRY_URL"` - RegistryToken string `env:"PROPLET_REGISTRY_TOKEN"` - RegistryTimeout time.Duration `env:"PROPLET_REGISTRY_TIMEOUT" envDefault:"30s"` ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"` ThingID string `env:"PROPLET_THING_ID,notEmpty"` ThingKey string `env:"PROPLET_THING_KEY,notEmpty"` @@ -57,16 +53,6 @@ func main() { logger := slog.New(logHandler) slog.SetDefault(logger) - if cfg.RegistryURL != "" { - if err := checkRegistryConnectivity(ctx, cfg.RegistryURL, cfg.RegistryTimeout); err != nil { - logger.Error("failed to connect to registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err)) - - return - } - - logger.Info("successfully connected to registry URL", slog.String("url", cfg.RegistryURL)) - } - mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.InstanceID, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger) if err != nil { logger.Error("failed to initialize mqtt client", slog.Any("error", err)) @@ -75,7 +61,7 @@ func main() { } wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) - service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.RegistryURL, cfg.RegistryToken, cfg.LivelinessInterval, mqttPubSub, logger, wazero) + service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero) if err != nil { logger.Error("failed to initialize service", slog.Any("error", err)) @@ -96,27 +82,3 @@ func main() { logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err)) } } - -func checkRegistryConnectivity(ctx context.Context, registryURL string, registryTimeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, registryTimeout) - defer cancel() - - client := http.DefaultClient - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody) - if err != nil { - return fmt.Errorf("failed to create HTTP request: %w", err) - } - - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to connect to registry URL: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("registry returned unexpected status: %d", resp.StatusCode) - } - - return nil -} diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go new file mode 100644 index 0000000..e70f93a --- /dev/null +++ b/cmd/proxy/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "fmt" + "log" + "log/slog" + "os" + + "github.com/absmach/propeller/proxy" + "github.com/caarlos0/env/v11" + "golang.org/x/sync/errgroup" +) + +const svcName = "proxy" + +type config struct { + LogLevel string `env:"PROXY_LOG_LEVEL" envDefault:"info"` + + BrokerURL string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` + PropletKey string `env:"PROPLET_THING_KEY,notEmpty"` + PropletID string `env:"PROPLET_THING_ID,notEmpty" ` + ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"` + + ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"` + Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"` + Token string `env:"PROXY_REGISTRY_TOKEN" envDefault:""` + Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""` + Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""` + RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"` +} + +func main() { + g, ctx := errgroup.WithContext(context.Background()) + + cfg := config{} + if err := env.Parse(&cfg); err != nil { + log.Fatalf("failed to load configuration : %s", err.Error()) + } + + var level slog.Level + if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil { + log.Fatalf("failed to parse log level: %s", err.Error()) + } + logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: level, + }) + logger := slog.New(logHandler) + slog.SetDefault(logger) + + mqttCfg := proxy.MQTTProxyConfig{ + BrokerURL: cfg.BrokerURL, + Password: cfg.PropletKey, + PropletID: cfg.PropletID, + ChannelID: cfg.ChannelID, + } + + httpCfg := proxy.HTTPProxyConfig{ + ChunkSize: cfg.ChunkSize, + Authenticate: cfg.Authenticate, + Token: cfg.Token, + Username: cfg.Username, + Password: cfg.Password, + RegistryURL: cfg.RegistryURL, + } + + logger.Info("successfully initialized MQTT and HTTP config") + + service, err := proxy.NewService(ctx, &mqttCfg, &httpCfg, logger) + if err != nil { + logger.Error("failed to create proxy service", slog.Any("error", err)) + + return + } + + logger.Info("starting proxy service") + + if err := start(ctx, g, service); err != nil { + logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err)) + } +} + +func start(ctx context.Context, g *errgroup.Group, s *proxy.ProxyService) error { + if err := s.MQTTClient().Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to MQTT broker: %w", err) + } + + slog.Info("successfully connected to broker") + + defer func() { + if err := s.MQTTClient().Disconnect(ctx); err != nil { + slog.Error("failed to disconnect MQTT client", "error", err) + } + }() + + if err := s.MQTTClient().Subscribe(ctx, s.ContainerChan()); err != nil { + return fmt.Errorf("failed to subscribe to container requests: %w", err) + } + + slog.Info("successfully subscribed to topic") + + g.Go(func() error { + return s.StreamHTTP(ctx) + }) + + g.Go(func() error { + return s.StreamMQTT(ctx) + }) + + return g.Wait() +} diff --git a/go.mod b/go.mod index f7aea4b..4a6578a 100644 --- a/go.mod +++ b/go.mod @@ -54,4 +54,11 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect google.golang.org/grpc v1.69.0 // indirect google.golang.org/protobuf v1.36.0 // indirect + oras.land/oras-go/v2 v2.5.0 +) + +require ( + github.com/caarlos0/env/v11 v11.3.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index 695655c..4fefaf4 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/caarlos0/env/v11 v11.2.2 h1:95fApNrUyueipoZN/EhA8mMxiNxrBwDa+oAZrMWl3 github.com/caarlos0/env/v11 v11.2.2/go.mod h1:JBfcdeQiBoI3Zh1QRAWfe+tpiNTmDtcCj/hHHHMx0vc= github.com/caarlos0/env/v11 v11.3.0 h1:CVTN6W6+twFC1jHKUwsw9eOTEiFpzyJOSA2AyHa8uvw= github.com/caarlos0/env/v11 v11.3.0/go.mod h1:Q5lYHeOsgY20CCV/R+b50Jwg2MnjySid7+3FUBz2BJw= +github.com/caarlos0/env/v11 v11.3.0 h1:CVTN6W6+twFC1jHKUwsw9eOTEiFpzyJOSA2AyHa8uvw= +github.com/caarlos0/env/v11 v11.3.0/go.mod h1:Q5lYHeOsgY20CCV/R+b50Jwg2MnjySid7+3FUBz2BJw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -61,6 +63,10 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= @@ -121,3 +127,5 @@ google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +oras.land/oras-go/v2 v2.5.0 h1:o8Me9kLY74Vp5uw07QXPiitjsw7qNXi8Twd+19Zf02c= +oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZHg= diff --git a/manager/api/requests.go b/manager/api/requests.go index d5f23c5..6d27c7d 100644 --- a/manager/api/requests.go +++ b/manager/api/requests.go @@ -10,6 +10,10 @@ type taskReq struct { } func (t *taskReq) validate() error { + if t.Name == "" { + return apiutil.ErrMissingName + } + return nil } diff --git a/proplet/requests.go b/proplet/requests.go index cbd0c2a..48a05b4 100644 --- a/proplet/requests.go +++ b/proplet/requests.go @@ -1,11 +1,14 @@ package proplet -import "errors" +import ( + "errors" +) type startRequest struct { ID string FunctionName string WasmFile []byte + imageURL string Params []uint64 } @@ -16,8 +19,8 @@ func (r startRequest) Validate() error { if r.FunctionName == "" { return errors.New("function name is required") } - if r.WasmFile == nil { - return errors.New("wasm file is required") + if r.WasmFile == nil && r.imageURL == "" { + return errors.New("either wasm file or wasm file download path is required") } return nil diff --git a/proplet/service.go b/proplet/service.go index 3f83bfe..13b3d7b 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -7,7 +7,6 @@ import ( "fmt" "log" "log/slog" - "net/url" "sync" "time" @@ -21,22 +20,19 @@ const ( ) var ( - RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry" - updateRegistryTopicTemplate = "channels/%s/messages/control/manager/update" - aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" - discoveryTopicTemplate = "channels/%s/messages/control/proplet/create" - startTopicTemplate = "channels/%s/messages/control/manager/start" - stopTopicTemplate = "channels/%s/messages/control/manager/stop" - registryResponseTopic = "channels/%s/messages/registry/server" - fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet" + RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry" + aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" + discoveryTopicTemplate = "channels/%s/messages/control/proplet/create" + startTopicTemplate = "channels/%s/messages/control/manager/start" + stopTopicTemplate = "channels/%s/messages/control/manager/stop" + registryResponseTopic = "channels/%s/messages/registry/server" + fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet" ) type PropletService struct { channelID string thingID string thingKey string - registryURL string - registryToken string livelinessInterval time.Duration pubsub pkgmqtt.PubSub chunks map[string][][]byte @@ -53,7 +49,7 @@ type ChunkPayload struct { Data []byte `json:"data"` } -func NewService(ctx context.Context, channelID, thingID, thingKey, registryURL, registryToken string, livelinessInterval time.Duration, pubsub pkgmqtt.PubSub, logger *slog.Logger, runtime Runtime) (*PropletService, error) { +func NewService(ctx context.Context, channelID, thingID, thingKey string, livelinessInterval time.Duration, pubsub pkgmqtt.PubSub, logger *slog.Logger, runtime Runtime) (*PropletService, error) { topic := fmt.Sprintf(discoveryTopicTemplate, channelID) payload := map[string]interface{}{ "proplet_id": thingID, @@ -67,8 +63,6 @@ func NewService(ctx context.Context, channelID, thingID, thingKey, registryURL, channelID: channelID, thingID: thingID, thingKey: thingKey, - registryURL: registryURL, - registryToken: registryToken, livelinessInterval: livelinessInterval, pubsub: pubsub, chunks: make(map[string][][]byte), @@ -125,11 +119,6 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { return fmt.Errorf("failed to subscribe to registry topics: %w", err) } - topic = fmt.Sprintf(updateRegistryTopicTemplate, p.channelID) - if err := p.pubsub.Subscribe(ctx, topic, p.registryUpdate(ctx)); err != nil { - return fmt.Errorf("failed to subscribe to update registry topic: %w", err) - } - logger.Info("Proplet service is running.") <-ctx.Done() @@ -152,6 +141,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri ID: payload.ID, FunctionName: payload.Name, WasmFile: payload.File, + imageURL: payload.ImageURL, Params: payload.Inputs, } if err := req.Validate(); err != nil { @@ -160,39 +150,44 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri p.logger.Info("Received start command", slog.String("app_name", req.FunctionName)) - if err := p.runtime.StartApp(ctx, req.WasmFile, req.ID, req.FunctionName, req.Params...); err != nil { - return err - } - - if p.registryURL != "" { - payload := map[string]interface{}{ - "app_name": req.FunctionName, - } - topic := fmt.Sprintf(fetchRequestTopicTemplate, p.channelID) - if err := p.pubsub.Publish(ctx, topic, payload); err != nil { + if req.WasmFile != nil { + if err := p.runtime.StartApp(ctx, req.WasmFile, req.ID, req.FunctionName, req.Params...); err != nil { return err } - go func() { - p.logger.Info("Waiting for chunks", slog.String("app_name", req.FunctionName)) + return nil + } + + pl := map[string]interface{}{ + "app_name": req.imageURL, + } + tp := fmt.Sprintf(fetchRequestTopicTemplate, p.channelID) + if err := p.pubsub.Publish(ctx, tp, pl); err != nil { + return err + } - for { - p.chunksMutex.Lock() - metadata, exists := p.chunkMetadata[req.FunctionName] - receivedChunks := len(p.chunks[req.FunctionName]) - p.chunksMutex.Unlock() + go func() { + p.logger.Info("Waiting for chunks", slog.String("app_name", req.imageURL)) - if exists && receivedChunks == metadata.TotalChunks { - p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.FunctionName)) - go p.deployAndRunApp(ctx, req.FunctionName) + for { + p.chunksMutex.Lock() + metadata, exists := p.chunkMetadata[req.imageURL] + receivedChunks := len(p.chunks[req.imageURL]) + p.chunksMutex.Unlock() - break + if exists && receivedChunks == metadata.TotalChunks { + p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.imageURL)) + wasmBinary := assembleChunks(p.chunks[req.imageURL]) + if err := p.runtime.StartApp(ctx, wasmBinary, req.ID, req.FunctionName, req.Params...); err != nil { + p.logger.Error("Failed to start app", slog.String("app_name", req.imageURL), slog.Any("error", err)) } - time.Sleep(pollingInterval) + break } - }() - } + + time.Sleep(pollingInterval) + } + }() return nil } @@ -222,7 +217,7 @@ func (p *PropletService) handleStopCommand(ctx context.Context) func(topic strin } } -func (p *PropletService) handleChunk(ctx context.Context) func(topic string, msg map[string]interface{}) error { +func (p *PropletService) handleChunk(_ context.Context) func(topic string, msg map[string]interface{}) error { return func(topic string, msg map[string]interface{}) error { data, err := json.Marshal(msg) if err != nil { @@ -249,29 +244,10 @@ func (p *PropletService) handleChunk(ctx context.Context) func(topic string, msg log.Printf("Received chunk %d/%d for app '%s'\n", chunk.ChunkIdx+1, chunk.TotalChunks, chunk.AppName) - if len(p.chunks[chunk.AppName]) == p.chunkMetadata[chunk.AppName].TotalChunks { - log.Printf("All chunks received for app '%s'. Deploying...\n", chunk.AppName) - go p.deployAndRunApp(ctx, chunk.AppName) - } - return nil } } -func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) { - log.Printf("Assembling chunks for app '%s'\n", appName) - - p.chunksMutex.Lock() - chunks := p.chunks[appName] - delete(p.chunks, appName) - p.chunksMutex.Unlock() - - _ = ctx - _ = assembleChunks(chunks) - - log.Printf("App '%s' started successfully\n", appName) -} - func assembleChunks(chunks [][]byte) []byte { var wasmBinary []byte for _, chunk := range chunks { @@ -294,51 +270,3 @@ func (c *ChunkPayload) Validate() error { return nil } - -func (p *PropletService) UpdateRegistry(ctx context.Context, registryURL, registryToken string) error { - if registryURL == "" { - return errors.New("registry URL cannot be empty") - } - if _, err := url.ParseRequestURI(registryURL); err != nil { - return fmt.Errorf("invalid registry URL '%s': %w", registryURL, err) - } - - p.registryURL = registryURL - p.registryToken = registryToken - - log.Printf("App Registry updated and persisted: %s\n", registryURL) - - return nil -} - -func (p *PropletService) registryUpdate(ctx context.Context) func(topic string, msg map[string]interface{}) error { - return func(topic string, msg map[string]interface{}) error { - data, err := json.Marshal(msg) - if err != nil { - return err - } - - var payload struct { - RegistryURL string `json:"registry_url"` - RegistryToken string `json:"registry_token"` - } - if err := json.Unmarshal(data, &payload); err != nil { - return err - } - - ackTopic := fmt.Sprintf(RegistryAckTopicTemplate, p.channelID) - - if err := p.UpdateRegistry(ctx, payload.RegistryURL, payload.RegistryToken); err != nil { - if err := p.pubsub.Publish(ctx, ackTopic, map[string]interface{}{"status": "failure", "error": err.Error()}); err != nil { - p.logger.Error("Failed to publish ack message", slog.String("ack_topic", ackTopic), slog.Any("error", err)) - } - } else { - if err := p.pubsub.Publish(ctx, ackTopic, map[string]interface{}{"status": "success"}); err != nil { - p.logger.Error("Failed to publish ack message", slog.String("ack_topic", ackTopic), slog.Any("error", err)) - } - p.logger.Info("App Registry configuration updated successfully", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL)) - } - - return nil - } -} diff --git a/proxy/http.go b/proxy/http.go new file mode 100644 index 0000000..6c288c9 --- /dev/null +++ b/proxy/http.go @@ -0,0 +1,157 @@ +package proxy + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + + "github.com/absmach/propeller/proplet" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/auth" + "oras.land/oras-go/v2/registry/remote/retry" +) + +const ( + tag = "latest" + size = 1024 * 1024 +) + +type HTTPProxyConfig struct { + ChunkSize int + Authenticate bool + Token string + Username string + Password string + RegistryURL string +} + +func (c *HTTPProxyConfig) setupAuthentication(repo *remote.Repository) { + if !c.Authenticate { + return + } + + var cred auth.Credential + if c.Username != "" && c.Password != "" { + cred = auth.Credential{ + Username: c.Username, + Password: c.Password, + } + } else if c.Token != "" { + cred = auth.Credential{ + AccessToken: c.Token, + } + } + + repo.Client = &auth.Client{ + Client: retry.DefaultClient, + Cache: auth.NewCache(), + Credential: auth.StaticCredential(c.RegistryURL, cred), + } +} + +func (c *HTTPProxyConfig) fetchManifest(ctx context.Context, repo *remote.Repository, containerName string) (*ocispec.Manifest, error) { + descriptor, err := repo.Resolve(ctx, tag) + if err != nil { + return nil, fmt.Errorf("failed to resolve manifest for %s: %w", containerName, err) + } + + reader, err := repo.Fetch(ctx, descriptor) + if err != nil { + return nil, fmt.Errorf("failed to fetch manifest for %s: %w", containerName, err) + } + defer reader.Close() + + manifestData, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read manifest for %s: %w", containerName, err) + } + + var manifest ocispec.Manifest + if err := json.Unmarshal(manifestData, &manifest); err != nil { + return nil, fmt.Errorf("failed to parse manifest for %s: %w", containerName, err) + } + + return &manifest, nil +} + +func findLargestLayer(manifest *ocispec.Manifest) (ocispec.Descriptor, error) { + var largestLayer ocispec.Descriptor + var maxSize int64 + + for _, layer := range manifest.Layers { + if layer.Size > maxSize { + maxSize = layer.Size + largestLayer = layer + } + } + + if largestLayer.Size == 0 { + return ocispec.Descriptor{}, errors.New("no valid layers found in manifest") + } + + return largestLayer, nil +} + +func createChunks(data []byte, containerPath string, chunkSize int) []proplet.ChunkPayload { + dataSize := len(data) + totalChunks := (dataSize + chunkSize - 1) / chunkSize + + chunks := make([]proplet.ChunkPayload, 0, totalChunks) + for i := range make([]struct{}, totalChunks) { + start := i * chunkSize + end := start + chunkSize + if end > dataSize { + end = dataSize + } + + chunkData := data[start:end] + log.Printf("Chunk %d size: %d bytes", i, len(chunkData)) + + chunks = append(chunks, proplet.ChunkPayload{ + AppName: containerPath, + ChunkIdx: i, + TotalChunks: totalChunks, + Data: chunkData, + }) + } + + return chunks +} + +func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerPath string, chunkSize int) ([]proplet.ChunkPayload, error) { + repo, err := remote.NewRepository(containerPath) + if err != nil { + return nil, fmt.Errorf("failed to create repository for %s: %w", containerPath, err) + } + + c.setupAuthentication(repo) + + manifest, err := c.fetchManifest(ctx, repo, containerPath) + if err != nil { + return nil, err + } + + largestLayer, err := findLargestLayer(manifest) + if err != nil { + return nil, fmt.Errorf("failed to find layer for %s: %w", containerPath, err) + } + + log.Printf("Container size: %d bytes (%.2f MB)", largestLayer.Size, float64(largestLayer.Size)/size) + + layerReader, err := repo.Fetch(ctx, largestLayer) + if err != nil { + return nil, fmt.Errorf("failed to fetch layer for %s: %w", containerPath, err) + } + defer layerReader.Close() + + data, err := io.ReadAll(layerReader) + if err != nil { + return nil, fmt.Errorf("failed to read layer for %s: %w", containerPath, err) + } + + return createChunks(data, containerPath, chunkSize), nil +} diff --git a/proxy/mqtt.go b/proxy/mqtt.go new file mode 100644 index 0000000..82c69ae --- /dev/null +++ b/proxy/mqtt.go @@ -0,0 +1,148 @@ +package proxy + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/absmach/propeller/proplet" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type MQTTProxyConfig struct { + BrokerURL string + Password string + PropletID string + ChannelID string +} + +const ( + connTimeout = 10 + reconnTimeout = 1 + disconnTimeout = 250 + pubTopic = "channels/%s/messages/registry/server" + subTopic = "channels/%s/messages/registry/proplet" +) + +type RegistryClient struct { + client mqtt.Client + config *MQTTProxyConfig +} + +func NewMQTTClient(cfg *MQTTProxyConfig) (*RegistryClient, error) { + opts := mqtt.NewClientOptions(). + AddBroker(cfg.BrokerURL). + SetClientID("Proplet-" + cfg.PropletID). + SetUsername(cfg.PropletID). + SetPassword(cfg.Password). + SetCleanSession(true). + SetAutoReconnect(true). + SetConnectTimeout(connTimeout * time.Second). + SetMaxReconnectInterval(reconnTimeout * time.Minute) + + opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + log.Printf("MQTT connection lost: %v\n", err) + }) + + opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) { + log.Println("MQTT reconnecting...") + }) + + opts.SetOnConnectHandler(func(client mqtt.Client) { + log.Println("MQTT connection established successfully") + }) + + client := mqtt.NewClient(opts) + + return &RegistryClient{ + client: client, + config: cfg, + }, nil +} + +func (c *RegistryClient) Connect(ctx context.Context) error { + token := c.client.Connect() + + select { + case <-token.Done(): + if err := token.Error(); err != nil { + return fmt.Errorf("MQTT connection failed: %w", err) + } + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + +func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- string) error { + handler := func(client mqtt.Client, msg mqtt.Message) { + data := msg.Payload() + + payLoad := struct { + Appname string `json:"app_name"` + }{ + Appname: "", + } + + err := json.Unmarshal(data, &payLoad) + if err != nil { + log.Printf("failed unmarshalling: %v", err) + + return + } + + select { + case containerChan <- payLoad.Appname: + log.Printf("Received container request: %s", payLoad.Appname) + case <-ctx.Done(): + + return + default: + log.Println("Channel full, dropping container request") + } + } + + x := fmt.Sprintf(subTopic, c.config.ChannelID) + log.Println(x) + + token := c.client.Subscribe(fmt.Sprintf(subTopic, c.config.ChannelID), 1, handler) + if err := token.Error(); err != nil { + return fmt.Errorf("failed to subscribe to %s: %w", subTopic, err) + } + + return nil +} + +func (c *RegistryClient) PublishContainer(ctx context.Context, chunk proplet.ChunkPayload) error { + data, err := json.Marshal(chunk) + if err != nil { + return fmt.Errorf("failed to marshal chunk payload: %w", err) + } + + token := c.client.Publish(fmt.Sprintf(pubTopic, c.config.ChannelID), 1, false, data) + + select { + case <-token.Done(): + if err := token.Error(); err != nil { + return fmt.Errorf("failed to publish container chunk: %w", err) + } + + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (c *RegistryClient) Disconnect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + c.client.Disconnect(disconnTimeout) + + return nil + } +} diff --git a/proxy/service.go b/proxy/service.go new file mode 100644 index 0000000..c8fe28c --- /dev/null +++ b/proxy/service.go @@ -0,0 +1,102 @@ +package proxy + +import ( + "context" + "fmt" + "log/slog" + + "github.com/absmach/propeller/proplet" +) + +const chunkBuffer = 10 + +type ProxyService struct { + orasconfig *HTTPProxyConfig + mqttClient *RegistryClient + logger *slog.Logger + containerChan chan string + dataChan chan proplet.ChunkPayload +} + +func NewService(ctx context.Context, mqttCfg *MQTTProxyConfig, httpCfg *HTTPProxyConfig, logger *slog.Logger) (*ProxyService, error) { + mqttClient, err := NewMQTTClient(mqttCfg) + if err != nil { + return nil, fmt.Errorf("failed to initialize MQTT client: %w", err) + } + + return &ProxyService{ + orasconfig: httpCfg, + mqttClient: mqttClient, + logger: logger, + containerChan: make(chan string, 1), + dataChan: make(chan proplet.ChunkPayload, chunkBuffer), + }, nil +} + +func (s *ProxyService) MQTTClient() *RegistryClient { + return s.mqttClient +} + +func (s *ProxyService) ContainerChan() chan string { + return s.containerChan +} + +func (s *ProxyService) StreamHTTP(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case containerName := <-s.containerChan: + chunks, err := s.orasconfig.FetchFromReg(ctx, containerName, s.orasconfig.ChunkSize) + if err != nil { + s.logger.Error("failed to fetch container", + slog.Any("container name", containerName), + slog.Any("error", err)) + + continue + } + + // Send each chunk through the data channel + for _, chunk := range chunks { + select { + case s.dataChan <- chunk: + s.logger.Info("sent container chunk to MQTT stream", + slog.Any("container", containerName), + slog.Int("chunk", chunk.ChunkIdx), + slog.Int("total", chunk.TotalChunks)) + case <-ctx.Done(): + return ctx.Err() + } + } + } + } +} + +func (s *ProxyService) StreamMQTT(ctx context.Context) error { + containerChunks := make(map[string]int) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case chunk := <-s.dataChan: + if err := s.mqttClient.PublishContainer(ctx, chunk); err != nil { + s.logger.Error("failed to publish container chunk", + slog.Any("error", err), + slog.Int("chunk", chunk.ChunkIdx), + slog.Int("total", chunk.TotalChunks)) + + continue + } + + containerChunks[chunk.AppName]++ + + if containerChunks[chunk.AppName] == chunk.TotalChunks { + s.logger.Info("successfully sent all chunks", + slog.String("container", chunk.AppName), + slog.Int("total_chunks", chunk.TotalChunks)) + delete(containerChunks, chunk.AppName) + } + } + } +} diff --git a/task/task.go b/task/task.go index 44b7a4e..fbac322 100644 --- a/task/task.go +++ b/task/task.go @@ -1,8 +1,6 @@ package task -import ( - "time" -) +import "time" type State uint8 @@ -35,6 +33,7 @@ type Task struct { ID string `json:"id"` Name string `json:"name"` State State `json:"state"` + ImageURL string `json:"image_url,omitempty"` File []byte `json:"file,omitempty"` Inputs []uint64 `json:"inputs,omitempty"` Results []uint64 `json:"results,omitempty"` diff --git a/test.md b/test.md deleted file mode 100644 index d9a98c6..0000000 --- a/test.md +++ /dev/null @@ -1,104 +0,0 @@ -# Test - -Start docker composition - -```bash -cd docker -docker compose up -``` - -Login as admin user - -```bash -USER_TOKEN=$(magistrala-cli users token admin 12345678 | jq -r .access_token) -``` - -Create a domain - -```bash -DOMAIN_ID=$(magistrala-cli domains create demo demo $USER_TOKEN | jq -r .id) -``` - -Create a thing called manager - -```bash -magistrala-cli things create '{"name": "Propeller Manager", "tags": ["manager", "propeller"], "status": "enabled"}' $DOMAIN_ID $USER_TOKEN -``` - -Set the following environment variables from the respose - -```bash -export MANAGER_THING_ID="" -export MANAGER_THING_KEY="" -``` - -Create a channel called manager - -```bash -magistrala-cli channels create '{"name": "Propeller Manager", "tags": ["manager", "propeller"], "status": "enabled"}' $DOMAIN_ID $USER_TOKEN -``` - -Set the following environment variables from the respose - -```bash -export MANAGER_CHANNEL_ID="" -``` - -Connect the thing to the manager channel - -```bash -magistrala-cli things connect $MANAGER_THING_ID $MANAGER_CHANNEL_ID $DOMAIN_ID $USER_TOKEN -``` - -Create a thing called proplet - -```bash -magistrala-cli things create '{"name": "Propeller Proplet", "tags": ["proplet", "propeller"], "status": "enabled"}' $DOMAIN_ID $USER_TOKEN -``` - -Set the following environment variables from the respose - -```bash -export PROPLET_THING_ID="" -export PROPLET_THING_KEY="" -``` - -Connect the thing to the manager channel - -```bash -magistrala-cli things connect $PROPLET_THING_ID $MANAGER_CHANNEL_ID $DOMAIN_ID $USER_TOKEN -``` - -Publish create message to the manager channel. This creates a new proplet. - -```bash -mosquitto_pub -u $PROPLET_THING_ID -P $PROPLET_THING_KEY -I propeller -t channels/$MANAGER_CHANNEL_ID/messages/control/proplet/create -h localhost -m "{\"proplet_id\": \"$PROPLET_THING_ID\", \"name\": \"proplet-1\"}" -``` - -Publish alive message to the manager channel. This updates the proplet. - -```bash -mosquitto_pub -u $PROPLET_THING_ID -P $PROPLET_THING_KEY -I propeller -t channels/$MANAGER_CHANNEL_ID/messages/control/proplet/alive -h localhost -m "{\"proplet_id\": \"$PROPLET_THING_ID\"}" -``` - -To start the manager, run the following command - -```bash -export MANAGER_THING_ID="" -export MANAGER_THING_KEY="" -export PRMANAGER_CHANNEL_ID="" -export PROPLET_THING_ID="" -export PROPLET_THING_KEY="" -propeller-manager -``` - -To start the proplet, run the following command - -```bash -export MANAGER_THING_ID="" -export MANAGER_THING_KEY="" -export PROPLET_CHANNEL_ID="" -export PROPLET_THING_ID="" -export PROPLET_THING_KEY="" -propeller-proplet -```