diff --git a/cmd/kured/main.go b/cmd/kured/main.go index e1a57e4fd..1ef45b026 100644 --- a/cmd/kured/main.go +++ b/cmd/kured/main.go @@ -2,18 +2,21 @@ package main import ( "context" + "encoding/json" "fmt" "math/rand" "net/http" "os" "os/exec" "regexp" + "strings" "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" kubectldrain "k8s.io/kubectl/pkg/drain" @@ -48,10 +51,11 @@ var ( messageTemplateReboot string podSelectors []string - rebootDays []string - rebootStart string - rebootEnd string - timezone string + rebootDays []string + rebootStart string + rebootEnd string + timezone string + annotateNodes bool // Metrics rebootRequiredGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -61,6 +65,15 @@ var ( }, []string{"node"}) ) +const ( + // KuredNodeLockAnnotation is the canonical string value for the kured node-lock annotation + KuredNodeLockAnnotation string = "weave.works/kured-node-lock" + // KuredRebootInProgressAnnotation is the canonical string value for the kured reboot-in-progress annotation + KuredRebootInProgressAnnotation string = "weave.works/kured-reboot-in-progress" + // KuredMostRecentRebootNeededAnnotation is the canonical string value for the kured most-recent-reboot-needed annotation + KuredMostRecentRebootNeededAnnotation string = "weave.works/kured-most-recent-reboot-needed" +) + func init() { prometheus.MustRegister(rebootRequiredGauge) } @@ -77,7 +90,7 @@ func main() { "namespace containing daemonset on which to place lock") rootCmd.PersistentFlags().StringVar(&dsName, "ds-name", "kured", "name of daemonset on which to place lock") - rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", "weave.works/kured-node-lock", + rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", KuredNodeLockAnnotation, "annotation in which to record locking node") rootCmd.PersistentFlags().DurationVar(&lockTTL, "lock-ttl", 0, "expire lock annotation after this duration (default: 0, disabled)") @@ -113,6 +126,9 @@ func main() { rootCmd.PersistentFlags().StringVar(&timezone, "time-zone", "UTC", "use this timezone for schedule inputs") + rootCmd.PersistentFlags().BoolVar(&annotateNodes, "annotate-nodes", false, + "if set, the annotations 'weave.works/kured-reboot-in-progress' and 'weave.works/kured-most-recent-reboot-needed' will be given to nodes undergoing kured reboots") + if err := rootCmd.Execute(); err != nil { log.Fatal(err) } @@ -315,6 +331,44 @@ type nodeMeta struct { Unschedulable bool `json:"unschedulable"` } +func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) { + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) + if err != nil { + log.Fatal("Error retrieving node object via k8s API: %v", err) + } + for k, v := range annotations { + node.Annotations[k] = v + log.Infof("Adding node %s annotation: %s=%s", node.GetName(), k, v) + } + + bytes, err := json.Marshal(node) + if err != nil { + log.Fatal("Error marshalling node object into JSON: %v", err) + } + + _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{}) + if err != nil { + var annotationsErr string + for k, v := range annotations { + annotationsErr += fmt.Sprintf("%s=%s ", k, v) + } + log.Fatal("Error adding node annotations %s via k8s API: %v", annotationsErr, err) + } +} + +func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) { + log.Infof("Deleting node %s annotation %s", nodeID, key) + + // JSON Patch takes as path input a JSON Pointer, defined in RFC6901 + // So we replace all instances of "/" with "~1" as per: + // https://tools.ietf.org/html/rfc6901#section-3 + patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1"))) + _, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{}) + if err != nil { + log.Fatal("Error deleting node annotation %s via k8s API: %v", key, err) + } +} + func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Duration) { config, err := rest.InClusterConfig() if err != nil { @@ -330,13 +384,23 @@ func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Dur nodeMeta := nodeMeta{} if holding(lock, &nodeMeta) { + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) + if err != nil { + log.Fatal("Error retrieving node object via k8s API: %v", err) + } if !nodeMeta.Unschedulable { - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) - if err != nil { - log.Fatal(err) - } uncordon(client, node) } + // If we're holding the lock we know we've tried, in a prior run, to reboot + // So (1) we want to confirm that the reboot succeeded practically ( !rebootRequired() ) + // And (2) check if we previously annotated the node that it was in the process of being rebooted, + // And finally (3) if it has that annotation, to delete it. + // This indicates to other node tools running on the cluster that this node may be a candidate for maintenance + if annotateNodes && !rebootRequired() { + if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok { + deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation) + } + } release(lock) } @@ -367,10 +431,23 @@ func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Dur node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) if err != nil { - log.Fatal(err) + log.Fatal("Error retrieving node object via k8s API: %v", err) } nodeMeta.Unschedulable = node.Spec.Unschedulable + var timeNowString string + if annotateNodes { + if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; !ok { + timeNowString = time.Now().Format(time.RFC3339) + // Annotate this node to indicate that "I am going to be rebooted!" + // so that other node maintenance tools running on the cluster are aware that this node is in the process of a "state transition" + annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString} + // & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot + annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString + addNodeAnnotations(client, nodeID, annotations) + } + } + if !acquire(lock, &nodeMeta, TTL) { // Prefer to not schedule pods onto this node to avoid draing the same pod multiple times. preferNoScheduleTaint.Enable() @@ -410,6 +487,9 @@ func root(cmd *cobra.Command, args []string) { log.Infof("Reboot Sentinel: %s every %v", rebootSentinel, period) log.Infof("Blocking Pod Selectors: %v", podSelectors) log.Infof("Reboot on: %v", window) + if annotateNodes { + log.Infof("Will annotate nodes during kured reboot operations") + } go rebootAsRequired(nodeID, window, lockTTL) go maintainRebootRequiredMetric(nodeID) diff --git a/kured-ds.yaml b/kured-ds.yaml index 2e0ffaec8..f4d4842c1 100644 --- a/kured-ds.yaml +++ b/kured-ds.yaml @@ -63,3 +63,4 @@ spec: # - --message-template-drain=Rebooting node %s # - --start-time=0:00 # - --time-zone=UTC +# - --annotate-nodes=false