Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: OBS-380 - basic diode server and components configuration #8

Merged
merged 3 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions diode-server/cmd/distributor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@ package main
import (
"context"
"log"
"log/slog"
"os"

"github.com/netboxlabs/diode-internal/diode-server/distributor"
"github.com/netboxlabs/diode-internal/diode-server/server"
)

func main() {
// TODO(mfiedorowicz): make logger configurable (handler, level, etc.)
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))

ctx := context.Background()
s := server.New(ctx, "diode-distributor", logger)
s := server.New(ctx, "diode-distributor")

distributorComponent, err := distributor.New(logger)
distributorComponent, err := distributor.New(s.Logger())
if err != nil {
log.Fatalf("failed to instantiate distributor component: %v", err)
}
Expand Down
9 changes: 2 additions & 7 deletions diode-server/cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@ package main
import (
"context"
"log"
"log/slog"
"os"

"github.com/netboxlabs/diode-internal/diode-server/ingester"
"github.com/netboxlabs/diode-internal/diode-server/server"
)

func main() {
// TODO(mfiedorowicz): make logger configurable (handler, level, etc.)
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))

ctx := context.Background()
s := server.New(ctx, "diode-ingester", logger)
s := server.New(ctx, "diode-ingester")

ingesterComponent := ingester.New(logger)
ingesterComponent := ingester.New(s.Logger())

if err := s.RegisterComponent(ingesterComponent); err != nil {
log.Fatalf("failed to register ingerster component: %v", err)
Expand Down
9 changes: 2 additions & 7 deletions diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@ package main
import (
"context"
"log"
"log/slog"
"os"

"github.com/netboxlabs/diode-internal/diode-server/reconciler"
"github.com/netboxlabs/diode-internal/diode-server/server"
)

func main() {
// TODO(mfiedorowicz): make logger configurable (handler, level, etc.)
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))

ctx := context.Background()
s := server.New(ctx, "diode-reconciler", logger)
s := server.New(ctx, "diode-reconciler")

reconcilerComponent, err := reconciler.New(logger)
reconcilerComponent, err := reconciler.New(s.Logger())
if err != nil {
log.Fatalf("failed to instantiate reconciler component: %v", err)
}
Expand Down
24 changes: 15 additions & 9 deletions diode-server/distributor/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,39 @@ package distributor

import (
"context"
"errors"
"fmt"
"log/slog"
"net"

"github.com/kelseyhightower/envconfig"
pb "github.com/netboxlabs/diode-internal/diode-sdk-go/diode/v1/diodepb"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

const (
DefaultGRPCPort = "8081"
)

// Component is a gRPC server that handles data ingestion requests
type Component struct {
pb.UnimplementedDistributorServiceServer

config Config
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
}

// New creates a new distributor component
func New(logger *slog.Logger) (*Component, error) {
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%s", DefaultGRPCPort))
var cfg Config
envconfig.MustProcess("", &cfg)

grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPCPort))
if err != nil {
return nil, errors.New(fmt.Sprintf("failed to listen on port %s: %v", DefaultGRPCPort, err))
return nil, fmt.Errorf("failed to listen on port %d: %v", cfg.GRPCPort, err)
}

grpcServer := grpc.NewServer()
component := &Component{
config: cfg,
logger: logger,
grpcListener: grpcListener,
grpcServer: grpcServer,
Expand All @@ -42,22 +45,25 @@ func New(logger *slog.Logger) (*Component, error) {
return component, nil
}

// Name returns the name of the component
func (c *Component) Name() string {
return "distributor"
}

// Start starts the component
func (c *Component) Start(_ context.Context) error {
c.logger.Info("starting component", "name", c.Name())

c.logger.Info("starting component", "name", c.Name(), "port", c.config.GRPCPort)
return c.grpcServer.Serve(c.grpcListener)
}

// Stop stops the component
func (c *Component) Stop() error {
c.logger.Info("stopping component", "name", c.Name())
c.grpcServer.GracefulStop()
return nil
}

// Push handles a push request
func (c *Component) Push(_ context.Context, in *pb.PushRequest) (*pb.PushResponse, error) {
c.logger.Info("diode.v1.DistributorService/Push called", "stream", in.Stream)
return &pb.PushResponse{}, nil
Expand Down
6 changes: 6 additions & 0 deletions diode-server/distributor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package distributor

// Config is the configuration for the distributor service
type Config struct {
GRPCPort int `envconfig:"GRPC_PORT" default:"8081"`
}
1 change: 1 addition & 0 deletions diode-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/netboxlabs/diode-internal/diode-server
go 1.21

require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/netboxlabs/diode-internal/diode-sdk-go v0.0.0
github.com/oklog/run v1.1.0
google.golang.org/grpc v1.61.0
Expand Down
2 changes: 2 additions & 0 deletions diode-server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
Expand Down
12 changes: 12 additions & 0 deletions diode-server/ingester/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,33 @@ import (
"context"
"log/slog"
"time"

"github.com/kelseyhightower/envconfig"
)

// Component asynchronously ingests data from the distributor
type Component struct {
config Config
logger *slog.Logger
}

// New creates a new ingester component
func New(logger *slog.Logger) *Component {
var cfg Config
envconfig.MustProcess("", &cfg)

return &Component{
config: cfg,
logger: logger,
}
}

// Name returns the name of the component
func (c *Component) Name() string {
return "ingester"
}

// Start starts the component
func (c *Component) Start(ctx context.Context) error {
c.logger.Info("starting component", "name", c.Name())

Expand All @@ -40,6 +51,7 @@ func (c *Component) ping(ctx context.Context) error {
}
}

// Stop stops the component
func (c *Component) Stop() error {
c.logger.Info("stopping component", "name", c.Name())
return nil
Expand Down
5 changes: 5 additions & 0 deletions diode-server/ingester/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package ingester

// Config is the configuration for the ingester service
type Config struct {
}
23 changes: 14 additions & 9 deletions diode-server/reconciler/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,36 @@ package reconciler

import (
"context"
"errors"
"fmt"
"log/slog"
"net"

"github.com/kelseyhightower/envconfig"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

const (
DefaultGRPCPort = "8081"
)

// Component reconciles ingested data
type Component struct {
config Config
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
}

// New creates a new reconciler component
func New(logger *slog.Logger) (*Component, error) {
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%s", DefaultGRPCPort))
var cfg Config
envconfig.MustProcess("", &cfg)

grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPCPort))
if err != nil {
return nil, errors.New(fmt.Sprintf("failed to listen on port %s: %v", DefaultGRPCPort, err))
return nil, fmt.Errorf("failed to listen on port %d: %v", cfg.GRPCPort, err)
}

grpcServer := grpc.NewServer()
component := &Component{
config: cfg,
logger: logger,
grpcListener: grpcListener,
grpcServer: grpcServer,
Expand All @@ -38,16 +41,18 @@ func New(logger *slog.Logger) (*Component, error) {
return component, nil
}

// Name returns the name of the component
func (c *Component) Name() string {
return "reconciler"
}

// Start starts the component
func (c *Component) Start(_ context.Context) error {
c.logger.Info("starting component", "name", c.Name())

c.logger.Info("starting component", "name", c.Name(), "port", c.config.GRPCPort)
return c.grpcServer.Serve(c.grpcListener)
}

// Stop stops the component
func (c *Component) Stop() error {
c.logger.Info("stopping component", "name", c.Name())
c.grpcServer.GracefulStop()
Expand Down
6 changes: 6 additions & 0 deletions diode-server/reconciler/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package reconciler

// Config is the configuration for the reconciler service
type Config struct {
GRPCPort int `envconfig:"GRPC_PORT" default:"8081"`
}
7 changes: 7 additions & 0 deletions diode-server/server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package server

// Config is the configuration for the server
type Config struct {
LoggingFormat string `envconfig:"LOGGING_FORMAT" default:"json"`
LoggingLevel string `envconfig:"LOGGING_LEVEL" default:"info"`
}
45 changes: 41 additions & 4 deletions diode-server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package server

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strings"
"sync"

"github.com/kelseyhightower/envconfig"
"github.com/oklog/run"
)

Expand All @@ -31,11 +32,14 @@ type Component interface {
}

// New returns a new Server
func New(ctx context.Context, name string, logger *slog.Logger) *Server {
func New(ctx context.Context, name string) *Server {
var cfg Config
envconfig.MustProcess("", &cfg)

return &Server{
cxt: ctx,
name: name,
logger: logger,
logger: newLogger(cfg),
components: make(map[string]Component),
componentGroup: run.Group{},
}
Expand All @@ -46,13 +50,18 @@ func (s *Server) Name() string {
return s.name
}

// Logger returns the logger of the Server
func (s *Server) Logger() *slog.Logger {
return s.logger
}

// RegisterComponent registers a Component with the Server
func (s *Server) RegisterComponent(c Component) error {
s.mu.Lock()
defer s.mu.Unlock()

if _, ok := s.components[c.Name()]; ok {
return errors.New(fmt.Sprintf("Server.RegisterComponent found duplicate component registration for %s", c.Name()))
return fmt.Errorf("Server.RegisterComponent found duplicate component registration for %s", c.Name())
}

s.components[c.Name()] = c
Expand Down Expand Up @@ -81,3 +90,31 @@ func (s *Server) Run() error {

return s.componentGroup.Run()
}

func newLogger(cfg Config) *slog.Logger {
var l slog.Level
switch strings.ToUpper(cfg.LoggingLevel) {
case "DEBUG":
l = slog.LevelDebug
case "INFO":
l = slog.LevelInfo
case "WARN":
l = slog.LevelWarn
case "ERROR":
l = slog.LevelError
default:
l = slog.LevelDebug
}

var h slog.Handler
switch strings.ToUpper(cfg.LoggingFormat) {
case "TEXT":
h = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: l, AddSource: false})
case "JSON":
h = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: l, AddSource: false})
default:
h = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: l, AddSource: false})
}

return slog.New(h)
}
Loading