Skip to content

Commit

Permalink
Merge pull request #29 from absmach/manager
Browse files Browse the repository at this point in the history
PROP- 25 - Implement Manager Service
  • Loading branch information
drasko authored Dec 10, 2024
2 parents 398c5a6 + 992ae1a commit 82da6e8
Show file tree
Hide file tree
Showing 62 changed files with 4,155 additions and 72 deletions.
37 changes: 37 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
run:
timeout: 10m

issues:
max-issues-per-linter: 100
max-same-issues: 100

linters:
enable-all: true
fast: false
disable:
- lll
- wsl
- depguard
- tagliatelle
- gomnd # This linter is deprecated
- execinquery # This linter is deprecated
- exportloopref # This linter is deprecated
- gochecknoglobals
- ireturn
- exhaustruct
- wrapcheck
- musttag
- revive
- varnamelen
- nonamedreturns
- gosec
- funlen
- interfacebloat
- dupl
- err113

linters-settings:
gocritic:
enable-all: true
disabled-checks:
- hugeParam
16 changes: 9 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package main
import (
"context"
_ "embed"
"fmt"
"log"

"github.com/absmach/propeller/proplet"
"github.com/absmach/propeller/task"
"github.com/absmach/propeller/worker"
"github.com/google/uuid"
)

Expand All @@ -28,13 +28,15 @@ func main() {
},
}

fmt.Printf("task: %s\n", t.Name)
log.Printf("task: %s\n", t.Name)

w := worker.NewWasmWorker("Wasm-Worker-1")
w.StartTask(ctx, t)
w := proplet.NewWasmProplet("Wasm-Proplet-1")
if err := w.StartTask(ctx, t); err != nil {
log.Println(err)
}
results, err := w.RunTask(ctx, t.ID)
if err != nil {
fmt.Println(err)
log.Println(err)
}
fmt.Printf("results: %v\n", results)
log.Printf("results: %v\n", results)
}
111 changes: 111 additions & 0 deletions cmd/manager/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package manager

import (
"context"
"fmt"
"log/slog"
"net/url"
"os"
"time"

"github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/propeller/manager"
"github.com/absmach/propeller/manager/api"
"github.com/absmach/propeller/manager/middleware"
"github.com/absmach/propeller/pkg/mqtt"
"github.com/absmach/propeller/pkg/scheduler"
"github.com/absmach/propeller/pkg/storage"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"
)

const svcName = "manager"

type Config struct {
LogLevel string
OTELURL url.URL
TraceRatio float64
Server server.Config
InstanceID string
ChannelID string
ThingID string
ThingKey string
MQTTAddress string
MQTTQOS uint8
MQTTTimeout time.Duration
}

func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) error {
g, ctx := errgroup.WithContext(ctx)

var level slog.Level
if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil {
return fmt.Errorf("failed to parse log level: %s", err.Error())
}
logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
})
logger := slog.New(logHandler)
slog.SetDefault(logger)

var tp trace.TracerProvider
switch {
case cfg.OTELURL == (url.URL{}):
tp = noop.NewTracerProvider()
default:
sdktp, err := jaeger.NewProvider(ctx, svcName, cfg.OTELURL, "", cfg.TraceRatio)
if err != nil {
return fmt.Errorf("failed to initialize opentelemetry: %s", err.Error())
}
defer func() {
if err := sdktp.Shutdown(ctx); err != nil {
slog.Error("error shutting down tracer provider", slog.Any("error", err))
}
}()
tp = sdktp
}
tracer := tp.Tracer(svcName)

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQOS, svcName, cfg.ThingID, cfg.ThingKey, cfg.MQTTTimeout, logger)
if err != nil {
return fmt.Errorf("failed to initialize mqtt pubsub: %s", err.Error())
}

svc := manager.NewService(
storage.NewInMemoryStorage(),
storage.NewInMemoryStorage(),
storage.NewInMemoryStorage(),
scheduler.NewRoundRobin(),
mqttPubSub,
cfg.ChannelID,
logger,
)
svc = middleware.Logging(logger, svc)
svc = middleware.Tracing(tracer, svc)
counter, latency := prometheus.MakeMetrics(svcName, "api")
svc = middleware.Metrics(counter, latency, svc)

if err := svc.Subscribe(ctx); err != nil {
return fmt.Errorf("failed to subscribe to manager channel: %s", err.Error())
}

hs := httpserver.NewServer(ctx, cancel, svcName, cfg.Server, api.MakeHandler(svc, logger, cfg.InstanceID), logger)

g.Go(func() error {
return hs.Start()
})

g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs)
})

if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}

return nil
}
24 changes: 24 additions & 0 deletions cmd/propellerd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"log"

"github.com/absmach/propeller/propellerd"
"github.com/spf13/cobra"
)

func main() {
rootCmd := &cobra.Command{
Use: "propellerd",
Short: "Propeller Daemon",
Long: `Propeller Daemon is a daemon that manages the lifecycle of Propeller components.`,
}

managerCmd := propellerd.NewManagerCmd()

rootCmd.AddCommand(managerCmd)

if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
}
}
Loading

0 comments on commit 82da6e8

Please sign in to comment.