Skip to content

Commit

Permalink
Fix CI
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <[email protected]>

Fix CI

Signed-off-by: JeffMboya <[email protected]>

Fix CI

Signed-off-by: JeffMboya <[email protected]>
  • Loading branch information
JeffMboya committed Dec 11, 2024
1 parent 027c3fa commit d6ea818
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 40 deletions.
29 changes: 23 additions & 6 deletions cmd/proplet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
Expand All @@ -24,6 +25,13 @@ var (
)

func main() {
if err := run(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

func run() error {
flag.StringVar(&wasmFilePath, "file", "", "Path to the WASM file")
flag.Parse()

Expand All @@ -48,13 +56,15 @@ func main() {
cfg, err := proplet.LoadConfig("proplet/config.json", hasWASMFile)
if err != nil {
logger.Error("Failed to load configuration", slog.String("path", "proplet/config.json"), slog.Any("error", err))
os.Exit(1)

return fmt.Errorf("failed to load configuration: %w", err)
}

if cfg.RegistryURL != "" {
if err := checkRegistryConnectivity(cfg.RegistryURL, logger); err != nil {
logger.Error("Failed connectivity check for Registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err))
os.Exit(1)

return fmt.Errorf("registry connectivity check failed: %w", err)
}
logger.Info("Registry connectivity verified", slog.String("url", cfg.RegistryURL))
}
Expand All @@ -63,25 +73,32 @@ func main() {
wasmBinary, err = loadWASMFile(wasmFilePath, logger)
if err != nil {
logger.Error("Failed to load WASM file", slog.String("wasm_file_path", wasmFilePath), slog.Any("error", err))
os.Exit(1)

return fmt.Errorf("failed to load WASM file: %w", err)
}
logger.Info("WASM binary loaded at startup", slog.Int("size_bytes", len(wasmBinary)))
}

if cfg.RegistryURL == "" && wasmBinary == nil {
logger.Error("Neither a registry URL nor a WASM binary file was provided")
os.Exit(1)

return errors.New("missing registry URL and WASM binary file")
}

service, err := proplet.NewService(ctx, cfg, wasmBinary, logger)
if err != nil {
logger.Error("Error initializing service", slog.Any("error", err))
os.Exit(1)

return fmt.Errorf("service initialization error: %w", err)
}

if err := service.Run(ctx, logger); err != nil {
logger.Error("Error running service", slog.Any("error", err))

return fmt.Errorf("service run error: %w", err)
}

return nil
}

func configureLogger(level string) *slog.Logger {
Expand Down Expand Up @@ -115,7 +132,7 @@ func checkRegistryConnectivity(registryURL string, logger *slog.Logger) error {

logger.Info("Checking registry connectivity", slog.String("url", registryURL))

req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody)
if err != nil {
logger.Error("Failed to create HTTP request", slog.String("url", registryURL), slog.Any("error", err))

Expand Down
33 changes: 16 additions & 17 deletions proplet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proplet

import (
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -37,28 +38,26 @@ func LoadConfig(filepath string, hasWASMFile bool) (Config, error) {
}

func (c Config) Validate(hasWASMFile bool) error {
if c.BrokerURL == "" {
return fmt.Errorf("broker_url is required")
requiredFields := map[string]string{
"broker_url": c.BrokerURL,
"password": c.Password,
"proplet_id": c.PropletID,
"channel_id": c.ChannelID,
}

for fieldName, value := range requiredFields {
if value == "" {
return fmt.Errorf("%s is required", fieldName)
}
}

if _, err := url.Parse(c.BrokerURL); err != nil {
return fmt.Errorf("broker_url is not a valid URL: %w", err)
}
if c.Password == "" {
return fmt.Errorf("password is required")
}
if c.PropletID == "" {
return fmt.Errorf("proplet_id is required")
}
if c.ChannelID == "" {
return fmt.Errorf("channel_id is required")
}

if !hasWASMFile {
if c.RegistryURL == "" {
return fmt.Errorf("registry_url is required when not using a WASM file")
}
if c.RegistryToken == "" {
return fmt.Errorf("registry_token is required when not using a WASM file")
if c.RegistryURL == "" || c.RegistryToken == "" {
return errors.New("registry_url and registry_token are required when not using a WASM file")
}
}

Expand All @@ -67,7 +66,7 @@ func (c Config) Validate(hasWASMFile bool) error {
return fmt.Errorf("registry_url is not a valid URL: %w", err)
}
if c.RegistryToken == "" {
return fmt.Errorf("registry_token is required when a registry_url is provided")
return errors.New("registry_token is required when a registry_url is provided")
}
}

Expand Down
34 changes: 17 additions & 17 deletions proplet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error {
p.mqttClient,
p.config,
func(client mqtt.Client, msg mqtt.Message) {
p.handleStartCommand(client, msg, logger)
p.handleStartCommand(ctx, client, msg, logger)
},
func(client mqtt.Client, msg mqtt.Message) {
p.handleStopCommand(client, msg, logger)
p.handleStopCommand(ctx, client, msg, logger)
},
func(client mqtt.Client, msg mqtt.Message) {
p.registryUpdate(client, msg, logger)
p.registryUpdate(ctx, client, msg, logger)
},
logger,
); err != nil {
Expand All @@ -150,7 +150,7 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error {
p.mqttClient,
p.config.ChannelID,
func(client mqtt.Client, msg mqtt.Message) {
p.handleChunk(client, msg)
p.handleChunk(ctx, client, msg)
},
logger,
); err != nil {
Expand All @@ -163,7 +163,7 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error {
return nil
}

func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, logger *slog.Logger) {
func (p *PropletService) handleStartCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) {
var req propletapi.StartRequest
if err := json.Unmarshal(msg.Payload(), &req); err != nil {
logger.Error("Invalid start command payload", slog.Any("error", err))
Expand All @@ -175,14 +175,14 @@ func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, log

if p.wasmBinary != nil {
logger.Info("Using preloaded WASM binary", slog.String("app_name", req.AppName))
function, err := p.runtime.StartApp(context.Background(), req.AppName, p.wasmBinary, "main")
function, err := p.runtime.StartApp(ctx, req.AppName, p.wasmBinary, "main")
if err != nil {
logger.Error("Failed to start app", slog.String("app_name", req.AppName), slog.Any("error", err))

return
}

_, err = function.Call(context.Background())
_, err = function.Call(ctx)
if err != nil {
logger.Error("Error executing app", slog.String("app_name", req.AppName), slog.Any("error", err))
} else {
Expand Down Expand Up @@ -211,7 +211,7 @@ func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, log

if exists && receivedChunks == metadata.TotalChunks {
logger.Info("All chunks received, deploying app", slog.String("app_name", req.AppName))
go p.deployAndRunApp(req.AppName)
go p.deployAndRunApp(ctx, req.AppName)

break
}
Expand All @@ -224,7 +224,7 @@ func (p *PropletService) handleStartCommand(_ mqtt.Client, msg mqtt.Message, log
}
}

func (p *PropletService) handleStopCommand(_ mqtt.Client, msg mqtt.Message, logger *slog.Logger) {
func (p *PropletService) handleStopCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) {
var req propletapi.StopRequest
if err := json.Unmarshal(msg.Payload(), &req); err != nil {
logger.Error("Invalid stop command payload", slog.Any("error", err))
Expand All @@ -234,7 +234,7 @@ func (p *PropletService) handleStopCommand(_ mqtt.Client, msg mqtt.Message, logg

logger.Info("Received stop command", slog.String("app_name", req.AppName))

err := p.runtime.StopApp(context.Background(), req.AppName)
err := p.runtime.StopApp(ctx, req.AppName)
if err != nil {
logger.Error("Failed to stop app", slog.String("app_name", req.AppName), slog.Any("error", err))

Expand All @@ -244,7 +244,7 @@ func (p *PropletService) handleStopCommand(_ mqtt.Client, msg mqtt.Message, logg
logger.Info("App stopped successfully", slog.String("app_name", req.AppName))
}

func (p *PropletService) handleChunk(_ mqtt.Client, msg mqtt.Message) {
func (p *PropletService) handleChunk(ctx context.Context, _ mqtt.Client, msg mqtt.Message) {
var chunk ChunkPayload
if err := json.Unmarshal(msg.Payload(), &chunk); err != nil {
log.Printf("Failed to unmarshal chunk payload: %v", err)
Expand All @@ -271,11 +271,11 @@ func (p *PropletService) handleChunk(_ mqtt.Client, msg mqtt.Message) {

if len(p.chunks[chunk.AppName]) == p.chunkMetadata[chunk.AppName].TotalChunks {
log.Printf("All chunks received for app '%s'. Deploying...\n", chunk.AppName)
go p.deployAndRunApp(chunk.AppName)
go p.deployAndRunApp(ctx, chunk.AppName)
}
}

func (p *PropletService) deployAndRunApp(appName string) {
func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) {
log.Printf("Assembling chunks for app '%s'\n", appName)

p.chunksMutex.Lock()
Expand All @@ -285,14 +285,14 @@ func (p *PropletService) deployAndRunApp(appName string) {

wasmBinary := assembleChunks(chunks)

function, err := p.runtime.StartApp(context.Background(), appName, wasmBinary, "main")
function, err := p.runtime.StartApp(ctx, appName, wasmBinary, "main")
if err != nil {
log.Printf("Failed to start app '%s': %v\n", appName, err)

return
}

_, err = function.Call(context.Background())
_, err = function.Call(ctx)
if err != nil {
log.Printf("Failed to execute app '%s': %v\n", appName, err)

Expand Down Expand Up @@ -350,7 +350,7 @@ func (p *PropletService) UpdateRegistry(ctx context.Context, registryURL, regist
return nil
}

func (p *PropletService) registryUpdate(client mqtt.Client, msg mqtt.Message, logger *slog.Logger) {
func (p *PropletService) registryUpdate(ctx context.Context, client mqtt.Client, msg mqtt.Message, logger *slog.Logger) {
var payload struct {
RegistryURL string `json:"registry_url"`
RegistryToken string `json:"registry_token"`
Expand All @@ -362,7 +362,7 @@ func (p *PropletService) registryUpdate(client mqtt.Client, msg mqtt.Message, lo
}

ackTopic := fmt.Sprintf(RegistryAckTopicTemplate, p.config.ChannelID)
if err := p.UpdateRegistry(context.Background(), payload.RegistryURL, payload.RegistryToken); err != nil {
if err := p.UpdateRegistry(ctx, payload.RegistryURL, payload.RegistryToken); err != nil {
client.Publish(ackTopic, 0, false, fmt.Sprintf(RegistryFailurePayload, err))
logger.Error("Failed to update registry configuration", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL), slog.Any("error", err))
} else {
Expand Down

0 comments on commit d6ea818

Please sign in to comment.