diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 350f28b..40e221f 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -24,13 +24,11 @@ func main() { ctx, cancel := context.WithCancel(ctx) defer cancel() - // Set up signal handling sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - // Loading .env file to environment err := godotenv.Load() if err != nil { panic(err) @@ -44,18 +42,16 @@ func main() { cfgH, err := config.LoadHTTPConfig(env.Options{Prefix: httpPrefix}) if err != nil { - logger.Error("Failed to load HTTP configuration", slog.Any("error", err)) + logger.Error("Failed to load HTTP configuration", slog.Any("error", err)) os.Exit(1) } - // Create proxy service - service, err := proxy.NewService(ctx, cfgM,cfgH, logger) + service, err := proxy.NewService(ctx, cfgM, cfgH, logger) if err != nil { logger.Error("failed to create proxy service", "error", err) os.Exit(1) } - // Start the service go func() { if err := start(ctx, service); err != nil { logger.Error("service error", "error", err) @@ -63,7 +59,6 @@ func main() { } }() - // Wait for signal <-sigChan cancel() } diff --git a/proxy/mqtt/mqtt.go b/proxy/mqtt/mqtt.go index e26c7f4..4c84e0e 100644 --- a/proxy/mqtt/mqtt.go +++ b/proxy/mqtt/mqtt.go @@ -16,13 +16,12 @@ type RegistryClient struct { config *config.MQTTProxyConfig } -func NewMQTTClient(config *config.MQTTProxyConfig) (*RegistryClient, error) { - fmt.Printf("config is %+v\n", config) +func NewMQTTClient(cfg *config.MQTTProxyConfig) (*RegistryClient, error) { opts := mqtt.NewClientOptions(). - AddBroker(config.BrokerURL). - SetClientID(fmt.Sprintf("Proplet-%s", config.PropletID)). - SetUsername(config.PropletID). - SetPassword(config.Password). + AddBroker(cfg.BrokerURL). + SetClientID(fmt.Sprintf("Proplet-%s", cfg.PropletID)). + SetUsername(cfg.PropletID). + SetPassword(cfg.Password). SetCleanSession(true). SetAutoReconnect(true). SetConnectTimeout(10 * time.Second). @@ -40,7 +39,7 @@ func NewMQTTClient(config *config.MQTTProxyConfig) (*RegistryClient, error) { return &RegistryClient{ client: client, - config: config, + config: cfg, }, nil } @@ -61,7 +60,6 @@ func (c *RegistryClient) Connect(ctx context.Context) error { func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- string) error { subTopic := fmt.Sprintf("channels/%s/message/registry/proplet", c.config.ChannelID) - fmt.Printf("subtopic is %+v\n", subTopic) handler := func(client mqtt.Client, msg mqtt.Message) { data := msg.Payload() @@ -95,7 +93,6 @@ func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- str return nil } -// PublishContainer publishes container data to the server channel func (c *RegistryClient) PublishContainer(ctx context.Context, containerData []byte) error { pubTopic := fmt.Sprintf("channels/%s/messages/registry/server", c.config.ChannelID) @@ -113,17 +110,11 @@ func (c *RegistryClient) PublishContainer(ctx context.Context, containerData []b } func (c *RegistryClient) Disconnect(ctx context.Context) error { - disconnectChan := make(chan error, 1) - - go func() { - c.client.Disconnect(250) - disconnectChan <- nil - }() - select { - case err := <-disconnectChan: - return err case <-ctx.Done(): return ctx.Err() + default: + c.client.Disconnect(250) + return nil } } diff --git a/proxy/service.go b/proxy/service.go index a192593..8e14a2f 100644 --- a/proxy/service.go +++ b/proxy/service.go @@ -32,17 +32,14 @@ func NewService(ctx context.Context, cfgM *config.MQTTProxyConfig, cfgH *config. }, nil } -// MQTTClient returns the MQTT client func (s *ProxyService) MQTTClient() *mqtt.RegistryClient { return s.mqttClient } -// ContainerChan returns the container channel func (s *ProxyService) ContainerChan() chan string { return s.containerChan } -// StreamHTTP handles the HTTP stream processing func (s *ProxyService) StreamHTTP(ctx context.Context, errs chan error) { for { select { @@ -67,7 +64,6 @@ func (s *ProxyService) StreamHTTP(ctx context.Context, errs chan error) { } } -// StreamMQTT handles the MQTT stream processing func (s *ProxyService) StreamMQTT(ctx context.Context, errs chan error) { for { select {