Skip to content

Commit

Permalink
Basic implementation of network policy controller
Browse files Browse the repository at this point in the history
  • Loading branch information
heypnus committed Jan 11, 2024
1 parent 21eebbc commit c2e6fab
Show file tree
Hide file tree
Showing 15 changed files with 1,058 additions and 300 deletions.
28 changes: 8 additions & 20 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
commonctl "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
ippool2 "github.com/vmware-tanzu/nsx-operator/pkg/controllers/ippool"
namespacecontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/namespace"
networkpolicycontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/networkpolicy"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/node"
nsxserviceaccountcontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/pod"
Expand All @@ -37,7 +38,6 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/ippool"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/securitypolicy"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc"
)

Expand Down Expand Up @@ -78,23 +78,6 @@ func init() {
}
}

func StartSecurityPolicyController(mgr ctrl.Manager, commonService common.Service) {
securityReconcile := &securitypolicycontroller.SecurityPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
if securityService, err := securitypolicy.InitializeSecurityPolicy(commonService); err != nil {
log.Error(err, "failed to initialize securitypolicy commonService", "controller", "SecurityPolicy")
os.Exit(1)
} else {
securityReconcile.Service = securityService
}
if err := securityReconcile.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "SecurityPolicy")
os.Exit(1)
}
}

func StartNSXServiceAccountController(mgr ctrl.Manager, commonService common.Service) {
log.Info("starting NSXServiceAccountController")
nsxServiceAccountReconcile := &nsxserviceaccountcontroller.NSXServiceAccountReconciler{
Expand Down Expand Up @@ -210,10 +193,15 @@ func main() {
StartNamespaceController(mgr, commonService)
StartVPCController(mgr, commonService)
StartIPPoolController(mgr, commonService)

if err := networkpolicycontroller.StartNetworkPolicyController(mgr, commonService); err != nil {
os.Exit(1)
}
}

// Start the security policy controller.
StartSecurityPolicyController(mgr, commonService)
if err := securitypolicycontroller.StartSecurityPolicyController(mgr, commonService); err != nil {
os.Exit(1)
}

// Start the NSXServiceAccount controller.
if cf.EnableAntreaNSXInterworking {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

const (
MetricResTypeSecurityPolicy = "securitypolicy"
MetricResTypeNetworkPolicy = "networkpolicy"
MetricResTypeIPPool = "ippool"
MetricResTypeNSXServiceAccount = "nsxserviceaccount"
MetricResTypeSubnetPort = "subnetport"
Expand Down
203 changes: 203 additions & 0 deletions pkg/controllers/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/* Copyright © 2024 VMware, Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0 */

package networkpolicy

import (
"context"
"errors"
"runtime"
"time"

networkingv1 "k8s.io/api/networking/v1"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
"github.com/vmware-tanzu/nsx-operator/pkg/metrics"
_ "github.com/vmware-tanzu/nsx-operator/pkg/nsx/ratelimiter"
servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/securitypolicy"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
)

var (
log = logger.Log
ResultNormal = common.ResultNormal
ResultRequeue = common.ResultRequeue
ResultRequeueAfter5mins = common.ResultRequeueAfter5mins
MetricResType = common.MetricResTypeNetworkPolicy
)

// NetworkPolicyReconciler reconciles a NetworkPolicy object
type NetworkPolicyReconciler struct {
Client client.Client
Scheme *apimachineryruntime.Scheme
Service *securitypolicy.SecurityPolicyService
}

func updateFail(r *NetworkPolicyReconciler, c *context.Context, o *networkingv1.NetworkPolicy, e *error) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateFailTotal, MetricResType)
}

func k8sClient(mgr ctrl.Manager) client.Client {
var c client.Client
if mgr != nil {
c = mgr.GetClient()
}
return c
}

func deleteFail(r *NetworkPolicyReconciler, c *context.Context, o *networkingv1.NetworkPolicy, e *error) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResType)
}

func updateSuccess(r *NetworkPolicyReconciler, c *context.Context, o *networkingv1.NetworkPolicy) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateSuccessTotal, MetricResType)
}

func deleteSuccess(r *NetworkPolicyReconciler, _ *context.Context, _ *networkingv1.NetworkPolicy) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResType)
}

func (r *NetworkPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
networkPolicy := &networkingv1.NetworkPolicy{}
log.Info("reconciling networkpolicy", "networkpolicy", req.NamespacedName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, MetricResType)

if err := r.Client.Get(ctx, req.NamespacedName, networkPolicy); err != nil {
log.Error(err, "unable to fetch network policy", "req", req.NamespacedName)
return ResultNormal, client.IgnoreNotFound(err)
}

if networkPolicy.ObjectMeta.DeletionTimestamp.IsZero() {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateTotal, MetricResType)
if !controllerutil.ContainsFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName) {
controllerutil.AddFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName)
if err := r.Client.Update(ctx, networkPolicy); err != nil {
log.Error(err, "add finalizer", "networkpolicy", req.NamespacedName)
updateFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
log.V(1).Info("added finalizer on networkpolicy", "networkpolicy", req.NamespacedName)
}

if err := r.Service.CreateOrUpdateSecurityPolicy(networkPolicy); err != nil {
if errors.As(err, &nsxutil.RestrictionError{}) {
log.Error(err, err.Error(), "networkpolicy", req.NamespacedName)
updateFail(r, &ctx, networkPolicy, &err)
return ResultNormal, nil
}
log.Error(err, "create or update failed, would retry exponentially", "networkpolicy", req.NamespacedName)
updateFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
updateSuccess(r, &ctx, networkPolicy)
} else {
if controllerutil.ContainsFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
if err := r.Service.DeleteSecurityPolicy(networkPolicy, false, servicecommon.ResourceTypeNetworkPolicy); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "networkpolicy", req.NamespacedName)
deleteFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
controllerutil.RemoveFinalizer(networkPolicy, servicecommon.NetworkPolicyFinalizerName)
if err := r.Client.Update(ctx, networkPolicy); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "networkpolicy", req.NamespacedName)
deleteFail(r, &ctx, networkPolicy, &err)
return ResultRequeue, err
}
log.V(1).Info("removed finalizer", "networkpolicy", req.NamespacedName)
deleteSuccess(r, &ctx, networkPolicy)
} else {
// only print a message because it's not a normal case
log.Info("finalizers cannot be recognized", "networkpolicy", req.NamespacedName)
}
}

return ResultNormal, nil
}

func (r *NetworkPolicyReconciler) setupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&networkingv1.NetworkPolicy{}).
WithOptions(
controller.Options{
MaxConcurrentReconciles: runtime.NumCPU(),
}).
Complete(r)
}

// Start setup manager and launch GC
func (r *NetworkPolicyReconciler) Start(mgr ctrl.Manager) error {
err := r.setupWithManager(mgr)
if err != nil {
return err
}

go r.GarbageCollector(make(chan bool), servicecommon.GCInterval)
return nil
}

// GarbageCollector collect networkpolicy which has been removed from K8s.
// cancel is used to break the loop during UT
func (r *NetworkPolicyReconciler) GarbageCollector(cancel chan bool, timeout time.Duration) {
ctx := context.Background()
log.Info("garbage collector started")
for {
select {
case <-cancel:
return
case <-time.After(timeout):
}
nsxPolicySet := r.Service.ListNetworkPolicyID()
if len(nsxPolicySet) == 0 {
continue
}
policyList := &networkingv1.NetworkPolicyList{}
err := r.Client.List(ctx, policyList)
if err != nil {
log.Error(err, "failed to list NetworkPolicy")
continue
}

CRPolicySet := sets.NewString()
for _, policy := range policyList.Items {
CRPolicySet.Insert(r.Service.BuildNetworkPolicyAllowPolicyID(string(policy.UID)))
CRPolicySet.Insert(r.Service.BuildNetworkPolicyIsolationPolicyID(string(policy.UID)))
}

for elem := range nsxPolicySet {
if CRPolicySet.Has(elem) {
continue
}
log.V(1).Info("GC collected NetworkPolicy", "ID", elem)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
err = r.Service.DeleteSecurityPolicy(types.UID(elem), false, servicecommon.ResourceTypeNetworkPolicy)
if err != nil {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResType)
} else {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResType)
}
}
}
}

func StartNetworkPolicyController(mgr ctrl.Manager, commonService servicecommon.Service) error {
networkPolicyReconcile := NetworkPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
networkPolicyReconcile.Service = securitypolicy.GetSecurityService(commonService)
if err := networkPolicyReconcile.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "NetworkPolicy")
return err
}
return nil
}
17 changes: 15 additions & 2 deletions pkg/controllers/securitypolicy/securitypolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *SecurityPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Reque
} else {
if controllerutil.ContainsFinalizer(obj, servicecommon.SecurityPolicyFinalizerName) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
if err := r.Service.DeleteSecurityPolicy(obj, false); err != nil {
if err := r.Service.DeleteSecurityPolicy(obj, false, servicecommon.ResourceTypeSecurityPolicy); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "securitypolicy", req.NamespacedName)
deleteFail(r, &ctx, obj, &err)
return ResultRequeue, err
Expand Down Expand Up @@ -288,7 +288,7 @@ func (r *SecurityPolicyReconciler) GarbageCollector(cancel chan bool, timeout ti
}
log.V(1).Info("GC collected SecurityPolicy CR", "UID", elem)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResType)
err = r.Service.DeleteSecurityPolicy(types.UID(elem), false)
err = r.Service.DeleteSecurityPolicy(types.UID(elem), false, servicecommon.ResourceTypeSecurityPolicy)
if err != nil {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResType)
} else {
Expand Down Expand Up @@ -337,3 +337,16 @@ func reconcileSecurityPolicy(client client.Client, pods []v1.Pod, q workqueue.Ra
}
return nil
}

func StartSecurityPolicyController(mgr ctrl.Manager, commonService servicecommon.Service) error {
securityPolicyReconcile := SecurityPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
securityPolicyReconcile.Service = securitypolicy.GetSecurityService(commonService)
if err := securityPolicyReconcile.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "SecurityPolicy")
return err
}
return nil
}
Loading

0 comments on commit c2e6fab

Please sign in to comment.