Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: nyagamunene <[email protected]>
  • Loading branch information
nyagamunene committed Dec 20, 2024
1 parent 03b75aa commit a1f4940
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 106 deletions.
8 changes: 4 additions & 4 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ const svcName = "proxy"
type config struct {
LogLevel string `env:"PROXY_LOG_LEVEL" envDefault:"info"`

BrokerURL string `env:"PROXY_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
PropletKey string `env:"PROXY_PROPLET_KEY,notEmpty"`
PropletID string `env:"PROXY_PROPLET_ID,notEmpty" `
ChannelID string `env:"PROXY_CHANNEL_ID,notEmpty"`
BrokerURL string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
PropletKey string `env:"PROPLET_THING_KEY,notEmpty"`
PropletID string `env:"PROPLET_THING_ID,notEmpty" `
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`

ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"`
Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"`
Expand Down
6 changes: 2 additions & 4 deletions proplet/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package proplet

import (
"errors"

"github.com/absmach/propeller/task"
)

type startRequest struct {
ID string
FunctionName string
WasmFile []byte
imageURL task.URLValue
imageURL string
Params []uint64
}

Expand All @@ -21,7 +19,7 @@ func (r startRequest) Validate() error {
if r.FunctionName == "" {
return errors.New("function name is required")
}
if r.WasmFile == nil && r.imageURL == (task.URLValue{}) {
if r.WasmFile == nil && r.imageURL == "" {
return errors.New("either wasm file or wasm file download path is required")
}

Expand Down
13 changes: 6 additions & 7 deletions proplet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,19 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri
}

go func() {
p.logger.Info("Waiting for chunks", slog.String("app_name", req.imageURL.String()))
p.logger.Info("Waiting for chunks", slog.String("app_name", req.imageURL))

for {
p.chunksMutex.Lock()
urlStr := req.imageURL.String()
metadata, exists := p.chunkMetadata[urlStr]
receivedChunks := len(p.chunks[urlStr])
metadata, exists := p.chunkMetadata[req.imageURL]
receivedChunks := len(p.chunks[req.imageURL])
p.chunksMutex.Unlock()

if exists && receivedChunks == metadata.TotalChunks {
p.logger.Info("All chunks received, deploying app", slog.String("app_name", urlStr))
wasmBinary := assembleChunks(p.chunks[urlStr])
p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.imageURL))
wasmBinary := assembleChunks(p.chunks[req.imageURL])
if err := p.runtime.StartApp(ctx, wasmBinary, req.ID, req.FunctionName, req.Params...); err != nil {
p.logger.Error("Failed to start app", slog.String("app_name", urlStr), slog.Any("error", err))
p.logger.Error("Failed to start app", slog.String("app_name", req.imageURL), slog.Any("error", err))
}

break
Expand Down
19 changes: 8 additions & 11 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"log"

"github.com/absmach/propeller/proplet"
"github.com/absmach/propeller/task"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
Expand Down Expand Up @@ -43,7 +42,6 @@ func (c *HTTPProxyConfig) setupAuthentication(repo *remote.Repository) {
}
} else if c.Token != "" {
cred = auth.Credential{
Username: c.Username,
AccessToken: c.Token,
}
}
Expand Down Expand Up @@ -124,37 +122,36 @@ func createChunks(data []byte, containerPath string, chunkSize int) []proplet.Ch
return chunks
}

func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerPath task.URLValue, chunkSize int) ([]proplet.ChunkPayload, error) {
reference := containerPath.String()
repo, err := remote.NewRepository(reference)
func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerPath string, chunkSize int) ([]proplet.ChunkPayload, error) {
repo, err := remote.NewRepository(containerPath)
if err != nil {
return nil, fmt.Errorf("failed to create repository for %s: %w", reference, err)
return nil, fmt.Errorf("failed to create repository for %s: %w", containerPath, err)
}

c.setupAuthentication(repo)

manifest, err := c.fetchManifest(ctx, repo, reference)
manifest, err := c.fetchManifest(ctx, repo, containerPath)
if err != nil {
return nil, err
}

largestLayer, err := findLargestLayer(manifest)
if err != nil {
return nil, fmt.Errorf("failed to find layer for %s: %w", reference, err)
return nil, fmt.Errorf("failed to find layer for %s: %w", containerPath, err)
}

log.Printf("Container size: %d bytes (%.2f MB)", largestLayer.Size, float64(largestLayer.Size)/size)

layerReader, err := repo.Fetch(ctx, largestLayer)
if err != nil {
return nil, fmt.Errorf("failed to fetch layer for %s: %w", reference, err)
return nil, fmt.Errorf("failed to fetch layer for %s: %w", containerPath, err)
}
defer layerReader.Close()

data, err := io.ReadAll(layerReader)
if err != nil {
return nil, fmt.Errorf("failed to read layer for %s: %w", reference, err)
return nil, fmt.Errorf("failed to read layer for %s: %w", containerPath, err)
}

return createChunks(data, reference, chunkSize), nil
return createChunks(data, containerPath, chunkSize), nil
}
9 changes: 4 additions & 5 deletions proxy/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/absmach/propeller/proplet"
"github.com/absmach/propeller/task"
mqtt "github.com/eclipse/paho.mqtt.golang"
)

Expand Down Expand Up @@ -78,14 +77,14 @@ func (c *RegistryClient) Connect(ctx context.Context) error {
return nil
}

func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- task.URLValue) error {
func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- string) error {
handler := func(client mqtt.Client, msg mqtt.Message) {
data := msg.Payload()

payLoad := struct {
Appname task.URLValue `json:"app_name"`
Appname string `json:"app_name"`
}{
Appname: task.URLValue{},
Appname: "",
}

err := json.Unmarshal(data, &payLoad)
Expand All @@ -97,7 +96,7 @@ func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- tas

select {
case containerChan <- payLoad.Appname:
log.Printf("Received container request: %s", payLoad.Appname.String())
log.Printf("Received container request: %s", payLoad.Appname)
case <-ctx.Done():

return
Expand Down
7 changes: 3 additions & 4 deletions proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"

"github.com/absmach/propeller/proplet"
"github.com/absmach/propeller/task"
)

const chunkBuffer = 10
Expand All @@ -15,7 +14,7 @@ type ProxyService struct {
orasconfig *HTTPProxyConfig
mqttClient *RegistryClient
logger *slog.Logger
containerChan chan task.URLValue
containerChan chan string
dataChan chan proplet.ChunkPayload
}

Expand All @@ -29,7 +28,7 @@ func NewService(ctx context.Context, mqttCfg *MQTTProxyConfig, httpCfg *HTTPProx
orasconfig: httpCfg,
mqttClient: mqttClient,
logger: logger,
containerChan: make(chan task.URLValue, 1),
containerChan: make(chan string, 1),
dataChan: make(chan proplet.ChunkPayload, chunkBuffer),
}, nil
}
Expand All @@ -38,7 +37,7 @@ func (s *ProxyService) MQTTClient() *RegistryClient {
return s.mqttClient
}

func (s *ProxyService) ContainerChan() chan task.URLValue {
func (s *ProxyService) ContainerChan() chan string {
return s.containerChan
}

Expand Down
2 changes: 1 addition & 1 deletion task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Task struct {
ID string `json:"id"`
Name string `json:"name"`
State State `json:"state"`
ImageURL URLValue `json:"image_url,omitempty"`
ImageURL string `json:"image_url,omitempty"`
File []byte `json:"file,omitempty"`
Inputs []uint64 `json:"inputs,omitempty"`
Results []uint64 `json:"results,omitempty"`
Expand Down
45 changes: 0 additions & 45 deletions task/url.go

This file was deleted.

25 changes: 0 additions & 25 deletions test.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,3 @@ export PROPLET_THING_ID=""
export PROPLET_THING_KEY=""
propeller-proplet
```

To start the proxy, run the following command

```bash
export PROXY_REGISTRY_URL=""
export PROXY_AUTHENTICATE="TRUE"
export PROXY_REGISTRY_USERNAME=""
export PROXY_REGISTRY_PASSWORD=""
export PROXY_PROPLET_KEY=""
export PROXY_PROPLET_ID=""
export PROXY_CHANNEL_ID=""
propeller-proxy
```

Subscibe to MQTT channel to download the requested binary

```bash
mosquitto_sub -I propeller -u $PROXY_PROPLET_ID -P $PROXY_PROPLET_KEY -t channels/$PROXY_CHANNEL_ID/messages/registry/server -h localhost
```

Publish to MQTT channel to request the container to download

```bash
mosquitto_pub -I propeller -u $PROXY_PROPLET_ID -P $PROXY_PROPLET_KEY -t channels/$PROXY_CHANNEL_ID/messages/registry/proplet -h localhost -m '{"app_name":"mrstevenyaga/add.wasm"}'
```

0 comments on commit a1f4940

Please sign in to comment.