Skip to content

Commit

Permalink
The implementation of network policy controller
Browse files Browse the repository at this point in the history
This patch will add the network policy controller to handle the network
policy event and CRUD the related security policy on the NSX side.

It will convert the network policy defination to security policy CR internally,
then reuse the logic to handle security policy CR to create the NSX security
policy. So we add a new flag createdFor in the security policy module.

Testing done:

CRUD for the generic network policy:
  kubectl apply -f test/e2e/manifest/testNetworkPolicy/np_simple.yaml
  kubectl apply -f test/e2e/manifest/testNetworkPolicy/np_simple2.yaml
  kubectl delete -f test/e2e/manifest/testNetworkPolicy/np_simple2.yaml
  • Loading branch information
heypnus committed Jan 24, 2024
1 parent e7fecda commit f062665
Show file tree
Hide file tree
Showing 17 changed files with 1,124 additions and 300 deletions.
23 changes: 3 additions & 20 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
commonctl "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
ippool2 "github.com/vmware-tanzu/nsx-operator/pkg/controllers/ippool"
namespacecontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/namespace"
networkpolicycontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/networkpolicy"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/node"
nsxserviceaccountcontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/pod"
Expand All @@ -37,7 +38,6 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/ippool"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/securitypolicy"
subnetservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc"
)
Expand Down Expand Up @@ -79,23 +79,6 @@ func init() {
}
}

func StartSecurityPolicyController(mgr ctrl.Manager, commonService common.Service) {
securityReconcile := &securitypolicycontroller.SecurityPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
securityService, err := securitypolicy.InitializeSecurityPolicy(commonService)
if err != nil {
log.Error(err, "failed to initialize securitypolicy commonService", "controller", "SecurityPolicy")
os.Exit(1)
}
securityReconcile.Service = securityService
if err := securityReconcile.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "SecurityPolicy")
os.Exit(1)
}
}

func StartNSXServiceAccountController(mgr ctrl.Manager, commonService common.Service) {
log.Info("starting NSXServiceAccountController")
nsxServiceAccountReconcile := &nsxserviceaccountcontroller.NSXServiceAccountReconciler{
Expand Down Expand Up @@ -219,10 +202,10 @@ func main() {
subnetport.StartSubnetPortController(mgr, commonService)
pod.StartPodController(mgr, commonService)
StartIPPoolController(mgr, commonService)
networkpolicycontroller.StartNetworkPolicyController(mgr, commonService)
}

// Start the security policy controller.
StartSecurityPolicyController(mgr, commonService)
securitypolicycontroller.StartSecurityPolicyController(mgr, commonService)

// Start the NSXServiceAccount controller.
if cf.EnableAntreaNSXInterworking {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

const (
MetricResTypeSecurityPolicy = "securitypolicy"
MetricResTypeNetworkPolicy = "networkpolicy"
MetricResTypeIPPool = "ippool"
MetricResTypeNSXServiceAccount = "nsxserviceaccount"
MetricResTypeSubnetPort = "subnetport"
Expand Down
195 changes: 195 additions & 0 deletions pkg/controllers/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/* Copyright © 2024 VMware, Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0 */

package networkpolicy

import (
"context"
"errors"
"os"
"runtime"
"time"

networkingv1 "k8s.io/api/networking/v1"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
"github.com/vmware-tanzu/nsx-operator/pkg/metrics"
_ "github.com/vmware-tanzu/nsx-operator/pkg/nsx/ratelimiter"
servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/securitypolicy"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
)

var (
log = logger.Log
ResultNormal = common.ResultNormal
ResultRequeue = common.ResultRequeue
ResultRequeueAfter5mins = common.ResultRequeueAfter5mins
MetricResType = common.MetricResTypeNetworkPolicy
)

// NetworkPolicyReconciler reconciles a NetworkPolicy object
type NetworkPolicyReconciler struct {
Client client.Client
Scheme *apimachineryruntime.Scheme
Service *securitypolicy.SecurityPolicyService
}

func updateFail(r *NetworkPolicyReconciler, c *context.Context, o *networkingv1.NetworkPolicy, e *error) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateFailTotal, MetricResType)
}

func deleteFail(r *NetworkPolicyReconciler, c *context.Context, o *networkingv1.NetworkPolicy, e *error) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResType)
}

func updateSuccess(r *NetworkPolicyReconciler, c *context.Context, o *networkingv1.NetworkPolicy) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateSuccessTotal, MetricResType)
}

func deleteSuccess(r *NetworkPolicyReconciler, _ *context.Context, _ *networkingv1.NetworkPolicy) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResType)
}

func (r *NetworkPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
networkPolicy := &networkingv1.NetworkPolicy{}
log.Info("reconciling networkpolicy", "networkpolicy", req.NamespacedName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, MetricResType)

if err := r.Client.Get(ctx, req.NamespacedName, networkPolicy); err != nil {
log.Error(err, "unable to fetch network policy", "req", req.NamespacedName)
return ResultNormal, client.IgnoreNotFound(err)
}

if networkPolicy.ObjectMeta.DeletionTimestamp.IsZero() {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateTotal, MetricResType)
if !controllerutil.ContainsFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName) {
controllerutil.AddFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName)
if err := r.Client.Update(ctx, networkPolicy); err != nil {
log.Error(err, "add finalizer", "networkpolicy", req.NamespacedName)
updateFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
log.V(1).Info("added finalizer on networkpolicy", "networkpolicy", req.NamespacedName)
}

if err := r.Service.CreateOrUpdateSecurityPolicy(networkPolicy); err != nil {
if errors.As(err, &nsxutil.RestrictionError{}) {
log.Error(err, err.Error(), "networkpolicy", req.NamespacedName)
updateFail(r, &ctx, networkPolicy, &err)
return ResultNormal, nil
}
log.Error(err, "create or update failed, would retry exponentially", "networkpolicy", req.NamespacedName)
updateFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
updateSuccess(r, &ctx, networkPolicy)
} else {
if controllerutil.ContainsFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
if err := r.Service.DeleteSecurityPolicy(networkPolicy, false, servicecommon.ResourceTypeNetworkPolicy); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "networkpolicy", req.NamespacedName)
deleteFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
controllerutil.RemoveFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName)
if err := r.Client.Update(ctx, networkPolicy); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "networkpolicy", req.NamespacedName)
deleteFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
log.V(1).Info("removed finalizer", "networkpolicy", req.NamespacedName)
deleteSuccess(r, &ctx, networkPolicy)
} else {
// only print a message because it's not a normal case
log.Info("finalizers cannot be recognized", "networkpolicy", req.NamespacedName)
}
}

return ResultNormal, nil
}

func (r *NetworkPolicyReconciler) setupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&networkingv1.NetworkPolicy{}).
WithOptions(
controller.Options{
MaxConcurrentReconciles: runtime.NumCPU(),
}).
Complete(r)
}

// Start setup manager and launch GC
func (r *NetworkPolicyReconciler) Start(mgr ctrl.Manager) error {
err := r.setupWithManager(mgr)
if err != nil {
return err
}

go r.GarbageCollector(make(chan bool), servicecommon.GCInterval)
return nil
}

// GarbageCollector collect networkpolicy which has been removed from K8s.
// cancel is used to break the loop during UT
func (r *NetworkPolicyReconciler) GarbageCollector(cancel chan bool, timeout time.Duration) {
ctx := context.Background()
log.Info("garbage collector started")
for {
select {
case <-cancel:
return
case <-time.After(timeout):
}
nsxPolicySet := r.Service.ListNetworkPolicyID()
if len(nsxPolicySet) == 0 {
continue
}
policyList := &networkingv1.NetworkPolicyList{}
err := r.Client.List(ctx, policyList)
if err != nil {
log.Error(err, "failed to list NetworkPolicy")
continue
}

CRPolicySet := sets.NewString()
for _, policy := range policyList.Items {
CRPolicySet.Insert(r.Service.BuildNetworkPolicyAllowPolicyID(string(policy.UID)))
CRPolicySet.Insert(r.Service.BuildNetworkPolicyIsolationPolicyID(string(policy.UID)))
}

for elem := range nsxPolicySet {
if CRPolicySet.Has(elem) {
continue
}
log.V(1).Info("GC collected NetworkPolicy", "ID", elem)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
err = r.Service.DeleteSecurityPolicy(types.UID(elem), false, servicecommon.ResourceTypeNetworkPolicy)
if err != nil {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResType)
} else {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResType)
}
}
}
}

func StartNetworkPolicyController(mgr ctrl.Manager, commonService servicecommon.Service) {
networkPolicyReconcile := NetworkPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
networkPolicyReconcile.Service = securitypolicy.GetSecurityService(commonService)
if err := networkPolicyReconcile.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "NetworkPolicy")
os.Exit(1)
}
}
17 changes: 15 additions & 2 deletions pkg/controllers/securitypolicy/securitypolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"os"
"reflect"
"runtime"
"time"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (r *SecurityPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Reque
} else {
if controllerutil.ContainsFinalizer(obj, servicecommon.SecurityPolicyFinalizerName) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
if err := r.Service.DeleteSecurityPolicy(obj, false); err != nil {
if err := r.Service.DeleteSecurityPolicy(obj, false, servicecommon.ResourceTypeSecurityPolicy); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "securitypolicy", req.NamespacedName)
deleteFail(r, &ctx, obj, &err)
return ResultRequeue, err
Expand Down Expand Up @@ -289,7 +290,7 @@ func (r *SecurityPolicyReconciler) GarbageCollector(cancel chan bool, timeout ti
}
log.V(1).Info("GC collected SecurityPolicy CR", "UID", elem)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
err = r.Service.DeleteSecurityPolicy(types.UID(elem), false)
err = r.Service.DeleteSecurityPolicy(types.UID(elem), false, servicecommon.ResourceTypeSecurityPolicy)
if err != nil {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResType)
} else {
Expand Down Expand Up @@ -338,3 +339,15 @@ func reconcileSecurityPolicy(client client.Client, pods []v1.Pod, q workqueue.Ra
}
return nil
}

func StartSecurityPolicyController(mgr ctrl.Manager, commonService servicecommon.Service) {
securityPolicyReconcile := SecurityPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
securityPolicyReconcile.Service = securitypolicy.GetSecurityService(commonService)
if err := securityPolicyReconcile.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "SecurityPolicy")
os.Exit(1)
}
}
Loading

0 comments on commit f062665

Please sign in to comment.