Skip to content

Commit

Permalink
add logging
Browse files Browse the repository at this point in the history
Signed-off-by: nyagamunene <[email protected]>
  • Loading branch information
nyagamunene committed Dec 12, 2024
1 parent 00d194a commit a67b93e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 29 deletions.
12 changes: 6 additions & 6 deletions cmd/proxy/.env
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
MQTT_REGISTRY_BROKER_URL=mqtt://localhost:1883
MQTT_REGISTRY_PASSWORD=0e8d1d8d-e3b8-4c20-8873-df200fb56100
MQTT_REGISTRY_PROPLET_ID=7fb3ce2f-271c-4e03-8481-5af7d29f9fd1
MQTT_REGISTRY_CHANNEL_ID=0c5c3658-e069-41d3-b08c-2023f2aad55d
MQTT_REGISTRY_BROKER_URL=localhost:1883
MQTT_REGISTRY_PROPLET_ID=test-proplet
MQTT_REGISTRY_CHANNEL_ID=test-channel
MQTT_REGISTRY_PASSWORD=

HTTP_REGISTRY_URL=
HTTP_AUTHENTICATE=
HTTP_REGISTRY_URL=localhost:5000
HTTP_AUTHENTICATE=false
HTTP_USERNAME=
HTTP_PASSWORD=
39 changes: 27 additions & 12 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
)

const (
svcName = "proxy"
mqttPrefix = "MQTT_REGISTRY_"
httpPrefix = "HTTP_"
chanSize = 2
)

func main() {
Expand All @@ -25,44 +25,52 @@ func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
slog.SetDefault(logger)

err := godotenv.Load()
err := godotenv.Load("cmd/proxy/.env")
if err != nil {
panic(err)
}

mqttCfg, err := config.LoadMQTTConfig(env.Options{Prefix: mqttPrefix})
if err != nil {
logger.Error("Failed to load MQTT configuration", slog.Any("error", err))
logger.Error("failed to load MQTT configuration", slog.Any("error", err))

return
}

logger.Info("successfully loaded MQTT config")

httpCfg, err := config.LoadHTTPConfig(env.Options{Prefix: httpPrefix})
if err != nil {
logger.Error("Failed to load HTTP configuration", slog.Any("error", err))
logger.Error("failed to load HTTP configuration", slog.Any("error", err))

return
}

logger.Info("successfully loaded HTTP config")

service, err := proxy.NewService(ctx, mqttCfg, httpCfg, logger)
if err != nil {
logger.Error("failed to create proxy service", "error", err)

return
}

g.Go(func() error {
return start(ctx, service)
})
logger.Info("starting proxy service")

if err := start(ctx, g, service); err != nil {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}
}

func start(ctx context.Context, s *proxy.ProxyService) error {
errs := make(chan error, chanSize)
func start(ctx context.Context, g *errgroup.Group, s *proxy.ProxyService) error {
slog.Info("connecting...")

if err := s.MQTTClient().Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", err)
}

slog.Info("successfully connected to broker")

defer func() {
if err := s.MQTTClient().Disconnect(ctx); err != nil {
slog.Error("failed to disconnect MQTT client", "error", err)
Expand All @@ -73,8 +81,15 @@ func start(ctx context.Context, s *proxy.ProxyService) error {
return fmt.Errorf("failed to subscribe to container requests: %w", err)
}

go s.StreamHTTP(ctx, errs)
go s.StreamMQTT(ctx, errs)
slog.Info("successfully subscribed to topic")

g.Go(func() error {
return s.StreamHTTP(ctx)
})

g.Go(func() error {
return s.StreamMQTT(ctx)
})

return <-errs
return g.Wait()
}
18 changes: 7 additions & 11 deletions proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func NewService(ctx context.Context, mqttCfg *config.MQTTProxyConfig, httpCfg *c
return nil, fmt.Errorf("failed to initialize MQTT client: %w", err)
}

logger.Info("successfully initialized MQTT client")

return &ProxyService{
orasconfig: httpCfg,
mqttClient: mqttClient,
Expand All @@ -42,13 +44,11 @@ func (s *ProxyService) ContainerChan() chan string {
return s.containerChan
}

func (s *ProxyService) StreamHTTP(ctx context.Context, errs chan error) {
func (s *ProxyService) StreamHTTP(ctx context.Context) error {
for {
select {
case <-ctx.Done():
errs <- ctx.Err()

return
return ctx.Err()
case containerName := <-s.containerChan:
chunks, err := s.orasconfig.FetchFromReg(ctx, containerName)
if err != nil {
Expand All @@ -66,22 +66,18 @@ func (s *ProxyService) StreamHTTP(ctx context.Context, errs chan error) {
"chunk", chunk.ChunkIdx,
"total", chunk.TotalChunks)
case <-ctx.Done():
errs <- ctx.Err()

return
return ctx.Err()
}
}
}
}
}

func (s *ProxyService) StreamMQTT(ctx context.Context, errs chan error) {
func (s *ProxyService) StreamMQTT(ctx context.Context) error {
for {
select {
case <-ctx.Done():
errs <- ctx.Err()

return
return ctx.Err()
case chunk := <-s.dataChan:
if err := s.mqttClient.PublishContainer(ctx, chunk); err != nil {
s.logger.Error("failed to publish container chunk",
Expand Down

0 comments on commit a67b93e

Please sign in to comment.