Skip to content

Commit

Permalink
Handle Linkerd CRDs (#35)
Browse files Browse the repository at this point in the history
Add support for 5 Linkerd CRDs:
- policy.linkerd.io/servers
- policy.linkerd.io/serverAuthorizaions
- multicluster.linkerd.io/links
- linkerd.io/serviceprofiles
- split.smi-spec.io/trafficsplits

Signed-off-by: Zahari Dichev <[email protected]>
  • Loading branch information
zaharidichev authored Sep 29, 2021
1 parent 4cce235 commit d86ee69
Show file tree
Hide file tree
Showing 18 changed files with 745 additions and 57 deletions.
21 changes: 10 additions & 11 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
"github.com/buoyantio/linkerd-buoyant/agent/pkg/handler"
"github.com/buoyantio/linkerd-buoyant/agent/pkg/k8s"
pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
spclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
"github.com/linkerd/linkerd2/pkg/admin"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -104,23 +104,21 @@ func main() {
ClientConfig()
dieIf(err)

k8sCS, err := kubernetes.NewForConfig(k8sConfig)
k8sAPI, err := l5dk8s.NewAPIForConfig(k8sConfig, "", nil, 0)
dieIf(err)
sharedInformers := informers.NewSharedInformerFactory(k8sCS, 10*time.Minute)

var l5dApi *l5dk8s.KubernetesAPI
if *localMode {
l5dApi, err = l5dk8s.NewAPIForConfig(k8sConfig, "", nil, 0)
dieIf(err)
}
spClient, err := spclient.NewForConfig(k8sConfig)
dieIf(err)

sharedInformers := informers.NewSharedInformerFactory(k8sAPI.Interface, 10*time.Minute)

k8sClient := k8s.NewClient(k8sCS, sharedInformers, l5dApi)
k8sClient := k8s.NewClient(sharedInformers, k8sAPI, spClient, *localMode)

// wait for discovery API to load

log.Info("waiting for Kubernetes API availability")
populateGroupList := func() (done bool, err error) {
_, err = k8sCS.DiscoveryClient.ServerGroups()
_, err = k8sAPI.Discovery().ServerGroups()
if err != nil {
log.Debug("cannot reach Kubernetes API; retrying")
return false, nil
Expand Down Expand Up @@ -167,7 +165,8 @@ func main() {
go manageAgentHandler.Start()

// run admin server
go admin.StartServer(*adminAddr)
adminServer := admin.NewServer(*adminAddr)
go adminServer.ListenAndServe()

// wait for shutdown
<-stop
Expand Down
44 changes: 44 additions & 0 deletions agent/pkg/api/linkerd_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,50 @@ import (
"google.golang.org/protobuf/encoding/prototext"
)

func (c *Client) TrafficSplitInfo(info *pb.TrafficSplitInfo) error {
msg := &pb.LinkerdMessage{
Auth: c.auth,
Message: &pb.LinkerdMessage_TrafficSplitInfo{
TrafficSplitInfo: info,
},
}

return c.sendLinkerdMsg(msg)
}

func (c *Client) SPInfo(info *pb.ServiceProfileInfo) error {
msg := &pb.LinkerdMessage{
Auth: c.auth,
Message: &pb.LinkerdMessage_ServiceProfileInfo{
ServiceProfileInfo: info,
},
}

return c.sendLinkerdMsg(msg)
}

func (c *Client) MCInfo(info *pb.MulticlusterInfo) error {
msg := &pb.LinkerdMessage{
Auth: c.auth,
Message: &pb.LinkerdMessage_MulticlusterInfo{
MulticlusterInfo: info,
},
}

return c.sendLinkerdMsg(msg)
}

func (c *Client) AuthPolicyInfo(info *pb.AuthPolicyInfo) error {
msg := &pb.LinkerdMessage{
Auth: c.auth,
Message: &pb.LinkerdMessage_AuthPolicyInfo{
AuthPolicyInfo: info,
},
}

return c.sendLinkerdMsg(msg)
}

func (c *Client) CrtInfo(info *pb.CertificateInfo) error {
msg := &pb.LinkerdMessage{
Auth: c.auth,
Expand Down
6 changes: 5 additions & 1 deletion agent/pkg/handler/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/buoyantio/linkerd-buoyant/agent/pkg/api"
"github.com/buoyantio/linkerd-buoyant/agent/pkg/k8s"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -130,7 +131,10 @@ func TestEvent(t *testing.T) {
t.Run(tc.testName, func(t *testing.T) {
cs := fake.NewSimpleClientset(tc.objs...)
sharedInformers := informers.NewSharedInformerFactory(cs, 10*time.Minute)
k8sClient := k8s.NewClient(cs, sharedInformers, nil)
k8sApi := &l5dk8s.KubernetesAPI{
Interface: cs,
}
k8sClient := k8s.NewClient(sharedInformers, k8sApi, nil, false)

m := &api.MockBcloudClient{}
apiClient := api.NewClient("", "", m)
Expand Down
85 changes: 84 additions & 1 deletion agent/pkg/handler/linkerd_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (h *LinkerdInfo) Start() {
select {
case <-ticker.C:
h.handleCertsInfo(context.Background())
h.handleAuthPolicyInfo(context.Background())
h.handleMulticluster(context.Background())
h.handleServiceProfiles(context.Background())
h.handleTrafficSplits(context.Background())
case <-h.stopCh:
return
}
Expand All @@ -57,6 +61,85 @@ func (h *LinkerdInfo) Stop() {
close(h.stopCh)
}

func (h *LinkerdInfo) handleTrafficSplits(ctx context.Context) {
trafficSplits, err := h.k8s.GetTrafficSplits(ctx)
if err != nil {
h.log.Errorf("error getting traffic splits: %s", err)
return
}

m := &pb.TrafficSplitInfo{
TrafficSplits: trafficSplits,
}
h.log.Tracef("handleTrafficSplits: %s", prototext.Format(m))

err = h.api.TrafficSplitInfo(m)
if err != nil {
h.log.Errorf("error sending TrafficSplitInfo message: %s", err)
}
}

func (h *LinkerdInfo) handleServiceProfiles(ctx context.Context) {
serviceProfiles, err := h.k8s.GetServiceProfiles(ctx)
if err != nil {
h.log.Errorf("error getting service profiles: %s", err)
return
}

m := &pb.ServiceProfileInfo{
ServiceProfiles: serviceProfiles,
}
h.log.Tracef("handleServiceProfiles: %s", prototext.Format(m))

err = h.api.SPInfo(m)
if err != nil {
h.log.Errorf("error sending ServiceProfileInfo message: %s", err)
}
}

func (h *LinkerdInfo) handleMulticluster(ctx context.Context) {
links, err := h.k8s.GetMulticlusterLinks(ctx)
if err != nil {
h.log.Errorf("error getting MC links: %s", err)
return
}

m := &pb.MulticlusterInfo{
MulticlusterLinks: links,
}
h.log.Tracef("handleMulticluster: %s", prototext.Format(m))

err = h.api.MCInfo(m)
if err != nil {
h.log.Errorf("error sending MulticlusterInfo message: %s", err)
}
}

func (h *LinkerdInfo) handleAuthPolicyInfo(ctx context.Context) {
servers, err := h.k8s.GetServers(ctx)
if err != nil {
h.log.Errorf("error getting servers: %s", err)
return
}

serverAuths, err := h.k8s.GetServerAuths(ctx)
if err != nil {
h.log.Errorf("error getting server authorizations: %s", err)
return
}

m := &pb.AuthPolicyInfo{
Servers: servers,
ServerAuthorizations: serverAuths,
}
h.log.Tracef("handleAuthPolicyInfo: %s", prototext.Format(m))

err = h.api.AuthPolicyInfo(m)
if err != nil {
h.log.Errorf("error sending AuthPolicyInfo message: %s", err)
}
}

func (h *LinkerdInfo) handleCertsInfo(ctx context.Context) {
certs, err := h.k8s.GetControlPlaneCerts(ctx)
if err != nil {
Expand All @@ -69,7 +152,7 @@ func (h *LinkerdInfo) handleCertsInfo(ctx context.Context) {
ControlPlane: certs,
},
}
h.log.Tracef("handleLinkerdInfo: %s", prototext.Format(m))
h.log.Tracef("handleCertsInfo: %s", prototext.Format(m))

err = h.api.CrtInfo(m)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion agent/pkg/handler/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/buoyantio/linkerd-buoyant/agent/pkg/api"
"github.com/buoyantio/linkerd-buoyant/agent/pkg/k8s"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -172,7 +173,10 @@ func TestWorkloadStream(t *testing.T) {
}
cs := fake.NewSimpleClientset(objs...)
sharedInformers := informers.NewSharedInformerFactory(cs, 10*time.Minute)
k8sClient := k8s.NewClient(cs, sharedInformers, nil)
k8sApi := &l5dk8s.KubernetesAPI{
Interface: cs,
}
k8sClient := k8s.NewClient(sharedInformers, k8sApi, nil, false)

m := &api.MockBcloudClient{}
apiClient := api.NewClient("", "", m)
Expand Down
4 changes: 2 additions & 2 deletions agent/pkg/k8s/certificates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestFindIdentityPod(t *testing.T) {
t.Run(tc.testName, func(t *testing.T) {
c := fakeClient(tc.pods...)
c.Sync(nil, time.Second)
client := NewClient(nil, c.sharedInformers, nil)
client := NewClient(c.sharedInformers, nil, nil, false)

pod, err := client.getControlPlaneComponentPod(identityComponentName)
if tc.expectedErr != nil {
Expand Down Expand Up @@ -440,7 +440,7 @@ AiAtuoI5XuCtrGVRzSmRTl2ra28aV9MyTU7d5qnTAFHKSgIgRKCvluOSgA5O21p5
}

c.Sync(nil, time.Second)
client := NewClient(c.k8sClient, c.sharedInformers, nil)
client := NewClient(c.sharedInformers, c.k8sClient, nil, false)

roots, err := client.extractRootsCerts(context.Background(), tc.container, "linkerd")
if tc.expectedErr != nil {
Expand Down
59 changes: 57 additions & 2 deletions agent/pkg/k8s/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,68 @@ package k8s
import (
"time"

spclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
spfake "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/fake"
spscheme "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/scheme"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
tsclient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
tsfake "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned/fake"
tsscheme "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned/scheme"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
dynamicfakeclient "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubectl/pkg/scheme"
)

func fakeClient(objects ...runtime.Object) *Client {
cs := fake.NewSimpleClientset(objects...)
cs, sp, ts, dyn := fakeClientSets(objects...)

sharedInformers := informers.NewSharedInformerFactory(cs, 10*time.Minute)
return NewClient(cs, sharedInformers, nil)

k8sApi := &l5dk8s.KubernetesAPI{
Interface: cs,
TsClient: ts,
DynamicClient: dyn,
}

client := NewClient(sharedInformers, k8sApi, sp, false)
client.ignoreCRDSupportCheck = true
return client
}

func fakeClientSets(objects ...runtime.Object) (kubernetes.Interface, spclient.Interface, tsclient.Interface, dynamic.Interface) {
spscheme.AddToScheme(scheme.Scheme)
tsscheme.AddToScheme(scheme.Scheme)

objs := []runtime.Object{}
spObjs := []runtime.Object{}
tsObjs := []runtime.Object{}
dynamicObjs := []runtime.Object{}

for _, obj := range objects {
switch obj.GetObjectKind().GroupVersionKind().Kind {
case "ServiceProfile":
spObjs = append(spObjs, obj)
case "TrafficSplit":
tsObjs = append(tsObjs, obj)
case "Link":
dynamicObjs = append(tsObjs, obj)
case "ServerAuthorization":
dynamicObjs = append(tsObjs, obj)
case "Server":
dynamicObjs = append(tsObjs, obj)
default:
objs = append(objs, obj)
}
}

cs := fake.NewSimpleClientset(objs...)

return cs,
spfake.NewSimpleClientset(spObjs...),
tsfake.NewSimpleClientset(tsObjs...),
dynamicfakeclient.NewSimpleDynamicClient(scheme.Scheme, dynamicObjs...)
}
Loading

0 comments on commit d86ee69

Please sign in to comment.