Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maint(landscape-mock): Bugfixes and logging in Landcape mock #377

Merged
merged 10 commits into from
Nov 14, 2023
85 changes: 63 additions & 22 deletions mocks/landscape/landscapemockservice/landscape_mock_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package landscapemockservice
import (
"context"
"fmt"
"log/slog"
"math/rand"
"sync"

Expand Down Expand Up @@ -80,50 +81,49 @@ func New() *Service {
// Connect implements the Connect API call.
// Upon first contact ever, a UID is randombly assigned to the host and sent to it.
// In subsequent contacts, this UID will be its unique identifier.
func (s *Service) Connect(stream landscapeapi.LandscapeHostAgent_ConnectServer) error {
func (s *Service) Connect(stream landscapeapi.LandscapeHostAgent_ConnectServer) (err error) {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

firstContact := true
ch := make(chan HostInfo)
defer close(ch)

for {
go func() {
recv, err := stream.Recv()
if err != nil {
return
}

select {
case <-ctx.Done():
return
default:
}
recv := asyncRecv(ctx, stream)

ch <- newHostInfo(recv)
}()
// We keep the hostInfo outside the loop so we can log messages with the hostname.
var hostInfo HostInfo

var hostInfo HostInfo
firstContact := true
for {
var msg recvMsg
select {
case hostInfo = <-ch:
case msg = <-recv:
case <-ctx.Done():
slog.Info(fmt.Sprintf("Landscape: %s: terminated connection: %v", hostInfo.Hostname, ctx.Err()))
return nil
}

if msg.err != nil {
slog.Info(fmt.Sprintf("Landscape: %s: terminated connection: %v", hostInfo.Hostname, msg.err))
return err
}
hostInfo = msg.info

s.mu.Lock()

s.recvLog = append(s.recvLog, hostInfo)

if firstContact {
slog.Info(fmt.Sprintf("Landscape: %s: New connection", hostInfo.Hostname))

firstContact = false
uid, onDisconnect, err := s.firstContact(ctx, cancel, hostInfo, stream)
if err != nil {
s.mu.Unlock()
slog.Info(fmt.Sprintf("Landscape: %s: terminated connection: %v", hostInfo.Hostname, err))
return err
}
defer onDisconnect()
hostInfo.UID = uid
} else {
slog.Info(fmt.Sprintf("Landscape: %s: Received update: %+v", hostInfo.Hostname, hostInfo))
}

h := s.hosts[hostInfo.UID]
Expand All @@ -134,8 +134,42 @@ func (s *Service) Connect(stream landscapeapi.LandscapeHostAgent_ConnectServer)
}
}

// recvMsg is the sanitized reuturn type of a GRPC Recv, used to pass by channel.
didrocks marked this conversation as resolved.
Show resolved Hide resolved
type recvMsg struct {
info HostInfo
err error
}

// This goroutine is an asynchronous GRPC Recv.
didrocks marked this conversation as resolved.
Show resolved Hide resolved
// Usually, you cannot select between a context and a GRPC receive. This function allows you to.
// It'll keep receiving until an error is returned, or the context is cancelled.
didrocks marked this conversation as resolved.
Show resolved Hide resolved
func asyncRecv(ctx context.Context, stream landscapeapi.LandscapeHostAgent_ConnectServer) <-chan recvMsg {
ch := make(chan recvMsg)

go func() {
defer close(ch)

for {
var msg recvMsg
recv, err := stream.Recv()
msg.err = err
if recv != nil {
didrocks marked this conversation as resolved.
Show resolved Hide resolved
msg.info = newHostInfo(recv)
}

select {
case <-ctx.Done():
return
case ch <- msg:
}
}
}()

return ch
}

func (s *Service) firstContact(ctx context.Context, cancel func(), hostInfo HostInfo, stream landscapeapi.LandscapeHostAgent_ConnectServer) (uid string, onDisconect func(), err error) {
if other, ok := s.hosts[hostInfo.UID]; ok && other.connected != nil && *other.connected {
if s.isConnected(hostInfo.UID) {
return "", nil, fmt.Errorf("UID collision: %q", hostInfo.UID)
}

Expand Down Expand Up @@ -186,6 +220,10 @@ func (s *Service) IsConnected(uid string) bool {
s.mu.RLock()
defer s.mu.RUnlock()

return s.isConnected(uid)
}

func (s *Service) isConnected(uid string) bool {
didrocks marked this conversation as resolved.
Show resolved Hide resolved
host, ok := s.hosts[uid]
return ok && host.connected != nil && *host.connected
}
Expand All @@ -200,6 +238,8 @@ func (s *Service) SendCommand(ctx context.Context, uid string, command *landscap
return fmt.Errorf("UID %q not connected", uid)
}

slog.Info(fmt.Sprintf("Landscape: %s: sending command %T: %v", conn.info.Hostname, command.GetCmd(), command.GetCmd()))

return conn.send(command)
}

Expand Down Expand Up @@ -235,6 +275,7 @@ func (s *Service) Disconnect(uid string) error {
return fmt.Errorf("UID %q not registered", uid)
}

slog.Info(fmt.Sprintf("Landscape: %s: requested disconnection", host.info.Hostname))
host.stop()
return nil
}
125 changes: 90 additions & 35 deletions mocks/landscape/landscaperepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,120 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"net"
"os"
"path/filepath"
"strconv"
"strings"

landscapeapi "github.com/canonical/landscape-hostagent-api"
"github.com/canonical/ubuntu-pro-for-windows/mocks/landscape/landscapemockservice"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)

func main() {
ctx := context.Background()
rootCmd := rootCmd()
EduardGomezEscandell marked this conversation as resolved.
Show resolved Hide resolved

if len(os.Args) != 2 || os.Args[1] == "--help" {
log.Fatalf("Usage: %s ADDRESS", os.Args[0])
rootCmd.PersistentFlags().CountP("verbosity", "v", "WARNING (-v) INFO (-vv), DEBUG (-vvv)")
rootCmd.Flags().StringP("address", "a", "localhost:8000", "Overrides the address where the server will be hosted")

if err := rootCmd.Execute(); err != nil {
slog.Error(fmt.Sprintf("Error executing: %v", err))
os.Exit(1)
}
addr := os.Args[1]
}

populateCommands()
func rootCmd() *cobra.Command {
return &cobra.Command{
Use: executableName(),
Short: "A mock server for Landscape hostagent testing",
Long: `Landscape mock REPL mocks a Landscape hostagent server
on your command line. Hosted at the specified address.`,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
// Force a visit of the local flags so persistent flags for all parents are merged.
cmd.LocalFlags()

// command parsing has been successful. Returns to not print usage anymore.
cmd.SilenceUsage = true

v := cmd.Flag("verbosity").Value.String()
n, err := strconv.Atoi(v)
if err != nil {
return fmt.Errorf("could not parse verbosity: %v", err)
}

setVerboseMode(n)
return nil
},
Run: func(cmd *cobra.Command, args []string) {
ctx := context.Background()

addr := cmd.Flag("address").Value.String()
fmt.Printf("Hosting on %s\n", addr)

populateCommands()
fmt.Println("Write `help` to see a list of available commands")
didrocks marked this conversation as resolved.
Show resolved Hide resolved

var cfg net.ListenConfig
lis, err := cfg.Listen(ctx, "tcp", addr)
if err != nil {
slog.Error(fmt.Sprintf("Can't listen: %v", err))
return
}
defer lis.Close()

server := grpc.NewServer()
service := landscapemockservice.New()
landscapeapi.RegisterLandscapeHostAgentServer(server, service)

go func() {
err := server.Serve(lis)
if err != nil {
slog.Error(fmt.Sprintf("Server exited with an error: %v", err))
}
}()
defer server.Stop()

if err := run(ctx, service); err != nil {
slog.Error(err.Error())
return
}
},
}
}

var cfg net.ListenConfig
lis, err := cfg.Listen(ctx, "tcp", addr)
func executableName() string {
exe, err := os.Executable()
if err != nil {
log.Fatalf("Can't listen: %v", err)
return "landscaperepl"
}
defer lis.Close()

server := grpc.NewServer()
service := landscapemockservice.New()
landscapeapi.RegisterLandscapeHostAgentServer(server, service)

go func() {
err := server.Serve(lis)
if err != nil {
log.Fatalf("Server exited with an error: %v", err)
}
}()
defer server.Stop()
return filepath.Base(exe)
}

if err := run(ctx, service); err != nil {
log.Fatalf("%v", err)
// setVerboseMode changes the verbosity of the logs.
func setVerboseMode(n int) {
var level slog.Level
switch n {
case 0:
level = slog.LevelError
case 1:
level = slog.LevelWarn
case 2:
level = slog.LevelInfo
default:
level = slog.LevelDebug
}

h := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
slog.SetDefault(slog.New(h))
}

// run contains the main execution loop.
func run(ctx context.Context, s *landscapemockservice.Service) error {
sc := bufio.NewScanner(os.Stdin)

prefix := "$ "

fi, _ := os.Stdin.Stat()
if (fi.Mode() & os.ModeCharDevice) == 0 {
// data is from pipe
prefix = ""
}

fmt.Print(prefix)

// READ
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
Expand All @@ -85,7 +141,6 @@ func run(ctx context.Context, s *landscapemockservice.Service) error {

// LOOP
fmt.Println()
fmt.Print(prefix)
}

if err := sc.Err(); err != nil {
Expand Down
Loading