Skip to content

Commit

Permalink
Add agent commands functionality + flare (#19)
Browse files Browse the repository at this point in the history
This is much of the scafolding needed to support sending
commands to the agent. To begin with, we will be sending
commands to request diagnostic data about the Linkerd proxy.

Signed-off-by: Zahari Dichev <[email protected]>
  • Loading branch information
zaharidichev authored Jul 23, 2021
1 parent a733b03 commit 4ea5a52
Show file tree
Hide file tree
Showing 25 changed files with 1,681 additions and 349 deletions.
24 changes: 16 additions & 8 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/buoyantio/linkerd-buoyant/agent/pkg/k8s"
pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
"github.com/linkerd/linkerd2/pkg/admin"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -41,8 +42,8 @@ func main() {
grpcAddr := flag.String("grpc-addr", "api.buoyant.cloud:443", "address of the Buoyant Cloud API")
kubeConfigPath := flag.String("kubeconfig", "", "path to kube config")
logLevel := flag.String("log-level", "info", "log level, must be one of: panic, fatal, error, warn, info, debug, trace")
localMode := flag.Bool("local-mode", false, "enable port forwarding for local development")
insecure := flag.Bool("insecure", false, "disable TLS in development mode")
proxyAddrOverride := flag.String("proxy-addr-override", "", "overrides the proxy address for development mode")

// klog flags
klog.InitFlags(nil)
Expand Down Expand Up @@ -93,12 +94,6 @@ func main() {
}

// setup kubernetes clients and shared informers

var proxyAddr string
if proxyAddrOverride != nil {
proxyAddr = *proxyAddrOverride
}

rules := clientcmd.NewDefaultClientConfigLoadingRules()
if *kubeConfigPath != "" {
rules.ExplicitPath = *kubeConfigPath
Expand All @@ -113,7 +108,13 @@ func main() {
dieIf(err)
sharedInformers := informers.NewSharedInformerFactory(k8sCS, 10*time.Minute)

k8sClient := k8s.NewClient(sharedInformers, proxyAddr)
var l5dApi *l5dk8s.KubernetesAPI
if *localMode {
l5dApi, err = l5dk8s.NewAPIForConfig(k8sConfig, "", nil, 0)
dieIf(err)
}

k8sClient := k8s.NewClient(k8sCS, sharedInformers, l5dApi)

// wait for discovery API to load

Expand Down Expand Up @@ -148,16 +149,22 @@ func main() {
// create handlers
eventHandler := handler.NewEvent(k8sClient, apiClient)
workloadHandler := handler.NewWorkload(k8sClient, apiClient)

linkerdInfoHandler := handler.NewLinkerdInfo(k8sClient, apiClient)
manageAgentHandler := handler.NewManageAgent(k8sClient, apiClient)

// start shared informer and wait for sync
err = k8sClient.Sync(shutdown, 60*time.Second)
dieIf(err)

// start api client stream management logic
go apiClient.Start()

// start handlers
go eventHandler.Start(sharedInformers)
go workloadHandler.Start(sharedInformers)
go linkerdInfoHandler.Start()
go manageAgentHandler.Start()

// run admin server
go admin.StartServer(*adminAddr)
Expand All @@ -167,5 +174,6 @@ func main() {
log.Info("shutting down")
workloadHandler.Stop()
linkerdInfoHandler.Stop()
manageAgentHandler.Stop()
close(shutdown)
}
24 changes: 16 additions & 8 deletions agent/pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
type Client struct {
auth *pb.Auth

client pb.ApiClient
stream pb.Api_WorkloadStreamClient
client pb.ApiClient
workloadStream *workloadStream
manageAgentStream *manageAgentStream

log *log.Entry

Expand All @@ -22,12 +23,19 @@ type Client struct {

// NewClient instantiates a new Buoyant Cloud API client.
func NewClient(id string, key string, client pb.ApiClient) *Client {
auth := &pb.Auth{
AgentId: id,
AgentKey: key,
}
return &Client{
auth: &pb.Auth{
AgentId: id,
AgentKey: key,
},
client: client,
log: log.WithField("api", id),
auth: auth,
workloadStream: newWorkloadStream(auth, client),
manageAgentStream: newManageAgentStream(auth, client),
client: client,
log: log.WithField("api", id),
}
}

func (c *Client) Start() {
c.manageAgentStream.startStream()
}
9 changes: 9 additions & 0 deletions agent/pkg/api/manage_agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package api

import (
pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
)

func (c *Client) AgentCommands() <-chan *pb.AgentCommand {
return c.manageAgentStream.commands
}
83 changes: 83 additions & 0 deletions agent/pkg/api/manage_agent_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package api

import (
"context"
"sync"
"time"

pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
log "github.com/sirupsen/logrus"
)

// manageAgentStream wraps the Buoyant Cloud API ManageAgent gRPC endpoint, and
// manages the stream.
type manageAgentStream struct {
auth *pb.Auth
client pb.ApiClient
stream pb.Api_ManageAgentClient
log *log.Entry

commands chan *pb.AgentCommand

// protects stream
sync.Mutex
}

func newManageAgentStream(auth *pb.Auth, client pb.ApiClient) *manageAgentStream {
return &manageAgentStream{
auth: auth,
client: client,
log: log.WithField("stream", "ManageAgentStream"),
commands: make(chan *pb.AgentCommand),
}
}

func (s *manageAgentStream) startStream() {
for {
command, err := s.recvCommand()
if err != nil {
s.log.Infof("stream closed, reseting: %s", err)
s.resetStream()
continue
}
s.commands <- command
}
}

func (s *manageAgentStream) recvCommand() (*pb.AgentCommand, error) {
s.Lock()
defer s.Unlock()
if s.stream == nil {
s.stream = s.newStream()
}

return s.stream.Recv()
}

func (s *manageAgentStream) newStream() pb.Api_ManageAgentClient {
var stream pb.Api_ManageAgentClient

// loop until the request to initiate a stream succeeds
for {
var err error
stream, err = s.client.ManageAgent(context.Background(), s.auth)
if err != nil {
s.log.Errorf("failed to initiate stream: %s", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}

s.log.Info("ManageAgentStream connected")
return stream
}

func (s *manageAgentStream) resetStream() {
s.Lock()
defer s.Unlock()
if s.stream != nil {
s.stream.CloseSend()
s.stream = nil
}
}
79 changes: 79 additions & 0 deletions agent/pkg/api/manage_agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package api

import (
"reflect"
"testing"
"time"

pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
)

func TestManageAgentStream(t *testing.T) {
t.Run("streams and resets", func(t *testing.T) {
fixtures := []*struct {
testName string
commandsFromApi []*pb.AgentCommand
}{
{
"receives commands",
[]*pb.AgentCommand{
createDiagnosticCommand("id1", "pod1", "ns1"),
createDiagnosticCommand("id2", "pod2", "ns2"),
createDiagnosticCommand("id3", "pod3", "ns3"),
createDiagnosticCommand("id4", "pod4", "ns4"),
createDiagnosticCommand("id5", "pod5", "ns5"),
createDiagnosticCommand("id6", "pod6", "ns6"),
},
},
}

for _, tc := range fixtures {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
m := &MockBcloudClient{agentCommandMessages: tc.commandsFromApi}
c := NewClient("", "", m)
go c.Start()

receivedCommands := []*pb.AgentCommand{}

timeout := time.After(time.Second * 10)

out:
for {
select {
case cmd := <-c.AgentCommands():
receivedCommands = append(receivedCommands, cmd)
if len(receivedCommands) >= len(tc.commandsFromApi) {
break out
}
case <-timeout:
t.Fatal("test timed out")
}
}

if len(receivedCommands) != len(tc.commandsFromApi) {
t.Fatalf("Expected to receive %d commands, got: %d", len(tc.commandsFromApi), len(receivedCommands))
}

for i, expectedCommand := range tc.commandsFromApi {
actualCommand := receivedCommands[i]
if !reflect.DeepEqual(expectedCommand, actualCommand) {
t.Fatalf("Expected command %d to be %+v, got %+v", i, expectedCommand, actualCommand)
}
}
})
}
})
}

func createDiagnosticCommand(diagnosticID, podName string, podNamespace string) *pb.AgentCommand {
return &pb.AgentCommand{
Command: &pb.AgentCommand_GetProxyDiagnostics{
GetProxyDiagnostics: &pb.GetProxyDiagnostics{
DiagnosticId: diagnosticID,
PodName: podName,
PodNamespace: podNamespace,
},
},
}
}
33 changes: 33 additions & 0 deletions agent/pkg/api/proxy_diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package api

import (
"context"

pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
"google.golang.org/protobuf/encoding/prototext"
)

// ProxyDiagnostics wraps the Buoyant Cloud API ProxyDiagnostics gRPC unary endpoint.
func (c *Client) ProxyDiagnostics(
diagnosticID string,
logs []byte,
metrics [][]byte,
podManifest *pb.Pod,
linkerdConfigMap *pb.ConfigMap,
nodes []*pb.Node,
k8sServiceManifest *pb.Service) error {
diagnostic := &pb.ProxyDiagnostic{
Auth: c.auth,
DiagnosticId: diagnosticID,
Logs: logs,
Metrics: metrics,
PodManifest: podManifest,
LinkerdConfigMap: linkerdConfigMap,
Nodes: nodes,
K8SServiceManifest: k8sServiceManifest,
}
c.log.Tracef("ProxyDiagnostics: %s", prototext.Format(diagnostic))

_, err := c.client.ProxyDiagnostics(context.Background(), diagnostic)
return err
}
Loading

0 comments on commit 4ea5a52

Please sign in to comment.