From 8956a9676f8867f0a5439e23a07aead171810c05 Mon Sep 17 00:00:00 2001 From: Kevin Nisbet Date: Wed, 30 Sep 2020 12:38:17 -0400 Subject: [PATCH] [7.0.x] scale tuning (#2159) * set value for health_test to prevent runaway cpu usage (#2117) (cherry picked from commit 80857d6dc8645b8e46890191567e1d4bc3b2155f) * [master] change coredns on workers to deployment (#2104) * change coredns on workers to deployment; introduce cluster proportional autoscaler for DNS services * tweak the proportional autoscaler to scale to 0 on worker nodes * remove worker coredns daemonset during upgrade * add critical labels and annotations (cherry picked from commit d2848ec02cbf50e92f7ad5d72a5f50bfc09b96e9) * [master] g1k scaling (#2146) * Use algorithm for calculating number of simaltanous expand operations * set a higher max interval when attempting to join a cluster to reduce load * tuning for large clusters and cloud environments * tuning for large clusters * add godoc for operationsfilter * temp e-ref (at PR) * address review feedback * address review feedback * fix glitch in filter without any type filters * increase default teleport connection limit * update e ref * update e ref (cherry picked from commit 0a63c55ef75244bf27fd9487156ab313a505de8c) * update planet and monitoring-app * bump planet * bump planet * bump dns-app tag * use correct version of monitoring app * update e-ref to merged commit * bump planet --- Makefile | 6 +- assets/dns-app/hooks/entrypoint.sh | 3 + assets/dns-app/resources/dns.yaml | 711 ++++++++++++++++++++++- assets/telekube/resources/app.yaml | 12 + e | 2 +- lib/defaults/defaults.go | 12 +- lib/expand/join.go | 11 +- lib/install/engine/cli/cli.go | 2 +- lib/install/engine/interactive/wizard.go | 2 +- lib/install/phases/coredns.go | 2 +- lib/install/phases/coredns_test.go | 4 +- lib/install/reconfigure/engine.go | 2 +- lib/ops/operatoracl.go | 4 +- lib/ops/ops.go | 147 ++++- lib/ops/ops_test.go | 267 +++++++++ lib/ops/opsclient/opsclient.go | 8 +- lib/ops/opshandler/opshandler.go | 4 +- lib/ops/opsroute/forward.go | 4 +- lib/ops/opsservice/operationgroup.go | 1 + lib/ops/opsservice/report.go | 2 +- lib/ops/opsservice/service.go | 31 +- lib/ops/resources/gravity/gravity.go | 2 +- lib/ops/suite/opssuite.go | 4 +- lib/ops/utils.go | 199 ++++--- lib/process/process.go | 2 +- lib/schema/parse.go | 6 + lib/status/status.go | 2 +- lib/storage/authgateway.go | 2 +- lib/storage/keyval/backend.go | 7 +- lib/storage/keyval/bolt.go | 2 + lib/storage/keyval/etcd.go | 3 + lib/storage/keyval/operations.go | 39 +- lib/update/utils.go | 72 +-- lib/webapi/operations.go | 2 +- tool/gravity/cli/operation.go | 169 +++++- tool/gravity/cli/plan.go | 3 + 36 files changed, 1548 insertions(+), 203 deletions(-) create mode 100644 lib/ops/ops_test.go diff --git a/Makefile b/Makefile index c113f9851d..037990a99f 100644 --- a/Makefile +++ b/Makefile @@ -49,15 +49,15 @@ RELEASE_OUT ?= TELEPORT_TAG = 3.2.16 # TELEPORT_REPOTAG adapts TELEPORT_TAG to the teleport tagging scheme TELEPORT_REPOTAG := v$(TELEPORT_TAG) -PLANET_TAG := 7.0.42-$(K8S_VER_SUFFIX) +PLANET_TAG := 7.0.46-$(K8S_VER_SUFFIX) PLANET_BRANCH := $(PLANET_TAG) K8S_APP_TAG := $(GRAVITY_TAG) TELEKUBE_APP_TAG := $(GRAVITY_TAG) WORMHOLE_APP_TAG := $(GRAVITY_TAG) STORAGE_APP_TAG ?= 0.0.3 LOGGING_APP_TAG ?= 6.0.6 -MONITORING_APP_TAG ?= 7.0.4 -DNS_APP_TAG = 7.0.0 +MONITORING_APP_TAG ?= 7.0.5 +DNS_APP_TAG = 7.0.1 BANDWAGON_TAG ?= 6.0.1 RBAC_APP_TAG := $(GRAVITY_TAG) TILLER_VERSION = 2.15.2 diff --git a/assets/dns-app/hooks/entrypoint.sh b/assets/dns-app/hooks/entrypoint.sh index 542103906a..ca7c1e2814 100755 --- a/assets/dns-app/hooks/entrypoint.sh +++ b/assets/dns-app/hooks/entrypoint.sh @@ -6,6 +6,9 @@ if [ $1 = "update" ]; then echo "Updating resources" rig upsert -f /var/lib/gravity/resources/dns.yaml + echo "Deleting coredns daemonset that has been replaced by a deployment" + rig delete ds/coredns-worker --resource-namespace=kube-system --force + echo "Checking status" rig status $RIG_CHANGESET --retry-attempts=120 --retry-period=1s --debug echo "Freezing" diff --git a/assets/dns-app/resources/dns.yaml b/assets/dns-app/resources/dns.yaml index 13b6438404..f8d7b5b955 100644 --- a/assets/dns-app/resources/dns.yaml +++ b/assets/dns-app/resources/dns.yaml @@ -93,8 +93,10 @@ spec: metadata: labels: k8s-app: kube-dns + gravitational.io/critical-pod: '' annotations: seccomp.security.alpha.kubernetes.io/pod: docker/default + scheduler.alpha.kubernetes.io/critical-pod: '' spec: priorityClassName: system-node-critical serviceAccountName: coredns @@ -104,7 +106,7 @@ spec: gravitational.io/k8s-role: master containers: - name: coredns - image: coredns/coredns:1.2.6 + image: coredns/coredns:1.7.0 imagePullPolicy: IfNotPresent resources: limits: @@ -158,7 +160,7 @@ spec: path: Corefile --- apiVersion: apps/v1 -kind: DaemonSet +kind: Deployment metadata: name: coredns-worker namespace: kube-system @@ -168,6 +170,11 @@ metadata: addonmanager.kubernetes.io/mode: Reconcile kubernetes.io/name: "CoreDNS" spec: + strategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 1 + replicas: 0 selector: matchLabels: k8s-app: kube-dns-worker @@ -175,8 +182,10 @@ spec: metadata: labels: k8s-app: kube-dns-worker + gravitational.io/critical-pod: '' annotations: seccomp.security.alpha.kubernetes.io/pod: docker/default + scheduler.alpha.kubernetes.io/critical-pod: '' spec: priorityClassName: system-node-critical serviceAccountName: coredns @@ -184,9 +193,20 @@ spec: - operator: "Exists" nodeSelector: gravitational.io/k8s-role: node + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: k8s-app + operator: In + values: ["kube-dns"] + topologyKey: kubernetes.io/hostname containers: - name: coredns - image: coredns/coredns:1.2.6 + image: coredns/coredns:1.7.0 imagePullPolicy: IfNotPresent resources: limits: @@ -289,3 +309,688 @@ spec: - name: metrics port: 9153 protocol: TCP +# +# Cluster Proportional Autoscaler +# Scale the coredns deployment proportionally to the cluster size +# https://github.com/kubernetes-sigs/cluster-proportional-autoscaler +# https://github.com/kubernetes-sigs/cluster-proportional-autoscaler/tree/master/examples +--- +kind: ServiceAccount +apiVersion: v1 +metadata: + name: cluster-proportional-autoscaler-coredns + namespace: kube-system +--- +apiVersion: extensions/v1beta1 +kind: PodSecurityPolicy +metadata: + annotations: + seccomp.security.alpha.kubernetes.io/allowedProfileNames: docker/default + seccomp.security.alpha.kubernetes.io/defaultProfileName: docker/default + name: cluster-proportional-autoscaler +spec: + privileged: false + allowPrivilegeEscalation: false + requiredDropCapabilities: + - All + runAsUser: + rule: 'MustRunAsNonRoot' + seLinux: + rule: 'RunAsAny' + supplementalGroups: + rule: 'MustRunAs' + ranges: + - min: 1 + max: 65535 + fsGroup: + rule: 'MustRunAs' + ranges: + - min: 1 + max: 65535 + volumes: + - 'configMap' + - 'emptyDir' + - 'projected' + - 'secret' +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: cluster-proportional-autoscaler-coredns +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["list", "watch"] + - apiGroups: + - policy + resources: + - podsecuritypolicies + verbs: + - use + resourceNames: + - cluster-proportional-autoscaler +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: cluster-proportional-autoscaler-coredns + namespace: kube-system +rules: + - apiGroups: [""] + resources: ["replicationcontrollers/scale"] + verbs: ["get", "update"] + - apiGroups: ["extensions","apps"] + resources: ["deployments/scale", "replicasets/scale"] + verbs: ["get", "update"] + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] + resourceNames: ["autoscaler-coredns-worker", "autoscaler-coredns-master"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: cluster-proportional-autoscaler-coredns +subjects: + - kind: ServiceAccount + name: cluster-proportional-autoscaler-coredns + namespace: kube-system +roleRef: + kind: ClusterRole + name: cluster-proportional-autoscaler-coredns + apiGroup: rbac.authorization.k8s.io +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: cluster-proportional-autoscaler-coredns + namespace: kube-system +subjects: + - kind: ServiceAccount + name: cluster-proportional-autoscaler-coredns + namespace: kube-system +roleRef: + kind: Role + name: cluster-proportional-autoscaler-coredns + apiGroup: rbac.authorization.k8s.io +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: autoscaler-coredns-worker + namespace: kube-system +data: + # When cluster is using large nodes(with more cores), "coresPerReplica" should dominate. + # If using small nodes, "nodesPerReplica" should dominate. + # Parameters based on kubernetes defaults: https://github.com/kubernetes/kubernetes/blob/f65f868aa041e108a73a014df360427c6e05e493/cluster/addons/dns-horizontal-autoscaler/dns-horizontal-autoscaler.yaml + # linear: '{"coresPerReplica":256,"includeUnschedulableNodes":true,"nodesPerReplica":16,"preventSinglePointFailure":true}' + # Use ladder scaling that emulates the linear scale, because we want to scale down to 0 replicas when no workers are + # joined to the cluster. The linear scaler doesn't support 0 replicas. + ladder: |- + { + "coresToReplicas": + [ + [ 0, 0 ], + [ 1, 1 ], + [ 256, 2 ], + [ 512, 3 ], + [ 768, 4 ], + [ 1024, 5 ], + [ 1280, 6 ], + [ 1536, 7 ], + [ 1792, 8 ], + [ 2048, 9 ], + [ 2304, 10 ], + [ 2560, 11 ], + [ 2816, 12 ], + [ 3072, 13 ], + [ 3328, 14 ], + [ 3584, 15 ], + [ 3840, 16 ], + [ 4096, 17 ], + [ 4352, 18 ], + [ 4608, 19 ], + [ 4864, 20 ], + [ 5120, 21 ], + [ 5376, 22 ], + [ 5632, 23 ], + [ 5888, 24 ], + [ 6144, 25 ], + [ 6400, 26 ], + [ 6656, 27 ], + [ 6912, 28 ], + [ 7168, 29 ], + [ 7424, 30 ], + [ 7680, 31 ], + [ 7936, 32 ], + [ 8192, 33 ], + [ 8448, 34 ], + [ 8704, 35 ], + [ 8960, 36 ], + [ 9216, 37 ], + [ 9472, 38 ], + [ 9728, 39 ], + [ 9984, 40 ], + [ 10240, 41 ], + [ 10496, 42 ], + [ 10752, 43 ], + [ 11008, 44 ], + [ 11264, 45 ], + [ 11520, 46 ], + [ 11776, 47 ], + [ 12032, 48 ], + [ 12288, 49 ], + [ 12544, 50 ], + [ 12800, 51 ], + [ 13056, 52 ], + [ 13312, 53 ], + [ 13568, 54 ], + [ 13824, 55 ], + [ 14080, 56 ], + [ 14336, 57 ], + [ 14592, 58 ], + [ 14848, 59 ], + [ 15104, 60 ], + [ 15360, 61 ], + [ 15616, 62 ], + [ 15872, 63 ], + [ 16128, 64 ], + [ 16384, 65 ], + [ 16640, 66 ], + [ 16896, 67 ], + [ 17152, 68 ], + [ 17408, 69 ], + [ 17664, 70 ], + [ 17920, 71 ], + [ 18176, 72 ], + [ 18432, 73 ], + [ 18688, 74 ], + [ 18944, 75 ], + [ 19200, 76 ], + [ 19456, 77 ], + [ 19712, 78 ], + [ 19968, 79 ], + [ 20224, 80 ], + [ 20480, 81 ], + [ 20736, 82 ], + [ 20992, 83 ], + [ 21248, 84 ], + [ 21504, 85 ], + [ 21760, 86 ], + [ 22016, 87 ], + [ 22272, 88 ], + [ 22528, 89 ], + [ 22784, 90 ], + [ 23040, 91 ], + [ 23296, 92 ], + [ 23552, 93 ], + [ 23808, 94 ], + [ 24064, 95 ], + [ 24320, 96 ], + [ 24576, 97 ], + [ 24832, 98 ], + [ 25088, 99 ], + [ 25344, 100 ], + [ 25600, 101 ], + [ 25856, 102 ], + [ 26112, 103 ], + [ 26368, 104 ], + [ 26624, 105 ], + [ 26880, 106 ], + [ 27136, 107 ], + [ 27392, 108 ], + [ 27648, 109 ], + [ 27904, 110 ], + [ 28160, 111 ], + [ 28416, 112 ], + [ 28672, 113 ], + [ 28928, 114 ], + [ 29184, 115 ], + [ 29440, 116 ], + [ 29696, 117 ], + [ 29952, 118 ], + [ 30208, 119 ], + [ 30464, 120 ], + [ 30720, 121 ], + [ 30976, 122 ], + [ 31232, 123 ], + [ 31488, 124 ], + [ 31744, 125 ], + [ 32000, 126 ], + [ 32256, 127 ], + [ 32512, 128 ], + [ 32768, 129 ], + [ 33024, 130 ], + [ 33280, 131 ], + [ 33536, 132 ], + [ 33792, 133 ], + [ 34048, 134 ], + [ 34304, 135 ], + [ 34560, 136 ], + [ 34816, 137 ], + [ 35072, 138 ], + [ 35328, 139 ], + [ 35584, 140 ], + [ 35840, 141 ], + [ 36096, 142 ], + [ 36352, 143 ], + [ 36608, 144 ], + [ 36864, 145 ], + [ 37120, 146 ], + [ 37376, 147 ], + [ 37632, 148 ], + [ 37888, 149 ], + [ 38144, 150 ], + [ 38400, 151 ], + [ 38656, 152 ], + [ 38912, 153 ], + [ 39168, 154 ], + [ 39424, 155 ], + [ 39680, 156 ], + [ 39936, 157 ], + [ 40192, 158 ], + [ 40448, 159 ], + [ 40704, 160 ], + [ 40960, 161 ], + [ 41216, 162 ], + [ 41472, 163 ], + [ 41728, 164 ], + [ 41984, 165 ], + [ 42240, 166 ], + [ 42496, 167 ], + [ 42752, 168 ], + [ 43008, 169 ], + [ 43264, 170 ], + [ 43520, 171 ], + [ 43776, 172 ], + [ 44032, 173 ], + [ 44288, 174 ], + [ 44544, 175 ], + [ 44800, 176 ], + [ 45056, 177 ], + [ 45312, 178 ], + [ 45568, 179 ], + [ 45824, 180 ], + [ 46080, 181 ], + [ 46336, 182 ], + [ 46592, 183 ], + [ 46848, 184 ], + [ 47104, 185 ], + [ 47360, 186 ], + [ 47616, 187 ], + [ 47872, 188 ], + [ 48128, 189 ], + [ 48384, 190 ], + [ 48640, 191 ], + [ 48896, 192 ], + [ 49152, 193 ], + [ 49408, 194 ], + [ 49664, 195 ], + [ 49920, 196 ], + [ 50176, 197 ], + [ 50432, 198 ], + [ 50688, 199 ], + [ 50944, 200 ], + [ 51200, 201 ], + [ 51456, 202 ], + [ 51712, 203 ], + [ 51968, 204 ], + [ 52224, 205 ], + [ 52480, 206 ], + [ 52736, 207 ], + [ 52992, 208 ], + [ 53248, 209 ], + [ 53504, 210 ], + [ 53760, 211 ], + [ 54016, 212 ], + [ 54272, 213 ], + [ 54528, 214 ], + [ 54784, 215 ], + [ 55040, 216 ], + [ 55296, 217 ], + [ 55552, 218 ], + [ 55808, 219 ], + [ 56064, 220 ], + [ 56320, 221 ], + [ 56576, 222 ], + [ 56832, 223 ], + [ 57088, 224 ], + [ 57344, 225 ], + [ 57600, 226 ], + [ 57856, 227 ], + [ 58112, 228 ], + [ 58368, 229 ], + [ 58624, 230 ], + [ 58880, 231 ], + [ 59136, 232 ], + [ 59392, 233 ], + [ 59648, 234 ], + [ 59904, 235 ], + [ 60160, 236 ], + [ 60416, 237 ], + [ 60672, 238 ], + [ 60928, 239 ], + [ 61184, 240 ], + [ 61440, 241 ], + [ 61696, 242 ], + [ 61952, 243 ], + [ 62208, 244 ], + [ 62464, 245 ], + [ 62720, 246 ], + [ 62976, 247 ], + [ 63232, 248 ], + [ 63488, 249 ], + [ 63744, 250 ], + [ 64000, 251 ], + [ 64256, 252 ], + [ 64512, 253 ], + [ 64768, 254 ], + [ 65024, 255 ], + [ 65280, 256 ] + ], + "nodesToReplicas": + [ + [ 0, 0 ], + [ 1, 1 ], + [ 2, 2 ], + [ 32, 3 ], + [ 48, 4 ], + [ 64, 5 ], + [ 80, 6 ], + [ 96, 7 ], + [ 112, 8 ], + [ 128, 9 ], + [ 144, 10 ], + [ 160, 11 ], + [ 176, 12 ], + [ 192, 13 ], + [ 208, 14 ], + [ 224, 15 ], + [ 240, 16 ], + [ 256, 17 ], + [ 272, 18 ], + [ 288, 19 ], + [ 304, 20 ], + [ 320, 21 ], + [ 336, 22 ], + [ 352, 23 ], + [ 368, 24 ], + [ 384, 25 ], + [ 400, 26 ], + [ 416, 27 ], + [ 432, 28 ], + [ 448, 29 ], + [ 464, 30 ], + [ 480, 31 ], + [ 496, 32 ], + [ 512, 33 ], + [ 528, 34 ], + [ 544, 35 ], + [ 560, 36 ], + [ 576, 37 ], + [ 592, 38 ], + [ 608, 39 ], + [ 624, 40 ], + [ 640, 41 ], + [ 656, 42 ], + [ 672, 43 ], + [ 688, 44 ], + [ 704, 45 ], + [ 720, 46 ], + [ 736, 47 ], + [ 752, 48 ], + [ 768, 49 ], + [ 784, 50 ], + [ 800, 51 ], + [ 816, 52 ], + [ 832, 53 ], + [ 848, 54 ], + [ 864, 55 ], + [ 880, 56 ], + [ 896, 57 ], + [ 912, 58 ], + [ 928, 59 ], + [ 944, 60 ], + [ 960, 61 ], + [ 976, 62 ], + [ 992, 63 ], + [ 1008, 64 ], + [ 1024, 65 ], + [ 1040, 66 ], + [ 1056, 67 ], + [ 1072, 68 ], + [ 1088, 69 ], + [ 1104, 70 ], + [ 1120, 71 ], + [ 1136, 72 ], + [ 1152, 73 ], + [ 1168, 74 ], + [ 1184, 75 ], + [ 1200, 76 ], + [ 1216, 77 ], + [ 1232, 78 ], + [ 1248, 79 ], + [ 1264, 80 ], + [ 1280, 81 ], + [ 1296, 82 ], + [ 1312, 83 ], + [ 1328, 84 ], + [ 1344, 85 ], + [ 1360, 86 ], + [ 1376, 87 ], + [ 1392, 88 ], + [ 1408, 89 ], + [ 1424, 90 ], + [ 1440, 91 ], + [ 1456, 92 ], + [ 1472, 93 ], + [ 1488, 94 ], + [ 1504, 95 ], + [ 1520, 96 ], + [ 1536, 97 ], + [ 1552, 98 ], + [ 1568, 99 ], + [ 1584, 100 ], + [ 1600, 101 ], + [ 1616, 102 ], + [ 1632, 103 ], + [ 1648, 104 ], + [ 1664, 105 ], + [ 1680, 106 ], + [ 1696, 107 ], + [ 1712, 108 ], + [ 1728, 109 ], + [ 1744, 110 ], + [ 1760, 111 ], + [ 1776, 112 ], + [ 1792, 113 ], + [ 1808, 114 ], + [ 1824, 115 ], + [ 1840, 116 ], + [ 1856, 117 ], + [ 1872, 118 ], + [ 1888, 119 ], + [ 1904, 120 ], + [ 1920, 121 ], + [ 1936, 122 ], + [ 1952, 123 ], + [ 1968, 124 ], + [ 1984, 125 ], + [ 2000, 126 ], + [ 2016, 127 ], + [ 2032, 128 ], + [ 2048, 129 ], + [ 2064, 130 ], + [ 2080, 131 ], + [ 2096, 132 ], + [ 2112, 133 ], + [ 2128, 134 ], + [ 2144, 135 ], + [ 2160, 136 ], + [ 2176, 137 ], + [ 2192, 138 ], + [ 2208, 139 ], + [ 2224, 140 ], + [ 2240, 141 ], + [ 2256, 142 ], + [ 2272, 143 ], + [ 2288, 144 ], + [ 2304, 145 ], + [ 2320, 146 ], + [ 2336, 147 ], + [ 2352, 148 ], + [ 2368, 149 ], + [ 2384, 150 ], + [ 2400, 151 ], + [ 2416, 152 ], + [ 2432, 153 ], + [ 2448, 154 ], + [ 2464, 155 ], + [ 2480, 156 ], + [ 2496, 157 ], + [ 2512, 158 ], + [ 2528, 159 ], + [ 2544, 160 ], + [ 2560, 161 ], + [ 2576, 162 ], + [ 2592, 163 ], + [ 2608, 164 ], + [ 2624, 165 ], + [ 2640, 166 ], + [ 2656, 167 ], + [ 2672, 168 ], + [ 2688, 169 ], + [ 2704, 170 ], + [ 2720, 171 ], + [ 2736, 172 ], + [ 2752, 173 ], + [ 2768, 174 ], + [ 2784, 175 ], + [ 2800, 176 ], + [ 2816, 177 ], + [ 2832, 178 ], + [ 2848, 179 ], + [ 2864, 180 ], + [ 2880, 181 ], + [ 2896, 182 ], + [ 2912, 183 ], + [ 2928, 184 ], + [ 2944, 185 ], + [ 2960, 186 ], + [ 2976, 187 ], + [ 2992, 188 ], + [ 3008, 189 ], + [ 3024, 190 ], + [ 3040, 191 ], + [ 3056, 192 ], + [ 3072, 193 ], + [ 3088, 194 ], + [ 3104, 195 ], + [ 3120, 196 ], + [ 3136, 197 ], + [ 3152, 198 ], + [ 3168, 199 ], + [ 3184, 200 ], + [ 3200, 201 ], + [ 3216, 202 ], + [ 3232, 203 ], + [ 3248, 204 ], + [ 3264, 205 ], + [ 3280, 206 ], + [ 3296, 207 ], + [ 3312, 208 ], + [ 3328, 209 ], + [ 3344, 210 ], + [ 3360, 211 ], + [ 3376, 212 ], + [ 3392, 213 ], + [ 3408, 214 ], + [ 3424, 215 ], + [ 3440, 216 ], + [ 3456, 217 ], + [ 3472, 218 ], + [ 3488, 219 ], + [ 3504, 220 ], + [ 3520, 221 ], + [ 3536, 222 ], + [ 3552, 223 ], + [ 3568, 224 ], + [ 3584, 225 ], + [ 3600, 226 ], + [ 3616, 227 ], + [ 3632, 228 ], + [ 3648, 229 ], + [ 3664, 230 ], + [ 3680, 231 ], + [ 3696, 232 ], + [ 3712, 233 ], + [ 3728, 234 ], + [ 3744, 235 ], + [ 3760, 236 ], + [ 3776, 237 ], + [ 3792, 238 ], + [ 3808, 239 ], + [ 3824, 240 ], + [ 3840, 241 ], + [ 3856, 242 ], + [ 3872, 243 ], + [ 3888, 244 ], + [ 3904, 245 ], + [ 3920, 246 ], + [ 3936, 247 ], + [ 3952, 248 ], + [ 3968, 249 ], + [ 3984, 250 ], + [ 4000, 251 ], + [ 4016, 252 ], + [ 4032, 253 ], + [ 4048, 254 ], + [ 4064, 255 ], + [ 4080, 256 ] + ] + } +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: autoscaler-coredns-worker + namespace: kube-system + labels: + k8s-app: autoscaler-coredns-worker +spec: + replicas: 1 + selector: + matchLabels: + k8s-app: autoscaler-coredns-worker + template: + metadata: + labels: + k8s-app: autoscaler-coredns-worker + gravitational.io/critical-pod: '' + annotations: + seccomp.security.alpha.kubernetes.io/pod: docker/default + spec: + serviceAccountName: cluster-proportional-autoscaler-coredns + tolerations: + - operator: "Exists" + containers: + - image: k8s.gcr.io/cpa/cluster-proportional-autoscaler-amd64:1.8.3 + name: autoscaler + command: + - /cluster-proportional-autoscaler + - --namespace=kube-system + - --configmap=autoscaler-coredns-worker + - --target=deployment/coredns-worker + - --nodelabels=gravitational.io/k8s-role=node + - --poll-period-seconds=60 + - --logtostderr=true + - --v=2 + securityContext: + # nonroot user is 65532 as per https://github.com/GoogleContainerTools/distroless/issues/235 + runAsUser: 65532 + allowPrivilegeEscalation: false + capabilities: + drop: + - all + readOnlyRootFilesystem: true + runAsNonRoot: true diff --git a/assets/telekube/resources/app.yaml b/assets/telekube/resources/app.yaml index 5b12300044..373d80990f 100644 --- a/assets/telekube/resources/app.yaml +++ b/assets/telekube/resources/app.yaml @@ -64,6 +64,11 @@ installer: nodes: - profile: node count: 6 + - name: "cloud" + description: "cloud install" + nodes: + - profile: gmaster + count: 1 nodeProfiles: - name: node description: "Gravity Auto Node" @@ -184,6 +189,13 @@ nodeProfiles: - c3.2xlarge - c3.4xlarge - i2.2xlarge + - name: gmaster + description: "Gravity Master (not kubernetes master)" + taints: + - key: node-role.kubernetes.io/master + effect: NoSchedule + labels: + gravitational.io/k8s-role: "master" hooks: clusterProvision: job: file://clusterProvision.yaml diff --git a/e b/e index a429ebece1..a0a111d33a 160000 --- a/e +++ b/e @@ -1 +1 @@ -Subproject commit a429ebece145f399ce3c088c8c074d98ac55af54 +Subproject commit a0a111d33ae5358f94069b1832331df6579a35c4 diff --git a/lib/defaults/defaults.go b/lib/defaults/defaults.go index f859d8da2b..82e21dd804 100644 --- a/lib/defaults/defaults.go +++ b/lib/defaults/defaults.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "time" "github.com/gravitational/gravity/lib/constants" @@ -138,9 +139,6 @@ const ( // MaxValidationConcurrency defines a number of validation requests to run concurrently MaxValidationConcurrency = 5 - // MaxExpandConcurrency is the number of servers that can be joining the cluster concurrently - MaxExpandConcurrency = 5 - // DownloadRetryPeriod is the period between failed retry attempts DownloadRetryPeriod = 5 * time.Second @@ -1301,6 +1299,14 @@ var ( AlertmanagerServiceAddr = fmt.Sprintf( "alertmanager-main.monitoring.svc.cluster.local:%v", AlertmanagerServicePort) + + // HelmRegistryVar is the Helm variable that gets passed via --set flag and + // sets the registry variable to the address of internal cluster registry: + // image.registry=leader.telekube.local:5000/ + HelmRegistryVar = fmt.Sprintf("%v=%v/", ImageRegistryVar, DockerRegistry) + + // MaxExpandConcurrency is the number of servers that can be joining the cluster concurrently + MaxExpandConcurrency = (runtime.NumCPU() / 3) + 1 ) // HookSecurityContext returns default securityContext for hook pods diff --git a/lib/expand/join.go b/lib/expand/join.go index d5598a2a1e..9aa7ac95f5 100644 --- a/lib/expand/join.go +++ b/lib/expand/join.go @@ -53,7 +53,6 @@ import ( "github.com/gravitational/gravity/lib/utils" "github.com/cenkalti/backoff" - "github.com/gravitational/coordinate/leader" "github.com/gravitational/roundtrip" "github.com/gravitational/trace" log "github.com/sirupsen/logrus" @@ -763,7 +762,15 @@ type operationContext struct { // For a local gravity cluster, it will attempt to start the expand operation // and will return an operation context wrapping a new expand operation. func (p *Peer) connectLoop() (*operationContext, error) { - ticker := backoff.NewTicker(leader.NewUnlimitedExponentialBackOff()) + + // Lots of joining nodes create load on the gravity-site controller cycling on creating join operations + // Set a high maximum so lots of queued joins don't create too much load + b := backoff.NewExponentialBackOff() + b.Multiplier = 2 + b.MaxElapsedTime = 0 // unlimited timeout + b.MaxInterval = time.Minute + + ticker := backoff.NewTicker(b) defer ticker.Stop() for { select { diff --git a/lib/install/engine/cli/cli.go b/lib/install/engine/cli/cli.go index b5ef16ff70..6786d722f3 100644 --- a/lib/install/engine/cli/cli.go +++ b/lib/install/engine/cli/cli.go @@ -144,7 +144,7 @@ func (r *executor) upsertClusterAndOperation() (*ops.SiteOperation, error) { } else { cluster = &clusters[0] } - operations, err := r.Operator.GetSiteOperations(cluster.Key()) + operations, err := r.Operator.GetSiteOperations(cluster.Key(), ops.OperationsFilter{}) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/install/engine/interactive/wizard.go b/lib/install/engine/interactive/wizard.go index 69329f4177..afa6ec2edb 100644 --- a/lib/install/engine/interactive/wizard.go +++ b/lib/install/engine/interactive/wizard.go @@ -115,7 +115,7 @@ func (r *executor) waitForOperation() (operation *ops.SiteOperation, err error) return trace.NotFound("no clusters created yet") } cluster := clusters[0] - operations, err := r.Operator.GetSiteOperations(cluster.Key()) + operations, err := r.Operator.GetSiteOperations(cluster.Key(), ops.OperationsFilter{}) if err != nil { return trace.Wrap(err, "failed to fetch operations") } diff --git a/lib/install/phases/coredns.go b/lib/install/phases/coredns.go index a93f5101ca..5e6632e9ad 100644 --- a/lib/install/phases/coredns.go +++ b/lib/install/phases/coredns.go @@ -234,7 +234,7 @@ const coreDNSTemplateText = ` }{{end}} {{if .UpstreamNameservers}}forward . {{range $server := .UpstreamNameservers}}{{$server}} {{end}}{ {{if .Rotate}}policy random{{else}}policy sequential{{end}} - health_check 0 + health_check 1s }{{end}} } ` diff --git a/lib/install/phases/coredns_test.go b/lib/install/phases/coredns_test.go index f36a16a3f6..3b84779db6 100644 --- a/lib/install/phases/coredns_test.go +++ b/lib/install/phases/coredns_test.go @@ -74,7 +74,7 @@ func (*StartSuite) TestCoreDNSConf(c *check.C) { } forward . 1.1.1.1 8.8.8.8 { policy sequential - health_check 0 + health_check 1s } } `, @@ -103,7 +103,7 @@ func (*StartSuite) TestCoreDNSConf(c *check.C) { } forward . 1.1.1.1 { policy random - health_check 0 + health_check 1s } } `, diff --git a/lib/install/reconfigure/engine.go b/lib/install/reconfigure/engine.go index 28ca86b5c8..8b7c54d884 100644 --- a/lib/install/reconfigure/engine.go +++ b/lib/install/reconfigure/engine.go @@ -138,7 +138,7 @@ func (e *Engine) upsertClusterAndOperation(ctx context.Context, installer instal } else { cluster = &clusters[0] } - operations, err := e.Operator.GetSiteOperations(cluster.Key()) + operations, err := e.Operator.GetSiteOperations(cluster.Key(), ops.OperationsFilter{}) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/ops/operatoracl.go b/lib/ops/operatoracl.go index add3ac2d58..5076cf090a 100644 --- a/lib/ops/operatoracl.go +++ b/lib/ops/operatoracl.go @@ -425,11 +425,11 @@ func (o *OperatorACL) GetSiteInstructions(tokenID string, serverProfile string, return o.operator.GetSiteInstructions(tokenID, serverProfile, params) } -func (o *OperatorACL) GetSiteOperations(key SiteKey) (SiteOperations, error) { +func (o *OperatorACL) GetSiteOperations(key SiteKey, f OperationsFilter) (SiteOperations, error) { if err := o.ClusterAction(key.SiteDomain, storage.KindCluster, teleservices.VerbRead); err != nil { return nil, trace.Wrap(err) } - return o.operator.GetSiteOperations(key) + return o.operator.GetSiteOperations(key, f) } func (o *OperatorACL) GetSiteOperation(key SiteOperationKey) (*SiteOperation, error) { diff --git a/lib/ops/ops.go b/lib/ops/ops.go index 4fd477658e..63f68ee1e6 100644 --- a/lib/ops/ops.go +++ b/lib/ops/ops.go @@ -1,5 +1,5 @@ /* -Copyright 2018-2019 Gravitational, Inc. +Copyright 2018-2020 Gravitational, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -746,7 +746,7 @@ type Operations interface { GetSiteInstructions(token string, serverProfile string, params url.Values) (string, error) // GetSiteOperations returns a list of operations executed for this site - GetSiteOperations(key SiteKey) (SiteOperations, error) + GetSiteOperations(key SiteKey, filter OperationsFilter) (SiteOperations, error) // CreateSiteInstallOperation initiates install operation for the site // this operation can be currently run only once @@ -855,6 +855,149 @@ type Operations interface { GetOperationPlan(SiteOperationKey) (*storage.OperationPlan, error) } +// OperationsFilter represents a filter to apply to results when listing operations +type OperationsFilter struct { + // Last indicates to only return the last operation + Last bool + + // First indicates to only return the first operation + First bool + + // Complete indicates to only return completed operations + Complete bool + + // Finished indicates to only return finished operations (complete or failed) + Finished bool + + // Active indicate to only return active operations + Active bool + + // Types indicates to only return an operation type (ie OperationExpand) + Types []string +} + +// URLValues converts the filter to a set of URL values that can be passed via the API +func (f OperationsFilter) URLValues() (res url.Values) { + res = url.Values{} + + if f.Last { + res.Add("last", "") + } + + if f.First { + res.Add("first", "") + } + + if f.Complete { + res.Add("complete", "") + } + + if f.Finished { + res.Add("finished", "") + } + + if f.Active { + res.Add("active", "") + } + + if len(f.Types) > 0 { + for _, t := range f.Types { + res.Add("type", t) + } + } + + return +} + +// FilterFromURLValues returns an operations filter based on set URL values +func FilterFromURLValues(v url.Values) (f OperationsFilter) { + if _, ok := v["last"]; ok { + f.Last = true + } + + if _, ok := v["first"]; ok { + f.First = true + } + + if _, ok := v["complete"]; ok { + f.Complete = true + } + + if _, ok := v["finished"]; ok { + f.Finished = true + } + + if _, ok := v["active"]; ok { + f.Active = true + } + + if t, ok := v["type"]; ok { + if len(t) > 0 { + f.Types = t + } + } + + return +} + +// Filter takes a list of operations and filters the results based on the set filter parameters +func (filter OperationsFilter) Filter(in SiteOperations) SiteOperations { + if len(in) == 0 { + return nil + } + + filtered := in + + if len(filter.Types) > 0 || filter.Active || filter.Complete || filter.Finished { + filtered = SiteOperations{} + + for _, value := range in { + if len(filter.Types) > 0 { + drop := true + for _, t := range filter.Types { + if t == value.Type { + drop = false + } + } + if drop { + continue + } + } + + op := SiteOperation(value) + if filter.Active && op.IsFinished() { + continue + } + + if filter.Complete && !op.IsCompleted() { + continue + } + + if filter.Finished && !op.IsFinished() { + continue + } + + filtered = append(filtered, value) + } + } + + if len(filtered) == 0 { + return nil + } + + if filter.First { + // backend is guaranteed to return operations in the last-to-first order + return SiteOperations{filtered[len(filtered)-1]} + } + + if filter.Last { + // backend is guaranteed to return operations in the last-to-first order + return SiteOperations{filtered[0]} + } + + return filtered +} + // LogEntry represents a single log line for an operation type LogEntry struct { // AccountID is the ID of the account for the operation diff --git a/lib/ops/ops_test.go b/lib/ops/ops_test.go new file mode 100644 index 0000000000..a4965507c4 --- /dev/null +++ b/lib/ops/ops_test.go @@ -0,0 +1,267 @@ +/* +Copyright 2020 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ops + +import ( + "github.com/gravitational/gravity/lib/storage" + check "gopkg.in/check.v1" +) + +type OperationsFilterSuite struct{} + +var _ = check.Suite(&OperationsFilterSuite{}) + +func (s *OperationsFilterSuite) TestOperationsFilter(c *check.C) { + tests := []struct { + description string + in SiteOperations + out SiteOperations + filter OperationsFilter + }{ + { + description: "empty filter", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + }, + }, + out: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + }, + }, + filter: OperationsFilter{}, + }, + { + description: "empty input", + filter: OperationsFilter{}, + }, + { + description: "first (most recent is last)", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + }, + }, + out: []storage.SiteOperation{ + { + ID: "op2", + }, + }, + filter: OperationsFilter{ + First: true, + }, + }, + { + description: "last (most recent is last)", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + }, + }, + out: []storage.SiteOperation{ + { + ID: "op1", + }, + }, + filter: OperationsFilter{ + Last: true, + }, + }, + { + description: "finished", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + State: OperationStateCompleted, + }, + { + ID: "op3", + State: OperationStateFailed, + }, + }, + out: []storage.SiteOperation{ + { + ID: "op2", + State: OperationStateCompleted, + }, + { + ID: "op3", + State: OperationStateFailed, + }, + }, + filter: OperationsFilter{ + Finished: true, + }, + }, + { + description: "completed", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + State: OperationStateCompleted, + }, + { + ID: "op3", + }, + }, + out: []storage.SiteOperation{ + { + ID: "op2", + State: OperationStateCompleted, + }, + }, + filter: OperationsFilter{ + Complete: true, + }, + }, + { + description: "active", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + State: OperationStateCompleted, + }, + { + ID: "op3", + }, + }, + out: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op3", + }, + }, + filter: OperationsFilter{ + Active: true, + }, + }, + { + description: "combined complete / last", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + State: OperationStateCompleted, + }, + { + ID: "op3", + State: OperationStateCompleted, + }, + }, + out: []storage.SiteOperation{ + { + ID: "op2", + State: OperationStateCompleted, + }, + }, + filter: OperationsFilter{ + Finished: true, + Last: true, + }, + }, + { + description: "type", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + State: OperationStateCompleted, + Type: OperationUpdate, + }, + { + ID: "op3", + State: OperationStateFailed, + }, + }, + out: []storage.SiteOperation{ + { + ID: "op2", + State: OperationStateCompleted, + Type: OperationUpdate, + }, + }, + filter: OperationsFilter{ + Types: []string{OperationUpdate}, + }, + }, + { + description: "multiple types", + in: []storage.SiteOperation{ + { + ID: "op1", + }, + { + ID: "op2", + State: OperationStateCompleted, + Type: OperationUpdate, + }, + { + ID: "op3", + State: OperationStateFailed, + Type: OperationShrink, + }, + }, + out: []storage.SiteOperation{ + { + ID: "op2", + State: OperationStateCompleted, + Type: OperationUpdate, + }, + { + ID: "op3", + State: OperationStateFailed, + Type: OperationShrink, + }, + }, + filter: OperationsFilter{ + Types: []string{OperationUpdate, OperationShrink}, + }, + }, + } + + for _, tt := range tests { + c.Assert(tt.filter.Filter(tt.in), check.DeepEquals, tt.out, check.Commentf(tt.description)) + } +} diff --git a/lib/ops/opsclient/opsclient.go b/lib/ops/opsclient/opsclient.go index 56cf8beeab..54b728b96d 100644 --- a/lib/ops/opsclient/opsclient.go +++ b/lib/ops/opsclient/opsclient.go @@ -443,9 +443,9 @@ func (c *Client) CheckSiteStatus(ctx context.Context, key ops.SiteKey) error { return trace.Wrap(err) } -func (c *Client) GetSiteOperations(siteKey ops.SiteKey) (ops.SiteOperations, error) { +func (c *Client) GetSiteOperations(siteKey ops.SiteKey, f ops.OperationsFilter) (ops.SiteOperations, error) { out, err := c.Get(context.TODO(), c.Endpoint("accounts", siteKey.AccountID, "sites", siteKey.SiteDomain, "operations", "common"), - url.Values{}) + f.URLValues()) if err != nil { return nil, trace.Wrap(err) } @@ -453,6 +453,10 @@ func (c *Client) GetSiteOperations(siteKey ops.SiteKey) (ops.SiteOperations, err if err := json.Unmarshal(out.Bytes(), &ops); err != nil { return nil, trace.Wrap(err) } + + // Servers that haven't been upgraded may not have filtered the results. + ops = f.Filter(ops) + return ops, nil } diff --git a/lib/ops/opshandler/opshandler.go b/lib/ops/opshandler/opshandler.go index 3ebe5bd3a8..65e9efe62f 100644 --- a/lib/ops/opshandler/opshandler.go +++ b/lib/ops/opshandler/opshandler.go @@ -1355,7 +1355,7 @@ func (h *WebHandler) validateRemoteAccess(w http.ResponseWriter, r *http.Request }] */ func (h *WebHandler) getSiteOperations(w http.ResponseWriter, r *http.Request, p httprouter.Params, context *HandlerContext) error { - operations, err := context.Operator.GetSiteOperations(siteKey(p)) + operations, err := context.Operator.GetSiteOperations(siteKey(p), ops.FilterFromURLValues(r.URL.Query())) if err != nil { return trace.Wrap(err) } @@ -2447,7 +2447,7 @@ func NeedsAuth(devmode bool, backend storage.Backend, operator ops.Operator, aut } func getLastOperation(key ops.SiteKey, operationType string, context *HandlerContext) (*ops.SiteOperation, error) { - operations, err := context.Operator.GetSiteOperations(key) + operations, err := context.Operator.GetSiteOperations(key, ops.OperationsFilter{}) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/ops/opsroute/forward.go b/lib/ops/opsroute/forward.go index d6ef85808d..c0b7905873 100644 --- a/lib/ops/opsroute/forward.go +++ b/lib/ops/opsroute/forward.go @@ -280,12 +280,12 @@ func (r *Router) GetSiteInstructions(tokenID string, serverProfile string, param return client.GetSiteInstructions(tokenID, serverProfile, params) } -func (r *Router) GetSiteOperations(key ops.SiteKey) (ops.SiteOperations, error) { +func (r *Router) GetSiteOperations(key ops.SiteKey, f ops.OperationsFilter) (ops.SiteOperations, error) { client, err := r.PickOperationClient(key.SiteDomain) if err != nil { return nil, trace.Wrap(err) } - return client.GetSiteOperations(key) + return client.GetSiteOperations(key, f) } func (r *Router) GetSiteOperation(key ops.SiteOperationKey) (*ops.SiteOperation, error) { diff --git a/lib/ops/opsservice/operationgroup.go b/lib/ops/opsservice/operationgroup.go index 8aaa1fb05a..1f4fdafcd7 100644 --- a/lib/ops/opsservice/operationgroup.go +++ b/lib/ops/opsservice/operationgroup.go @@ -350,6 +350,7 @@ func (g *operationGroup) onSiteOperationComplete(key ops.SiteOperationKey) error if err != nil { return trace.Wrap(err) } + logger := log.WithField("operation", operation.String()) operations, err := ops.GetActiveOperationsByType(g.siteKey, g.operator, operation.Type) diff --git a/lib/ops/opsservice/report.go b/lib/ops/opsservice/report.go index 04c94e1004..6ea1c5fb00 100644 --- a/lib/ops/opsservice/report.go +++ b/lib/ops/opsservice/report.go @@ -329,7 +329,7 @@ func runCollectors(ctx context.Context, cluster site, dir string) error { } func collectOperationsLogs(site site, dir string) error { - operations, err := site.service.GetSiteOperations(site.key) + operations, err := site.service.GetSiteOperations(site.key, ops.OperationsFilter{}) if err != nil { return trace.Wrap(err, "failed to get cluster operations") } diff --git a/lib/ops/opsservice/service.go b/lib/ops/opsservice/service.go index 5eb833f75a..b6e8904b1f 100644 --- a/lib/ops/opsservice/service.go +++ b/lib/ops/opsservice/service.go @@ -165,6 +165,11 @@ type Operator struct { // FieldLogger allows this operator to log messages log.FieldLogger + + // cachedProvisioningTokenMutex provides a mutex on use of the cached provisioning token. + cachedProvisioningTokenMutex sync.RWMutex + // cachedProvisioningToken holds an in memory cache of which token is the cluster provisioning token. + cachedProvisioningToken string } // New creates an instance of the Operator service @@ -433,6 +438,22 @@ func (o *Operator) CreateProvisioningToken(token storage.ProvisioningToken) erro } func (o *Operator) GetExpandToken(key ops.SiteKey) (*storage.ProvisioningToken, error) { + o.cachedProvisioningTokenMutex.RLock() + cachedToken := o.cachedProvisioningToken + o.cachedProvisioningTokenMutex.RUnlock() + + if cachedToken != "" { + // security: make sure to re-retrieve the token from the backend in case it's been updated or removed + token, err := o.backend().GetProvisioningToken(cachedToken) + if err != nil && !trace.IsNotFound(err) { + return nil, trace.Wrap(err) + } + + if token != nil { + return token, nil + } + } + tokens, err := o.backend().GetSiteProvisioningTokens(key.SiteDomain) if err != nil { return nil, trace.Wrap(err) @@ -441,6 +462,9 @@ func (o *Operator) GetExpandToken(key ops.SiteKey) (*storage.ProvisioningToken, for _, token := range tokens { // return long-lived join token if token.Type == storage.ProvisioningTokenTypeExpand && token.Expires.IsZero() { + o.cachedProvisioningTokenMutex.Lock() + o.cachedProvisioningToken = token.Token + o.cachedProvisioningTokenMutex.Unlock() return &token, nil } } @@ -862,7 +886,7 @@ func (o *Operator) SignSSHKey(req ops.SSHSignRequest) (*ops.SSHSignResponse, err }, nil } -func (o *Operator) GetSiteOperations(key ops.SiteKey) (ops.SiteOperations, error) { +func (o *Operator) GetSiteOperations(key ops.SiteKey, f ops.OperationsFilter) (ops.SiteOperations, error) { _, err := o.openSite(key) if err != nil { return nil, trace.Wrap(err) @@ -871,7 +895,10 @@ func (o *Operator) GetSiteOperations(key ops.SiteKey) (ops.SiteOperations, error if err != nil { return nil, trace.Wrap(err) } - return ops.SiteOperations(operations), nil + + filtered := f.Filter(ops.SiteOperations(operations)) + + return filtered, nil } // GetsiteOperation returns the operation information based on it's key diff --git a/lib/ops/resources/gravity/gravity.go b/lib/ops/resources/gravity/gravity.go index 6bf432d68a..cae5e993b1 100644 --- a/lib/ops/resources/gravity/gravity.go +++ b/lib/ops/resources/gravity/gravity.go @@ -416,7 +416,7 @@ func (r *Resources) GetCollection(req resources.ListRequest) (resources.Collecti } return storageCollection{PersistentStorage: ps}, nil case storage.KindOperation: - operations, err := r.Operator.GetSiteOperations(req.SiteKey) + operations, err := r.Operator.GetSiteOperations(req.SiteKey, ops.OperationsFilter{}) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/ops/suite/opssuite.go b/lib/ops/suite/opssuite.go index bb9f2e0fa1..cbbb1c8d9a 100644 --- a/lib/ops/suite/opssuite.go +++ b/lib/ops/suite/opssuite.go @@ -115,7 +115,7 @@ func (s *OpsSuite) SitesCRUD(c *C) { c.Assert(err, IsNil) c.Assert(sites, DeepEquals, []ops.Site{*site}) - operations, err := s.O.GetSiteOperations(siteKey) + operations, err := s.O.GetSiteOperations(siteKey, ops.OperationsFilter{}) c.Assert(err, IsNil) c.Assert(len(operations), Equals, 0) @@ -149,7 +149,7 @@ func (s *OpsSuite) SitesCRUD(c *C) { c.Assert(err, IsNil) c.Assert(reportStream.Close(), IsNil) - operations, err = s.O.GetSiteOperations(siteKey) + operations, err = s.O.GetSiteOperations(siteKey, ops.OperationsFilter{}) c.Assert(err, IsNil) c.Assert(operations, DeepEquals, ops.SiteOperations{storage.SiteOperation(*op)}) diff --git a/lib/ops/utils.go b/lib/ops/utils.go index 3e53b547cc..b6da1d5cf1 100644 --- a/lib/ops/utils.go +++ b/lib/ops/utils.go @@ -34,11 +34,20 @@ import ( // GetInstallOperation returns an install operation for the specified siteKey func GetInstallOperation(siteKey SiteKey, operator Operator) (op *SiteOperation, progress *ProgressEntry, err error) { - op, progress, err = MatchOperation(siteKey, operator, MatchByType(OperationInstall)) + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Types: []string{OperationInstall}, + }) + if err != nil { + return nil, nil, trace.Wrap(err) + } + + if len(operations) == 0 { + return nil, nil, trace.NotFound("no install operation found for %v", siteKey) + } + + op = (*SiteOperation)(&operations[0]) + progress, err = operator.GetSiteOperationProgress(op.Key()) if err != nil { - if trace.IsNotFound(err) { - return nil, nil, trace.NotFound("no install operation for %v found", siteKey) - } return nil, nil, trace.Wrap(err) } return op, progress, nil @@ -46,11 +55,22 @@ func GetInstallOperation(siteKey SiteKey, operator Operator) (op *SiteOperation, // GetLastUninstallOperation returns the last uninstall operation for the specified siteKey func GetLastUninstallOperation(siteKey SiteKey, operator Operator) (op *SiteOperation, progress *ProgressEntry, err error) { - op, progress, err = MatchOperation(siteKey, operator, MatchByType(OperationUninstall)) + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Types: []string{OperationUninstall}, + Last: true, + }) + if err != nil { + return nil, nil, trace.Wrap(err) + } + + if len(operations) == 0 { + return nil, nil, trace.NotFound("no uninstall operation found for %v", siteKey) + } + + // backend is guaranteed to return operations in the last-to-first order + op = (*SiteOperation)(&operations[0]) + progress, err = operator.GetSiteOperationProgress(op.Key()) if err != nil { - if trace.IsNotFound(err) { - return nil, nil, trace.NotFound("no uninstall operation for %v found", siteKey) - } return nil, nil, trace.Wrap(err) } return op, progress, nil @@ -58,94 +78,121 @@ func GetLastUninstallOperation(siteKey SiteKey, operator Operator) (op *SiteOper // GetLastCompletedUpdateOperation returns the last completed update operation func GetLastCompletedUpdateOperation(siteKey SiteKey, operator Operator) (op *SiteOperation, err error) { - op, _, err = MatchOperation(siteKey, operator, func(op SiteOperation) bool { - return op.Type == OperationUpdate && op.IsCompleted() + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Types: []string{OperationUpdate}, + Last: true, + Complete: true, }) if err != nil { - if trace.IsNotFound(err) { - return nil, trace.NotFound("no update operation for %v found", siteKey) - } return nil, trace.Wrap(err) } - return op, nil + + if len(operations) == 0 { + return nil, trace.NotFound("no completed update operation found for %v", siteKey) + } + + return (*SiteOperation)(&operations[0]), nil } // GetCompletedInstallOperation returns a completed install operation for the specified site func GetCompletedInstallOperation(siteKey SiteKey, operator Operator) (*SiteOperation, error) { - op, entry, err := GetInstallOperation(siteKey, operator) + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Types: []string{OperationInstall}, + Last: true, + Complete: true, + }) if err != nil { return nil, trace.Wrap(err) } - if entry.IsCompleted() { - return op, nil + if len(operations) == 0 { + return nil, trace.NotFound("no completed install operation found for %v", siteKey) } - return nil, trace.NotFound("no completed install operation for %v found", siteKey) + + return (*SiteOperation)(&operations[0]), nil } // GetLastOperation returns the most recent operation and its progress for the specified site -func GetLastOperation(siteKey SiteKey, operator Operator) (*SiteOperation, *ProgressEntry, error) { - operations, err := operator.GetSiteOperations(siteKey) +func GetLastOperation(siteKey SiteKey, operator Operator) (op *SiteOperation, progress *ProgressEntry, err error) { + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Last: true, + }) if err != nil { return nil, nil, trace.Wrap(err) } + if len(operations) == 0 { - return nil, nil, trace.NotFound("no operations found for %v", siteKey) + return nil, nil, trace.NotFound("no operation found for %v", siteKey) } + // backend is guaranteed to return operations in the last-to-first order - lastOperation := (*SiteOperation)(&operations[0]) - progress, err := operator.GetSiteOperationProgress(lastOperation.Key()) + op = (*SiteOperation)(&operations[0]) + progress, err = operator.GetSiteOperationProgress(op.Key()) if err != nil { return nil, nil, trace.Wrap(err) } - return lastOperation, progress, nil + + return op, progress, nil } // GetLastCompletedOperations returns the cluster's last completed operation -func GetLastCompletedOperation(key SiteKey, operator Operator) (*SiteOperation, *ProgressEntry, error) { - operations, err := operator.GetSiteOperations(key) +func GetLastFinishedOperation(siteKey SiteKey, operator Operator) (op *SiteOperation, progress *ProgressEntry, err error) { + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Last: true, + Finished: true, + }) if err != nil { return nil, nil, trace.Wrap(err) } - // more recent operations appear first - for _, operation := range operations { - op := (*SiteOperation)(&operation) - if op.IsFinished() { - progress, err := operator.GetSiteOperationProgress(op.Key()) - if err != nil { - return nil, nil, trace.Wrap(err) - } - return op, progress, nil - } + + if len(operations) == 0 { + return nil, nil, trace.NotFound("no completed operation found for %v", siteKey) } - return nil, nil, trace.NotFound("cluster %v does not have completed operations", - key.SiteDomain) + + // backend is guaranteed to return operations in the last-to-first order + op = (*SiteOperation)(&operations[0]) + progress, err = operator.GetSiteOperationProgress(op.Key()) + if err != nil { + return nil, nil, trace.Wrap(err) + } + + return op, progress, nil } // GetLastUpgradeOperation returns the most recent upgrade operation or NotFound. -func GetLastUpgradeOperation(key SiteKey, operator Operator) (*SiteOperation, error) { - op, _, err := MatchOperation(key, operator, MatchByType(OperationUpdate)) +func GetLastUpgradeOperation(siteKey SiteKey, operator Operator) (*SiteOperation, error) { + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Types: []string{OperationUpdate}, + Last: true, + }) if err != nil { - if trace.IsNotFound(err) { - return nil, trace.NotFound("no upgrade operation for %v found", key) - } return nil, trace.Wrap(err) } - return op, nil + + if len(operations) == 0 { + return nil, trace.NotFound("no upgrade operation found for %v", siteKey) + } + + return (*SiteOperation)(&operations[0]), nil } // GetLastShrinkOperation returns the last shrink operation // // If there're no operations or the last operation is not of type 'shrink', returns NotFound error func GetLastShrinkOperation(siteKey SiteKey, operator Operator) (*SiteOperation, error) { - lastOperation, _, err := GetLastOperation(siteKey, operator) + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Types: []string{OperationShrink}, + Last: true, + }) if err != nil { return nil, trace.Wrap(err) } - if lastOperation.Type != OperationShrink { - return nil, trace.NotFound("the last operation is not shrink: %v", lastOperation) + + if len(operations) == 0 { + return nil, trace.NotFound("no shrink operation found for %v", siteKey) } - return lastOperation, nil + + return (*SiteOperation)(&operations[0]), nil } // GetOperationWithProgress returns the operation and its progress for the provided operation key @@ -162,57 +209,45 @@ func GetOperationWithProgress(opKey SiteOperationKey, operator Operator) (*SiteO } // GetActiveOperations returns a list of currently active cluster operations -func GetActiveOperations(key SiteKey, operator Operator) (active []SiteOperation, err error) { - all, err := operator.GetSiteOperations(key) +func GetActiveOperations(siteKey SiteKey, operator Operator) (active []SiteOperation, err error) { + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Active: true, + }) if err != nil { return nil, trace.Wrap(err) } - for _, op := range all { - operation := (*SiteOperation)(&op) - if !operation.IsFinished() { - active = append(active, *operation) - } + + if len(operations) == 0 { + return nil, trace.NotFound("no active operation found for %v", siteKey) } - if len(active) == 0 { - return nil, trace.NotFound("no operations in progress for %v", key) + + for _, op := range operations { + active = append(active, SiteOperation(op)) } + return active, nil } // GetActiveOperationsByType returns a list of cluster operations of the specified // type that are currently in progress -func GetActiveOperationsByType(key SiteKey, operator Operator, opType string) (result []SiteOperation, err error) { - active, err := GetActiveOperations(key, operator) +func GetActiveOperationsByType(siteKey SiteKey, operator Operator, opType string) (active []SiteOperation, err error) { + operations, err := operator.GetSiteOperations(siteKey, OperationsFilter{ + Types: []string{opType}, + Active: true, + }) if err != nil { return nil, trace.Wrap(err) } - for _, op := range active { - if op.Type == opType { - result = append(result, op) - } - } - return result, nil -} -// MatchOperation returns an operation that matches given match function. -// Returns trace.NotFound if no operation matches -func MatchOperation(siteKey SiteKey, operator Operator, match OperationMatcher) (op *SiteOperation, progress *ProgressEntry, err error) { - operations, err := operator.GetSiteOperations(siteKey) - if err != nil { - return nil, nil, trace.Wrap(err) + if len(operations) == 0 { + return nil, trace.NotFound("no active operation found for %v with type %v", siteKey, opType) } + for _, op := range operations { - if !match(SiteOperation(op)) { - continue - } - operation := (*SiteOperation)(&op) - entry, err := operator.GetSiteOperationProgress(operation.Key()) - if err != nil { - return nil, nil, trace.Wrap(err) - } - return operation, entry, nil + active = append(active, SiteOperation(op)) } - return nil, nil, trace.NotFound("no operation for %v found", siteKey) + + return active, nil } // GetWizardOperation returns the install operation assuming that the diff --git a/lib/process/process.go b/lib/process/process.go index b3ad82c548..f7c75ca9ad 100644 --- a/lib/process/process.go +++ b/lib/process/process.go @@ -2192,7 +2192,7 @@ func (p *Process) ensureClusterState() error { SiteDomain: site.Domain, } - operations, err := p.operator.GetSiteOperations(siteKey) + operations, err := p.operator.GetSiteOperations(siteKey, ops.OperationsFilter{}) if err != nil { return trace.Wrap(err) } diff --git a/lib/schema/parse.go b/lib/schema/parse.go index c9288e394a..5883a34514 100644 --- a/lib/schema/parse.go +++ b/lib/schema/parse.go @@ -195,6 +195,12 @@ func SetDefaults(manifest *Manifest) error { } else if manifest.NodeProfiles[i].Labels[constants.MasterLabel] == constants.True { manifest.NodeProfiles[i].ServiceRole = ServiceRoleMaster } + + // Allow using the gravity labels to indicate the service role and not just the kubernetes labels + // as the kubernetes labels may have undesireable implications + if label, ok := manifest.NodeProfiles[i].Labels[ServiceLabelRole]; ok { + manifest.NodeProfiles[i].ServiceRole = ServiceRole(label) + } } return nil } diff --git a/lib/status/status.go b/lib/status/status.go index 97076ab2c0..963c9c52a4 100644 --- a/lib/status/status.go +++ b/lib/status/status.go @@ -467,7 +467,7 @@ func fetchOperationByID(clusterKey ops.SiteKey, operationID string, operator ops } operation, progress, err = ops.GetOperationWithProgress(opKey, operator) } else { - operation, progress, err = ops.GetLastCompletedOperation(clusterKey, operator) + operation, progress, err = ops.GetLastFinishedOperation(clusterKey, operator) } if err != nil { return trace.Wrap(err) diff --git a/lib/storage/authgateway.go b/lib/storage/authgateway.go index eaa1506f5d..5268dd192c 100644 --- a/lib/storage/authgateway.go +++ b/lib/storage/authgateway.go @@ -105,7 +105,7 @@ func NewAuthGateway(spec AuthGatewaySpecV1) AuthGateway { // DefaultAuthGateway returns auth gateway resource with default parameters. func DefaultAuthGateway() AuthGateway { - var maxConnections int64 = teledefaults.LimiterMaxConnections + var maxConnections int64 = 10000 maxUsers := teledefaults.LimiterMaxConcurrentUsers clientIdleTimeout := teleservices.NewDuration(0) disconnectExpiredCert := teleservices.NewBool(false) diff --git a/lib/storage/keyval/backend.go b/lib/storage/keyval/backend.go index 529efea24d..f090b7c3fe 100644 --- a/lib/storage/keyval/backend.go +++ b/lib/storage/keyval/backend.go @@ -19,17 +19,22 @@ package keyval import ( "encoding/base64" "encoding/json" + "sync" "time" - log "github.com/sirupsen/logrus" + "github.com/gravitational/gravity/lib/storage" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + log "github.com/sirupsen/logrus" ) // backend implements storage interface, it also acts as a codec type backend struct { clockwork.Clock kvengine + + cachedCompleteOperationsMutex sync.RWMutex + cachedCompleteOperations map[string]*storage.SiteOperation } func (b *backend) ttl(t time.Time) time.Duration { diff --git a/lib/storage/keyval/bolt.go b/lib/storage/keyval/bolt.go index d122e1f922..d3ee797186 100644 --- a/lib/storage/keyval/bolt.go +++ b/lib/storage/keyval/bolt.go @@ -57,6 +57,8 @@ func NewBolt(cfg BoltConfig) (storage.Backend, error) { return &backend{ Clock: clock, kvengine: engine, + + cachedCompleteOperations: make(map[string]*storage.SiteOperation), }, nil } diff --git a/lib/storage/keyval/etcd.go b/lib/storage/keyval/etcd.go index c3ea2710d4..548ec61685 100644 --- a/lib/storage/keyval/etcd.go +++ b/lib/storage/keyval/etcd.go @@ -26,6 +26,7 @@ import ( "github.com/gravitational/gravity/lib/defaults" "github.com/gravitational/gravity/lib/state" + "github.com/gravitational/gravity/lib/storage" "github.com/gravitational/gravity/lib/utils" "github.com/cenkalti/backoff" @@ -63,6 +64,8 @@ func NewETCD(cfg ETCDConfig) (*electingBackend, error) { Backend: &backend{ Clock: clock, kvengine: engine, + + cachedCompleteOperations: make(map[string]*storage.SiteOperation), }, Leader: leader, client: engine.client, diff --git a/lib/storage/keyval/operations.go b/lib/storage/keyval/operations.go index 2dfdb6e864..9d2fdbb2e1 100644 --- a/lib/storage/keyval/operations.go +++ b/lib/storage/keyval/operations.go @@ -19,6 +19,7 @@ package keyval import ( "sort" + "github.com/gravitational/gravity/lib/ops" "github.com/gravitational/gravity/lib/storage" "github.com/gravitational/gravity/lib/utils" @@ -57,6 +58,14 @@ func (b *backend) GetSiteOperation(siteDomain, operationID string) (*storage.Sit if operationID == "" { return nil, trace.BadParameter("missing parameter OperationID") } + + b.cachedCompleteOperationsMutex.RLock() + if op, ok := b.cachedCompleteOperations[operationID]; ok { + b.cachedCompleteOperationsMutex.RUnlock() + return op, nil + } + b.cachedCompleteOperationsMutex.RUnlock() + var op storage.SiteOperation if err := b.getVal(b.key(sitesP, siteDomain, operationsP, operationID, valP), &op); err != nil { if trace.IsNotFound(err) { @@ -66,6 +75,14 @@ func (b *backend) GetSiteOperation(siteDomain, operationID string) (*storage.Sit } utils.UTC(&op.Created) utils.UTC(&op.Updated) + + // Operations that are not expected to change in the future are the only operations that are safe to cache + if op.State == ops.OperationStateCompleted { + b.cachedCompleteOperationsMutex.Lock() + b.cachedCompleteOperations[operationID] = &op + b.cachedCompleteOperationsMutex.Unlock() + } + return &op, nil } @@ -96,20 +113,32 @@ func (b *backend) GetSiteOperations(siteDomain string) ([]storage.SiteOperation, } return nil, trace.Wrap(err) } + var out []storage.SiteOperation + var uncachedOperations []string + + b.cachedCompleteOperationsMutex.RLock() for _, id := range ids { - var op storage.SiteOperation - err = b.getVal(b.key(sitesP, siteDomain, operationsP, id, valP), &op) + if op, ok := b.cachedCompleteOperations[id]; ok { + out = append(out, *op) + } else { + uncachedOperations = append(uncachedOperations, id) + } + } + b.cachedCompleteOperationsMutex.RUnlock() + + for _, id := range uncachedOperations { + op, err := b.GetSiteOperation(siteDomain, id) if err != nil { if !trace.IsNotFound(err) { return nil, trace.Wrap(err) } continue } - utils.UTC(&op.Created) - utils.UTC(&op.Updated) - out = append(out, op) + + out = append(out, *op) } + sort.Sort(operationsSorter(out)) return out, nil } diff --git a/lib/update/utils.go b/lib/update/utils.go index 9809e84302..722dbdb930 100644 --- a/lib/update/utils.go +++ b/lib/update/utils.go @@ -32,7 +32,6 @@ import ( "github.com/gravitational/rigging" "github.com/gravitational/trace" log "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -67,20 +66,30 @@ func WaitForEndpoints(ctx context.Context, client corev1.CoreV1Interface, server kubednsLabels := labels.Set{"k8s-app": defaults.KubeDNSLabel} kubednsWorkerLabels := labels.Set{"k8s-app": defaults.KubeDNSWorkerLabel} - // Due to https://github.com/gravitational/gravity.e/issues/3808 the node name we need to match may be inconsistent - // so try to match either possible node name - matchesNode := matchesNode([]string{ - server.AdvertiseIP, - server.Nodename, - }) err := Retry(ctx, func() error { - if (hasEndpoints(client, clusterLabels, existingEndpoint) == nil) && - (hasEndpoints(client, kubednsLabels, matchesNode) == nil || - hasEndpoints(client, kubednsLegacyLabels, matchesNode) == nil || - hasEndpoints(client, kubednsWorkerLabels, matchesNode) == nil) { - return nil + var errors []error + + if err := hasEndpoints(client, clusterLabels); err != nil { + errors = append(errors, trace.Wrap(err)) + } + + if err := hasEndpoints(client, kubednsLegacyLabels); err == nil { + // If this cluster has the legacy dns application, new labels won't be available, and we can exit at + // this point. + return trace.NewAggregate(errors...) + } + + if err := hasEndpoints(client, kubednsLabels); err != nil { + errors = append(errors, trace.Wrap(err)) } - return trace.NotFound("endpoints not ready") + + if server.ClusterRole == string(schema.ServiceRoleNode) { + if err := hasEndpoints(client, kubednsWorkerLabels); err != nil { + errors = append(errors, trace.Wrap(err)) + } + } + + return trace.NewAggregate(errors...) }, defaults.EndpointsWaitTimeout) return trace.Wrap(err) } @@ -107,7 +116,7 @@ func SplitServers(servers []storage.UpdateServer) (masters, nodes []storage.Upda return masters, nodes } -func hasEndpoints(client corev1.CoreV1Interface, labels labels.Set, fn endpointMatchFn) error { +func hasEndpoints(client corev1.CoreV1Interface, labels labels.Set) error { list, err := client.Endpoints(metav1.NamespaceSystem).List( metav1.ListOptions{ LabelSelector: labels.String(), @@ -119,11 +128,8 @@ func hasEndpoints(client corev1.CoreV1Interface, labels labels.Set, fn endpointM } for _, endpoint := range list.Items { for _, subset := range endpoint.Subsets { - for _, addr := range subset.Addresses { - log.WithField("addr", addr).Debug("Trying endpoint.") - if fn(addr) { - return nil - } + if len(subset.Addresses) > 0 { + return nil } } } @@ -131,34 +137,6 @@ func hasEndpoints(client corev1.CoreV1Interface, labels labels.Set, fn endpointM return trace.NotFound("no active endpoints found for query %q", labels) } -// matchesNode is a predicate that matches an endpoint address to the specified -// node name -func matchesNode(nodes []string) endpointMatchFn { - return func(addr v1.EndpointAddress) bool { - // Abort if the node name is not populated. - // There is no need to wait for endpoints we cannot - // match to a node. - if addr.NodeName == nil { - return false - } - - for _, node := range nodes { - if *addr.NodeName == node { - return true - } - } - return false - } -} - -// existingEndpoint is a trivial predicate that matches for any endpoint. -func existingEndpoint(v1.EndpointAddress) bool { - return true -} - -// endpointMatchFn matches an endpoint address using custom criteria. -type endpointMatchFn func(addr v1.EndpointAddress) bool - func formatOperation(op ops.SiteOperation) string { return fmt.Sprintf("operation(%v(%v), cluster=%v, created=%v)", op.TypeString(), op.ID, op.SiteDomain, op.Created.Format(constants.ShortDateFormat)) diff --git a/lib/webapi/operations.go b/lib/webapi/operations.go index 088631e582..3a1f0f4c45 100644 --- a/lib/webapi/operations.go +++ b/lib/webapi/operations.go @@ -155,7 +155,7 @@ func (m *Handler) validateServers(w http.ResponseWriter, r *http.Request, p http func (m *Handler) getOperations(w http.ResponseWriter, r *http.Request, p httprouter.Params, context *AuthContext) (interface{}, error) { siteDomain := p[0].Value siteKey := ops.SiteKey{AccountID: context.User.GetAccountID(), SiteDomain: siteDomain} - operations, err := context.Operator.GetSiteOperations(siteKey) + operations, err := context.Operator.GetSiteOperations(siteKey, ops.FilterFromURLValues(r.URL.Query())) if err != nil { return nil, trace.Wrap(err) } diff --git a/tool/gravity/cli/operation.go b/tool/gravity/cli/operation.go index c7b4c6a245..23d30f7dfb 100644 --- a/tool/gravity/cli/operation.go +++ b/tool/gravity/cli/operation.go @@ -286,18 +286,21 @@ func completeClusterOperationPlan(localEnv *localenv.LocalEnvironment, operation // getLastOperation returns the last operation found across the specified backends. // If no operation is found, the returned error will indicate a not found operation func getLastOperation(localEnv *localenv.LocalEnvironment, environ LocalEnvironmentFactory, operationID string) (*clusterOperation, error) { - operations, err := getBackendOperations(localEnv, environ, operationID) - if err != nil { - return nil, trace.Wrap(err) - } - log.WithField("operations", operationList(operations).String()).Debug("Fetched backend operations.") - if len(operations) == 0 { - if operationID != "" { + b := newBackendOperations() + + if operationID != "" { + op := b.GetOperationById(localEnv, environ, operationID) + if op == nil { return nil, newOperationNotFound("no operation with ID %v found", operationID) } + } + + op := b.GetLastOperation(localEnv, environ) + if op == nil { return nil, newOperationNotFound("no operation found") } - return &operations[0], nil + + return op, nil } func getActiveOperation(localEnv *localenv.LocalEnvironment, environ LocalEnvironmentFactory, operationID string) (*clusterOperation, error) { @@ -311,39 +314,145 @@ func getActiveOperation(localEnv *localenv.LocalEnvironment, environ LocalEnviro return operation, nil } -// getBackendOperations returns the list of operation from the specified backends -// in descending order (sorted by creation time) -func getBackendOperations(localEnv *localenv.LocalEnvironment, environ LocalEnvironmentFactory, operationID string) (result []clusterOperation, err error) { - b := newBackendOperations() - b.List(localEnv, environ) - for _, op := range b.operations { - if operationID == "" || operationID == op.ID { - result = append(result, op) - } +func newBackendOperations() backendOperations { + return backendOperations{ + operations: make(map[string]clusterOperation), + } +} + +func (r *backendOperations) getLastOperationFromCluster(localEnv *localenv.LocalEnvironment) (*clusterOperation, error) { + clusterEnv, err := localEnv.NewClusterEnvironment(localenv.WithEtcdTimeout(1 * time.Second)) + if err != nil { + return nil, trace.Wrap(err) } - sort.Slice(result, func(i, j int) bool { - return result[i].Created.After(result[j].Created) + + if clusterEnv == nil { + return nil, trace.NotFound("clusterEnv not available") + } + + sites, err := clusterEnv.Operator.GetSites(defaults.SystemAccountID) + if err != nil { + return nil, trace.Wrap(err) + } + if len(sites) == 0 { + return nil, trace.NotFound("no clusters found") + } + + operations, err := clusterEnv.Operator.GetSiteOperations(ops.SiteKey{ + AccountID: defaults.SystemAccountID, + SiteDomain: sites[0].Domain, + }, ops.OperationsFilter{ + Last: true, }) - return result, nil + if err != nil { + return nil, trace.Wrap(err) + } + + if len(operations) == 0 { + return nil, trace.NotFound("no operations found") + } + + plan, err := clusterEnv.Operator.GetOperationPlan(ops.SiteOperationKey{ + AccountID: defaults.SystemAccountID, + SiteDomain: sites[0].Domain, + OperationID: operations[0].ID, + }) + if err != nil && !trace.IsNotFound(err) { + return nil, trace.Wrap(err).AddField("operationId", operations[0].ID) + } + + return &clusterOperation{ + SiteOperation: ops.SiteOperation(operations[0]), + hasPlan: plan != nil, + }, nil } -func newBackendOperations() backendOperations { - return backendOperations{ - operations: make(map[string]clusterOperation), +func (r *backendOperations) GetLastOperation(localEnv *localenv.LocalEnvironment, environ LocalEnvironmentFactory) *clusterOperation { + r.importLocal(environ) + + clusterOp, err := r.getLastOperationFromCluster(localEnv) + if err != nil { + log.WithError(err).Warn("Failed to request operation from cluster.") + } + + if len(r.operations) == 0 { + return clusterOp } + + operations := []clusterOperation{} + for _, v := range r.operations { + operations = append(operations, v) + } + + sort.Slice(operations, func(i, j int) bool { + return operations[i].Created.After(operations[j].Created) + }) + + if clusterOp != nil && clusterOp.Created.After(operations[0].Created) { + return clusterOp + } + + return &operations[0] } -func (r *backendOperations) List(localEnv *localenv.LocalEnvironment, environ LocalEnvironmentFactory) { +func (r *backendOperations) getOperationByIDFromCluster(localEnv *localenv.LocalEnvironment, operationID string) (*clusterOperation, error) { clusterEnv, err := localEnv.NewClusterEnvironment(localenv.WithEtcdTimeout(1 * time.Second)) if err != nil { - log.WithError(err).Debug("Failed to create cluster environment.") + return nil, trace.Wrap(err) } - if clusterEnv != nil { - err = r.init(clusterEnv.Backend) - if err != nil { - log.WithError(err).Debug("Failed to query cluster operations.") - } + + if clusterEnv == nil { + return nil, trace.NotFound("clusterEnv not available") + } + + sites, err := clusterEnv.Operator.GetSites(defaults.SystemAccountID) + if err != nil { + return nil, trace.Wrap(err) + } + if len(sites) == 0 { + return nil, trace.NotFound("no clusters found") + } + + operation, err := clusterEnv.Operator.GetSiteOperation(ops.SiteOperationKey{ + AccountID: defaults.SystemAccountID, + SiteDomain: sites[0].Domain, + OperationID: operationID, + }) + if err != nil { + return nil, trace.Wrap(err) } + + plan, err := clusterEnv.Operator.GetOperationPlan(ops.SiteOperationKey{ + AccountID: defaults.SystemAccountID, + SiteDomain: sites[0].Domain, + OperationID: operation.ID, + }) + if err != nil && !trace.IsNotFound(err) { + return nil, trace.Wrap(err).AddField("operationId", operation.ID) + } + + return &clusterOperation{ + SiteOperation: *operation, + hasPlan: plan != nil, + }, nil +} + +func (r *backendOperations) GetOperationById(localEnv *localenv.LocalEnvironment, environ LocalEnvironmentFactory, operationId string) *clusterOperation { + r.importLocal(environ) + + if op, ok := r.operations[operationId]; ok { + return &op + } + + clusterOp, err := r.getOperationByIDFromCluster(localEnv, operationId) + if err != nil { + log.WithError(err).Warn("Failed to request operation from cluster.") + } + + return clusterOp +} + +func (r *backendOperations) importLocal(environ LocalEnvironmentFactory) { if environ == nil { return } diff --git a/tool/gravity/cli/plan.go b/tool/gravity/cli/plan.go index e7ad53745c..0e1e978387 100644 --- a/tool/gravity/cli/plan.go +++ b/tool/gravity/cli/plan.go @@ -81,12 +81,15 @@ func displayOperationPlan(localEnv *localenv.LocalEnvironment, environ LocalEnvi } return trace.Wrap(err) } + if isInvalidOperation(*op) { return trace.BadParameter(invalidOperationBanner, op.String(), op.ID) } + if op.IsCompleted() && op.hasPlan { return displayClusterOperationPlan(localEnv, op.Key(), opts) } + switch op.Type { case ops.OperationInstall, ops.OperationReconfigure: err = displayInstallOperationPlan(localEnv, op.Key(), opts)