Skip to content

Commit

Permalink
Merge pull request #32 from nyagamunene/PROP-27
Browse files Browse the repository at this point in the history
PROP-27 - Implement Registry proxy
  • Loading branch information
drasko authored Dec 20, 2024
2 parents b794084 + 2da6d97 commit 8a91d1c
Show file tree
Hide file tree
Showing 15 changed files with 591 additions and 266 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Propellerd Build
build
config.toml
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ linters:
- err113
- noctx
- cyclop
- tagalign

linters-settings:
gocritic:
Expand Down
10 changes: 4 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ')
VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0')
COMMIT ?= $(shell git rev-parse HEAD)
EXAMPLES = addition long-addition
SERVICES = manager proplet cli
SERVICES = manager proplet cli proxy

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) \
Expand All @@ -20,12 +20,10 @@ endef
$(SERVICES):
$(call compile_service,$(@))


# Install all non-WASM executables from the build directory to GOBIN with 'propeller-' prefix
install:
for file in $(BUILD_DIR)/*; do \
if [[ ! "$$file" =~ \.wasm$$ ]]; then \
cp "$$file" $(GOBIN)/propeller-`basename "$$file"`; \
fi \
done
$(foreach f,$(wildcard $(BUILD_DIR)/*[!.wasm]),cp $(f) $(patsubst $(BUILD_DIR)/%,$(GOBIN)/propeller-%,$(f));)

.PHONY: all $(SERVICES)
all: $(SERVICES)
Expand Down
40 changes: 1 addition & 39 deletions cmd/proplet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"log/slog"
"net/http"
"os"
"time"

Expand All @@ -26,9 +25,6 @@ type config struct {
MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"`
MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"`
LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"`
RegistryURL string `env:"PROPLET_REGISTRY_URL"`
RegistryToken string `env:"PROPLET_REGISTRY_TOKEN"`
RegistryTimeout time.Duration `env:"PROPLET_REGISTRY_TIMEOUT" envDefault:"30s"`
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`
ThingID string `env:"PROPLET_THING_ID,notEmpty"`
ThingKey string `env:"PROPLET_THING_KEY,notEmpty"`
Expand Down Expand Up @@ -57,16 +53,6 @@ func main() {
logger := slog.New(logHandler)
slog.SetDefault(logger)

if cfg.RegistryURL != "" {
if err := checkRegistryConnectivity(ctx, cfg.RegistryURL, cfg.RegistryTimeout); err != nil {
logger.Error("failed to connect to registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err))

return
}

logger.Info("successfully connected to registry URL", slog.String("url", cfg.RegistryURL))
}

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.InstanceID, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger)
if err != nil {
logger.Error("failed to initialize mqtt client", slog.Any("error", err))
Expand All @@ -75,7 +61,7 @@ func main() {
}
wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID)

service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.RegistryURL, cfg.RegistryToken, cfg.LivelinessInterval, mqttPubSub, logger, wazero)
service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero)
if err != nil {
logger.Error("failed to initialize service", slog.Any("error", err))

Expand All @@ -96,27 +82,3 @@ func main() {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}
}

func checkRegistryConnectivity(ctx context.Context, registryURL string, registryTimeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, registryTimeout)
defer cancel()

client := http.DefaultClient

req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to registry URL: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("registry returned unexpected status: %d", resp.StatusCode)
}

return nil
}
111 changes: 111 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"context"
"fmt"
"log"
"log/slog"
"os"

"github.com/absmach/propeller/proxy"
"github.com/caarlos0/env/v11"
"golang.org/x/sync/errgroup"
)

const svcName = "proxy"

type config struct {
LogLevel string `env:"PROXY_LOG_LEVEL" envDefault:"info"`

BrokerURL string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
PropletKey string `env:"PROPLET_THING_KEY,notEmpty"`
PropletID string `env:"PROPLET_THING_ID,notEmpty" `
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`

ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"`
Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"`
Token string `env:"PROXY_REGISTRY_TOKEN" envDefault:""`
Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""`
Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""`
RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"`
}

func main() {
g, ctx := errgroup.WithContext(context.Background())

cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load configuration : %s", err.Error())
}

var level slog.Level
if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil {
log.Fatalf("failed to parse log level: %s", err.Error())
}
logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
})
logger := slog.New(logHandler)
slog.SetDefault(logger)

mqttCfg := proxy.MQTTProxyConfig{
BrokerURL: cfg.BrokerURL,
Password: cfg.PropletKey,
PropletID: cfg.PropletID,
ChannelID: cfg.ChannelID,
}

httpCfg := proxy.HTTPProxyConfig{
ChunkSize: cfg.ChunkSize,
Authenticate: cfg.Authenticate,
Token: cfg.Token,
Username: cfg.Username,
Password: cfg.Password,
RegistryURL: cfg.RegistryURL,
}

logger.Info("successfully initialized MQTT and HTTP config")

service, err := proxy.NewService(ctx, &mqttCfg, &httpCfg, logger)
if err != nil {
logger.Error("failed to create proxy service", slog.Any("error", err))

return
}

logger.Info("starting proxy service")

if err := start(ctx, g, service); err != nil {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}
}

func start(ctx context.Context, g *errgroup.Group, s *proxy.ProxyService) error {
if err := s.MQTTClient().Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", err)
}

slog.Info("successfully connected to broker")

defer func() {
if err := s.MQTTClient().Disconnect(ctx); err != nil {
slog.Error("failed to disconnect MQTT client", "error", err)
}
}()

if err := s.MQTTClient().Subscribe(ctx, s.ContainerChan()); err != nil {
return fmt.Errorf("failed to subscribe to container requests: %w", err)
}

slog.Info("successfully subscribed to topic")

g.Go(func() error {
return s.StreamHTTP(ctx)
})

g.Go(func() error {
return s.StreamMQTT(ctx)
})

return g.Wait()
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,11 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/grpc v1.69.0 // indirect
google.golang.org/protobuf v1.36.0 // indirect
oras.land/oras-go/v2 v2.5.0
)

require (
github.com/caarlos0/env/v11 v11.3.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/caarlos0/env/v11 v11.2.2 h1:95fApNrUyueipoZN/EhA8mMxiNxrBwDa+oAZrMWl3
github.com/caarlos0/env/v11 v11.2.2/go.mod h1:JBfcdeQiBoI3Zh1QRAWfe+tpiNTmDtcCj/hHHHMx0vc=
github.com/caarlos0/env/v11 v11.3.0 h1:CVTN6W6+twFC1jHKUwsw9eOTEiFpzyJOSA2AyHa8uvw=
github.com/caarlos0/env/v11 v11.3.0/go.mod h1:Q5lYHeOsgY20CCV/R+b50Jwg2MnjySid7+3FUBz2BJw=
github.com/caarlos0/env/v11 v11.3.0 h1:CVTN6W6+twFC1jHKUwsw9eOTEiFpzyJOSA2AyHa8uvw=
github.com/caarlos0/env/v11 v11.3.0/go.mod h1:Q5lYHeOsgY20CCV/R+b50Jwg2MnjySid7+3FUBz2BJw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -61,6 +63,10 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
Expand Down Expand Up @@ -121,3 +127,5 @@ google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
oras.land/oras-go/v2 v2.5.0 h1:o8Me9kLY74Vp5uw07QXPiitjsw7qNXi8Twd+19Zf02c=
oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZHg=
4 changes: 4 additions & 0 deletions manager/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type taskReq struct {
}

func (t *taskReq) validate() error {
if t.Name == "" {
return apiutil.ErrMissingName
}

return nil
}

Expand Down
9 changes: 6 additions & 3 deletions proplet/requests.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package proplet

import "errors"
import (
"errors"
)

type startRequest struct {
ID string
FunctionName string
WasmFile []byte
imageURL string
Params []uint64
}

Expand All @@ -16,8 +19,8 @@ func (r startRequest) Validate() error {
if r.FunctionName == "" {
return errors.New("function name is required")
}
if r.WasmFile == nil {
return errors.New("wasm file is required")
if r.WasmFile == nil && r.imageURL == "" {
return errors.New("either wasm file or wasm file download path is required")
}

return nil
Expand Down
Loading

0 comments on commit 8a91d1c

Please sign in to comment.