Skip to content

Commit

Permalink
feat: add resource create/delete actions (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
varnastadeus authored Sep 13, 2023
1 parent 8489d83 commit a81c1f5
Show file tree
Hide file tree
Showing 10 changed files with 514 additions and 16 deletions.
2 changes: 2 additions & 0 deletions actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func NewService(
reflect.TypeOf(&castai.ActionCheckNodeDeleted{}): newCheckNodeDeletedHandler(log, clientset),
reflect.TypeOf(&castai.ActionCheckNodeStatus{}): newCheckNodeStatusHandler(log, clientset),
reflect.TypeOf(&castai.ActionPatch{}): newPatchHandler(log, dynamicClient),
reflect.TypeOf(&castai.ActionCreate{}): newCreateHandler(log, dynamicClient),
reflect.TypeOf(&castai.ActionDelete{}): newDeleteHandler(log, dynamicClient),
},
healthCheck: healthCheck,
}
Expand Down
116 changes: 116 additions & 0 deletions actions/create_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package actions

import (
"context"
"errors"
"fmt"
"reflect"

jsonpatch "github.com/evanphx/json-patch"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"

"github.com/castai/cluster-controller/castai"
)

type createHandler struct {
log logrus.FieldLogger
client dynamic.Interface
}

func newCreateHandler(log logrus.FieldLogger, client dynamic.Interface) ActionHandler {
return &createHandler{
log: log,
client: client,
}
}

func (h *createHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionCreate)
if !ok {
return newUnexpectedTypeErr(action.Data(), req)
}

if req.Object == nil {
return errors.New("no object provided")
}

newObj := &unstructured.Unstructured{Object: req.Object}
if newObj.GetNamespace() == "" {
return errors.New("object namespace is missing")
}

log := h.log.WithFields(logrus.Fields{
"id": action.ID,
"action": reflect.TypeOf(action.Data()).String(),
"gvr": req.GroupVersionResource.String(),
"name": newObj.GetName(),
})

r := h.client.Resource(schema.GroupVersionResource{
Group: req.Group,
Version: req.Version,
Resource: req.Resource,
}).Namespace(newObj.GetNamespace())

log.Info("creating new resource")
_, err := r.Create(ctx, newObj, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("creating resource %v: %w", req.Resource, err)
}

if apierrors.IsAlreadyExists(err) {
log.Info("resource already exists, patching")
obj, err := r.Get(ctx, newObj.GetName(), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("getting old resource: %w", err)
}

// Keep metadata fields equal to ignore unintentional patch.
newObj.SetResourceVersion(obj.GetResourceVersion())
newObj.SetCreationTimestamp(obj.GetCreationTimestamp())
newObj.SetUID(obj.GetUID())
newObj.SetGeneration(obj.GetGeneration())
newObj.SetManagedFields(obj.GetManagedFields())

// Status fields should be omitted.
delete(obj.Object, "status")
delete(newObj.Object, "status")

original, err := obj.MarshalJSON()
if err != nil {
return err
}

modified, err := newObj.MarshalJSON()
if err != nil {
return err
}

patch, err := jsonpatch.CreateMergePatch(original, modified)
if err != nil {
return fmt.Errorf("creating patch: %w", err)
}

// If resources are identical, patch will be equal '{}'.
if len(patch) <= 2 {
log.Info("skipping patch, resources are identical")
return nil
}

log.Infof("patching resource: %s", patch)
_, err = r.Patch(ctx, obj.GetName(), types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("patching resource %v: %w", obj.GetName(), err)
}

return nil
}

return nil
}
146 changes: 146 additions & 0 deletions actions/create_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package actions

import (
"context"
"errors"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/fake"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/castai/cluster-controller/castai"
)

func Test_newCreateHandler(t *testing.T) {
scheme := runtime.NewScheme()
_ = appsv1.AddToScheme(scheme)
ctx := context.Background()

tests := map[string]struct {
objs []runtime.Object
action *castai.ClusterAction
convertFn func(i map[string]interface{}) client.Object
err error
want *appsv1.Deployment
}{
"should return error when action is of a different type": {
action: &castai.ClusterAction{
ActionDeleteNode: &castai.ActionDeleteNode{},
},
err: newUnexpectedTypeErr(&castai.ActionDeleteNode{}, &castai.ActionCreate{}),
},
"should return error when object is not provided": {
action: &castai.ClusterAction{
ActionCreate: &castai.ActionCreate{
GroupVersionResource: castai.GroupVersionResource{},
},
},
err: errors.New("no object provided"),
},
"should create new deployment": {
action: &castai.ClusterAction{
ActionCreate: &castai.ActionCreate{
GroupVersionResource: castai.GroupVersionResource{
Group: appsv1.SchemeGroupVersion.Group,
Version: appsv1.SchemeGroupVersion.Version,
Resource: "deployments",
},
Object: getObj(t, newDeployment()),
},
},
want: newDeployment(),
convertFn: func(i map[string]interface{}) client.Object {
out := &appsv1.Deployment{}
_ = runtime.DefaultUnstructuredConverter.FromUnstructured(i, out)
return out
},
},
"should patch already existing resource": {
action: &castai.ClusterAction{
ActionCreate: &castai.ActionCreate{
GroupVersionResource: castai.GroupVersionResource{
Group: appsv1.SchemeGroupVersion.Group,
Version: appsv1.SchemeGroupVersion.Version,
Resource: "deployments",
},
Object: getObj(t, newDeployment(func(d *appsv1.Deployment) {
d.Labels = map[string]string{"changed": "true"}
})),
},
},
objs: []runtime.Object{newDeployment(func(d *appsv1.Deployment) {})},
want: newDeployment(func(d *appsv1.Deployment) {
d.Labels = map[string]string{"changed": "true"}
}),
convertFn: func(i map[string]interface{}) client.Object {
out := &appsv1.Deployment{}
_ = runtime.DefaultUnstructuredConverter.FromUnstructured(i, out)
return out
},
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
r := require.New(t)
log := logrus.New()

c := fake.NewSimpleDynamicClient(scheme, test.objs...)
handler := newCreateHandler(log, c)
err := handler.Handle(ctx, test.action)
if test.err != nil {
r.Error(err)
r.Equal(test.err, err)
return
}

r.NoError(err)
res := c.Resource(schema.GroupVersionResource{
Group: test.action.ActionCreate.Group,
Version: test.action.ActionCreate.Version,
Resource: test.action.ActionCreate.Resource,
})
list, err := res.List(ctx, metav1.ListOptions{})
r.NoError(err)
r.Len(list.Items, 1)
r.Equal(test.want, test.convertFn(list.Items[0].Object))
})
}
}

func getObj(t *testing.T, obj runtime.Object) map[string]interface{} {
t.Helper()
unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
t.Error(err)
}
return unstructured
}

func newDeployment(opts ...func(d *appsv1.Deployment)) *appsv1.Deployment {
out := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
Namespace: "default",
},
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{},
},
}
for _, opt := range opts {
opt(out)
}
return out
}
63 changes: 63 additions & 0 deletions actions/delete_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package actions

import (
"context"
"fmt"
"reflect"

"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

"github.com/castai/cluster-controller/castai"
)

type deleteHandler struct {
log logrus.FieldLogger
client dynamic.Interface
}

func newDeleteHandler(log logrus.FieldLogger, client dynamic.Interface) ActionHandler {
return &deleteHandler{
log: log,
client: client,
}
}

func (h *deleteHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionDelete)
if !ok {
return newUnexpectedTypeErr(action.Data(), req)
}

log := h.log.WithFields(logrus.Fields{
"id": action.ID,
"action": reflect.TypeOf(action.Data()).String(),
"gvr": req.ID.GroupVersionResource.String(),
"name": req.ID.Name,
})

r := h.client.Resource(schema.GroupVersionResource{
Group: req.ID.Group,
Version: req.ID.Version,
Resource: req.ID.Resource,
})

var res dynamic.ResourceInterface = r
if req.ID.Namespace != nil {
res = r.Namespace(*req.ID.Namespace)
}

log.Info("deleting resource")
if err := res.Delete(ctx, req.ID.Name, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
log.Info("resource not found, skipping deletion")
return nil
}
return fmt.Errorf("deleting resource %v: %w", req.ID.Name, err)
}

return nil
}
Loading

0 comments on commit a81c1f5

Please sign in to comment.