From d6ea81871ac0c204cdf2cd06778f04e9ccd2c8fc Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 11 Dec 2024 14:45:44 +0300 Subject: [PATCH] Fix CI Signed-off-by: JeffMboya Fix CI Signed-off-by: JeffMboya Fix CI Signed-off-by: JeffMboya --- cmd/proplet/main.go | 29 +++++++++++++++++++++++------ proplet/config.go | 33 ++++++++++++++++----------------- proplet/service.go | 34 +++++++++++++++++----------------- 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index 15d6937..ef241d8 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "flag" "fmt" "log" @@ -24,6 +25,13 @@ var ( ) func main() { + if err := run(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func run() error { flag.StringVar(&wasmFilePath, "file", "", "Path to the WASM file") flag.Parse() @@ -48,13 +56,15 @@ func main() { cfg, err := proplet.LoadConfig("proplet/config.json", hasWASMFile) if err != nil { logger.Error("Failed to load configuration", slog.String("path", "proplet/config.json"), slog.Any("error", err)) - os.Exit(1) + + return fmt.Errorf("failed to load configuration: %w", err) } if cfg.RegistryURL != "" { if err := checkRegistryConnectivity(cfg.RegistryURL, logger); err != nil { logger.Error("Failed connectivity check for Registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err)) - os.Exit(1) + + return fmt.Errorf("registry connectivity check failed: %w", err) } logger.Info("Registry connectivity verified", slog.String("url", cfg.RegistryURL)) } @@ -63,25 +73,32 @@ func main() { wasmBinary, err = loadWASMFile(wasmFilePath, logger) if err != nil { logger.Error("Failed to load WASM file", slog.String("wasm_file_path", wasmFilePath), slog.Any("error", err)) - os.Exit(1) + + return fmt.Errorf("failed to load WASM file: %w", err) } logger.Info("WASM binary loaded at startup", slog.Int("size_bytes", len(wasmBinary))) } if cfg.RegistryURL == "" && wasmBinary == nil { logger.Error("Neither a registry URL nor a WASM binary file was provided") - os.Exit(1) + + return errors.New("missing registry URL and WASM binary file") } service, err := proplet.NewService(ctx, cfg, wasmBinary, logger) if err != nil { logger.Error("Error initializing service", slog.Any("error", err)) - os.Exit(1) + + return fmt.Errorf("service initialization error: %w", err) } if err := service.Run(ctx, logger); err != nil { logger.Error("Error running service", slog.Any("error", err)) + + return fmt.Errorf("service run error: %w", err) } + + return nil } func configureLogger(level string) *slog.Logger { @@ -115,7 +132,7 @@ func checkRegistryConnectivity(registryURL string, logger *slog.Logger) error { logger.Info("Checking registry connectivity", slog.String("url", registryURL)) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody) if err != nil { logger.Error("Failed to create HTTP request", slog.String("url", registryURL), slog.Any("error", err)) diff --git a/proplet/config.go b/proplet/config.go index 9e8f396..1bea3f1 100644 --- a/proplet/config.go +++ b/proplet/config.go @@ -2,6 +2,7 @@ package proplet import ( "encoding/json" + "errors" "fmt" "net/url" "os" @@ -37,28 +38,26 @@ func LoadConfig(filepath string, hasWASMFile bool) (Config, error) { } func (c Config) Validate(hasWASMFile bool) error { - if c.BrokerURL == "" { - return fmt.Errorf("broker_url is required") + requiredFields := map[string]string{ + "broker_url": c.BrokerURL, + "password": c.Password, + "proplet_id": c.PropletID, + "channel_id": c.ChannelID, } + + for fieldName, value := range requiredFields { + if value == "" { + return fmt.Errorf("%s is required", fieldName) + } + } + if _, err := url.Parse(c.BrokerURL); err != nil { return fmt.Errorf("broker_url is not a valid URL: %w", err) } - if c.Password == "" { - return fmt.Errorf("password is required") - } - if c.PropletID == "" { - return fmt.Errorf("proplet_id is required") - } - if c.ChannelID == "" { - return fmt.Errorf("channel_id is required") - } if !hasWASMFile { - if c.RegistryURL == "" { - return fmt.Errorf("registry_url is required when not using a WASM file") - } - if c.RegistryToken == "" { - return fmt.Errorf("registry_token is required when not using a WASM file") + if c.RegistryURL == "" || c.RegistryToken == "" { + return errors.New("registry_url and registry_token are required when not using a WASM file") } } @@ -67,7 +66,7 @@ func (c Config) Validate(hasWASMFile bool) error { return fmt.Errorf("registry_url is not a valid URL: %w", err) } if c.RegistryToken == "" { - return fmt.Errorf("registry_token is required when a registry_url is provided") + return errors.New("registry_token is required when a registry_url is provided") } } diff --git a/proplet/service.go b/proplet/service.go index 0088b6a..5a8d8dc 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -133,13 +133,13 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { p.mqttClient, p.config, func(client mqtt.Client, msg mqtt.Message) { - p.handleStartCommand(client, msg, logger) + p.handleStartCommand(ctx, client, msg, logger) }, func(client mqtt.Client, msg mqtt.Message) { - p.handleStopCommand(client, msg, logger) + p.handleStopCommand(ctx, client, msg, logger) }, func(client mqtt.Client, msg mqtt.Message) { - p.registryUpdate(client, msg, logger) + p.registryUpdate(ctx, client, msg, logger) }, logger, ); err != nil { @@ -150,7 +150,7 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { p.mqttClient, p.config.ChannelID, func(client mqtt.Client, msg mqtt.Message) { - p.handleChunk(client, msg) + p.handleChunk(ctx, client, msg) }, logger, ); err != nil { @@ -163,7 +163,7 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { return nil } -func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { +func (p *PropletService) handleStartCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { var req propletapi.StartRequest if err := json.Unmarshal(msg.Payload(), &req); err != nil { logger.Error("Invalid start command payload", slog.Any("error", err)) @@ -175,14 +175,14 @@ func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, log if p.wasmBinary != nil { logger.Info("Using preloaded WASM binary", slog.String("app_name", req.AppName)) - function, err := p.runtime.StartApp(context.Background(), req.AppName, p.wasmBinary, "main") + function, err := p.runtime.StartApp(ctx, req.AppName, p.wasmBinary, "main") if err != nil { logger.Error("Failed to start app", slog.String("app_name", req.AppName), slog.Any("error", err)) return } - _, err = function.Call(context.Background()) + _, err = function.Call(ctx) if err != nil { logger.Error("Error executing app", slog.String("app_name", req.AppName), slog.Any("error", err)) } else { @@ -211,7 +211,7 @@ func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, log if exists && receivedChunks == metadata.TotalChunks { logger.Info("All chunks received, deploying app", slog.String("app_name", req.AppName)) - go p.deployAndRunApp(req.AppName) + go p.deployAndRunApp(ctx, req.AppName) break } @@ -224,7 +224,7 @@ func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, log } } -func (p *PropletService) handleStopCommand(_ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { +func (p *PropletService) handleStopCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { var req propletapi.StopRequest if err := json.Unmarshal(msg.Payload(), &req); err != nil { logger.Error("Invalid stop command payload", slog.Any("error", err)) @@ -234,7 +234,7 @@ func (p *PropletService) handleStopCommand(_ mqtt.Client, msg mqtt.Message, logg logger.Info("Received stop command", slog.String("app_name", req.AppName)) - err := p.runtime.StopApp(context.Background(), req.AppName) + err := p.runtime.StopApp(ctx, req.AppName) if err != nil { logger.Error("Failed to stop app", slog.String("app_name", req.AppName), slog.Any("error", err)) @@ -244,7 +244,7 @@ func (p *PropletService) handleStopCommand(_ mqtt.Client, msg mqtt.Message, logg logger.Info("App stopped successfully", slog.String("app_name", req.AppName)) } -func (p *PropletService) handleChunk(_ mqtt.Client, msg mqtt.Message) { +func (p *PropletService) handleChunk(ctx context.Context, _ mqtt.Client, msg mqtt.Message) { var chunk ChunkPayload if err := json.Unmarshal(msg.Payload(), &chunk); err != nil { log.Printf("Failed to unmarshal chunk payload: %v", err) @@ -271,11 +271,11 @@ func (p *PropletService) handleChunk(_ mqtt.Client, msg mqtt.Message) { if len(p.chunks[chunk.AppName]) == p.chunkMetadata[chunk.AppName].TotalChunks { log.Printf("All chunks received for app '%s'. Deploying...\n", chunk.AppName) - go p.deployAndRunApp(chunk.AppName) + go p.deployAndRunApp(ctx, chunk.AppName) } } -func (p *PropletService) deployAndRunApp(appName string) { +func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) { log.Printf("Assembling chunks for app '%s'\n", appName) p.chunksMutex.Lock() @@ -285,14 +285,14 @@ func (p *PropletService) deployAndRunApp(appName string) { wasmBinary := assembleChunks(chunks) - function, err := p.runtime.StartApp(context.Background(), appName, wasmBinary, "main") + function, err := p.runtime.StartApp(ctx, appName, wasmBinary, "main") if err != nil { log.Printf("Failed to start app '%s': %v\n", appName, err) return } - _, err = function.Call(context.Background()) + _, err = function.Call(ctx) if err != nil { log.Printf("Failed to execute app '%s': %v\n", appName, err) @@ -350,7 +350,7 @@ func (p *PropletService) UpdateRegistry(ctx context.Context, registryURL, regist return nil } -func (p *PropletService) registryUpdate(client mqtt.Client, msg mqtt.Message, logger *slog.Logger) { +func (p *PropletService) registryUpdate(ctx context.Context, client mqtt.Client, msg mqtt.Message, logger *slog.Logger) { var payload struct { RegistryURL string `json:"registry_url"` RegistryToken string `json:"registry_token"` @@ -362,7 +362,7 @@ func (p *PropletService) registryUpdate(client mqtt.Client, msg mqtt.Message, lo } ackTopic := fmt.Sprintf(RegistryAckTopicTemplate, p.config.ChannelID) - if err := p.UpdateRegistry(context.Background(), payload.RegistryURL, payload.RegistryToken); err != nil { + if err := p.UpdateRegistry(ctx, payload.RegistryURL, payload.RegistryToken); err != nil { client.Publish(ackTopic, 0, false, fmt.Sprintf(RegistryFailurePayload, err)) logger.Error("Failed to update registry configuration", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL), slog.Any("error", err)) } else {