Skip to content

Commit

Permalink
Use typed crds for policy (#37)
Browse files Browse the repository at this point in the history
In this PR we introduce the use of the typed policy client in the agent.

Signed-off-by: Zahari Dichev <[email protected]>
  • Loading branch information
zaharidichev authored Nov 3, 2021
1 parent 555dea8 commit 138bebe
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 116 deletions.
6 changes: 3 additions & 3 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/buoyantio/linkerd-buoyant/agent/pkg/handler"
"github.com/buoyantio/linkerd-buoyant/agent/pkg/k8s"
pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
spclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
l5dApi "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
"github.com/linkerd/linkerd2/pkg/admin"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -107,12 +107,12 @@ func main() {
k8sAPI, err := l5dk8s.NewAPIForConfig(k8sConfig, "", nil, 0)
dieIf(err)

spClient, err := spclient.NewForConfig(k8sConfig)
l5dClient, err := l5dApi.NewForConfig(k8sConfig)
dieIf(err)

sharedInformers := informers.NewSharedInformerFactory(k8sAPI.Interface, 10*time.Minute)

k8sClient := k8s.NewClient(sharedInformers, k8sAPI, spClient, *localMode)
k8sClient := k8s.NewClient(sharedInformers, k8sAPI, l5dClient, *localMode)

// wait for discovery API to load

Expand Down
28 changes: 14 additions & 14 deletions agent/pkg/k8s/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package k8s
import (
"time"

spclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
spfake "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/fake"
spscheme "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/scheme"
l5dClient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
l5dFake "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/fake"
l5dScheme "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/scheme"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
tsclient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
tsfake "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned/fake"
Expand All @@ -20,7 +20,7 @@ import (
)

func fakeClient(objects ...runtime.Object) *Client {
cs, sp, ts, dyn := fakeClientSets(objects...)
cs, l5dApiClient, ts, dyn := fakeClientSets(objects...)

sharedInformers := informers.NewSharedInformerFactory(cs, 10*time.Minute)

Expand All @@ -30,32 +30,32 @@ func fakeClient(objects ...runtime.Object) *Client {
DynamicClient: dyn,
}

client := NewClient(sharedInformers, k8sApi, sp, false)
client := NewClient(sharedInformers, k8sApi, l5dApiClient, false)
client.ignoreCRDSupportCheck = true
return client
}

func fakeClientSets(objects ...runtime.Object) (kubernetes.Interface, spclient.Interface, tsclient.Interface, dynamic.Interface) {
spscheme.AddToScheme(scheme.Scheme)
func fakeClientSets(objects ...runtime.Object) (kubernetes.Interface, l5dClient.Interface, tsclient.Interface, dynamic.Interface) {
l5dScheme.AddToScheme(scheme.Scheme)
tsscheme.AddToScheme(scheme.Scheme)

objs := []runtime.Object{}
spObjs := []runtime.Object{}
l5dObjects := []runtime.Object{}
tsObjs := []runtime.Object{}
dynamicObjs := []runtime.Object{}

for _, obj := range objects {
switch obj.GetObjectKind().GroupVersionKind().Kind {
case "ServiceProfile":
spObjs = append(spObjs, obj)
l5dObjects = append(l5dObjects, obj)
case "ServerAuthorization":
l5dObjects = append(l5dObjects, obj)
case "Server":
l5dObjects = append(l5dObjects, obj)
case "TrafficSplit":
tsObjs = append(tsObjs, obj)
case "Link":
dynamicObjs = append(tsObjs, obj)
case "ServerAuthorization":
dynamicObjs = append(tsObjs, obj)
case "Server":
dynamicObjs = append(tsObjs, obj)
default:
objs = append(objs, obj)
}
Expand All @@ -64,7 +64,7 @@ func fakeClientSets(objects ...runtime.Object) (kubernetes.Interface, spclient.I
cs := fake.NewSimpleClientset(objs...)

return cs,
spfake.NewSimpleClientset(spObjs...),
l5dFake.NewSimpleClientset(l5dObjects...),
tsfake.NewSimpleClientset(tsObjs...),
dynamicfakeclient.NewSimpleDynamicClient(scheme.Scheme, dynamicObjs...)
}
43 changes: 22 additions & 21 deletions agent/pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"net/url"
"time"

server "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
serverAuthorization "github.com/linkerd/linkerd2/controller/gen/apis/serverauthorization/v1beta1"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
spclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
l5dApi "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
spscheme "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/scheme"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/multicluster"
Expand All @@ -31,7 +33,7 @@ import (

type Client struct {
k8sClient *l5dk8s.KubernetesAPI
spClient spclient.Interface
l5dClient l5dApi.Interface

encoders map[runtime.GroupVersioner]runtime.Encoder

Expand Down Expand Up @@ -75,10 +77,8 @@ const (

var errSyncCache = errors.New("failed to sync caches")
var linkSGV = multicluster.LinkGVR.GroupVersion()
var serverSGV = l5dk8s.ServerGVR.GroupVersion()
var sazSGV = l5dk8s.SazGVR.GroupVersion()

func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8s.KubernetesAPI, spClient spclient.Interface, local bool) *Client {
func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8s.KubernetesAPI, l5dClient l5dApi.Interface, local bool) *Client {
log := log.WithField("client", "k8s")
log.Debug("initializing")

Expand All @@ -92,23 +92,24 @@ func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8
// For types that we do not have a cleint for we use the `unstructured` package. As we
// also do not have proto definitions for these CRDs, we serialize them to JSON.
//
// +-------------------------+-----------------------------+----------+------------+
// | Group | Kind | Client | Serializer |
// +-------------------------+-----------------------------+----------+------------+
// | policy.linkerd.io | servers,serverAuthorizaions | dynamic | json |
// | multicluster.linkerd.io | links | dynamic | json |
// | linkerd.io | serviceprofiles | spclient | json |
// | split.smi-spec.io | trafficsplits | tsclient | json |
// +-------------------------+-----------------------------+----------+------------+
// +-------------------------+----------------------+-----------+------------+
// | Group | Kind | Client | Serializer |
// +-------------------------+----------------------+-----------+------------+
// | policy.linkerd.io | serverAuthorizations | l5dClient | json |
// | policy.linkerd.io | servers | l5dClient | json |
// | multicluster.linkerd.io | links | dynamic | json |
// | linkerd.io | serviceprofiles | spclient | json |
// | split.smi-spec.io | trafficsplits | tsclient | json |
// +-------------------------+----------------------+-----------+------------+

encoders := map[runtime.GroupVersioner]runtime.Encoder{
v1.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(protoSerializer, v1.SchemeGroupVersion),
appsv1.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(protoSerializer, appsv1.SchemeGroupVersion),
ts.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, ts.SchemeGroupVersion),
sp.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, sp.SchemeGroupVersion),
linkSGV: scheme.Codecs.EncoderForVersion(jsonSerializer, linkSGV),
sazSGV: scheme.Codecs.EncoderForVersion(jsonSerializer, sazSGV),
serverSGV: scheme.Codecs.EncoderForVersion(jsonSerializer, serverSGV),
v1.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(protoSerializer, v1.SchemeGroupVersion),
appsv1.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(protoSerializer, appsv1.SchemeGroupVersion),
ts.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, ts.SchemeGroupVersion),
sp.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, sp.SchemeGroupVersion),
linkSGV: scheme.Codecs.EncoderForVersion(jsonSerializer, linkSGV),
serverAuthorization.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, serverAuthorization.SchemeGroupVersion),
server.SchemeGroupVersion: scheme.Codecs.EncoderForVersion(jsonSerializer, server.SchemeGroupVersion),
}

podInformer := sharedInformers.Core().V1().Pods()
Expand Down Expand Up @@ -150,7 +151,7 @@ func NewClient(sharedInformers informers.SharedInformerFactory, k8sClient *l5dk8
eventSynced: eventInformerSynced,

k8sClient: k8sClient,
spClient: spClient,
l5dClient: l5dClient,
log: log,
local: local,
}
Expand Down
24 changes: 15 additions & 9 deletions agent/pkg/k8s/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import (
"context"

pb "github.com/buoyantio/linkerd-buoyant/gen/bcloud"
l5dk8s "github.com/linkerd/linkerd2/pkg/k8s"
server "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
serverAuthorization "github.com/linkerd/linkerd2/controller/gen/apis/serverauthorization/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
sazGVR = serverAuthorization.SchemeGroupVersion.WithResource("serverauthorizations")
serverGVR = server.SchemeGroupVersion.WithResource("servers")
)

func (c *Client) GetServers(ctx context.Context) ([]*pb.Server, error) {
supported, err := c.resourceSupported(l5dk8s.ServerGVR)
supported, err := c.resourceSupported(serverGVR)
if err != nil {
return nil, err
}
Expand All @@ -18,7 +24,7 @@ func (c *Client) GetServers(ctx context.Context) ([]*pb.Server, error) {
return nil, nil
}

servers, err := c.k8sClient.DynamicClient.Resource(l5dk8s.ServerGVR).Namespace(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
servers, err := c.l5dClient.ServerV1beta1().Servers(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
Expand All @@ -27,15 +33,15 @@ func (c *Client) GetServers(ctx context.Context) ([]*pb.Server, error) {
for i, s := range servers.Items {
s := s
results[i] = &pb.Server{
Server: c.serialize(&s, serverSGV),
Server: c.serialize(&s, server.SchemeGroupVersion),
}
}

return results, nil
}

func (c *Client) GetServerAuths(ctx context.Context) ([]*pb.ServerAuthorization, error) {
supported, err := c.resourceSupported(l5dk8s.SazGVR)
supported, err := c.resourceSupported(sazGVR)
if err != nil {
return nil, err
}
Expand All @@ -44,16 +50,16 @@ func (c *Client) GetServerAuths(ctx context.Context) ([]*pb.ServerAuthorization,
return nil, nil
}

servers, err := c.k8sClient.DynamicClient.Resource(l5dk8s.SazGVR).Namespace(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
serverAuths, err := c.l5dClient.ServerauthorizationV1beta1().ServerAuthorizations(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

results := make([]*pb.ServerAuthorization, len(servers.Items))
for i, s := range servers.Items {
results := make([]*pb.ServerAuthorization, len(serverAuths.Items))
for i, s := range serverAuths.Items {
s := s
results[i] = &pb.ServerAuthorization{
ServerAuthorization: c.serialize(&s, sazSGV),
ServerAuthorization: c.serialize(&s, serverAuthorization.SchemeGroupVersion),
}
}

Expand Down
66 changes: 35 additions & 31 deletions agent/pkg/k8s/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@ import (
"context"
"testing"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
server "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
serverauthorization "github.com/linkerd/linkerd2/controller/gen/apis/serverauthorization/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubectl/pkg/scheme"
)

func TestGetServers(t *testing.T) {
server := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "policy.linkerd.io/v1alpha1",
"kind": "Server",
"metadata": map[string]interface{}{
"name": "srv",
"namespace": "srvns",
},
"spec": map[string]interface{}{
"port": "http",
"proxyProtocol": "HTTP/1",
},
srv := &server.Server{
TypeMeta: metav1.TypeMeta{
APIVersion: server.SchemeGroupVersion.Identifier(),
Kind: "Server",
},
ObjectMeta: metav1.ObjectMeta{
Name: "srv",
Namespace: "srvns",
},
Spec: server.ServerSpec{
Port: intstr.FromString("http"),
ProxyProtocol: "HTTP/1",
},
}

client := fakeClient(server)
client := fakeClient(srv)

result, err := client.GetServers(context.Background())
if err != nil {
Expand All @@ -34,7 +37,7 @@ func TestGetServers(t *testing.T) {

var buf bytes.Buffer
jsonSerializer := scheme.DefaultJSONEncoder()
if err := jsonSerializer.Encode(server, &buf); err != nil {
if err := jsonSerializer.Encode(srv, &buf); err != nil {
t.Error(err)
}

Expand All @@ -47,26 +50,27 @@ func TestGetServers(t *testing.T) {
}

func TestGetServerAuths(t *testing.T) {
server := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "policy.linkerd.io/v1alpha1",
"kind": "ServerAuthorization",
"metadata": map[string]interface{}{
"name": "saz",
"namespace": "sazns",
srvAuth := &serverauthorization.ServerAuthorization{
TypeMeta: metav1.TypeMeta{
APIVersion: serverauthorization.SchemeGroupVersion.Identifier(),
Kind: "ServerAuthorization",
},
ObjectMeta: metav1.ObjectMeta{
Name: "saz",
Namespace: "sazns",
},

Spec: serverauthorization.ServerAuthorizationSpec{
Server: serverauthorization.Server{
Name: "web-http",
},
"spec": map[string]interface{}{
"server": map[string]interface{}{
"name": "web-http",
},
"client": map[string]interface{}{
"unauthenticated": "true",
},
Client: serverauthorization.Client{
Unauthenticated: true,
},
},
}

client := fakeClient(server)
client := fakeClient(srvAuth)

result, err := client.GetServerAuths(context.Background())
if err != nil {
Expand All @@ -75,7 +79,7 @@ func TestGetServerAuths(t *testing.T) {

var buf bytes.Buffer
jsonSerializer := scheme.DefaultJSONEncoder()
if err := jsonSerializer.Encode(server, &buf); err != nil {
if err := jsonSerializer.Encode(srvAuth, &buf); err != nil {
t.Error(err)
}

Expand Down
2 changes: 1 addition & 1 deletion agent/pkg/k8s/service_profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (c *Client) GetServiceProfiles(ctx context.Context) ([]*pb.ServiceProfile,
return nil, nil
}

spses, err := c.spClient.LinkerdV1alpha2().ServiceProfiles(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
spses, err := c.l5dClient.LinkerdV1alpha2().ServiceProfiles(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/pkg/k8s/service_profiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestGetServiceProfile(t *testing.T) {
serviceProfile := sp.ServiceProfile{
TypeMeta: metav1.TypeMeta{
APIVersion: l5dk8s.ServiceProfileAPIVersion,
APIVersion: sp.SchemeGroupVersion.Identifier(),
Kind: l5dk8s.ServiceProfileKind,
},
ObjectMeta: metav1.ObjectMeta{
Expand Down
2 changes: 1 addition & 1 deletion agent/pkg/k8s/traffic_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestGetTrafficSplits(t *testing.T) {
weight := resource.MustParse("500m")
trafficSplit := &ts.TrafficSplit{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1alpha1",
APIVersion: ts.SchemeGroupVersion.Identifier(),
Kind: "TrafficSplit",
},
ObjectMeta: metav1.ObjectMeta{
Expand Down
Loading

0 comments on commit 138bebe

Please sign in to comment.