Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROP-27 - Implement Registry proxy #32

Merged
merged 42 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
57bcec1
Initial implementation of HTTP to MQTT proxy
nyagamunene Dec 9, 2024
3a10e82
update mqtt client and http client
nyagamunene Dec 10, 2024
2c29619
refactor oras http
nyagamunene Dec 10, 2024
b83e12d
add env file and main file
nyagamunene Dec 11, 2024
3fedc22
add env file and main file
nyagamunene Dec 11, 2024
0401587
update go.mod and go.sum file
nyagamunene Dec 11, 2024
e2ac8c5
fix failing linter
nyagamunene Dec 11, 2024
e9755d2
fix tag align
nyagamunene Dec 11, 2024
16a8e47
fix failing linter
nyagamunene Dec 11, 2024
046a167
fix start method
nyagamunene Dec 11, 2024
ee09666
remove white spaces
nyagamunene Dec 11, 2024
b442273
fix failing linter
nyagamunene Dec 11, 2024
26a6706
fix failing linter
nyagamunene Dec 11, 2024
48bf08f
address comments and change how data is sent
nyagamunene Dec 12, 2024
ebea46b
fix failing linter
nyagamunene Dec 12, 2024
2f4621a
fix failing linter
nyagamunene Dec 12, 2024
7751314
add logging
nyagamunene Dec 12, 2024
182f89f
add documentation and debug connection issue
nyagamunene Dec 13, 2024
a86c749
update go mod and go sum file
nyagamunene Dec 13, 2024
1f4b1a5
remove password
nyagamunene Dec 16, 2024
0b323e6
add comments
nyagamunene Dec 16, 2024
b1d7f38
fix failing linter
nyagamunene Dec 16, 2024
df6c46c
add validation
nyagamunene Dec 16, 2024
7356002
adjust size of data sent via nats
nyagamunene Dec 16, 2024
b9a4d20
add contants
nyagamunene Dec 16, 2024
f9aa43e
refactor FetchFromReg
nyagamunene Dec 16, 2024
50f4f32
add logging after all chunks were sent sucessfully
nyagamunene Dec 16, 2024
fa04558
change chunk_payload type
nyagamunene Dec 16, 2024
4c718e4
update test documentation
nyagamunene Dec 16, 2024
27608f0
update env variables
nyagamunene Dec 18, 2024
fb81865
fix failing linter
nyagamunene Dec 18, 2024
c31f6d5
update env file
nyagamunene Dec 19, 2024
93cb573
remove proxy read me
nyagamunene Dec 19, 2024
471d22e
intergrate proplet and manager with proxy
nyagamunene Dec 19, 2024
b439ca0
remove unused variable
nyagamunene Dec 19, 2024
6c13917
address comments
nyagamunene Dec 20, 2024
03b75aa
remove linter check
nyagamunene Dec 20, 2024
a1f4940
address comments
nyagamunene Dec 20, 2024
cf7a9eb
remove test.md file
nyagamunene Dec 20, 2024
0f70398
update make install command
nyagamunene Dec 20, 2024
febf61c
update make install command
nyagamunene Dec 20, 2024
2da6d97
add comments to make install command
nyagamunene Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Propellerd Build
build
config.toml
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ linters:
- err113
- noctx
- cyclop
- tagalign

linters-settings:
gocritic:
Expand Down
10 changes: 4 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ')
VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0')
COMMIT ?= $(shell git rev-parse HEAD)
EXAMPLES = addition long-addition
SERVICES = manager proplet cli
SERVICES = manager proplet cli proxy

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) \
Expand All @@ -20,12 +20,10 @@ endef
$(SERVICES):
$(call compile_service,$(@))


# Install all non-WASM executables from the build directory to GOBIN with 'propeller-' prefix
install:
for file in $(BUILD_DIR)/*; do \
if [[ ! "$$file" =~ \.wasm$$ ]]; then \
cp "$$file" $(GOBIN)/propeller-`basename "$$file"`; \
fi \
done
$(foreach f,$(wildcard $(BUILD_DIR)/*[!.wasm]),cp $(f) $(patsubst $(BUILD_DIR)/%,$(GOBIN)/propeller-%,$(f));)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible, in order to simplify handling of binaries, to have a subdir wasm within build dir, and put all Wasm binaries there. I guess these binaries are coming from examples, right? If really needed you can also add build/bin and build/wasm, but I do not think that we need build/bin (as most of the time we'll need just Propeller services (and not examples), and they can live directly in the top build dir, for convenience), but if this simplifies copying, then please go ahead and make both build/bin and build/wasm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, as you would need to prefix each bin anyway - I will merge as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible, in order to simplify handling of binaries, to have a subdir wasm within build dir, and put all Wasm binaries there. I guess these binaries are coming from examples, right? If really needed you can also add build/bin and build/wasm, but I do not think that we need build/bin (as most of the time we'll need just Propeller services (and not examples), and they can live directly in the top build dir, for convenience), but if this simplifies copying, then please go ahead and make both build/bin and build/wasm.

We can implement this .


.PHONY: all $(SERVICES)
all: $(SERVICES)
Expand Down
40 changes: 1 addition & 39 deletions cmd/proplet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"log/slog"
"net/http"
"os"
"time"

Expand All @@ -26,9 +25,6 @@ type config struct {
MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"`
MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"`
LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"`
RegistryURL string `env:"PROPLET_REGISTRY_URL"`
RegistryToken string `env:"PROPLET_REGISTRY_TOKEN"`
RegistryTimeout time.Duration `env:"PROPLET_REGISTRY_TIMEOUT" envDefault:"30s"`
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`
ThingID string `env:"PROPLET_THING_ID,notEmpty"`
ThingKey string `env:"PROPLET_THING_KEY,notEmpty"`
Expand Down Expand Up @@ -57,16 +53,6 @@ func main() {
logger := slog.New(logHandler)
slog.SetDefault(logger)

if cfg.RegistryURL != "" {
if err := checkRegistryConnectivity(ctx, cfg.RegistryURL, cfg.RegistryTimeout); err != nil {
logger.Error("failed to connect to registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err))

return
}

logger.Info("successfully connected to registry URL", slog.String("url", cfg.RegistryURL))
}

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.InstanceID, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger)
if err != nil {
logger.Error("failed to initialize mqtt client", slog.Any("error", err))
Expand All @@ -75,7 +61,7 @@ func main() {
}
wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID)

service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.RegistryURL, cfg.RegistryToken, cfg.LivelinessInterval, mqttPubSub, logger, wazero)
service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero)
if err != nil {
logger.Error("failed to initialize service", slog.Any("error", err))

Expand All @@ -96,27 +82,3 @@ func main() {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}
}

func checkRegistryConnectivity(ctx context.Context, registryURL string, registryTimeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, registryTimeout)
defer cancel()

client := http.DefaultClient

req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to registry URL: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("registry returned unexpected status: %d", resp.StatusCode)
}

return nil
}
111 changes: 111 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"context"
"fmt"
"log"
"log/slog"
"os"

"github.com/absmach/propeller/proxy"
"github.com/caarlos0/env/v11"
"golang.org/x/sync/errgroup"
)

const svcName = "proxy"

type config struct {
LogLevel string `env:"PROXY_LOG_LEVEL" envDefault:"info"`

BrokerURL string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
PropletKey string `env:"PROPLET_THING_KEY,notEmpty"`
PropletID string `env:"PROPLET_THING_ID,notEmpty" `
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`

ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"`
Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"`
Token string `env:"PROXY_REGISTRY_TOKEN" envDefault:""`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need token and password or it is either one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its either of the 2

Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""`
Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""`
RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"`
}

func main() {
g, ctx := errgroup.WithContext(context.Background())

cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load configuration : %s", err.Error())
}

var level slog.Level
if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil {
log.Fatalf("failed to parse log level: %s", err.Error())
}
logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
})
logger := slog.New(logHandler)
slog.SetDefault(logger)

mqttCfg := proxy.MQTTProxyConfig{
BrokerURL: cfg.BrokerURL,
Password: cfg.PropletKey,
PropletID: cfg.PropletID,
ChannelID: cfg.ChannelID,
}

httpCfg := proxy.HTTPProxyConfig{
ChunkSize: cfg.ChunkSize,
Authenticate: cfg.Authenticate,
Token: cfg.Token,
Username: cfg.Username,
Password: cfg.Password,
RegistryURL: cfg.RegistryURL,
}

logger.Info("successfully initialized MQTT and HTTP config")

service, err := proxy.NewService(ctx, &mqttCfg, &httpCfg, logger)
if err != nil {
logger.Error("failed to create proxy service", slog.Any("error", err))

return
}

logger.Info("starting proxy service")

if err := start(ctx, g, service); err != nil {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}
}

func start(ctx context.Context, g *errgroup.Group, s *proxy.ProxyService) error {
if err := s.MQTTClient().Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", err)
}

slog.Info("successfully connected to broker")

defer func() {
if err := s.MQTTClient().Disconnect(ctx); err != nil {
slog.Error("failed to disconnect MQTT client", "error", err)
}
}()

if err := s.MQTTClient().Subscribe(ctx, s.ContainerChan()); err != nil {
return fmt.Errorf("failed to subscribe to container requests: %w", err)
}

slog.Info("successfully subscribed to topic")

g.Go(func() error {
return s.StreamHTTP(ctx)
})

g.Go(func() error {
return s.StreamMQTT(ctx)
})

return g.Wait()
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,11 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/grpc v1.69.0 // indirect
google.golang.org/protobuf v1.36.0 // indirect
oras.land/oras-go/v2 v2.5.0
)

require (
github.com/caarlos0/env/v11 v11.3.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/caarlos0/env/v11 v11.2.2 h1:95fApNrUyueipoZN/EhA8mMxiNxrBwDa+oAZrMWl3
github.com/caarlos0/env/v11 v11.2.2/go.mod h1:JBfcdeQiBoI3Zh1QRAWfe+tpiNTmDtcCj/hHHHMx0vc=
github.com/caarlos0/env/v11 v11.3.0 h1:CVTN6W6+twFC1jHKUwsw9eOTEiFpzyJOSA2AyHa8uvw=
github.com/caarlos0/env/v11 v11.3.0/go.mod h1:Q5lYHeOsgY20CCV/R+b50Jwg2MnjySid7+3FUBz2BJw=
github.com/caarlos0/env/v11 v11.3.0 h1:CVTN6W6+twFC1jHKUwsw9eOTEiFpzyJOSA2AyHa8uvw=
github.com/caarlos0/env/v11 v11.3.0/go.mod h1:Q5lYHeOsgY20CCV/R+b50Jwg2MnjySid7+3FUBz2BJw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -61,6 +63,10 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
Expand Down Expand Up @@ -121,3 +127,5 @@ google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
oras.land/oras-go/v2 v2.5.0 h1:o8Me9kLY74Vp5uw07QXPiitjsw7qNXi8Twd+19Zf02c=
oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZHg=
4 changes: 4 additions & 0 deletions manager/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type taskReq struct {
}

func (t *taskReq) validate() error {
if t.Name == "" {
return apiutil.ErrMissingName
}

return nil
}

Expand Down
9 changes: 6 additions & 3 deletions proplet/requests.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package proplet

import "errors"
import (
"errors"
)

type startRequest struct {
ID string
FunctionName string
WasmFile []byte
imageURL string
Params []uint64
}

Expand All @@ -16,8 +19,8 @@ func (r startRequest) Validate() error {
if r.FunctionName == "" {
return errors.New("function name is required")
}
if r.WasmFile == nil {
return errors.New("wasm file is required")
if r.WasmFile == nil && r.imageURL == "" {
return errors.New("either wasm file or wasm file download path is required")
}

return nil
Expand Down
Loading
Loading