Skip to content

Commit

Permalink
Merge pull request kubereboot#296 from jackfrancis/node-annotations
Browse files Browse the repository at this point in the history
add node annotations to identify kured reboot operations
  • Loading branch information
Daniel Holbach authored Mar 9, 2021
2 parents 32e01a8 + baf8340 commit 250b9ba
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 10 deletions.
100 changes: 90 additions & 10 deletions cmd/kured/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package main

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"os"
"os/exec"
"regexp"
"strings"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kubectldrain "k8s.io/kubectl/pkg/drain"
Expand Down Expand Up @@ -48,10 +51,11 @@ var (
messageTemplateReboot string
podSelectors []string

rebootDays []string
rebootStart string
rebootEnd string
timezone string
rebootDays []string
rebootStart string
rebootEnd string
timezone string
annotateNodes bool

// Metrics
rebootRequiredGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand All @@ -61,6 +65,15 @@ var (
}, []string{"node"})
)

const (
// KuredNodeLockAnnotation is the canonical string value for the kured node-lock annotation
KuredNodeLockAnnotation string = "weave.works/kured-node-lock"
// KuredRebootInProgressAnnotation is the canonical string value for the kured reboot-in-progress annotation
KuredRebootInProgressAnnotation string = "weave.works/kured-reboot-in-progress"
// KuredMostRecentRebootNeededAnnotation is the canonical string value for the kured most-recent-reboot-needed annotation
KuredMostRecentRebootNeededAnnotation string = "weave.works/kured-most-recent-reboot-needed"
)

func init() {
prometheus.MustRegister(rebootRequiredGauge)
}
Expand All @@ -77,7 +90,7 @@ func main() {
"namespace containing daemonset on which to place lock")
rootCmd.PersistentFlags().StringVar(&dsName, "ds-name", "kured",
"name of daemonset on which to place lock")
rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", "weave.works/kured-node-lock",
rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", KuredNodeLockAnnotation,
"annotation in which to record locking node")
rootCmd.PersistentFlags().DurationVar(&lockTTL, "lock-ttl", 0,
"expire lock annotation after this duration (default: 0, disabled)")
Expand Down Expand Up @@ -113,6 +126,9 @@ func main() {
rootCmd.PersistentFlags().StringVar(&timezone, "time-zone", "UTC",
"use this timezone for schedule inputs")

rootCmd.PersistentFlags().BoolVar(&annotateNodes, "annotate-nodes", false,
"if set, the annotations 'weave.works/kured-reboot-in-progress' and 'weave.works/kured-most-recent-reboot-needed' will be given to nodes undergoing kured reboots")

if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -315,6 +331,44 @@ type nodeMeta struct {
Unschedulable bool `json:"unschedulable"`
}

func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatal("Error retrieving node object via k8s API: %v", err)
}
for k, v := range annotations {
node.Annotations[k] = v
log.Infof("Adding node %s annotation: %s=%s", node.GetName(), k, v)
}

bytes, err := json.Marshal(node)
if err != nil {
log.Fatal("Error marshalling node object into JSON: %v", err)
}

_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
if err != nil {
var annotationsErr string
for k, v := range annotations {
annotationsErr += fmt.Sprintf("%s=%s ", k, v)
}
log.Fatal("Error adding node annotations %s via k8s API: %v", annotationsErr, err)
}
}

func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) {
log.Infof("Deleting node %s annotation %s", nodeID, key)

// JSON Patch takes as path input a JSON Pointer, defined in RFC6901
// So we replace all instances of "/" with "~1" as per:
// https://tools.ietf.org/html/rfc6901#section-3
patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1")))
_, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
log.Fatal("Error deleting node annotation %s via k8s API: %v", key, err)
}
}

func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Duration) {
config, err := rest.InClusterConfig()
if err != nil {
Expand All @@ -330,13 +384,23 @@ func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Dur

nodeMeta := nodeMeta{}
if holding(lock, &nodeMeta) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatal("Error retrieving node object via k8s API: %v", err)
}
if !nodeMeta.Unschedulable {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
uncordon(client, node)
}
// If we're holding the lock we know we've tried, in a prior run, to reboot
// So (1) we want to confirm that the reboot succeeded practically ( !rebootRequired() )
// And (2) check if we previously annotated the node that it was in the process of being rebooted,
// And finally (3) if it has that annotation, to delete it.
// This indicates to other node tools running on the cluster that this node may be a candidate for maintenance
if annotateNodes && !rebootRequired() {
if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok {
deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
}
}
release(lock)
}

Expand Down Expand Up @@ -367,10 +431,23 @@ func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Dur

node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatal(err)
log.Fatal("Error retrieving node object via k8s API: %v", err)
}
nodeMeta.Unschedulable = node.Spec.Unschedulable

var timeNowString string
if annotateNodes {
if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; !ok {
timeNowString = time.Now().Format(time.RFC3339)
// Annotate this node to indicate that "I am going to be rebooted!"
// so that other node maintenance tools running on the cluster are aware that this node is in the process of a "state transition"
annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString}
// & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot
annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString
addNodeAnnotations(client, nodeID, annotations)
}
}

if !acquire(lock, &nodeMeta, TTL) {
// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
preferNoScheduleTaint.Enable()
Expand Down Expand Up @@ -410,6 +487,9 @@ func root(cmd *cobra.Command, args []string) {
log.Infof("Reboot Sentinel: %s every %v", rebootSentinel, period)
log.Infof("Blocking Pod Selectors: %v", podSelectors)
log.Infof("Reboot on: %v", window)
if annotateNodes {
log.Infof("Will annotate nodes during kured reboot operations")
}

go rebootAsRequired(nodeID, window, lockTTL)
go maintainRebootRequiredMetric(nodeID)
Expand Down
1 change: 1 addition & 0 deletions kured-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ spec:
# - --message-template-drain=Rebooting node %s
# - --start-time=0:00
# - --time-zone=UTC
# - --annotate-nodes=false

0 comments on commit 250b9ba

Please sign in to comment.