From 7377a6dc06254b87fc4de95f10ad4972764d81f5 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Fri, 13 Dec 2024 19:36:42 +0300 Subject: [PATCH] add documentation and debug connection issue Signed-off-by: nyagamunene --- .gitignore | 1 + cmd/proxy/.env | 9 ----- cmd/proxy/main.go | 47 ++++++++++++------------ proxy/README.md | 87 ++++++++++++++++++++++++++++++++++++++++++++ proxy/config/http.go | 10 ----- proxy/config/mqtt.go | 21 ++--------- proxy/mqtt/mqtt.go | 5 ++- 7 files changed, 120 insertions(+), 60 deletions(-) delete mode 100644 cmd/proxy/.env create mode 100644 proxy/README.md diff --git a/.gitignore b/.gitignore index 3ea852f..0e2f099 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ # Propellerd Build build +config.toml diff --git a/cmd/proxy/.env b/cmd/proxy/.env deleted file mode 100644 index fc5da20..0000000 --- a/cmd/proxy/.env +++ /dev/null @@ -1,9 +0,0 @@ -MQTT_REGISTRY_BROKER_URL=localhost:1883 -MQTT_REGISTRY_PROPLET_ID=test-proplet -MQTT_REGISTRY_CHANNEL_ID=test-channel -MQTT_REGISTRY_PASSWORD= - -HTTP_REGISTRY_URL=localhost:5000 -HTTP_AUTHENTICATE=false -HTTP_USERNAME= -HTTP_PASSWORD= diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 4ba43d0..0ba1d9e 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -8,8 +8,6 @@ import ( "github.com/absmach/propeller/proxy" "github.com/absmach/propeller/proxy/config" - "github.com/caarlos0/env/v11" - "github.com/joho/godotenv" "golang.org/x/sync/errgroup" ) @@ -19,36 +17,41 @@ const ( httpPrefix = "HTTP_" ) +const ( + BrokerURL = "localhost:1883" + PropletID = "72fd490b-f91f-47dc-aa0b-a65931719ee1" + ChannelID = "cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2" + PropletPassword = "3963a940-332e-4a18-aa57-bab4d4124ab0" + + RegistryURL = "docker.io" + Authenticate = true + RegistryUsername = "mrstevenyaga" + RegistryPassword = "Nya@851612" +) + func main() { g, ctx := errgroup.WithContext(context.Background()) logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) slog.SetDefault(logger) - err := godotenv.Load("cmd/proxy/.env") - if err != nil { - panic(err) - } - - mqttCfg, err := config.LoadMQTTConfig(env.Options{Prefix: mqttPrefix}) - if err != nil { - logger.Error("failed to load MQTT configuration", slog.Any("error", err)) - - return + mqttCfg := config.MQTTProxyConfig{ + BrokerURL: BrokerURL, + Password: PropletPassword, + PropletID: PropletID, + ChannelID: ChannelID, } - logger.Info("successfully loaded MQTT config") - - httpCfg, err := config.LoadHTTPConfig(env.Options{Prefix: httpPrefix}) - if err != nil { - logger.Error("failed to load HTTP configuration", slog.Any("error", err)) - - return + httpCfg := config.HTTPProxyConfig{ + RegistryURL: RegistryURL, + Authenticate: Authenticate, + Username: RegistryUsername, + Password: RegistryPassword, } - logger.Info("successfully loaded HTTP config") + logger.Info("successfully initialized MQTT and HTTP config") - service, err := proxy.NewService(ctx, mqttCfg, httpCfg, logger) + service, err := proxy.NewService(ctx, &mqttCfg, &httpCfg, logger) if err != nil { logger.Error("failed to create proxy service", "error", err) @@ -63,8 +66,6 @@ func main() { } func start(ctx context.Context, g *errgroup.Group, s *proxy.ProxyService) error { - slog.Info("connecting...") - if err := s.MQTTClient().Connect(ctx); err != nil { return fmt.Errorf("failed to connect to MQTT broker: %w", err) } diff --git a/proxy/README.md b/proxy/README.md new file mode 100644 index 0000000..c097a5a --- /dev/null +++ b/proxy/README.md @@ -0,0 +1,87 @@ +# Proxy Service + +The Proxy Service acts as a bridge between MQTT and HTTP protocols in the Propeller system. It enables bidirectional communication between MQTT clients and HTTP endpoints, allowing for seamless integration of different protocols. + +## Overview + +The proxy service performs two main functions: +1. Subscribes to MQTT topics and forwards messages to HTTP endpoints +2. Streams data between MQTT and HTTP protocols + +## Configuration + +The service is configured using environment variables. + +### Environment Variables + +#### MQTT Configuration + +| Variable | Description | Default | Required | +|----------|-------------|---------|----------| +| `BrokerURL` | URL of the MQTT broker | `localhost:1883` | Yes | +| `PropletID` | Unique identifier for the proplet | `72fd490b-f91f-47dc-aa0b-a65931719ee1` | Yes | +| `ChannelID` | Channel identifier for MQTT communication | `cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2` | Yes | +| `PropletPassword` | Password for MQTT authentication | `3963a940-332e-4a18-aa57-bab4d4124ab0` | Yes | + +#### Registry Configuration + +| Variable | Description | Default | Required | +|----------|-------------|---------|----------| +| `RegistryURL` | URL of the HTTP registry | `localhost:5000` | Yes | +| `Authenticate` | Enable/disable registry authentication | `false` | No | +| `RegistryUsername` | Username for registry authentication | `""` | Only if `Authenticate=true` | +| `RegistryPassword` | Password for registry authentication | `""` | Only if `Authenticate=true` | + +### Example Configuration +```env +# MQTT Configuration +BrokerURL=localhost:1883 +PropletID=72fd490b-f91f-47dc-aa0b-a65931719ee1 +ChannelID=cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2 +PropletPassword=3963a940-332e-4a18-aa57-bab4d4124ab0 + +# Registry Configuration +RegistryURL=localhost:5000 +Authenticate=false +RegistryUsername= +RegistryPassword= +``` + +## Running the Service + +The proxy service can be started by running the main.go file: + +```bash +go run cmd/proxy/main.go +``` + +## Service Flow + +1. **Initialization** + - Loads configuration from environment variables + - Sets up logging + - Creates a new proxy service instance + +2. **Connection** + - Establishes connection to the MQTT broker + - Subscribes to configured topics + - Sets up HTTP streaming + +3. **Operation** + - Runs two concurrent streams: + - StreamHTTP: Handles HTTP communication + - StreamMQTT: Handles MQTT communication + - Uses error groups for graceful error handling and shutdown + +4. **Error Handling** + - Implements comprehensive error logging + - Graceful shutdown with proper resource cleanup + - Automatic disconnection from MQTT broker on service termination + +## HTTP Registry Operations + +The HTTP configuration supports: +- Registry operations with optional authentication +- Automatic retry mechanism for failed requests +- Chunked data handling with configurable chunk size (1MB default) +- Static credential caching for authenticated requests diff --git a/proxy/config/http.go b/proxy/config/http.go index 92fe57e..d3da929 100644 --- a/proxy/config/http.go +++ b/proxy/config/http.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - "github.com/caarlos0/env/v11" "oras.land/oras-go/v2/registry/remote" "oras.land/oras-go/v2/registry/remote/auth" "oras.land/oras-go/v2/registry/remote/retry" @@ -30,15 +29,6 @@ type HTTPProxyConfig struct { Password string `env:"PASSWORD" envDefault:""` } -func LoadHTTPConfig(opts env.Options) (*HTTPProxyConfig, error) { - config := HTTPProxyConfig{} - if err := env.ParseWithOptions(&config, opts); err != nil { - return nil, err - } - - return &config, nil -} - func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string) ([]ChunkPayload, error) { fullPath := fmt.Sprintf("%s/%s", c.RegistryURL, containerName) diff --git a/proxy/config/mqtt.go b/proxy/config/mqtt.go index 35843d9..fc148e5 100644 --- a/proxy/config/mqtt.go +++ b/proxy/config/mqtt.go @@ -1,21 +1,8 @@ package config -import ( - "github.com/caarlos0/env/v11" -) - type MQTTProxyConfig struct { - BrokerURL string `env:"BROKER_URL" envDefault:""` - Password string `env:"PASSWORD" envDefault:""` - PropletID string `env:"PROPLET_ID" envDefault:""` - ChannelID string `env:"CHANNEL_ID" envDefault:""` -} - -func LoadMQTTConfig(opts env.Options) (*MQTTProxyConfig, error) { - c := MQTTProxyConfig{} - if err := env.ParseWithOptions(&c, opts); err != nil { - return nil, err - } - - return &c, nil + BrokerURL string + Password string + PropletID string + ChannelID string } diff --git a/proxy/mqtt/mqtt.go b/proxy/mqtt/mqtt.go index 5ff9ccd..5a1ff96 100644 --- a/proxy/mqtt/mqtt.go +++ b/proxy/mqtt/mqtt.go @@ -16,7 +16,7 @@ const ( reconnTimeout = 1 disconnTimeout = 250 pubTopic = "channels/%s/messages/registry/server" - subTopic = "channels/%s/message/registry/proplet" + subTopic = "channels/%s/messages/registry/proplet" ) type RegistryClient struct { @@ -94,6 +94,9 @@ func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- str } } + x := fmt.Sprintf(subTopic, c.config.ChannelID) + log.Println(x) + token := c.client.Subscribe(fmt.Sprintf(subTopic, c.config.ChannelID), 1, handler) if err := token.Error(); err != nil { return fmt.Errorf("failed to subscribe to %s: %w", subTopic, err)