diff --git a/webhosting-operator/main.go b/webhosting-operator/main.go new file mode 100644 index 00000000..7a6b4373 --- /dev/null +++ b/webhosting-operator/main.go @@ -0,0 +1,115 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "log" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + + webhostingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/webhosting-operator/pkg/apis/webhosting/v1alpha1" +) + +func main() { + ctx := signals.SetupSignalHandler() + cfg := config.GetConfigOrDie() + + d := dynamic.NewForConfigOrDie(cfg) + + lastResourceVersion := "" + for { + select { + case <-ctx.Done(): + return + default: + } + + log.Printf("Starting watch with resourceVersion %q", lastResourceVersion) + w, err := d.Resource(webhostingv1alpha1.SchemeGroupVersion.WithResource("websites")).Watch(ctx, metav1.ListOptions{ + AllowWatchBookmarks: true, + ResourceVersion: lastResourceVersion, + }) + utilruntime.Must(err) + + lastResourceVersion, err = doWatch(ctx, w) + if apierrors.IsResourceExpired(err) { + lastResourceVersion = "" + log.Printf("watch ended with resource expired, retrying with resourceVersion %q: %v", lastResourceVersion, err) + continue + } + } +} + +func doWatch(ctx context.Context, w watch.Interface) (string, error) { + defer w.Stop() + + lastResourceVersion := "" + +loop: + for { + select { + case <-ctx.Done(): + break loop + case event, ok := <-w.ResultChan(): + if !ok { + break loop + } + + if event.Type == watch.Error { + return "", apierrors.FromObject(event.Object) + } + + var objBytes []byte + + if event.Object != nil { + type resourceVersionGetter interface { + GetResourceVersion() string + } + + if rv, ok := event.Object.(resourceVersionGetter); ok { + lastResourceVersion = rv.GetResourceVersion() + log.Printf("last resource version: %q", lastResourceVersion) + } + + if obj, ok := event.Object.(metav1.Object); ok { + obj.SetManagedFields(nil) + ann := obj.GetAnnotations() + delete(ann, "kubectl.kubernetes.io/last-applied-configuration") + obj.SetAnnotations(ann) + + var err error + objBytes, err = json.Marshal(obj) + if err != nil { + return "", err + } + } + } + + log.Printf("%s: %s", event.Type, objBytes) + } + } + + return lastResourceVersion, nil +}