Skip to content

Commit

Permalink
add documentation and debug connection issue
Browse files Browse the repository at this point in the history
Signed-off-by: nyagamunene <[email protected]>
  • Loading branch information
nyagamunene committed Dec 13, 2024
1 parent a67b93e commit 7377a6d
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 60 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Propellerd Build
build
config.toml
9 changes: 0 additions & 9 deletions cmd/proxy/.env

This file was deleted.

47 changes: 24 additions & 23 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/absmach/propeller/proxy"
"github.com/absmach/propeller/proxy/config"
"github.com/caarlos0/env/v11"
"github.com/joho/godotenv"
"golang.org/x/sync/errgroup"
)

Expand All @@ -19,36 +17,41 @@ const (
httpPrefix = "HTTP_"
)

const (
BrokerURL = "localhost:1883"
PropletID = "72fd490b-f91f-47dc-aa0b-a65931719ee1"
ChannelID = "cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2"
PropletPassword = "3963a940-332e-4a18-aa57-bab4d4124ab0"

RegistryURL = "docker.io"
Authenticate = true
RegistryUsername = "mrstevenyaga"
RegistryPassword = "Nya@851612"
)

func main() {
g, ctx := errgroup.WithContext(context.Background())

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
slog.SetDefault(logger)

err := godotenv.Load("cmd/proxy/.env")
if err != nil {
panic(err)
}

mqttCfg, err := config.LoadMQTTConfig(env.Options{Prefix: mqttPrefix})
if err != nil {
logger.Error("failed to load MQTT configuration", slog.Any("error", err))

return
mqttCfg := config.MQTTProxyConfig{
BrokerURL: BrokerURL,
Password: PropletPassword,
PropletID: PropletID,
ChannelID: ChannelID,
}

logger.Info("successfully loaded MQTT config")

httpCfg, err := config.LoadHTTPConfig(env.Options{Prefix: httpPrefix})
if err != nil {
logger.Error("failed to load HTTP configuration", slog.Any("error", err))

return
httpCfg := config.HTTPProxyConfig{
RegistryURL: RegistryURL,
Authenticate: Authenticate,
Username: RegistryUsername,
Password: RegistryPassword,
}

logger.Info("successfully loaded HTTP config")
logger.Info("successfully initialized MQTT and HTTP config")

service, err := proxy.NewService(ctx, mqttCfg, httpCfg, logger)
service, err := proxy.NewService(ctx, &mqttCfg, &httpCfg, logger)
if err != nil {
logger.Error("failed to create proxy service", "error", err)

Expand All @@ -63,8 +66,6 @@ func main() {
}

func start(ctx context.Context, g *errgroup.Group, s *proxy.ProxyService) error {
slog.Info("connecting...")

if err := s.MQTTClient().Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", err)
}
Expand Down
87 changes: 87 additions & 0 deletions proxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Proxy Service

The Proxy Service acts as a bridge between MQTT and HTTP protocols in the Propeller system. It enables bidirectional communication between MQTT clients and HTTP endpoints, allowing for seamless integration of different protocols.

## Overview

The proxy service performs two main functions:
1. Subscribes to MQTT topics and forwards messages to HTTP endpoints
2. Streams data between MQTT and HTTP protocols

## Configuration

The service is configured using environment variables.

### Environment Variables

#### MQTT Configuration

| Variable | Description | Default | Required |
|----------|-------------|---------|----------|
| `BrokerURL` | URL of the MQTT broker | `localhost:1883` | Yes |
| `PropletID` | Unique identifier for the proplet | `72fd490b-f91f-47dc-aa0b-a65931719ee1` | Yes |
| `ChannelID` | Channel identifier for MQTT communication | `cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2` | Yes |
| `PropletPassword` | Password for MQTT authentication | `3963a940-332e-4a18-aa57-bab4d4124ab0` | Yes |

#### Registry Configuration

| Variable | Description | Default | Required |
|----------|-------------|---------|----------|
| `RegistryURL` | URL of the HTTP registry | `localhost:5000` | Yes |
| `Authenticate` | Enable/disable registry authentication | `false` | No |
| `RegistryUsername` | Username for registry authentication | `""` | Only if `Authenticate=true` |
| `RegistryPassword` | Password for registry authentication | `""` | Only if `Authenticate=true` |

### Example Configuration
```env
# MQTT Configuration
BrokerURL=localhost:1883
PropletID=72fd490b-f91f-47dc-aa0b-a65931719ee1
ChannelID=cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2
PropletPassword=3963a940-332e-4a18-aa57-bab4d4124ab0
# Registry Configuration
RegistryURL=localhost:5000
Authenticate=false
RegistryUsername=
RegistryPassword=
```

## Running the Service

The proxy service can be started by running the main.go file:

```bash
go run cmd/proxy/main.go
```

## Service Flow

1. **Initialization**
- Loads configuration from environment variables
- Sets up logging
- Creates a new proxy service instance

2. **Connection**
- Establishes connection to the MQTT broker
- Subscribes to configured topics
- Sets up HTTP streaming

3. **Operation**
- Runs two concurrent streams:
- StreamHTTP: Handles HTTP communication
- StreamMQTT: Handles MQTT communication
- Uses error groups for graceful error handling and shutdown

4. **Error Handling**
- Implements comprehensive error logging
- Graceful shutdown with proper resource cleanup
- Automatic disconnection from MQTT broker on service termination

## HTTP Registry Operations

The HTTP configuration supports:
- Registry operations with optional authentication
- Automatic retry mechanism for failed requests
- Chunked data handling with configurable chunk size (1MB default)
- Static credential caching for authenticated requests
10 changes: 0 additions & 10 deletions proxy/config/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"

"github.com/caarlos0/env/v11"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"
Expand All @@ -30,15 +29,6 @@ type HTTPProxyConfig struct {
Password string `env:"PASSWORD" envDefault:""`
}

func LoadHTTPConfig(opts env.Options) (*HTTPProxyConfig, error) {
config := HTTPProxyConfig{}
if err := env.ParseWithOptions(&config, opts); err != nil {
return nil, err
}

return &config, nil
}

func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string) ([]ChunkPayload, error) {
fullPath := fmt.Sprintf("%s/%s", c.RegistryURL, containerName)

Expand Down
21 changes: 4 additions & 17 deletions proxy/config/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
package config

import (
"github.com/caarlos0/env/v11"
)

type MQTTProxyConfig struct {
BrokerURL string `env:"BROKER_URL" envDefault:""`
Password string `env:"PASSWORD" envDefault:""`
PropletID string `env:"PROPLET_ID" envDefault:""`
ChannelID string `env:"CHANNEL_ID" envDefault:""`
}

func LoadMQTTConfig(opts env.Options) (*MQTTProxyConfig, error) {
c := MQTTProxyConfig{}
if err := env.ParseWithOptions(&c, opts); err != nil {
return nil, err
}

return &c, nil
BrokerURL string
Password string
PropletID string
ChannelID string
}
5 changes: 4 additions & 1 deletion proxy/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
reconnTimeout = 1
disconnTimeout = 250
pubTopic = "channels/%s/messages/registry/server"
subTopic = "channels/%s/message/registry/proplet"
subTopic = "channels/%s/messages/registry/proplet"
)

type RegistryClient struct {
Expand Down Expand Up @@ -94,6 +94,9 @@ func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- str
}
}

x := fmt.Sprintf(subTopic, c.config.ChannelID)
log.Println(x)

token := c.client.Subscribe(fmt.Sprintf(subTopic, c.config.ChannelID), 1, handler)
if err := token.Error(); err != nil {
return fmt.Errorf("failed to subscribe to %s: %w", subTopic, err)
Expand Down

0 comments on commit 7377a6d

Please sign in to comment.