Skip to content

Commit

Permalink
Self hosted release 0.18.0
Browse files Browse the repository at this point in the history
Self hosted release 0.18.0
  • Loading branch information
etaques authored Aug 15, 2022
2 parents b9af2c7 + 1787f36 commit 90461ac
Show file tree
Hide file tree
Showing 119 changed files with 3,320 additions and 1,631 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ define make_docker
-f docker/Dockerfile .
$(eval SERVICE="")
endef

define make_docker_dev
$(eval svc=$(shell [ -z "$(SERVICE)" ] && echo $(subst docker_dev_,,$(1)) || echo $(svc)))
docker build \
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.17.0
0.18.0
91 changes: 48 additions & 43 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package agent

import (
"context"
"errors"
"github.com/eclipse/paho.mqtt.golang"
"github.com/fatih/structs"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/ns1labs/orb/agent/config"
"github.com/ns1labs/orb/agent/policyMgr"
"github.com/ns1labs/orb/buildinfo"
"github.com/ns1labs/orb/fleet"
"go.uber.org/zap"
"runtime"
"time"
Expand All @@ -25,18 +25,19 @@ var (
)

type Agent interface {
Start() error
Stop()
RestartAll(reason string) error
RestartBackend(backend string, reason string) error
Start(ctx context.Context, cancelFunc context.CancelFunc) error
Stop(ctx context.Context)
RestartAll(ctx context.Context, reason string) error
RestartBackend(ctx context.Context, backend string, reason string) error
}

type orbAgent struct {
logger *zap.Logger
config config.Config
client mqtt.Client
db *sqlx.DB
backends map[string]backend.Backend
logger *zap.Logger
config config.Config
client mqtt.Client
db *sqlx.DB
backends map[string]backend.Backend
cancelFunction context.CancelFunc

hbTicker *time.Ticker
hbDone chan bool
Expand Down Expand Up @@ -87,33 +88,34 @@ func New(logger *zap.Logger, c config.Config) (Agent, error) {
return &orbAgent{logger: logger, config: c, policyManager: pm, db: db, groupsInfos: make(map[string]GroupInfo)}, nil
}

func (a *orbAgent) startBackends() error {
func (a *orbAgent) startBackends(agentCtx context.Context) error {
a.logger.Info("registered backends", zap.Strings("values", backend.GetList()))
a.logger.Info("requested backends", zap.Any("values", a.config.OrbAgent.Backends))
if len(a.config.OrbAgent.Backends) == 0 {
return errors.New("no backends specified")
}
a.backends = make(map[string]backend.Backend, len(a.config.OrbAgent.Backends))
for name, config := range a.config.OrbAgent.Backends {
for name, configurationEntry := range a.config.OrbAgent.Backends {
if !backend.HaveBackend(name) {
return errors.New("specified backend does not exist: " + name)
}
be := backend.GetBackend(name)
if err := be.Configure(a.logger, a.policyManager.GetRepo(), config, structs.Map(a.config.OrbAgent.Otel)); err != nil {
if err := be.Configure(a.logger, a.policyManager.GetRepo(), configurationEntry, structs.Map(a.config.OrbAgent.Otel)); err != nil {
return err
}
if err := be.Start(); err != nil {
backendCtx := context.WithValue(agentCtx, "routine", name)
if err := be.Start(context.WithCancel(backendCtx)); err != nil {
return err
}
a.backends[name] = be
}
return nil
}

func (a *orbAgent) Start() error {

a.logger.Info("agent started", zap.String("version", buildinfo.GetVersion()))

func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) error {
agentCtx := context.WithValue(ctx, "routine", "agentRoutine")
a.logger.Info("agent started", zap.String("version", buildinfo.GetVersion()), zap.Any("routine", agentCtx.Value("routine")))
a.cancelFunction = cancelFunc
mqtt.CRITICAL = &agentLoggerCritical{a: a}
mqtt.ERROR = &agentLoggerError{a: a}

Expand All @@ -122,7 +124,7 @@ func (a *orbAgent) Start() error {
mqtt.DEBUG = &agentLoggerDebug{a: a}
}

if err := a.startBackends(); err != nil {
if err := a.startBackends(ctx); err != nil {
return err
}

Expand All @@ -137,41 +139,44 @@ func (a *orbAgent) Start() error {

a.groupRequestSucceeded = make(chan bool, 1)
a.policyRequestSucceeded = make(chan bool, 1)

if err := a.startComms(cloudConfig); err != nil {
commsCtx := context.WithValue(agentCtx, "routine", "comms")
if err := a.startComms(commsCtx, cloudConfig); err != nil {
a.logger.Error("could not restart mqtt client")
return err
}

a.hbTicker = time.NewTicker(HeartbeatFreq)
a.hbDone = make(chan bool)
go a.sendHeartbeats()
heartbeatCtx := context.WithValue(agentCtx, "routine", "heartbeat")
go a.sendHeartbeats(heartbeatCtx)

return nil
}

func (a *orbAgent) Stop() {
a.logger.Info("stopping agent")
a.logger.Debug("stopping agent with number of go routines and go calls", zap.Int("goroutines", runtime.NumGoroutine()), zap.Int64("gocalls", runtime.NumCgoCall()))
func (a *orbAgent) Stop(ctx context.Context) {
a.logger.Info("routine call for stop agent", zap.Any("routine", ctx.Value("routine")))
defer a.cancelFunction()
for name, b := range a.backends {
a.logger.Debug("stopping backend", zap.String("backend", name))
if err := b.Stop(ctx); err != nil {
a.logger.Error("error while stopping the backend", zap.String("backend", name))
}
}
a.hbTicker.Stop()
a.hbDone <- true
a.sendSingleHeartbeat(time.Now(), fleet.Offline)
if token := a.client.Unsubscribe(a.rpcFromCoreTopic); token.Wait() && token.Error() != nil {
a.logger.Warn("failed to unsubscribe to RPC channel", zap.Error(token.Error()))
}
a.unsubscribeGroupChannels()
for _, be := range a.backends {
if err := be.Stop(); err != nil {
a.logger.Error("backend error while stopping", zap.Error(err))
if a.client != nil && a.client.IsConnected() {
if token := a.client.Unsubscribe(a.rpcFromCoreTopic); token.Wait() && token.Error() != nil {
a.logger.Warn("failed to unsubscribe to RPC channel", zap.Error(token.Error()))
}
a.unsubscribeGroupChannels()
a.client.Disconnect(250)
}
a.client.Disconnect(250)
a.logger.Debug("stopping agent with number of go routines and go calls", zap.Int("goroutines", runtime.NumGoroutine()), zap.Int64("gocalls", runtime.NumCgoCall()))
defer close(a.hbDone)
defer close(a.policyRequestSucceeded)
defer close(a.groupRequestSucceeded)
}

func (a *orbAgent) RestartBackend(name string, reason string) error {
func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason string) error {
if !backend.HaveBackend(name) {
return errors.New("specified backend does not exist: " + name)
}
Expand All @@ -183,7 +188,7 @@ func (a *orbAgent) RestartBackend(name string, reason string) error {
a.logger.Error("failed to remove policies", zap.String("backend", name), zap.Error(err))
}
a.logger.Info("resetting backend", zap.String("backend", name))
if err := be.FullReset(); err != nil {
if err := be.FullReset(ctx); err != nil {
a.logger.Error("failed to reset backend", zap.String("backend", name), zap.Error(err))
}
a.logger.Info("reapplying policies", zap.String("backend", name))
Expand All @@ -193,7 +198,7 @@ func (a *orbAgent) RestartBackend(name string, reason string) error {
return nil
}

func (a *orbAgent) restartComms() error {
func (a *orbAgent) restartComms(ctx context.Context) error {
ccm, err := cloud_config.New(a.logger, a.config, a.db)
if err != nil {
return err
Expand All @@ -202,23 +207,23 @@ func (a *orbAgent) restartComms() error {
if err != nil {
return err
}
if err := a.startComms(cloudConfig); err != nil {
if err := a.startComms(ctx, cloudConfig); err != nil {
a.logger.Error("could not restart mqtt client")
return err
}
return nil
}

func (a *orbAgent) RestartAll(reason string) error {
a.logger.Info("restarting comms")
err := a.restartComms()
func (a *orbAgent) RestartAll(ctx context.Context, reason string) error {
a.logger.Info("restarting comms", zap.String("reason", reason))
err := a.restartComms(ctx)
if err != nil {
a.logger.Error("failed to restart comms", zap.Error(err))
}
a.logger.Info("restarting all backends", zap.String("reason", reason))
for name := range a.backends {
a.logger.Info("restarting backend", zap.String("backend", name), zap.String("reason", reason))
err = a.RestartBackend(name, reason)
err = a.RestartBackend(ctx, name, reason)
if err != nil {
a.logger.Error("failed to restart backend", zap.Error(err))
}
Expand Down
7 changes: 4 additions & 3 deletions agent/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package backend

import (
"context"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/ns1labs/orb/agent/policies"
"go.uber.org/zap"
Expand Down Expand Up @@ -45,9 +46,9 @@ type Backend interface {
Configure(*zap.Logger, policies.PolicyRepo, map[string]string, map[string]interface{}) error
SetCommsClient(string, mqtt.Client, string)
Version() (string, error)
Start() error
Stop() error
FullReset() error
Start(ctx context.Context, cancelFunc context.CancelFunc) error
Stop(ctx context.Context) error
FullReset(ctx context.Context) error

GetStartTime() time.Time
GetCapabilities() (map[string]interface{}, error)
Expand Down
Loading

0 comments on commit 90461ac

Please sign in to comment.