diff --git a/.github/config/values.yaml b/.github/config/values.yaml index ec91a88..0601f44 100644 --- a/.github/config/values.yaml +++ b/.github/config/values.yaml @@ -2,5 +2,5 @@ clusterDefaults: version: latest images: galaxysql: pxc-registry.cn-shanghai.cr.aliyuncs.com/polardbx/polardbx-sql - galaxyengine: pxc-registry.cn-shanghai.cr.aliyuncs.com/polardbx/polardbx-engine + galaxyengine: pxc-registry.cn-shanghai.cr.aliyuncs.com/polardbx/polardbx-engine-2.0 galaxycdc: pxc-registry.cn-shanghai.cr.aliyuncs.com/polardbx/polardbx-cdc \ No newline at end of file diff --git a/CHANGELOG/README.md b/CHANGELOG/README.md index 9c53bbc..3297189 100644 --- a/CHANGELOG/README.md +++ b/CHANGELOG/README.md @@ -1,5 +1,16 @@ # Changelog +## 2023-04-12 + +Release v1.4.1 + ++ Enhancement & New Features + + Support PolarDB-X Engine 2.0.0 + + Remove single download proxy for consensus logs in point-in-time recovery(PITR) ++ Bug Fix + + Fix readonly pxc when shareGms is `true` + + Fix Compute Nodes metadata inconsistency when upgrading frequently + ## 2023-03-23 Release v1.4.0 diff --git a/api/v1/polardbx/config.go b/api/v1/polardbx/config.go index 1e96d67..b65be1b 100644 --- a/api/v1/polardbx/config.go +++ b/api/v1/polardbx/config.go @@ -22,7 +22,8 @@ import ( ) type CNStaticConfig struct { - AttendHtap bool `json:"AttendHtap,omitempty"` + AttendHtap bool `json:"AttendHtap,omitempty"` + // +kubebuilder:default=true EnableCoroutine bool `json:"EnableCoroutine,omitempty"` EnableReplicaRead bool `json:"EnableReplicaRead,omitempty"` EnableJvmRemoteDebug bool `json:"EnableJvmRemoteDebug,omitempty"` @@ -58,6 +59,10 @@ type CDCConfig struct { Envs map[string]intstr.IntOrString `json:"envs,omitempty"` } +type ColumnarConfig struct { + Envs map[string]intstr.IntOrString `json:"envs,omitempty"` +} + type Config struct { // CN config. CN CNConfig `json:"cn,omitempty"` @@ -67,4 +72,7 @@ type Config struct { // CDC config CDC CDCConfig `json:"cdc,omitempty"` + + // Columnar config + Columnar ColumnarConfig `json:"columnar,omitempty"` } diff --git a/api/v1/polardbx/status.go b/api/v1/polardbx/status.go index d351ce5..547ff8d 100644 --- a/api/v1/polardbx/status.go +++ b/api/v1/polardbx/status.go @@ -98,6 +98,8 @@ type ReplicaStatusForPrint struct { DN string `json:"dn,omitempty"` // CDC status, e.g. 1/2 or - when CDC nodes not requested. CDC string `json:"cdc,omitempty"` + // Columnar status, e.g. 1/2 or - when Columnar nodes not requested. + Columnar string `json:"columnar,omitempty"` } // StatusForPrint represents the overall printable status of the cluster. @@ -151,6 +153,9 @@ type ClusterReplicasStatus struct { // CDC defines the replica status for CDC. CDC *ReplicasStatus `json:"cdc,omitempty"` + + // Columnar defines the replica status for CDC. + Columnar *ReplicasStatus `json:"columnar,omitempty"` } // PitrStatus represents the status ot pitr restore diff --git a/api/v1/polardbx/topology.go b/api/v1/polardbx/topology.go index ccbc415..e59a41a 100644 --- a/api/v1/polardbx/topology.go +++ b/api/v1/polardbx/topology.go @@ -64,10 +64,11 @@ type XStoreTopologyRule struct { } type TopologyRuleComponents struct { - CN []StatelessTopologyRuleItem `json:"cn,omitempty"` - CDC []StatelessTopologyRuleItem `json:"cdc,omitempty"` - GMS *XStoreTopologyRule `json:"gms,omitempty"` - DN *XStoreTopologyRule `json:"dn,omitempty"` + CN []StatelessTopologyRuleItem `json:"cn,omitempty"` + CDC []StatelessTopologyRuleItem `json:"cdc,omitempty"` + Columnar []StatelessTopologyRuleItem `json:"columnar,omitempty"` + GMS *XStoreTopologyRule `json:"gms,omitempty"` + DN *XStoreTopologyRule `json:"dn,omitempty"` } type TopologyRules struct { @@ -157,6 +158,28 @@ type CDCTemplate struct { Resources corev1.ResourceRequirements `json:"resources,omitempty"` } +type ColumnarTemplate struct { + // Image for Columnar. Should be replaced by default value if not present. + // +optional + Image string `json:"image,omitempty"` + + // ImagePullPolicy describes a policy for if/when to pull a container image (especially + // for the engine container). + ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` + + // ImagePullSecrets represents the secrets for pulling private images. + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + + // HostNetwork mode. + // +optional + HostNetwork bool `json:"hostNetwork,omitempty"` + + // +kubebuilder:default={limits: {cpu: 4, memory: "8Gi"}} + + // Resources. Default is limits of 4 cpu and 8Gi memory. + Resources corev1.ResourceRequirements `json:"resources,omitempty"` +} + type TopologyNodeGMS struct { // Template of GMS xstore. If not provided, the operator will use // the template for DN as template for GMS. @@ -200,6 +223,17 @@ type TopologyNodeCDC struct { Template CDCTemplate `json:"template,omitempty"` } +type TopologyNodeColumnar struct { + // +kubebuilder:default=2 + // +kubebuilder:validation:Minimum=0 + + Replicas int32 `json:"replicas,omitempty"` + + // +kubebuilder:default={resources: {limits: {cpu: 4, memory: "8Gi"}}} + + Template ColumnarTemplate `json:"template,omitempty"` +} + type TopologyNodes struct { GMS TopologyNodeGMS `json:"gms,omitempty"` @@ -210,6 +244,8 @@ type TopologyNodes struct { DN TopologyNodeDN `json:"dn,omitempty"` CDC *TopologyNodeCDC `json:"cdc,omitempty"` + + Columnar *TopologyNodeColumnar `json:"columnar,omitempty"` } type Topology struct { diff --git a/api/v1/polardbx/zz_generated.deepcopy.go b/api/v1/polardbx/zz_generated.deepcopy.go index 29a562f..be094e0 100644 --- a/api/v1/polardbx/zz_generated.deepcopy.go +++ b/api/v1/polardbx/zz_generated.deepcopy.go @@ -190,6 +190,11 @@ func (in *ClusterReplicasStatus) DeepCopyInto(out *ClusterReplicasStatus) { *out = new(ReplicasStatus) **out = **in } + if in.Columnar != nil { + in, out := &in.Columnar, &out.Columnar + *out = new(ReplicasStatus) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterReplicasStatus. @@ -202,6 +207,49 @@ func (in *ClusterReplicasStatus) DeepCopy() *ClusterReplicasStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ColumnarConfig) DeepCopyInto(out *ColumnarConfig) { + *out = *in + if in.Envs != nil { + in, out := &in.Envs, &out.Envs + *out = make(map[string]intstr.IntOrString, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ColumnarConfig. +func (in *ColumnarConfig) DeepCopy() *ColumnarConfig { + if in == nil { + return nil + } + out := new(ColumnarConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ColumnarTemplate) DeepCopyInto(out *ColumnarTemplate) { + *out = *in + if in.ImagePullSecrets != nil { + in, out := &in.ImagePullSecrets, &out.ImagePullSecrets + *out = make([]v1.LocalObjectReference, len(*in)) + copy(*out, *in) + } + in.Resources.DeepCopyInto(&out.Resources) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ColumnarTemplate. +func (in *ColumnarTemplate) DeepCopy() *ColumnarTemplate { + if in == nil { + return nil + } + out := new(ColumnarTemplate) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Condition) DeepCopyInto(out *Condition) { *out = *in @@ -228,6 +276,7 @@ func (in *Config) DeepCopyInto(out *Config) { in.CN.DeepCopyInto(&out.CN) in.DN.DeepCopyInto(&out.DN) in.CDC.DeepCopyInto(&out.CDC) + in.Columnar.DeepCopyInto(&out.Columnar) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Config. @@ -655,6 +704,22 @@ func (in *TopologyNodeCN) DeepCopy() *TopologyNodeCN { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TopologyNodeColumnar) DeepCopyInto(out *TopologyNodeColumnar) { + *out = *in + in.Template.DeepCopyInto(&out.Template) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TopologyNodeColumnar. +func (in *TopologyNodeColumnar) DeepCopy() *TopologyNodeColumnar { + if in == nil { + return nil + } + out := new(TopologyNodeColumnar) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TopologyNodeDN) DeepCopyInto(out *TopologyNodeDN) { *out = *in @@ -702,6 +767,11 @@ func (in *TopologyNodes) DeepCopyInto(out *TopologyNodes) { *out = new(TopologyNodeCDC) (*in).DeepCopyInto(*out) } + if in.Columnar != nil { + in, out := &in.Columnar, &out.Columnar + *out = new(TopologyNodeColumnar) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TopologyNodes. @@ -731,6 +801,13 @@ func (in *TopologyRuleComponents) DeepCopyInto(out *TopologyRuleComponents) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Columnar != nil { + in, out := &in.Columnar, &out.Columnar + *out = make([]StatelessTopologyRuleItem, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.GMS != nil { in, out := &in.GMS, &out.GMS *out = new(XStoreTopologyRule) diff --git a/api/v1/polardbxcluster_types.go b/api/v1/polardbxcluster_types.go index 9e5dda4..9fd14a8 100644 --- a/api/v1/polardbxcluster_types.go +++ b/api/v1/polardbxcluster_types.go @@ -141,6 +141,7 @@ type PolarDBXClusterStatus struct { // +kubebuilder:printcolumn:name="CN",type=string,JSONPath=`.status.statusForPrint.replicaStatus.cn` // +kubebuilder:printcolumn:name="DN",type=string,JSONPath=`.status.statusForPrint.replicaStatus.dn` // +kubebuilder:printcolumn:name="CDC",type=string,JSONPath=`.status.statusForPrint.replicaStatus.cdc` +// +kubebuilder:printcolumn:name="COLUMNAR",type=string,JSONPath=`.status.statusForPrint.replicaStatus.columnar` // +kubebuilder:printcolumn:name="PHASE",type=string,JSONPath=`.status.phase` // +kubebuilder:printcolumn:name="DISK",type=string,JSONPath=`.status.statusForPrint.storageSize` // +kubebuilder:printcolumn:name="STAGE",type=string,priority=1,JSONPath=`.status.stage` diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b5d7ee4..4d68030 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -1155,6 +1155,11 @@ func (in *SystemTaskSpec) DeepCopyInto(out *SystemTaskSpec) { *out = *in in.CnResources.DeepCopyInto(&out.CnResources) in.DnResources.DeepCopyInto(&out.DnResources) + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SystemTaskSpec. diff --git a/build/images/xstore-tools/Dockerfile b/build/images/xstore-tools/Dockerfile index eee997f..254ee56 100644 --- a/build/images/xstore-tools/Dockerfile +++ b/build/images/xstore-tools/Dockerfile @@ -60,10 +60,12 @@ ADD third-party third-party RUN CGO_ENABLED=0 GOOS=linux GO111MODULE=on go build -tags polardbx -o polardbx-binlog cmd/polardbx-binlog/main.go # Build filestream tools RUN CGO_ENABLED=0 GOOS=linux GO111MODULE=on go build -o polardbx-filestream-client cmd/polardbx-filestream-cli/main.go +# Build polardbx-job +RUN CGO_ENABLED=0 GOOS=linux GO111MODULE=on go build -tags polardbx -o polardbx-job cmd/polardbx-job/main.go # Build the image with scripts -FROM polardbx/xstore-tools-base:v1.3.0 +FROM polardbx/xstore-tools-base:v1.4.1 ARG VERSION=test @@ -77,6 +79,7 @@ COPY --from=builder /tools/xstore/current/__pycache__ __pycache__ COPY --from=gobuilder /workspace/polardbx-binlog bb RUN mkdir -p /tools/xstore/current/bin COPY --from=gobuilder /workspace/polardbx-filestream-client /tools/xstore/current/bin/ +COPY --from=gobuilder /workspace/polardbx-job /tools/xstore/current/bin/ # Copy all scripts RUN ln -s cli.py xsl && chmod +x xsl diff --git a/charts/polardbx-logcollector/Chart.yaml b/charts/polardbx-logcollector/Chart.yaml index 21f6565..6db8a1a 100644 --- a/charts/polardbx-logcollector/Chart.yaml +++ b/charts/polardbx-logcollector/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: polardbx-logcollector description: Helm chart of polardbx-operator logcollector plugin type: application -version: 1.4.0 -appVersion: v1.4.0 +version: 1.4.1 +appVersion: v1.4.1 keywords: - polardb-x - operator diff --git a/charts/polardbx-monitor/Chart.yaml b/charts/polardbx-monitor/Chart.yaml index d7e2bf9..061cdb4 100644 --- a/charts/polardbx-monitor/Chart.yaml +++ b/charts/polardbx-monitor/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: polardbx-monitor description: Helm chart of polardbx-operator monitor plugin type: application -version: 1.4.0 -appVersion: v1.4.0 +version: 1.4.1 +appVersion: v1.4.1 keywords: - polardb-x - operator diff --git a/charts/polardbx-operator/Chart.yaml b/charts/polardbx-operator/Chart.yaml index 44b7c92..3a0bcb9 100644 --- a/charts/polardbx-operator/Chart.yaml +++ b/charts/polardbx-operator/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: polardbx-operator description: Helm chart of polardbx-operator type: application -version: 1.4.0 -appVersion: v1.4.0 +version: 1.4.1 +appVersion: v1.4.1 keywords: - polardb-x - operator diff --git a/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxbackups.yaml b/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxbackups.yaml index 9c6a4bf..114c982 100644 --- a/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxbackups.yaml +++ b/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxbackups.yaml @@ -179,6 +179,7 @@ spec: AttendHtap: type: boolean EnableCoroutine: + default: true type: boolean EnableJvmRemoteDebug: type: boolean @@ -205,6 +206,17 @@ spec: type: object type: object type: object + columnar: + description: Columnar config + properties: + envs: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + type: object dn: description: DN config. properties: @@ -633,6 +645,84 @@ spec: type: object type: object type: object + columnar: + properties: + replicas: + default: 2 + format: int32 + minimum: 0 + type: integer + template: + default: + resources: + limits: + cpu: 4 + memory: 8Gi + properties: + hostNetwork: + description: HostNetwork mode. + type: boolean + image: + description: Image for Columnar. Should be replaced + by default value if not present. + type: string + imagePullPolicy: + description: ImagePullPolicy describes a policy + for if/when to pull a container image (especially + for the engine container). + type: string + imagePullSecrets: + description: ImagePullSecrets represents the secrets + for pulling private images. + items: + description: LocalObjectReference contains enough + information to let you locate the referenced + object inside the same namespace. + properties: + name: + description: 'Name of the referent. More + info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, + kind, uid?' + type: string + type: object + type: array + resources: + default: + limits: + cpu: 4 + memory: 8Gi + description: Resources. Default is limits of 4 + cpu and 8Gi memory. + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: 'Limits describes the maximum + amount of compute resources allowed. More + info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: 'Requests describes the minimum + amount of compute resources required. If + Requests is omitted for a container, it + defaults to Limits if that is explicitly + specified, otherwise to an implementation-defined + value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + type: object + type: object + type: object + type: object dn: default: replicas: 2 @@ -1120,6 +1210,139 @@ spec: type: object type: object type: array + columnar: + items: + properties: + name: + type: string + replicas: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + selector: + properties: + reference: + type: string + selector: + description: A node selector represents + the union of the results of one or more + label queries over a set of nodes; that + is, it represents the OR of the selectors + represented by the node selector terms. + properties: + nodeSelectorTerms: + description: Required. A list of node + selector terms. The terms are ORed. + items: + description: A null or empty node + selector term matches no objects. + The requirements of them are ANDed. + The TopologySelectorTerm type implements + a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector + requirements by node's labels. + items: + description: A node selector + requirement is a selector + that contains values, a key, + and an operator that relates + the key and values. + properties: + key: + description: The label key + that the selector applies + to. + type: string + operator: + description: Represents + a key's relationship to + a set of values. Valid + operators are In, NotIn, + Exists, DoesNotExist. + Gt, and Lt. + type: string + values: + description: An array of + string values. If the + operator is In or NotIn, + the values array must + be non-empty. If the operator + is Exists or DoesNotExist, + the values array must + be empty. If the operator + is Gt or Lt, the values + array must have a single + element, which will be + interpreted as an integer. + This array is replaced + during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector + requirements by node's fields. + items: + description: A node selector + requirement is a selector + that contains values, a key, + and an operator that relates + the key and values. + properties: + key: + description: The label key + that the selector applies + to. + type: string + operator: + description: Represents + a key's relationship to + a set of values. Valid + operators are In, NotIn, + Exists, DoesNotExist. + Gt, and Lt. + type: string + values: + description: An array of + string values. If the + operator is In or NotIn, + the values array must + be non-empty. If the operator + is Exists or DoesNotExist, + the values array must + be empty. If the operator + is Gt or Lt, the values + array must have a single + element, which will be + interpreted as an integer. + This array is replaced + during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object + type: object + type: array dn: properties: nodeSets: diff --git a/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxclusters.yaml b/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxclusters.yaml index 2d2a760..0242962 100644 --- a/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxclusters.yaml +++ b/charts/polardbx-operator/crds/polardbx.aliyun.com_polardbxclusters.yaml @@ -36,6 +36,9 @@ spec: - jsonPath: .status.statusForPrint.replicaStatus.cdc name: CDC type: string + - jsonPath: .status.statusForPrint.replicaStatus.columnar + name: COLUMNAR + type: string - jsonPath: .status.phase name: PHASE type: string @@ -138,6 +141,7 @@ spec: AttendHtap: type: boolean EnableCoroutine: + default: true type: boolean EnableJvmRemoteDebug: type: boolean @@ -164,6 +168,17 @@ spec: type: object type: object type: object + columnar: + description: Columnar config + properties: + envs: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + type: object dn: description: DN config. properties: @@ -584,6 +599,83 @@ spec: type: object type: object type: object + columnar: + properties: + replicas: + default: 2 + format: int32 + minimum: 0 + type: integer + template: + default: + resources: + limits: + cpu: 4 + memory: 8Gi + properties: + hostNetwork: + description: HostNetwork mode. + type: boolean + image: + description: Image for Columnar. Should be replaced + by default value if not present. + type: string + imagePullPolicy: + description: ImagePullPolicy describes a policy for + if/when to pull a container image (especially for + the engine container). + type: string + imagePullSecrets: + description: ImagePullSecrets represents the secrets + for pulling private images. + items: + description: LocalObjectReference contains enough + information to let you locate the referenced object + inside the same namespace. + properties: + name: + description: 'Name of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, + kind, uid?' + type: string + type: object + type: array + resources: + default: + limits: + cpu: 4 + memory: 8Gi + description: Resources. Default is limits of 4 cpu + and 8Gi memory. + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: 'Limits describes the maximum amount + of compute resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: 'Requests describes the minimum amount + of compute resources required. If Requests is + omitted for a container, it defaults to Limits + if that is explicitly specified, otherwise to + an implementation-defined value. More info: + https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + type: object + type: object + type: object + type: object dn: default: replicas: 2 @@ -1041,6 +1133,126 @@ spec: type: object type: object type: array + columnar: + items: + properties: + name: + type: string + replicas: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + selector: + properties: + reference: + type: string + selector: + description: A node selector represents the + union of the results of one or more label + queries over a set of nodes; that is, it represents + the OR of the selectors represented by the + node selector terms. + properties: + nodeSelectorTerms: + description: Required. A list of node selector + terms. The terms are ORed. + items: + description: A null or empty node selector + term matches no objects. The requirements + of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector + requirements by node's labels. + items: + description: A node selector requirement + is a selector that contains values, + a key, and an operator that relates + the key and values. + properties: + key: + description: The label key that + the selector applies to. + type: string + operator: + description: Represents a key's + relationship to a set of values. + Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, + and Lt. + type: string + values: + description: An array of string + values. If the operator is + In or NotIn, the values array + must be non-empty. If the + operator is Exists or DoesNotExist, + the values array must be empty. + If the operator is Gt or Lt, + the values array must have + a single element, which will + be interpreted as an integer. + This array is replaced during + a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector + requirements by node's fields. + items: + description: A node selector requirement + is a selector that contains values, + a key, and an operator that relates + the key and values. + properties: + key: + description: The label key that + the selector applies to. + type: string + operator: + description: Represents a key's + relationship to a set of values. + Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, + and Lt. + type: string + values: + description: An array of string + values. If the operator is + In or NotIn, the values array + must be non-empty. If the + operator is Exists or DoesNotExist, + the values array must be empty. + If the operator is Gt or Lt, + the values array must have + a single element, which will + be interpreted as an integer. + This array is replaced during + a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object + type: object + type: array dn: properties: nodeSets: @@ -1765,6 +1977,19 @@ spec: format: int32 type: integer type: object + columnar: + description: Columnar defines the replica status for CDC. + properties: + available: + description: Available represents how many replicas are currently + available. + format: int32 + type: integer + total: + description: Total defines the total size of the replica. + format: int32 + type: integer + type: object dn: description: DN defines the replica status for DN. properties: @@ -1857,6 +2082,7 @@ spec: AttendHtap: type: boolean EnableCoroutine: + default: true type: boolean EnableJvmRemoteDebug: type: boolean @@ -1883,6 +2109,17 @@ spec: type: object type: object type: object + columnar: + description: Columnar config + properties: + envs: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + type: object dn: description: DN config. properties: @@ -2092,6 +2329,84 @@ spec: type: object type: object type: object + columnar: + properties: + replicas: + default: 2 + format: int32 + minimum: 0 + type: integer + template: + default: + resources: + limits: + cpu: 4 + memory: 8Gi + properties: + hostNetwork: + description: HostNetwork mode. + type: boolean + image: + description: Image for Columnar. Should be replaced + by default value if not present. + type: string + imagePullPolicy: + description: ImagePullPolicy describes a policy + for if/when to pull a container image (especially + for the engine container). + type: string + imagePullSecrets: + description: ImagePullSecrets represents the secrets + for pulling private images. + items: + description: LocalObjectReference contains enough + information to let you locate the referenced + object inside the same namespace. + properties: + name: + description: 'Name of the referent. More + info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, + kind, uid?' + type: string + type: object + type: array + resources: + default: + limits: + cpu: 4 + memory: 8Gi + description: Resources. Default is limits of 4 + cpu and 8Gi memory. + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: 'Limits describes the maximum + amount of compute resources allowed. More + info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: 'Requests describes the minimum + amount of compute resources required. If + Requests is omitted for a container, it + defaults to Limits if that is explicitly + specified, otherwise to an implementation-defined + value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + type: object + type: object + type: object + type: object dn: default: replicas: 2 @@ -2579,6 +2894,139 @@ spec: type: object type: object type: array + columnar: + items: + properties: + name: + type: string + replicas: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + selector: + properties: + reference: + type: string + selector: + description: A node selector represents + the union of the results of one or more + label queries over a set of nodes; that + is, it represents the OR of the selectors + represented by the node selector terms. + properties: + nodeSelectorTerms: + description: Required. A list of node + selector terms. The terms are ORed. + items: + description: A null or empty node + selector term matches no objects. + The requirements of them are ANDed. + The TopologySelectorTerm type implements + a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector + requirements by node's labels. + items: + description: A node selector + requirement is a selector + that contains values, a key, + and an operator that relates + the key and values. + properties: + key: + description: The label key + that the selector applies + to. + type: string + operator: + description: Represents + a key's relationship to + a set of values. Valid + operators are In, NotIn, + Exists, DoesNotExist. + Gt, and Lt. + type: string + values: + description: An array of + string values. If the + operator is In or NotIn, + the values array must + be non-empty. If the operator + is Exists or DoesNotExist, + the values array must + be empty. If the operator + is Gt or Lt, the values + array must have a single + element, which will be + interpreted as an integer. + This array is replaced + during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector + requirements by node's fields. + items: + description: A node selector + requirement is a selector + that contains values, a key, + and an operator that relates + the key and values. + properties: + key: + description: The label key + that the selector applies + to. + type: string + operator: + description: Represents + a key's relationship to + a set of values. Valid + operators are In, NotIn, + Exists, DoesNotExist. + Gt, and Lt. + type: string + values: + description: An array of + string values. If the + operator is In or NotIn, + the values array must + be non-empty. If the operator + is Exists or DoesNotExist, + the values array must + be empty. If the operator + is Gt or Lt, the values + array must have a single + element, which will be + interpreted as an integer. + This array is replaced + during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object + type: object + type: array dn: properties: nodeSets: @@ -3294,6 +3742,10 @@ spec: cn: description: CN status, e.g. 1/2 type: string + columnar: + description: Columnar status, e.g. 1/2 or - when Columnar + nodes not requested. + type: string dn: description: DN status, e.g. 1/2 type: string diff --git a/charts/polardbx-operator/templates/parameter-template-product.yaml b/charts/polardbx-operator/templates/parameter-template-product.yaml index c8e30b8..f1243ab 100644 --- a/charts/polardbx-operator/templates/parameter-template-product.yaml +++ b/charts/polardbx-operator/templates/parameter-template-product.yaml @@ -670,7 +670,7 @@ spec: optional: '[ON|OFF]' restart: false unit: STRING - - defaultValue: '9999999999' + - defaultValue: '3' divisibilityFactor: 1 mode: readwrite name: loose_innodb_commit_cleanout_max_rows @@ -1202,6 +1202,27 @@ spec: optional: '[ON|OFF]' restart: false unit: STRING + - defaultValue: '2' + divisibilityFactor: 1 + mode: readwrite + name: innodb_log_files_in_group + optional: '[2-100]' + restart: false + unit: INT + - defaultValue: ibdata1:12M:autoextend + divisibilityFactor: 0 + mode: readwrite + name: innodb_data_file_path + optional: .* + restart: false + unit: STRING + - defaultValue: 'OFF' + divisibilityFactor: 0 + mode: readwrite + name: innodb_use_native_aio + optional: '[OFF|ON]' + restart: false + unit: STRING - defaultValue: '7200' divisibilityFactor: 1 mode: readwrite @@ -1426,6 +1447,13 @@ spec: optional: .* restart: true unit: INT + - defaultValue: '10' + divisibilityFactor: 1 + mode: readwrite + name: loose_consensus_prefetch_window_size + optional: .* + restart: true + unit: INT - defaultValue: '8' divisibilityFactor: 1 mode: readwrite @@ -2140,13 +2168,6 @@ spec: optional: '[0-4294967295]' restart: false unit: INT - - defaultValue: '5532' - divisibilityFactor: 1 - mode: readwrite - name: max_connections - optional: .* - restart: true - unit: INT - defaultValue: '1024' divisibilityFactor: 1 mode: readwrite @@ -2217,13 +2238,6 @@ spec: optional: '[0-255]' restart: false unit: INT - - defaultValue: '5000' - divisibilityFactor: 1 - mode: readwrite - name: max_user_connections - optional: .* - restart: true - unit: INT - defaultValue: '102400' divisibilityFactor: 1 mode: readwrite @@ -2231,6 +2245,13 @@ spec: optional: '[1-102400]' restart: false unit: INT + - defaultValue: '0' + divisibilityFactor: 1 + mode: readwrite + name: max_relay_log_size + optional: '[0-1073741824]' + restart: false + unit: INT - defaultValue: '0' divisibilityFactor: 1 mode: readwrite @@ -2287,7 +2308,7 @@ spec: optional: '[0-20]' restart: true unit: int - - defaultValue: '65535' + - defaultValue: '615350' divisibilityFactor: 1 mode: readwrite name: open_files_limit @@ -2518,6 +2539,19 @@ spec: optional: '[s*|ALL_LOSSY|ALL_NON_LOSSY|ALL_SIGNED|ALL_UNSIGNED]' restart: true unit: STRING + - defaultValue: '512' + divisibilityFactor: 8 + mode: 'false' + name: slave_checkpoint_group + optional: '[32-524280]' + unit: INT + - defaultValue: INDEX_SCAN,HASH_SCAN + divisibilityFactor: 0 + mode: readwrite + name: slave_rows_search_algorithms + optional: '[TABLE_SCAN,INDEX_SCAN|INDEX_SCAN,HASH_SCAN|TABLE_SCAN,HASH_SCAN||TABLE_SCAN,INDEX_SCAN,HASH_SCAN]' + restart: false + unit: STRING - defaultValue: '2' divisibilityFactor: 1 mode: readwrite diff --git a/charts/polardbx-operator/templates/webhook/admission-webhook-configuration.yaml b/charts/polardbx-operator/templates/webhook/admission-webhook-configuration.yaml index a2e0f35..5bb914e 100644 --- a/charts/polardbx-operator/templates/webhook/admission-webhook-configuration.yaml +++ b/charts/polardbx-operator/templates/webhook/admission-webhook-configuration.yaml @@ -82,6 +82,25 @@ webhooks: resources: - polardbxbackups scope: "Namespaced" +- admissionReviewVersions: + - "v1" + clientConfig: + service: + name: kubernetes + namespace: default + path: /apis/admission.polardbx.aliyun.com/v1/validate-polardbx-aliyun-com-v1-polardbxbackupbinlog + name: "polardbxbackupbinlog-validate.polardbx.aliyun.com" + sideEffects: None + rules: + - apiGroups: + - polardbx.aliyun.com + apiVersions: + - v1 + operations: + - CREATE + resources: + - polardbxbackupbinlogs + scope: "Namespaced" --- apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration diff --git a/charts/polardbx-operator/values.yaml b/charts/polardbx-operator/values.yaml index b2de87d..2904f5c 100644 --- a/charts/polardbx-operator/values.yaml +++ b/charts/polardbx-operator/values.yaml @@ -14,7 +14,7 @@ images: polardbxJob: polardbx-job # Default image tag. Use app version if not specified or 'latest' if useLatestImage is true. -imageTag: v1.4.0 +imageTag: v1.4.1 # Uses the latest images for operator components. useLatestImage: false @@ -30,7 +30,7 @@ clusterDefaults: version: latest images: galaxysql: polardbx-sql - galaxyengine: polardbx-engine + galaxyengine: polardbx-engine-2.0 galaxycdc: polardbx-cdc # Configuration of Kubernetes hosts. diff --git a/cmd/polardbx-job/main.go b/cmd/polardbx-job/main.go index 0a4c9a9..cd94f66 100644 --- a/cmd/polardbx-job/main.go +++ b/cmd/polardbx-job/main.go @@ -1,10 +1,12 @@ package main import ( + "encoding/json" "flag" "fmt" "github.com/alibaba/polardbx-operator/pkg/hpfs/backupbinlog" "github.com/alibaba/polardbx-operator/pkg/pitr" + "io" "os" "os/signal" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -16,14 +18,19 @@ type JobType string const ( PitrHeartbeatJobType JobType = "PitrHeartbeat" PitrPrepareBinlogs JobType = "PitrPrepareBinlogs" + PitrDownloadFile JobType = "PitrDownloadFile" ) var ( - jobType string + jobType string + binlogSourceJson string + output string ) func init() { flag.StringVar(&jobType, "job-type", "PitrHeartbeat", "the job type") + flag.StringVar(&binlogSourceJson, "binlog-source", "{}", "the BinlogSource json") + flag.StringVar(&output, "output", "", "output filepath") flag.Parse() } @@ -52,6 +59,25 @@ func main() { waitActions = append(waitActions, func() { waitGroup.Wait() }) + case PitrDownloadFile: + binlogSource := pitr.BinlogSource{} + err := json.Unmarshal([]byte(binlogSourceJson), &binlogSource) + if err != nil { + panic(err) + } + reader, err := binlogSource.OpenStream() + if err != nil { + panic(err) + } + filews, err := os.OpenFile(output, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644) + if err != nil { + panic(err) + } + defer filews.Close() + _, err = io.CopyN(filews, reader, int64(*(binlogSource.GetTrueLength()))) + if err != nil { + panic(err) + } default: panic("invalid job type") } diff --git a/hack/copy_image.sh b/hack/copy_image.sh new file mode 100755 index 0000000..c14c8e1 --- /dev/null +++ b/hack/copy_image.sh @@ -0,0 +1,68 @@ +#!/bin/bash + +# Copyright 2021 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +SOURCE_REPO=polardbx +SOURCE_VERSION=v1.4.1 +DEST_REPO=polardbx +DEST_VERSION=latest +TARGETS="xstore-tools polardbx-operator probe-proxy polardbx-exporter polardbx-init polardbx-hpfs polardbx-job" + +#pull from source image and tag it +for i in $TARGETS +do + docker pull $SOURCE_REPO/$i:$SOURCE_VERSION-arm64 + docker tag $SOURCE_REPO/$i:$SOURCE_VERSION-arm64 $DEST_REPO/$i:$DEST_VERSION-arm64 + docker pull $SOURCE_REPO/$i:$SOURCE_VERSION-amd64 + docker tag $SOURCE_REPO/$i:$SOURCE_VERSION-amd64 $DEST_REPO/$i:$DEST_VERSION-amd64 + docker tag $SOURCE_REPO/$i:$SOURCE_VERSION-amd64 $DEST_REPO/$i:$DEST_VERSION +done + +#push +for i in $TARGETS +do + docker push $DEST_REPO/$i:$DEST_VERSION-arm64 + docker push $DEST_REPO/$i:$DEST_VERSION-amd64 + docker push $DEST_REPO/$i:$DEST_VERSION +done + +#create manifest +for i in $TARGETS +do + docker manifest rm "${DEST_REPO}"/$i:"${DEST_VERSION}" + docker manifest create "${DEST_REPO}"/$i:"${DEST_VERSION}" \ + "${DEST_REPO}"/$i:"${DEST_VERSION}"-arm64 \ + "${DEST_REPO}"/$i:"${DEST_VERSION}"-amd64 --amend +done + + +# 设置manifest +for i in $TARGETS +do + docker manifest annotate "${DEST_REPO}"/$i:"${DEST_VERSION}" \ + "${DEST_REPO}"/$i:"${DEST_VERSION}"-arm64 \ + --os linux --arch arm64 + + docker manifest annotate "${DEST_REPO}"/$i:"${DEST_VERSION}" \ + "${DEST_REPO}"/$i:"${DEST_VERSION}"-amd64 \ + --os linux --arch amd64 +done + +#push manifest +for i in $TARGETS +do + docker manifest push "${DEST_REPO}"/$i:"${DEST_VERSION}" +done \ No newline at end of file diff --git a/pkg/featuregate/featuregates.go b/pkg/featuregate/featuregates.go index b9271cc..78730c5 100644 --- a/pkg/featuregate/featuregates.go +++ b/pkg/featuregate/featuregates.go @@ -67,7 +67,7 @@ var ( EnableGalaxyClusterMode = declareFeatureGate("EnableGalaxyCluster", true, false, "Enable cluster mode on galaxy store engine.") EnforceQoSGuaranteed = declareFeatureGate("EnforceQoSGuaranteed", false, false, "Enforce pod's QoS to Guaranteed.") ResetTrustIpsBeforeStart = declareFeatureGate("ResetTrustIpsBeforeStart", false, true, "Reset trust ips in CNs to avoid security problems.") - EnableXStoreWithPodService = declareFeatureGate("EnableXStoreWithPodService", true, true, "Use services for pods in xstore.") + EnableXStoreWithPodService = declareFeatureGate("EnableXStoreWithPodService", true, false, "Use services for pods in xstore.") EnforceClusterIpXStorePod = declareFeatureGate("EnforceClusterIpXStorePod", false, false, "Use cluster ip services for pods in old xstore.") EnableAutoRebuildFollower = declareFeatureGate("EnableAutoRebuildFollower", false, false, "Enable creating rebuild task for follower if it is unhealthy.") ) diff --git a/pkg/hpfs/filestream/flowcontrol.go b/pkg/hpfs/filestream/flowcontrol.go index d13b7ed..0518c26 100644 --- a/pkg/hpfs/filestream/flowcontrol.go +++ b/pkg/hpfs/filestream/flowcontrol.go @@ -27,6 +27,7 @@ import ( "io" k8snet "k8s.io/apimachinery/pkg/util/net" net "net" + "runtime/debug" "sync" "sync/atomic" "time" @@ -114,7 +115,8 @@ func (f *FlowControlManger) LimitFlow(reader io.Reader, writer io.Writer, notify wg.Add(1) go func() { defer func() { - recover() + obj := recover() + fmt.Println(obj, debug.Stack()) }() defer wg.Done() for { diff --git a/pkg/init/init.go b/pkg/init/init.go index d9007a8..166704a 100644 --- a/pkg/init/init.go +++ b/pkg/init/init.go @@ -203,7 +203,7 @@ func Do() { fmt.Println("Connected to metadb!") fmt.Printf("Try self-registration, register %s:%d to metadb...\n", env.LocalIP, env.ServerPort) - stmt := fmt.Sprintf(`INSERT IGNORE INTO server_info + stmt := fmt.Sprintf(`REPLACE INTO server_info (inst_id, inst_type, ip, port, htap_port, mgr_port, mpp_port, status, cpu_core, mem_size, extras) VALUES ('%s', '%s', '%s', %d, %d, %d, %d, %d, %d, %d, '%s')`, env.InstanceID, env.InstanceType, env.LocalIP, env.ServerPort, env.HtapPort, env.MgrPort, env.MppPort, 0, env.CpuCore, env.MemSize, env.PodId) diff --git a/pkg/meta/core/gms/manager_impl.go b/pkg/meta/core/gms/manager_impl.go index 84bd683..b192c91 100644 --- a/pkg/meta/core/gms/manager_impl.go +++ b/pkg/meta/core/gms/manager_impl.go @@ -728,7 +728,7 @@ func (meta *manager) EnableComputeNodes(computeNodes ...ComputeNodeInfo) error { } func (c *ComputeNodeInfo) toSelectCriteria() string { - return fmt.Sprintf("(ip = '%s' AND port = %d)", c.Host, c.Port) + return fmt.Sprintf("(ip = '%s' AND port = %d AND (extras like '%%%s%%' or extras is null))", c.Host, c.Port, c.Extra) } func (meta *manager) DisableComputeNodes(computeNodes ...ComputeNodeInfo) error { diff --git a/pkg/operator/v1/config/config.go b/pkg/operator/v1/config/config.go index d0bba9e..4629d05 100644 --- a/pkg/operator/v1/config/config.go +++ b/pkg/operator/v1/config/config.go @@ -70,11 +70,12 @@ func (c *config) Nfs() NfsConfig { } type imagesConfig struct { - Repo string `json:"repo,omitempty"` - Common map[string]string `json:"common,omitempty"` - ComputeImages map[string]string `json:"compute,omitempty"` - CdcImages map[string]string `json:"cdc,omitempty"` - StoreImages map[string]map[string]string `json:"store,omitempty"` + Repo string `json:"repo,omitempty"` + Common map[string]string `json:"common,omitempty"` + ComputeImages map[string]string `json:"compute,omitempty"` + CdcImages map[string]string `json:"cdc,omitempty"` + ColumnarImages map[string]string `json:"columnar,omitempty"` + StoreImages map[string]map[string]string `json:"store,omitempty"` } func newImage(image string, defaultRepo, defaultTag string) string { @@ -141,6 +142,8 @@ func (c *imagesConfig) DefaultImageForCluster(role string, container string, ver image = defaults.NonEmptyStrOrDefault(c.ComputeImages[container], c.Common[container]) case "cdc": image = defaults.NonEmptyStrOrDefault(c.CdcImages[container], c.Common[container]) + case "columnar": + image = defaults.NonEmptyStrOrDefault(c.ColumnarImages[container], c.Common[container]) default: panic("invalid role: " + role) } diff --git a/pkg/operator/v1/polardbx/controllers/polardbxcluster_controller.go b/pkg/operator/v1/polardbx/controllers/polardbxcluster_controller.go index 12a09c1..0025124 100644 --- a/pkg/operator/v1/polardbx/controllers/polardbxcluster_controller.go +++ b/pkg/operator/v1/polardbx/controllers/polardbxcluster_controller.go @@ -197,8 +197,11 @@ func (r *PolarDBXReconciler) newReconcileTask(rc *polardbxreconcile.Context, pol ), instancesteps.CreateOrReconcileCNs, instancesteps.CreateOrReconcileCDCs, + instancesteps.WaitUntilCNCDCPodsReady, + instancesteps.CreateOrReconcileColumnars, instancesteps.WaitUntilCNDeploymentsRolledOut, instancesteps.WaitUntilCDCDeploymentsRolledOut, + instancesteps.WaitUntilColumnarDeploymentsRolledOut, instancesteps.CreateFileStorage, )(task) @@ -248,6 +251,8 @@ func (r *PolarDBXReconciler) newReconcileTask(rc *polardbxreconcile.Context, pol // Always reconcile the stateless components (mainly for rebuilt). instancesteps.CreateOrReconcileCNs(task) instancesteps.CreateOrReconcileCDCs(task) + instancesteps.WaitUntilCNCDCPodsReady(task) + instancesteps.CreateOrReconcileColumnars(task) //sync cn label to pod without rebuild pod instancesteps.TrySyncCnLabelToPodsDirectly(task) @@ -266,7 +271,7 @@ func (r *PolarDBXReconciler) newReconcileTask(rc *polardbxreconcile.Context, pol switch polardbx.Status.Stage { case polardbxv1polardbx.StageEmpty: // Before doing rebalancing, the controller always trying to update - // the CN/CDC deployments and DN stores. + // the CN/CDC/Columnar deployments and DN stores. // Update before doing update. commonsteps.UpdateSnapshotAndObservedGeneration(task) @@ -283,12 +288,15 @@ func (r *PolarDBXReconciler) newReconcileTask(rc *polardbxreconcile.Context, pol ), instancesteps.CreateOrReconcileCNs, instancesteps.CreateOrReconcileCDCs, + instancesteps.WaitUntilCNCDCPodsReady, + instancesteps.CreateOrReconcileColumnars, // Only add or update, never remove. instancesteps.CreateOrReconcileDNs, instancesteps.WaitUntilDNsReady, instancesteps.WaitUntilCNDeploymentsRolledOut, instancesteps.WaitUntilCDCDeploymentsRolledOut, + instancesteps.WaitUntilColumnarDeploymentsRolledOut, instancesteps.CreateFileStorage, )(task) @@ -302,9 +310,10 @@ func (r *PolarDBXReconciler) newReconcileTask(rc *polardbxreconcile.Context, pol // Enable added DN stores. gmssteps.EnableDNs(task) - // Wait terminated CN/CDCs to be finalized in GMS to avoid DDL problems. + // Wait terminated CN/CDC/Columnars to be finalized in GMS to avoid DDL problems. instancesteps.WaitUntilCNPodsStable(task) instancesteps.WaitUntilCDCPodsStable(task) + instancesteps.WaitUntilColumnarPodsStable(task) // Start to rebalance data after DN stores are reconciled if necessary. rebalancesteps.StartRebalanceTask(task) @@ -375,9 +384,9 @@ func mapRequestsWhenStatelessPodDeletedOrFailed(object client.Object) []reconcil if polardbxName, ok := object.GetLabels()[polardbxmeta.LabelName]; ok { pod := object.(*corev1.Pod) - // Only for CN & CDC pods. + // Only for CN & CDC & Columnar pods. role := pod.Labels[polardbxmeta.LabelRole] - if role != polardbxmeta.RoleCN && role != polardbxmeta.RoleCDC { + if role != polardbxmeta.RoleCN && role != polardbxmeta.RoleCDC && role != polardbxmeta.RoleColumnar { return nil } @@ -414,7 +423,7 @@ func (r *PolarDBXReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&appsv1.Deployment{}). // Watches owned Parameters Owns(&polardbxv1.PolarDBXParameter{}). - // Watches deleted or failed CN/CDC Pods. + // Watches deleted or failed CN/CDC/Columnar Pods. Watches( &source.Kind{Type: &corev1.Pod{}}, handler.EnqueueRequestsFromMapFunc(mapRequestsWhenStatelessPodDeletedOrFailed), diff --git a/pkg/operator/v1/polardbx/convention/convention.go b/pkg/operator/v1/polardbx/convention/convention.go index 2676b5b..2805b25 100644 --- a/pkg/operator/v1/polardbx/convention/convention.go +++ b/pkg/operator/v1/polardbx/convention/convention.go @@ -210,8 +210,8 @@ func ParseGroupFromDeployment(deploy *appsv1.Deployment) string { // Convention for names. func NewDeploymentName(polardbx *polardbxv1.PolarDBXCluster, role string, group string) string { - if role != polardbxmeta.RoleCN && role != polardbxmeta.RoleCDC { - panic("required role to be cn or cdc, but found " + role) + if role != polardbxmeta.RoleCN && role != polardbxmeta.RoleCDC && role != polardbxmeta.RoleColumnar { + panic("required role to be cn, cdc or columnar, but found " + role) } if group == "" { diff --git a/pkg/operator/v1/polardbx/factory/deployment.go b/pkg/operator/v1/polardbx/factory/deployment.go index b34b753..77c9102 100644 --- a/pkg/operator/v1/polardbx/factory/deployment.go +++ b/pkg/operator/v1/polardbx/factory/deployment.go @@ -318,6 +318,15 @@ do echo "debug mode" sleep 3600 done +` + + columnarStartCmd = ` +sh /home/admin/entrypoint.sh +while [ "debug" == $(cat /etc/podinfo/runmode) ] +do + echo "debug mode" + sleep 3600 +done ` ) @@ -797,6 +806,149 @@ func (f *objectFactory) newDeployment4CDC(group string, mr *matchingRule, mustSt return &deployment, nil } +func (f *objectFactory) NewDeployments4Columnar() (map[string]appsv1.Deployment, error) { + polardbx, err := f.rc.GetPolarDBX() + if err != nil { + return nil, err + } + topology := polardbx.Status.SpecSnapshot.Topology + rules := topology.Rules.Components.Columnar + nodes := topology.Nodes.Columnar + if nodes == nil { + return nil, nil + } + + replicas := topology.Nodes.Columnar.Replicas + matchingRules, err := f.getStatelessMatchingRules(int(replicas) /*template.HostNetwork*/, false, rules) + if err != nil { + return nil, err + } + + // Build deployments according rules. + return f.buildDeployments(matchingRules, f.newDeployment4Columnar) +} + +func (f *objectFactory) newDeployment4Columnar(group string, mr *matchingRule, mustStaticPorts bool) (*appsv1.Deployment, error) { + polardbx := f.rc.MustGetPolarDBX() + config := f.rc.Config() + topology := polardbx.Status.SpecSnapshot.Topology + template := polardbx.Status.SpecSnapshot.Topology.Nodes.Columnar.Template + + // Factories + envFactory, err := NewEnvFactory(f.rc, polardbx, f) + if err != nil { + return nil, err + } + //portsFactory := NewPortsFactory(f.rc, polardbx) + volumeFactory := NewVolumeFactory(f.rc, polardbx) + + // Get GMS connection info. + gmsConn, err := f.getGmsConn(polardbx) + if err != nil { + return nil, err + } + + // Ports & Envs + //ports := portsFactory.NewPortsForColumnarEngine() + envVars := envFactory.NewEnvVarsForColumnarEngine(gmsConn) + + // Affinity + var nodeSelector *corev1.NodeSelector + if mr.rule != nil { + nodeSelector, err = f.getNodeSelectorFromRef(polardbx, mr.rule.NodeSelector) + if err != nil { + return nil, err + } + } + affinity := f.tryScatterAffinityForStatelessDeployment( + convention.ConstLabelsWithRole(polardbx, polardbxmeta.RoleColumnar), + nodeSelector, + ) + + // Name & Labels + deployName := convention.NewDeploymentName(polardbx, polardbxmeta.RoleColumnar, group) + + labels := convention.ConstLabelsWithRole(polardbx, polardbxmeta.RoleColumnar) + labels[polardbxmeta.LabelGroup] = group + + annotations := f.newPodAnnotations(polardbx) + + // Containers + + // Container engine + engineContainer := corev1.Container{ + Name: convention.ContainerEngine, + Image: defaults.NonEmptyStrOrDefault( + template.Image, + config.Images().DefaultImageForCluster(polardbxmeta.RoleColumnar, convention.ContainerEngine, topology.Version), + ), + Command: []string{"/bin/bash", "-c"}, + Args: []string{columnarStartCmd}, + ImagePullPolicy: template.ImagePullPolicy, + Env: envVars, + Resources: *template.Resources.DeepCopy(), + VolumeMounts: volumeFactory.NewVolumeMountsForColumnarEngine(), + SecurityContext: k8shelper.NewSecurityContext(config.Cluster().ContainerPrivileged()), + } + + containers := []corev1.Container{engineContainer} + + dnsPolicy := corev1.DNSClusterFirst + if template.HostNetwork { + dnsPolicy = corev1.DNSClusterFirstWithHostNet + } + + deployment := appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: deployName, + Namespace: f.rc.Namespace(), + Labels: k8shelper.PatchLabels( + copyutil.CopyStrMap(labels), + map[string]string{ + polardbxmeta.LabelGeneration: strconv.FormatInt(polardbx.Status.ObservedGeneration, 10), + }, + ), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: pointer.Int32(int32(mr.replicas)), + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Strategy: f.newDeploymentUpgradeStrategy(polardbx), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Annotations: annotations, + Finalizers: []string{polardbxmeta.Finalizer}, + }, + Spec: corev1.PodSpec{ + // TODO: Add a config to control this switch + EnableServiceLinks: pointer.BoolPtr(false), + ImagePullSecrets: template.ImagePullSecrets, + Volumes: volumeFactory.NewVolumesForColumnar(), + Containers: containers, + RestartPolicy: corev1.RestartPolicyAlways, + TerminationGracePeriodSeconds: pointer.Int64(30), + DNSPolicy: dnsPolicy, + ShareProcessNamespace: pointer.Bool(true), + Affinity: affinity, + HostNetwork: template.HostNetwork, + }, + }, + }, + } + + //add hash to label + convention.AddLabelHash(polardbxmeta.LabelHash, &deployment) + //update generation label + deployment.SetLabels(k8shelper.PatchLabels(deployment.Labels, map[string]string{ + polardbxmeta.LabelGeneration: strconv.FormatInt(polardbx.Status.ObservedGeneration, 10), + })) + + // Return + return &deployment, nil +} + func (f *objectFactory) buildDeployments(rules map[string]matchingRule, builder func(name string, mr *matchingRule, mustStaticPorts bool) (*appsv1.Deployment, error)) (map[string]appsv1.Deployment, error) { deployments := make(map[string]appsv1.Deployment) for name, mr := range rules { diff --git a/pkg/operator/v1/polardbx/factory/env_factory.go b/pkg/operator/v1/polardbx/factory/env_factory.go index abdf873..05f38d7 100644 --- a/pkg/operator/v1/polardbx/factory/env_factory.go +++ b/pkg/operator/v1/polardbx/factory/env_factory.go @@ -45,6 +45,7 @@ type EnvFactory interface { NewSystemEnvVars() []corev1.EnvVar NewEnvVarsForCNEngine(gmsConn StorageConnection, ports CNPorts) []corev1.EnvVar NewEnvVarsForCDCEngine(gmsConn StorageConnection) []corev1.EnvVar + NewEnvVarsForColumnarEngine(gmsConn StorageConnection) []corev1.EnvVar } type envFactory struct { @@ -390,6 +391,53 @@ func (e *envFactory) NewEnvVarsForCDCEngine(gmsConn StorageConnection) []corev1. return append(systemEnvs, basicEnvs...) } +func (e *envFactory) newBasicEnvVarsForColumnarEngine(gmsConn *StorageConnection) []corev1.EnvVar { + topology := e.polardbx.Status.SpecSnapshot.Topology + if topology.Nodes.Columnar == nil { + return nil + } + + template := topology.Nodes.Columnar.Template + pxcServiceName := e.polardbx.Spec.ServiceName + if len(pxcServiceName) == 0 { + pxcServiceName = e.polardbx.Name + } + + // FIXME CDC currently doesn't support host network, so ports are hard coded. + envs := []corev1.EnvVar{ + {Name: "switchCloud", Value: "aliyun"}, + {Name: "ins_id", ValueFrom: e.newValueFromObjectFiled("metadata.uid")}, + {Name: "ins_ip", ValueFrom: e.newValueFromObjectFiled("status.podIP")}, + {Name: "cpu_cores", Value: strconv.FormatInt(template.Resources.Limits.Cpu().Value(), 10)}, + {Name: "mem_size", Value: strconv.FormatInt(template.Resources.Limits.Memory().Value()>>20, 10)}, + {Name: "metaDbAddr", Value: fmt.Sprintf("%s:%d", gmsConn.Host, gmsConn.Port)}, + {Name: "metaDbName", Value: fmt.Sprintf(gms.MetaDBName)}, + {Name: "metaDbUser", Value: gmsConn.User}, + {Name: "metaDbPasswd", Value: e.cipher.Encrypt(gmsConn.Passwd)}, + {Name: "metaDbXprotoPort", Value: strconv.Itoa(31306)}, + {Name: "storageDbXprotoPort", Value: strconv.Itoa(0)}, + {Name: "polarx_username", Value: "polardbx_root"}, + {Name: "polarx_password", ValueFrom: e.newValueFromSecretKey(e.polardbx.Name, "polardbx_root")}, + {Name: "dnPasswordKey", Value: e.cipher.Key()}, + } + configEnvs := e.polardbx.Spec.Config.Columnar.Envs + if configEnvs != nil { + for k, v := range configEnvs { + envs = append(envs, corev1.EnvVar{ + Name: k, + Value: v.String(), + }) + } + } + return envs +} + +func (e *envFactory) NewEnvVarsForColumnarEngine(gmsConn StorageConnection) []corev1.EnvVar { + systemEnvs := e.NewSystemEnvVars() + basicEnvs := e.newBasicEnvVarsForColumnarEngine(&gmsConn) + return append(systemEnvs, basicEnvs...) +} + func NewEnvFactory(rc *polardbxv1reconcile.Context, polardbx *polardbxv1.PolarDBXCluster, objectFactory *objectFactory) (EnvFactory, error) { cipher, err := rc.GetPolarDBXPasswordCipher() if err != nil { diff --git a/pkg/operator/v1/polardbx/factory/object_factory.go b/pkg/operator/v1/polardbx/factory/object_factory.go index b07a137..c14ae69 100644 --- a/pkg/operator/v1/polardbx/factory/object_factory.go +++ b/pkg/operator/v1/polardbx/factory/object_factory.go @@ -34,6 +34,7 @@ type ObjectFactory interface { NewDeployments4CN() (map[string]appsv1.Deployment, error) NewDeployments4CDC() (map[string]appsv1.Deployment, error) + NewDeployments4Columnar() (map[string]appsv1.Deployment, error) NewXStoreMyCnfOverlay4GMS() (string, error) NewXStoreMyCnfOverlay4DN(idx int) (string, error) diff --git a/pkg/operator/v1/polardbx/factory/ports_factory.go b/pkg/operator/v1/polardbx/factory/ports_factory.go index f919065..6dc8d41 100644 --- a/pkg/operator/v1/polardbx/factory/ports_factory.go +++ b/pkg/operator/v1/polardbx/factory/ports_factory.go @@ -59,9 +59,13 @@ func (p *CDCPorts) GetProbePort() int { return p.ProbePort } +type ColumnarPorts struct { +} + type PortsFactory interface { NewPortsForCNEngine(mustStaticPorts bool) CNPorts NewPortsForCDCEngine() CDCPorts + NewPortsForColumnarEngine() ColumnarPorts } type portsFactory struct { @@ -108,6 +112,11 @@ func (f *portsFactory) NewPortsForCDCEngine() CDCPorts { } } +func (f *portsFactory) NewPortsForColumnarEngine() ColumnarPorts { + // todo: current no ports on columnar. + return ColumnarPorts{} +} + func NewPortsFactory(rc *polardbxv1reconcile.Context, polardbx *polardbxv1.PolarDBXCluster) PortsFactory { return &portsFactory{ rc: rc, polardbx: polardbx, diff --git a/pkg/operator/v1/polardbx/factory/readonly.go b/pkg/operator/v1/polardbx/factory/readonly.go index b0d6e5d..7b8f2d0 100644 --- a/pkg/operator/v1/polardbx/factory/readonly.go +++ b/pkg/operator/v1/polardbx/factory/readonly.go @@ -54,6 +54,8 @@ func (f *objectFactory) newReadonlySpec(readonlyParam *polardbxv1polardbx.Readon } } + specCopy.Topology.Rules = polardbxv1polardbx.TopologyRules{} + return specCopy } diff --git a/pkg/operator/v1/polardbx/factory/servicemonitor.go b/pkg/operator/v1/polardbx/factory/servicemonitor.go index f80e5b6..8eae27b 100644 --- a/pkg/operator/v1/polardbx/factory/servicemonitor.go +++ b/pkg/operator/v1/polardbx/factory/servicemonitor.go @@ -211,5 +211,36 @@ func (f *objectFactory) NewServiceMonitors() (map[string]promv1.ServiceMonitor, }, }, }, + polardbxmeta.RoleColumnar: { + ObjectMeta: metav1.ObjectMeta{ + Name: f.rc.NameInto(suffixPatcher("-columnar")), + Namespace: f.rc.Namespace(), + Labels: convention.ConstLabelsWithRole(polardbx, polardbxmeta.RoleColumnar), + }, + Spec: promv1.ServiceMonitorSpec{ + JobLabel: f.rc.NameInto(suffixPatcher("-columnar")), + TargetLabels: []string{ + polardbxmeta.LabelName, + polardbxmeta.LabelRole, + }, + PodTargetLabels: []string{}, + Endpoints: []promv1.Endpoint{ + { + Port: "metrics", + Interval: fmt.Sprintf("%.0fs", monitorInterval.Seconds()), + ScrapeTimeout: fmt.Sprintf("%.0fs", scrapeTimeout.Seconds()), + }, + }, + NamespaceSelector: promv1.NamespaceSelector{ + MatchNames: []string{f.rc.Namespace()}, + }, + Selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + polardbxmeta.LabelName: polardbx.Name, + polardbxmeta.LabelRole: polardbxmeta.RoleColumnar, + }, + }, + }, + }, }, nil } diff --git a/pkg/operator/v1/polardbx/factory/volume_factory.go b/pkg/operator/v1/polardbx/factory/volume_factory.go index e89a8ce..2bac946 100644 --- a/pkg/operator/v1/polardbx/factory/volume_factory.go +++ b/pkg/operator/v1/polardbx/factory/volume_factory.go @@ -32,6 +32,8 @@ type VolumeFactory interface { NewVolumeMountsForCNExporter() []corev1.VolumeMount NewVolumesForCDC() []corev1.Volume NewVolumeMountsForCDCEngine() []corev1.VolumeMount + NewVolumesForColumnar() []corev1.Volume + NewVolumeMountsForColumnarEngine() []corev1.VolumeMount } type volumeFactory struct { @@ -301,6 +303,40 @@ func (v *volumeFactory) NewVolumeMountsForCDCEngine() []corev1.VolumeMount { return append(systemVolMounts, volumeMounts...) } +func (v *volumeFactory) NewVolumesForColumnar() []corev1.Volume { + systemVols := v.NewSystemVolumes() + volumes := []corev1.Volume{ + { + Name: "columnar-log", + VolumeSource: v.newEmptyDirVolumeSource(), + }, + { + Name: "columnar-config", + VolumeSource: v.newEmptyDirVolumeSource(), + }, + } + return append(systemVols, volumes...) +} + +func (v *volumeFactory) NewVolumeMountsForColumnarEngine() []corev1.VolumeMount { + systemVolMounts := v.NewSystemVolumeMounts() + volumeMounts := []corev1.VolumeMount{ + { + Name: "columnar-log", + MountPath: "/home/admin/polardbx-columnar/logs", + MountPropagation: mountPropagationModePtr(corev1.MountPropagationHostToContainer), + }, + //{ + // Name: "columnar-config", + // MountPath: "/home/admin/polardbx-columnar/conf/config.properties", + // ReadOnly: true, + // SubPath: "config.properties", + // MountPropagation: mountPropagationModePtr(corev1.MountPropagationHostToContainer), + //}, + } + return append(systemVolMounts, volumeMounts...) +} + func NewVolumeFactory(rc *polardbxv1reconcile.Context, polardbx *polardbxv1.PolarDBXCluster) VolumeFactory { return &volumeFactory{rc: rc, polardbx: polardbx} } diff --git a/pkg/operator/v1/polardbx/meta/label.go b/pkg/operator/v1/polardbx/meta/label.go index 99ca635..c1a2fe2 100644 --- a/pkg/operator/v1/polardbx/meta/label.go +++ b/pkg/operator/v1/polardbx/meta/label.go @@ -48,10 +48,11 @@ const ( ) const ( - RoleGMS = "gms" - RoleCN = "cn" - RoleDN = "dn" - RoleCDC = "cdc" + RoleGMS = "gms" + RoleCN = "cn" + RoleDN = "dn" + RoleCDC = "cdc" + RoleColumnar = "columnar" ) const ( diff --git a/pkg/operator/v1/polardbx/reconcile/context.go b/pkg/operator/v1/polardbx/reconcile/context.go index 6975b58..2bccd73 100644 --- a/pkg/operator/v1/polardbx/reconcile/context.go +++ b/pkg/operator/v1/polardbx/reconcile/context.go @@ -58,19 +58,20 @@ type Context struct { *control.BaseReconcileContext // Caches - objectCache cache.ObjectLoadingCache - polardbxKey types.NamespacedName - polardbxChanged bool - polardbx *polardbxv1.PolarDBXCluster - primaryPolardbx *polardbxv1.PolarDBXCluster - polardbxStatus *polardbxv1.PolarDBXClusterStatus - cnDeployments map[string]*appsv1.Deployment - cdcDeployments map[string]*appsv1.Deployment - podsByRole map[string][]corev1.Pod - nodes []corev1.Node - gmsStore *polardbxv1.XStore - dnStores map[int]*polardbxv1.XStore - primaryDnStore map[int]*polardbxv1.XStore + objectCache cache.ObjectLoadingCache + polardbxKey types.NamespacedName + polardbxChanged bool + polardbx *polardbxv1.PolarDBXCluster + primaryPolardbx *polardbxv1.PolarDBXCluster + polardbxStatus *polardbxv1.PolarDBXClusterStatus + cnDeployments map[string]*appsv1.Deployment + cdcDeployments map[string]*appsv1.Deployment + columnarDeployments map[string]*appsv1.Deployment + podsByRole map[string][]corev1.Pod + nodes []corev1.Node + gmsStore *polardbxv1.XStore + dnStores map[int]*polardbxv1.XStore + primaryDnStore map[int]*polardbxv1.XStore polardbxMonitor *polardbxv1.PolarDBXMonitor polardbxMonitorKey types.NamespacedName @@ -820,8 +821,10 @@ func (rc *Context) GetDeploymentMap(role string) (map[string]*appsv1.Deployment, deploymentMapPtr = &rc.cnDeployments case polardbxmeta.RoleCDC: deploymentMapPtr = &rc.cdcDeployments + case polardbxmeta.RoleColumnar: + deploymentMapPtr = &rc.columnarDeployments default: - panic("required role to be cn or cdc, but found " + role) + panic("required role to be cn, cdc or columnar, but found " + role) } if *deploymentMapPtr == nil { @@ -841,10 +844,9 @@ func (rc *Context) GetDeploymentMap(role string) (map[string]*appsv1.Deployment, } func (rc *Context) GetPods(role string) ([]corev1.Pod, error) { - if role != polardbxmeta.RoleCN && role != polardbxmeta.RoleCDC { - panic("required role to be cn or cdc, but found " + role) + if role != polardbxmeta.RoleCN && role != polardbxmeta.RoleCDC && role != polardbxmeta.RoleColumnar { + panic("required role to be cn, cdc or columnar, but found " + role) } - pods := rc.podsByRole[role] if pods == nil { polardbx, err := rc.GetPolarDBX() @@ -876,37 +878,45 @@ func (rc *Context) GetPods(role string) ([]corev1.Pod, error) { func (rc *Context) GetGMS() (*polardbxv1.XStore, error) { polardbx := rc.MustGetPolarDBX() var err error + readonly := polardbx.Spec.Readonly - if polardbx.Spec.Readonly { + if readonly { polardbx, err = rc.GetPrimaryPolarDBX() if err != nil { return nil, err } } + if rc.gmsStore != nil { + return rc.gmsStore, nil + } + + gmsName := convention.NewGMSName(polardbx) if polardbx.Spec.ShareGMS { - return rc.GetDN(0) - } else { - if rc.gmsStore == nil { - gmsStore, err := rc.objectCache.GetObject( - rc.Context(), - types.NamespacedName{ - Namespace: rc.polardbxKey.Namespace, - Name: convention.NewGMSName(polardbx), - }, - &polardbxv1.XStore{}, - ) - if err != nil { - return nil, err - } + gmsName = convention.NewDNName(polardbx, 0) + } - if err := k8shelper.CheckControllerReference(gmsStore, polardbx); err != nil { - return nil, err - } - rc.gmsStore = gmsStore.(*polardbxv1.XStore) - } - return rc.gmsStore, nil + gmsStore, err := rc.objectCache.GetObject( + rc.Context(), + types.NamespacedName{ + Namespace: rc.polardbxKey.Namespace, + Name: gmsName, + }, + &polardbxv1.XStore{}, + ) + if err != nil { + return nil, err + } + + if err := k8shelper.CheckControllerReference(gmsStore, polardbx); err != nil { + return nil, err } + + if !readonly { + rc.gmsStore = gmsStore.(*polardbxv1.XStore) + } + + return gmsStore.(*polardbxv1.XStore), nil } func (rc *Context) GetService(name string) (*corev1.Service, error) { diff --git a/pkg/operator/v1/polardbx/steps/instance/common/status.go b/pkg/operator/v1/polardbx/steps/instance/common/status.go index 99d23a1..486d1dc 100644 --- a/pkg/operator/v1/polardbx/steps/instance/common/status.go +++ b/pkg/operator/v1/polardbx/steps/instance/common/status.go @@ -182,10 +182,11 @@ var UpdateDisplayReplicas = polardbxv1reconcile.NewStepBinder("UpdateDisplayRepl // update replicas status. statusRef.ReplicaStatus = polardbxv1polardbx.ClusterReplicasStatus{ - GMS: polardbxv1polardbx.ReplicasStatus{Total: 1, Available: int32(countAvailableXStores(gmsStore))}, - DN: polardbxv1polardbx.ReplicasStatus{Total: snapshot.Topology.Nodes.DN.Replicas, Available: int32(countAvailableXStores(dnStores...))}, - CN: nil, - CDC: nil, + GMS: polardbxv1polardbx.ReplicasStatus{Total: 1, Available: int32(countAvailableXStores(gmsStore))}, + DN: polardbxv1polardbx.ReplicasStatus{Total: snapshot.Topology.Nodes.DN.Replicas, Available: int32(countAvailableXStores(dnStores...))}, + CN: nil, + CDC: nil, + Columnar: nil, } cnDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleCN) @@ -208,6 +209,17 @@ var UpdateDisplayReplicas = polardbxv1reconcile.NewStepBinder("UpdateDisplayRepl } } + if snapshot.Topology.Nodes.Columnar != nil { + columnarDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleColumnar) + if err != nil { + return flow.Error(err, "Unable to get deployments of Columnar.") + } + statusRef.ReplicaStatus.Columnar = &polardbxv1polardbx.ReplicasStatus{ + Available: int32(countAvailableReplicasFromDeployments(columnarDeployments)), + Total: snapshot.Topology.Nodes.Columnar.Replicas, + } + } + gmsDisplay := statusRef.ReplicaStatus.GMS.Display() if polardbx.Spec.Readonly { @@ -215,10 +227,11 @@ var UpdateDisplayReplicas = polardbxv1reconcile.NewStepBinder("UpdateDisplayRepl } statusForPrintRef.ReplicaStatus = polardbxv1polardbx.ReplicaStatusForPrint{ - GMS: gmsDisplay, - CN: statusRef.ReplicaStatus.CN.Display(), - DN: statusRef.ReplicaStatus.DN.Display(), - CDC: statusRef.ReplicaStatus.CDC.Display(), + GMS: gmsDisplay, + CN: statusRef.ReplicaStatus.CN.Display(), + DN: statusRef.ReplicaStatus.DN.Display(), + CDC: statusRef.ReplicaStatus.CDC.Display(), + Columnar: statusRef.ReplicaStatus.Columnar.Display(), } return flow.Pass() diff --git a/pkg/operator/v1/polardbx/steps/instance/finalizer/handler.go b/pkg/operator/v1/polardbx/steps/instance/finalizer/handler.go index 60e14aa..73204c0 100644 --- a/pkg/operator/v1/polardbx/steps/instance/finalizer/handler.go +++ b/pkg/operator/v1/polardbx/steps/instance/finalizer/handler.go @@ -17,6 +17,7 @@ limitations under the License. package finalizer import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sync" "sync/atomic" "time" @@ -57,7 +58,7 @@ func removeFinalizers(rc *polardbxv1reconcile.Context, log logr.Logger, pods []c } func handleFinalizerForPods(rc *polardbxv1reconcile.Context, log logr.Logger, deletedOrFailedPods []corev1.Pod, role string) error { - polardbxmeta.AssertRoleIn(role, polardbxmeta.RoleCN, polardbxmeta.RoleCDC) + polardbxmeta.AssertRoleIn(role, polardbxmeta.RoleCN, polardbxmeta.RoleCDC, polardbxmeta.RoleColumnar) if len(deletedOrFailedPods) == 0 { return nil @@ -68,6 +69,14 @@ func handleFinalizerForPods(rc *polardbxv1reconcile.Context, log logr.Logger, de return err } + canDeleteTime := v1.NewTime(time.Now().Add(-5 * time.Second)) + deletedOrFailedPods = k8shelper.FilterPodsBy(deletedOrFailedPods, func(pod *corev1.Pod) bool { + if pod.CreationTimestamp.Before(&canDeleteTime) { + return true + } + return false + }) + // Delete records in GMS. if role == polardbxmeta.RoleCN { toDeleteInfo := make([]gms.ComputeNodeInfo, 0, len(deletedOrFailedPods)) @@ -82,13 +91,14 @@ func handleFinalizerForPods(rc *polardbxv1reconcile.Context, log logr.Logger, de k8shelper.MustGetContainerFromPod(&pod, convention.ContainerEngine), convention.PortAccess, ).ContainerPort, + Extra: pod.Name, }) } err := mgr.DeleteComputeNodes(toDeleteInfo...) if err != nil { return err } - } else { + } else if role == polardbxmeta.RoleCDC { toDeleteInfo := make([]gms.CdcNodeInfo, 0, len(deletedOrFailedPods)) for _, pod := range deletedOrFailedPods { if !k8shelper.IsPodScheduled(&pod) { @@ -124,9 +134,15 @@ var RemoveResidualFinalizersOnPods = polardbxv1reconcile.NewStepBinder("RemoveRe return flow.Error(err, "Unable to get pods for CDC") } + columnarPods, err := rc.GetPods(polardbxmeta.RoleColumnar) + if err != nil { + return flow.Error(err, "Unable to get pods for Columnar") + } + if err := errutil.FirstNonNil( removeFinalizers(rc, flow.Logger(), cnPods), removeFinalizers(rc, flow.Logger(), cdcPods), + removeFinalizers(rc, flow.Logger(), columnarPods), ); err != nil { return flow.Error(err, "Failed to remove some finalizer.") } @@ -202,6 +218,11 @@ var HandleFinalizerForStatelessPods = polardbxv1reconcile.NewStepBinder("HandleF return flow.Error(err, "Unable to get pods for CDC") } + columnarPods, err := rc.GetPods(polardbxmeta.RoleColumnar) + if err != nil { + return flow.Error(err, "Unable to get pods for Columnar") + } + isPodDeletedOrFailedAndContainsFinalizer := func(pod *corev1.Pod) bool { return k8shelper.IsPodDeletedOrFailed(pod) && controllerutil.ContainsFinalizer(pod, polardbxmeta.Finalizer) } @@ -217,6 +238,11 @@ var HandleFinalizerForStatelessPods = polardbxv1reconcile.NewStepBinder("HandleF k8shelper.FilterPodsBy(cdcPods, isPodDeletedOrFailedAndContainsFinalizer), polardbxmeta.RoleCDC, ), + // Handle for Columnar pods. + handleFinalizerForPods(rc, flow.Logger(), + k8shelper.FilterPodsBy(columnarPods, isPodDeletedOrFailedAndContainsFinalizer), + polardbxmeta.RoleColumnar, + ), ); err != nil { return flow.Error(err, "Failed to handle some finalizer.") } diff --git a/pkg/operator/v1/polardbx/steps/instance/gms/gms.go b/pkg/operator/v1/polardbx/steps/instance/gms/gms.go index bb436e1..32d17c3 100644 --- a/pkg/operator/v1/polardbx/steps/instance/gms/gms.go +++ b/pkg/operator/v1/polardbx/steps/instance/gms/gms.go @@ -248,7 +248,11 @@ func SyncDynamicConfigs(force bool) control.BindFunc { parameter := rc.MustGetPolarDBXParameterTemplate(polardbx.Spec.ParameterTemplate.Name) params := parameter.Spec.NodeType.CN.ParamList for _, param := range params { - targetDynamicConfigs[param.Name] = param.DefaultValue + // dynamic config priority higher than parameter template + _, exist := targetDynamicConfigs[param.Name] + if !exist { + targetDynamicConfigs[param.Name] = param.DefaultValue + } } } diff --git a/pkg/operator/v1/polardbx/steps/instance/object.go b/pkg/operator/v1/polardbx/steps/instance/object.go index b43a8ce..3c247dc 100644 --- a/pkg/operator/v1/polardbx/steps/instance/object.go +++ b/pkg/operator/v1/polardbx/steps/instance/object.go @@ -22,6 +22,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -551,7 +552,7 @@ var RemoveTrailingDNs = polardbxv1reconcile.NewStepBinder("RemoveTrailingDNs", ) func reconcileGroupedDeployments(rc *polardbxv1reconcile.Context, flow control.Flow, role string) (reconcile.Result, error) { - polardbxmeta.AssertRoleIn(role, polardbxmeta.RoleCN, polardbxmeta.RoleCDC) + polardbxmeta.AssertRoleIn(role, polardbxmeta.RoleCN, polardbxmeta.RoleCDC, polardbxmeta.RoleColumnar) flow = flow.WithLoggerValues("role", role) @@ -569,11 +570,13 @@ func reconcileGroupedDeployments(rc *polardbxv1reconcile.Context, flow control.F return flow.Pass() } deployments, err = factory.NewObjectFactory(rc).NewDeployments4CN() - } else { + } else if role == polardbxmeta.RoleCDC { if polardbx.Spec.Readonly { return flow.Pass() } deployments, err = factory.NewObjectFactory(rc).NewDeployments4CDC() + } else if role == polardbxmeta.RoleColumnar { + deployments, err = factory.NewObjectFactory(rc).NewDeployments4Columnar() } if err != nil { return flow.Error(err, "Unable to new deployments.") @@ -690,11 +693,51 @@ var CreateOrReconcileCDCs = polardbxv1reconcile.NewStepBinder("CreateOrReconcile }, ) +var CreateOrReconcileColumnars = polardbxv1reconcile.NewStepBinder("CreateOrReconcileColumnars", + func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { + return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleColumnar) + }, +) + func isXStoreReady(xstore *polardbxv1.XStore) bool { return xstore.Status.ObservedGeneration == xstore.Generation && xstore.Status.Phase == polardbxv1xstore.PhaseRunning } +var WaitUntilCNCDCPodsReady = polardbxv1reconcile.NewStepBinder("WaitUntilCNCDCPodsReady", + func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { + cnPods, err := rc.GetPods(polardbxmeta.RoleCN) + if err != nil { + return flow.Error(err, "Unable to get pods for CN") + } + + unready := k8shelper.FilterPodsBy(cnPods, func(pod *corev1.Pod) bool { + return !k8shelper.IsPodReady(pod) + }) + + if len(unready) > 0 { + return flow.Wait("Found unready cn pods, keep waiting...", "unready-pods", + strings.Join(k8shelper.ToObjectNames(unready), ",")) + } + + cdcPods, err := rc.GetPods(polardbxmeta.RoleCDC) + if err != nil { + return flow.Error(err, "Unable to get pods for CDC") + } + + unready = k8shelper.FilterPodsBy(cdcPods, func(pod *corev1.Pod) bool { + return !k8shelper.IsPodReady(pod) + }) + + if len(unready) > 0 { + return flow.Wait("Found unready cdc pods, keep waiting...", "unready-pods", + strings.Join(k8shelper.ToObjectNames(unready), ",")) + } + + return flow.Pass() + }, +) + var WaitUntilGMSReady = polardbxv1reconcile.NewStepBinder("WaitUntilGMSReady", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { gms, err := rc.GetGMS() @@ -822,7 +865,7 @@ var WaitUntilCDCPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilCDCPods cdcPods, err := rc.GetPods(polardbxmeta.RoleCDC) if err != nil { - return flow.Error(err, "Unable to get pods of CN.") + return flow.Error(err, "Unable to get pods of CDC.") } unFinalizedPodsSize := k8shelper.FilterPodsBy(cdcPods, func(pod *corev1.Pod) bool { @@ -855,6 +898,45 @@ var WaitUntilCDCDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUn }, ) +var WaitUntilColumnarPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilColumnarPodsStable", + func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { + polardbx := rc.MustGetPolarDBX() + + columnarPods, err := rc.GetPods(polardbxmeta.RoleColumnar) + if err != nil { + return flow.Error(err, "Unable to get pods of Columnar.") + } + + unFinalizedPodsSize := k8shelper.FilterPodsBy(columnarPods, func(pod *corev1.Pod) bool { + return len(pod.Finalizers) > 0 + }) + + columnarTemplate := polardbx.Status.SpecSnapshot.Topology.Nodes.Columnar + columnarReplicas := 0 + if columnarTemplate != nil { + columnarReplicas = int(columnarTemplate.Replicas) + } + if len(unFinalizedPodsSize) == columnarReplicas { + return flow.Pass() + } + return flow.Wait("Wait until some pod to be finalized.") + }, +) + +var WaitUntilColumnarDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilColumnarDeploymentsRolledOut", + func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { + columnarDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleColumnar) + if err != nil { + return flow.Error(err, "Unable to get deployments of Columnar.") + } + + if areDeploymentsRolledOut(columnarDeployments) { + return flow.Continue("Deployments of Columnar are rolled out.") + } + return flow.Wait("Some deployment of Columnar is rolling.") + }, +) + var TrySyncCnLabelToPodsDirectly = polardbxv1reconcile.NewStepBinder("TrySyncCnLabelToPodsDirectly", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { changedCount := 0 diff --git a/pkg/operator/v1/polardbx/steps/instance/pitr/pitr.go b/pkg/operator/v1/polardbx/steps/instance/pitr/pitr.go index d6ab508..2581743 100644 --- a/pkg/operator/v1/polardbx/steps/instance/pitr/pitr.go +++ b/pkg/operator/v1/polardbx/steps/instance/pitr/pitr.go @@ -11,6 +11,8 @@ import ( "github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/helper" polardbxmeta "github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/meta" polardbxv1reconcile "github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/reconcile" + "io" + corev1 "k8s.io/api/core/v1" "net/http" "time" @@ -91,6 +93,38 @@ var WaitPreparePitrBinlogs = polardbxv1reconcile.NewStepBinder("WaitPreparePitrB if err != nil { return flow.RetryErr(err, "failed to job and pod") } + if pod.Status.Phase != corev1.PodRunning { + return flow.Retry("wait for the pitr pod to be running") + } + port := k8shelper.MustGetPortFromContainer( + k8shelper.MustGetContainerFromPod(pod, ContainerName), + PortName, + ).ContainerPort + endpoint := fmt.Sprintf("http://%s:%d", pod.Status.PodIP, port) + lastErrUrl := endpoint + "/lastErr" + client := http.Client{ + Timeout: 1 * time.Second, + } + rep, err := client.Get(lastErrUrl) + if err != nil { + return flow.RetryErr(err, "failed to request pitr service ", "lastErrUrl", lastErrUrl) + } + if rep.StatusCode != http.StatusOK { + return flow.RetryErr(err, "failed to request pitr service ", "lastErrUrl", lastErrUrl, "response", rep) + } + defer rep.Body.Close() + if rep.ContentLength != 0 { + bodyContent, err := io.ReadAll(rep.Body) + if err != nil { + return flow.RetryErr(err, "failed to read body") + } + flow.Logger().Error(fmt.Errorf("lastErr %s", string(bodyContent)), "get lastErr from pitr service") + polardbxObj := rc.MustGetPolarDBX() + polardbxObj.Status.Phase = polarxv1polarx.PhaseFailed + rc.UpdatePolarDBXStatus() + return flow.Error(errors.New("changed to failed phase"), "") + } + if len(pod.Status.ContainerStatuses) > 0 { ready := true for _, containerStatus := range pod.Status.ContainerStatuses { @@ -98,12 +132,8 @@ var WaitPreparePitrBinlogs = polardbxv1reconcile.NewStepBinder("WaitPreparePitrB } if ready { polardbxObj := rc.MustGetPolarDBX() - port := k8shelper.MustGetPortFromContainer( - k8shelper.MustGetContainerFromPod(pod, ContainerName), - PortName, - ).ContainerPort polardbxObj.Status.PitrStatus = &polarxv1polarx.PitrStatus{ - PrepareJobEndpoint: fmt.Sprintf("http://%s:%d", pod.Status.PodIP, port), + PrepareJobEndpoint: endpoint, Job: job.Name, } return flow.Continue("The container is ready") diff --git a/pkg/operator/v1/xstore/steps/follower/job.go b/pkg/operator/v1/xstore/steps/follower/job.go index edd0806..84e34e1 100644 --- a/pkg/operator/v1/xstore/steps/follower/job.go +++ b/pkg/operator/v1/xstore/steps/follower/job.go @@ -74,9 +74,6 @@ var ( JobTaskRestoreMoveBack: JobArgMoveRestoreFunc, JobTaskInitLogger: JobArgInitLoggerFunc, } - BackupToolBinFilePaths = map[string]string{ - galaxy.Engine: GalaxyEngineBackupBinFilepath, - } BackupSetPrepareArgs = map[string]string{ galaxy.Engine: GalaxyEngineBackupSetPrepareArg, } @@ -126,7 +123,7 @@ func JobCommandInitLoggerFunc(ctx JobContext) []string { func JobArgBackupFunc(ctx JobContext) []string { return []string{ "-c", - "touch /tmp/rebuild.log && tail -f /tmp/rebuild.log & " + BackupToolBinFilePaths[ctx.engine] + " --defaults-file=/data/mysql/conf/my.cnf --backup " + BackupExtraArgs[ctx.engine] + " --user=root --socket='/data/mysql/run/mysql.sock' " + BackupStreamTypeArgs[ctx.engine] + " " + TargetDirArgs[ctx.engine] + "/tmp/backup 2>/tmp/rebuild.log " + + "touch /tmp/rebuild.log && tail -f /tmp/rebuild.log & " + " `/tools/xstore/current/venv/bin/python3 /tools/xstore/current/cli.py engine xtrabackup` --defaults-file=/data/mysql/conf/my.cnf --backup " + BackupExtraArgs[ctx.engine] + " --user=root --socket='/data/mysql/run/mysql.sock' " + BackupStreamTypeArgs[ctx.engine] + " " + TargetDirArgs[ctx.engine] + "/tmp/backup 2>/tmp/rebuild.log " + "| /tools/xstore/current/bin/polardbx-filestream-client " + BackupStreamTypeArgs[ctx.engine] + " --meta.action=uploadRemote " + fmt.Sprintf(" --meta.instanceId='%s' ", GetFileStreamInstanceId(ctx.otherPod)) + fmt.Sprintf(" --meta.filename='%s' ", FileStreamBackupFilename) + fmt.Sprintf(" --destNodeName='%s' ", ctx.otherPod.Spec.NodeName) + " --hostInfoFilePath=/tools/xstore/hdfs-nodes.json && /tools/xstore/current/venv/bin/python3 /tools/xstore/current/cli.py process check_std_err_complete --filepath=/tmp/rebuild.log ", } @@ -149,7 +146,7 @@ func JobArgMoveRestoreFunc(ctx JobContext) []string { backupDir := filepath.Join(FileStreamRootDir, GetFileStreamInstanceId(ctx.jobTargetPod), FileStreamBackupFilename) return []string{ "-c", - BackupToolBinFilePaths[ctx.engine] + " --defaults-file=/data/mysql/conf/my.cnf --force-non-empty-directories --move-back " + TargetDirArgs[ctx.engine] + backupDir, + " `/tools/xstore/current/venv/bin/python3 /tools/xstore/current/cli.py engine xtrabackup` --defaults-file=/data/mysql/conf/my.cnf --force-non-empty-directories --move-back " + TargetDirArgs[ctx.engine] + backupDir, } } @@ -157,7 +154,7 @@ func JobArgPrepareRestoreFunc(ctx JobContext) []string { backupDir := filepath.Join(FileStreamRootDir, GetFileStreamInstanceId(ctx.jobTargetPod), FileStreamBackupFilename) return []string{ "-c", - BackupToolBinFilePaths[ctx.engine] + fmt.Sprintf(" %s ", BackupSetPrepareArgs[ctx.engine]) + " --use-memory=1G " + TargetDirArgs[ctx.engine] + backupDir, + " `/tools/xstore/current/venv/bin/python3 /tools/xstore/current/cli.py engine xtrabackup` " + fmt.Sprintf(" %s ", BackupSetPrepareArgs[ctx.engine]) + " --use-memory=1G " + TargetDirArgs[ctx.engine] + backupDir, } } diff --git a/pkg/pitr/driver.go b/pkg/pitr/driver.go index cc3864e..4e74c91 100644 --- a/pkg/pitr/driver.go +++ b/pkg/pitr/driver.go @@ -2,10 +2,12 @@ package pitr import ( "encoding/json" + "errors" "fmt" "github.com/alibaba/polardbx-operator/pkg/util/defaults" "net/http" "os" + "runtime/debug" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sync" "sync/atomic" @@ -51,6 +53,7 @@ func Run() error { obj := recover() if obj != nil { pCtx.Logger.Info("panic", "obj", obj) + pCtx.LastErr = errors.New(fmt.Sprintf("panic %s", debug.Stack())) } }() steps := []Step{ diff --git a/pkg/pitr/workflow.go b/pkg/pitr/workflow.go index e1e1413..edc32ed 100644 --- a/pkg/pitr/workflow.go +++ b/pkg/pitr/workflow.go @@ -509,10 +509,17 @@ func FinishAndStartHttpServer(pCtx *Context) error { writer.WriteHeader(http.StatusNotFound) writer.Write([]byte("filename param is required")) } + onlyMeta := query.Get("only_meta") + for _, restoreBinlog := range pCtx.RestoreBinlogs { if restoreBinlog.XStoreName == xStore { for _, source := range restoreBinlog.ResultSources { if filename == source.getBinlogFilename() { + if strings.EqualFold(onlyMeta, "true") { + binlogSourceJsonBytes := []byte(MustMarshalJSON(source)) + writer.Write(binlogSourceJsonBytes) + return + } reader, err := source.OpenStream() if err != nil { pCtx.Logger.Error(err, "failed to open stream", "xstoreName", xStore, "filename", filename) diff --git a/pkg/webhook/polardbxbackup/binlog/webhook.go b/pkg/webhook/polardbxbackup/binlog/webhook.go new file mode 100644 index 0000000..a99a0ef --- /dev/null +++ b/pkg/webhook/polardbxbackup/binlog/webhook.go @@ -0,0 +1,26 @@ +package binlog + +import ( + "context" + polardbxv1 "github.com/alibaba/polardbx-operator/api/v1" + "github.com/alibaba/polardbx-operator/pkg/operator/v1/config" + "github.com/alibaba/polardbx-operator/pkg/webhook/extension" + "github.com/alibaba/polardbx-operator/pkg/webhook/polardbxbackup" + "k8s.io/apimachinery/pkg/runtime/schema" + ctrl "sigs.k8s.io/controller-runtime" +) + +func SetupWebhooks(ctx context.Context, mgr ctrl.Manager, apiPath string, configLoader func() config.Config) error { + gvk := schema.GroupVersionKind{ + Group: polardbxv1.GroupVersion.Group, + Version: polardbxv1.GroupVersion.Version, + Kind: "PolarDBXBackupBinlog", + } + + mgr.GetWebhookServer().Register(extension.GenerateValidatePath(apiPath, gvk), + extension.WithCustomValidator(&polardbxv1.PolarDBXBackupBinlog{}, + polardbxbackup.NewPolarDBXBackupValidator(mgr.GetAPIReader(), + mgr.GetLogger().WithName("webhook.validate.polardbxbackupbinlog"), + configLoader))) + return nil +} diff --git a/pkg/webhook/polardbxbackup/validator.go b/pkg/webhook/polardbxbackup/validator.go index f97b566..a6c5d3c 100644 --- a/pkg/webhook/polardbxbackup/validator.go +++ b/pkg/webhook/polardbxbackup/validator.go @@ -60,21 +60,27 @@ func (v *Validator) getFilestreamClient() (*filestream.FileClient, error) { } func (v *Validator) ValidateCreate(ctx context.Context, obj runtime.Object) error { - pxcBackup := obj.(*v1.PolarDBXBackup) + var storageProvider polardbx.BackupStorageProvider + if pxcBackup, ok := obj.(*v1.PolarDBXBackup); ok { + storageProvider = pxcBackup.Spec.StorageProvider + } + if pxcBinlogBackup, ok := obj.(*v1.PolarDBXBackupBinlog); ok { + storageProvider = pxcBinlogBackup.Spec.StorageProvider + } // validate storage configure - if pxcBackup.Spec.StorageProvider.StorageName == "" { + if storageProvider.StorageName == "" { return field.Required(field.NewPath("spec", "storageProvider", "storageName"), "storage name must be provided") } - if pxcBackup.Spec.StorageProvider.Sink == "" { + if storageProvider.Sink == "" { return field.Required(field.NewPath("spec", "storageProvider", "sink"), "sink must be provided") } - filestreamAction, err := polardbx.NewBackupStorageFilestreamAction(pxcBackup.Spec.StorageProvider.StorageName) + filestreamAction, err := polardbx.NewBackupStorageFilestreamAction(storageProvider.StorageName) if err != nil { return field.Invalid(field.NewPath("spec", "storageProvider", "storageName"), - pxcBackup.Spec.StorageProvider.StorageName, "unsupported storage") + storageProvider.StorageName, "unsupported storage") } // validate whether storage is available @@ -84,13 +90,13 @@ func (v *Validator) ValidateCreate(ctx context.Context, obj runtime.Object) erro } actionMetadata := filestream.ActionMetadata{ Action: filestreamAction.Upload, - Sink: pxcBackup.Spec.StorageProvider.Sink, + Sink: storageProvider.Sink, RequestId: uuid.New().String(), Filename: magicString, } sentBytes, err := fsClient.Upload(strings.NewReader(magicString), actionMetadata) if err != nil || sentBytes == 0 { - return field.Invalid(field.NewPath("spec", "storageProvider"), pxcBackup.Spec.StorageProvider, + return field.Invalid(field.NewPath("spec", "storageProvider"), storageProvider, "invalid storage, please check configuration of both backup and hpfs") } @@ -98,6 +104,9 @@ func (v *Validator) ValidateCreate(ctx context.Context, obj runtime.Object) erro } func (v *Validator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error { + if _, ok := oldObj.(*v1.PolarDBXBackup); !ok { + return nil + } oldBackup, newBackup := oldObj.(*v1.PolarDBXBackup), newObj.(*v1.PolarDBXBackup) if oldBackup.Name != newBackup.Name { return field.Forbidden(field.NewPath("metadata", "name"), "immutable field") diff --git a/pkg/webhook/polardbxcluster/validator.go b/pkg/webhook/polardbxcluster/validator.go index 34aecd2..cd0cf16 100644 --- a/pkg/webhook/polardbxcluster/validator.go +++ b/pkg/webhook/polardbxcluster/validator.go @@ -295,6 +295,8 @@ func (v *PolarDBXClusterV1Validator) validateTopologyRules(ctx context.Context, fieldPath.Child("components", "cn"), rules.Components.CN, validSelectors)...) errList = append(errList, v.validateStatelessTopologyRuleItems(ctx, fieldPath.Child("components", "cdc"), rules.Components.CDC, validSelectors)...) + errList = append(errList, v.validateStatelessTopologyRuleItems(ctx, + fieldPath.Child("components", "columnar"), rules.Components.Columnar, validSelectors)...) errList = append(errList, v.validateXStoreTopologyRule(ctx, fieldPath.Child("components", "gms"), rules.Components.GMS, validSelectors)...) errList = append(errList, v.validateXStoreTopologyRule(ctx, @@ -360,6 +362,18 @@ func (v *PolarDBXClusterV1Validator) validateCDCTemplate(ctx context.Context, fl return errList } +func (v *PolarDBXClusterV1Validator) validateColumnarTemplate(ctx context.Context, fldPath *field.Path, template *polardbxv1polardbx.ColumnarTemplate) field.ErrorList { + var errList field.ErrorList + + errList = append(errList, validateResources(ctx, fldPath.Child("resources"), &template.Resources)...) + + if err := validateImagePullPolicy(fldPath.Child("imagePullPolicy"), template.ImagePullPolicy); err != nil { + errList = append(errList, err) + } + + return errList +} + func (v *PolarDBXClusterV1Validator) validateTopologyNodes(ctx context.Context, nodes *polardbxv1polardbx.TopologyNodes) field.ErrorList { var errList field.ErrorList fldPath := field.NewPath("spec", "topology", "nodes") @@ -377,6 +391,11 @@ func (v *PolarDBXClusterV1Validator) validateTopologyNodes(ctx context.Context, fldPath.Child("cdc", "template"), &nodes.CDC.Template)...) } + if nodes.Columnar != nil { + errList = append(errList, v.validateColumnarTemplate(ctx, + fldPath.Child("columnar", "template"), + &nodes.Columnar.Template)...) + } return errList } @@ -497,6 +516,14 @@ func (v *PolarDBXClusterV1Validator) validateReplicas(ctx context.Context, topol field.NewPath("spec", "topology", "rules", "cdc"), int(cdcNodes.Replicas+cdcNodes.XReplicas))...) } + columnarRules := topology.Rules.Components.Columnar + columnarNodes := topology.Nodes.Columnar + // Skip if Columnar nodes is nil. + if columnarNodes != nil { + errList = append(errList, v.validateReplicasOnStatelessComponent(ctx, columnarRules, + field.NewPath("spec", "topology", "rules", "columnar"), int(columnarNodes.Replicas))...) + } + return errList } diff --git a/pkg/webhook/webhooks.go b/pkg/webhook/webhooks.go index d97df2f..2a22da8 100644 --- a/pkg/webhook/webhooks.go +++ b/pkg/webhook/webhooks.go @@ -20,6 +20,7 @@ import ( "context" "github.com/alibaba/polardbx-operator/pkg/operator/v1/config" "github.com/alibaba/polardbx-operator/pkg/webhook/polardbxbackup" + backupbinlog "github.com/alibaba/polardbx-operator/pkg/webhook/polardbxbackup/binlog" "net/http" ctrl "sigs.k8s.io/controller-runtime" @@ -57,5 +58,9 @@ func SetupWebhooks(ctx context.Context, mgr ctrl.Manager, configPath string, con return err } + if err := backupbinlog.SetupWebhooks(ctx, mgr, ApiPath, configLoader); err != nil { + return err + } + return nil } diff --git a/test/e2e/polardbxcluster/lifecycle/common.go b/test/e2e/polardbxcluster/lifecycle/common.go index 7103d08..8184278 100644 --- a/test/e2e/polardbxcluster/lifecycle/common.go +++ b/test/e2e/polardbxcluster/lifecycle/common.go @@ -48,6 +48,10 @@ func CreatePolarDBXClusterAndWaitUntilRunningOrFail(f *framework.Framework, pola err := f.Client.Create(f.Ctx, polardbxcluster) framework.ExpectNoError(err) + WaitPolarDBXClusterRunningOrFail(f, polardbxcluster, timeout) +} + +func WaitPolarDBXClusterRunningOrFail(f *framework.Framework, polardbxcluster *polardbxv1.PolarDBXCluster, timeout time.Duration) { // Wait until not in new / creating / restoring phases. obj, err := pxcframework.WaitUntilPolarDBXClusterToNotInPhases(f.Client, polardbxcluster.Name, polardbxcluster.Namespace, diff --git a/test/e2e/polardbxcluster/lifecycle/creation.go b/test/e2e/polardbxcluster/lifecycle/creation.go index 6551dd4..f414ba2 100644 --- a/test/e2e/polardbxcluster/lifecycle/creation.go +++ b/test/e2e/polardbxcluster/lifecycle/creation.go @@ -17,6 +17,7 @@ limitations under the License. package lifecycle import ( + polardbxv1 "github.com/alibaba/polardbx-operator/api/v1" "time" polardbxv1polardbx "github.com/alibaba/polardbx-operator/api/v1/polardbx" @@ -244,4 +245,45 @@ var _ = ginkgo.Describe("[PolarDBXCluster] [Lifecycle:Create]", func() { // Expect TLS enabled. pxcframework.NewExpectation(f, obj).ExpectSecurityTLSOk() }) + + ginkgo.It("readonly pxc should be created as expected", func() { + readOnlyName := "ro-test" + obj := pxcframework.NewPolarDBXCluster( + "e2e-test-readonly", + f.Namespace, + pxcframework.ProtocolVersion(5), + pxcframework.InitReadonly(1, readOnlyName, true), + pxcframework.TopologyModeGuide("quick-start-paxos"), + ) + + // Always run clean up to make sure objects are cleaned. + defer DeletePolarDBXClusterAndWaitUntilItDisappear(f, obj, 1*time.Minute) + + // Do create and verify. + CreatePolarDBXClusterAndWaitUntilRunningOrFail(f, obj, 10*time.Minute) + + // Fetch the readonly object + readonlyObj := &polardbxv1.PolarDBXCluster{} + framework.ExpectNoError(f.Client.Get(f.Ctx, types.NamespacedName{ + Name: obj.Name + "-" + readOnlyName, + Namespace: f.Namespace, + }, readonlyObj)) + + WaitPolarDBXClusterRunningOrFail(f, readonlyObj, 20*time.Minute) + + // Update object. + framework.ExpectNoError(f.Client.Get(f.Ctx, types.NamespacedName{ + Name: obj.Name, Namespace: f.Namespace, + }, obj)) + + // Update readonly object + framework.ExpectNoError(f.Client.Get(f.Ctx, types.NamespacedName{ + Name: obj.Name + "-" + readOnlyName, + Namespace: f.Namespace, + }, readonlyObj)) + + // Expect all ok in running. + pxcframework.NewExpectation(f, obj).ExpectAllOk(true) + pxcframework.NewExpectation(f, readonlyObj).ExpectAllOk(true) + }) }) diff --git a/test/e2e/polardbxcluster/lifecycle/upgrade.go b/test/e2e/polardbxcluster/lifecycle/upgrade.go index 78b557c..3a1a582 100644 --- a/test/e2e/polardbxcluster/lifecycle/upgrade.go +++ b/test/e2e/polardbxcluster/lifecycle/upgrade.go @@ -243,6 +243,62 @@ var _ = ginkgo.Describe("[PolarDBXCluster] [Lifecycle:Upgrade]", func() { pxcframework.NewExpectation(f, obj).ExpectCDCDeploymentsOk() }) + ginkgo.It("should columnar numbers 0 to 1 be as expected", func() { + resources := corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + } + obj := pxcframework.NewPolarDBXCluster( + "e2e-test-upgrade-columnar-number-0-1", + f.Namespace, + pxcframework.ProtocolVersion(5), + pxcframework.TopologyNode("cn", 2, "", "", false, resources), + pxcframework.TopologyNode("dn", 2, "", "", false, resources), + pxcframework.TopologyNode("cdc", 2, "", "", false, resources), + ) + + // Always run clean up to make sure objects are cleaned. + defer DeletePolarDBXClusterAndWaitUntilItDisappear(f, obj, 1*time.Minute) + + // Do create and verify. + CreatePolarDBXClusterAndWaitUntilRunningOrFail(f, obj, 10*time.Minute) + + // Update object. + framework.ExpectNoError(f.Client.Get(f.Ctx, types.NamespacedName{ + Name: obj.Name, Namespace: f.Namespace, + }, obj)) + + // Expect sub-resources (especially deployments and xstores ok) + exp := pxcframework.NewExpectation(f, obj) + exp.ExpectDeploymentsOk() + exp.ExpectXStoresOk() + + obj.Spec.Topology.Nodes.Columnar = &polardbxv1polardbx.TopologyNodeColumnar{ + Replicas: 1, + Template: polardbxv1polardbx.ColumnarTemplate{ + Image: "registry.cn-zhangjiakou.aliyuncs.com/drds_pre/polardbx-columnar:v1", + }, + } + err := f.Client.Update(f.Ctx, obj) + framework.ExpectNoError(err) + + framework.ExpectNoError(f.Client.Get(f.Ctx, types.NamespacedName{ + Name: obj.Name, Namespace: f.Namespace, + }, obj)) + + obj, err = pxcframework.WaitUntilPolarDBXClusterUpgradeCompleteOrFail(f.Client, obj.Name, obj.Namespace, 10*time.Minute) + framework.ExpectNoError(err) + pxcframework.ExpectBeInPhase(obj, polardbxv1polardbx.PhaseRunning) + + pxcframework.NewExpectation(f, obj).ExpectColumnarDeploymentsOk() + }) + ginkgo.It("should cn resource upgrade as expected", func() { resources := corev1.ResourceRequirements{ Limits: corev1.ResourceList{ diff --git a/test/framework/polardbxcluster/expect.go b/test/framework/polardbxcluster/expect.go index ee62211..4a4d6c4 100644 --- a/test/framework/polardbxcluster/expect.go +++ b/test/framework/polardbxcluster/expect.go @@ -565,9 +565,47 @@ func (e *Expectation) ExpectCDCDeploymentsOk() { }) } +func (e *Expectation) ExpectColumnarDeploymentsOk() { + labels := map[string]string{ + "polardbx/name": e.obj.Name, + "polardbx/role": "columnar", + } + ns := e.obj.Namespace + + var deploymentList appsv1.DeploymentList + framework.ExpectNoError(e.c.List(e.ctx, &deploymentList, client.InNamespace(ns), client.MatchingLabels(labels))) + deployments := deploymentList.Items + e.ExpectOwnerReferenceCorrect(common.GetObjectList(deployments)...) + + columnarNode := e.obj.Spec.Topology.Nodes.Columnar + if columnarNode == nil || columnarNode.Replicas == 0 { + gomega.Expect(deployments).To(gomega.BeEmpty(), "cdc not defined or replicas is 0, should be empty") + return + } + + gomega.Expect(deployments).NotTo(gomega.BeEmpty(), "columnar is defined, should not empty") + + nodeSelectors := e.obj.Spec.Topology.Rules.Selectors + cdcRules := e.obj.Spec.Topology.Rules.Components.CDC + replicas := columnarNode.Replicas + template := columnarNode.Template + + e.expectDeploymentsMatchesRulesAndReplicas(deployments, nodeSelectors, cdcRules, int(replicas), "cdc") + e.expectDeploymentsMatchesTemplate(deployments, func(deploy *appsv1.Deployment) { + podSpec := &deploy.Spec.Template.Spec + gomega.Expect(podSpec.HostNetwork).To(gomega.BeEquivalentTo(template.HostNetwork), "host network should be the same as template: "+deploy.Name) + engineContainer := k8shelper.GetContainerFromPodSpec(podSpec, "engine") + gomega.Expect(engineContainer.Resources).To(gomega.BeEquivalentTo(template.Resources), "resources of engine container should be the same as template: "+deploy.Name) + if template.Image != "" { + gomega.Expect(engineContainer.Image).To(gomega.BeEquivalentTo(template.Image), "image of engine container should be the same as template if specified: "+deploy.Name) + } + }) +} + func (e *Expectation) ExpectDeploymentsOk() { e.ExpectCNDeploymentsOk() e.ExpectCDCDeploymentsOk() + e.ExpectColumnarDeploymentsOk() } func (e *Expectation) ExpectOwnerReferenceCorrect(subResources ...client.Object) { @@ -703,23 +741,28 @@ func (e *Expectation) ExpectXStoresOk() { dnRule := e.obj.Spec.Topology.Rules.Components.DN dnNode := &e.obj.Spec.Topology.Nodes.DN dnReplicas := dnNode.Replicas + readOnly := e.obj.Spec.Readonly - if !shareGms { - gomega.Expect(xstoresByRole).To(gomega.HaveLen(2), "must have 2 roles of xstores") - gomega.Expect(xstoresByRole["gms"]).To(gomega.HaveLen(1), "1 gms") - - gmsStore := xstoresByRole["gms"][0] - gmsRule := e.obj.Spec.Topology.Rules.Components.GMS - if gmsRule == nil { - gmsRule = dnRule - } - gmsTemplate := e.obj.Spec.Topology.Nodes.GMS.Template - if gmsTemplate == nil { - gmsTemplate = &dnNode.Template - } - e.expectXStoreToMatch(gmsStore.(*polardbxv1.XStore), nodeSelectors, gmsRule, gmsTemplate) + if readOnly { + gomega.Expect(xstoresByRole).To(gomega.HaveLen(1), "must have 1 role of xstores for readonly pxc") } else { - gomega.Expect(xstoresByRole).To(gomega.HaveLen(1), "must have 1 roles of xstores in share gms mode") + if !shareGms { + gomega.Expect(xstoresByRole).To(gomega.HaveLen(2), "must have 2 roles of xstores") + gomega.Expect(xstoresByRole["gms"]).To(gomega.HaveLen(1), "1 gms") + + gmsStore := xstoresByRole["gms"][0] + gmsRule := e.obj.Spec.Topology.Rules.Components.GMS + if gmsRule == nil { + gmsRule = dnRule + } + gmsTemplate := e.obj.Spec.Topology.Nodes.GMS.Template + if gmsTemplate == nil { + gmsTemplate = &dnNode.Template + } + e.expectXStoreToMatch(gmsStore.(*polardbxv1.XStore), nodeSelectors, gmsRule, gmsTemplate) + } else { + gomega.Expect(xstoresByRole).To(gomega.HaveLen(1), "must have 1 roles of xstores in share gms mode") + } } dnStores := xstoresByRole["dn"] @@ -763,14 +806,24 @@ func (e *Expectation) ExpectPodsWithPaxosModeOk() { gomega.Expect(pods).NotTo(gomega.BeEmpty(), "no pods found") podsByRole := common.MapObjectsFromObjectListByLabel(pods, "polardbx/role") - gomega.Expect(podsByRole).To(gomega.HaveLen(4), "must be pods with 4 roles (when running)") + readOnly := e.obj.Spec.Readonly + if !readOnly { + gomega.Expect(podsByRole).To(gomega.HaveLen(4), "must be pods with 4 roles (when running)") - framework.ExpectHaveKeys(podsByRole, "cn", "dn", "gms", "cdc") + framework.ExpectHaveKeys(podsByRole, "cn", "dn", "gms", "cdc") - gomega.Expect(podsByRole["cn"]).To(gomega.HaveLen(1), "must be 1 cn pod") - gomega.Expect(podsByRole["cdc"]).To(gomega.HaveLen(1), "must be 1 cdc pod") - gomega.Expect(podsByRole["dn"]).To(gomega.HaveLen(3), "must be 3 dn pod") - gomega.Expect(podsByRole["gms"]).To(gomega.HaveLen(3), "must be 3 gms pod") + gomega.Expect(podsByRole["cn"]).To(gomega.HaveLen(1), "must be 1 cn pod") + gomega.Expect(podsByRole["cdc"]).To(gomega.HaveLen(1), "must be 1 cdc pod") + gomega.Expect(podsByRole["dn"]).To(gomega.HaveLen(3), "must be 3 dn pod") + gomega.Expect(podsByRole["gms"]).To(gomega.HaveLen(3), "must be 3 gms pod") + } else { + gomega.Expect(podsByRole).To(gomega.HaveLen(2), "must be pods with 2 roles for readonly pxc (when running)") + + framework.ExpectHaveKeys(podsByRole, "cn", "dn") + + gomega.Expect(podsByRole["cn"]).To(gomega.HaveLen(1), "must be 1 cn pod") + gomega.Expect(podsByRole["dn"]).To(gomega.HaveLen(1), "must be 1 dn pod") + } } func (e *Expectation) ExpectSubResourcesOk(withPaxosMode bool) { diff --git a/test/framework/polardbxcluster/factory.go b/test/framework/polardbxcluster/factory.go index 704aafb..c6f8862 100644 --- a/test/framework/polardbxcluster/factory.go +++ b/test/framework/polardbxcluster/factory.go @@ -114,6 +114,17 @@ func TopologyNode(role string, replicas int, engine, image string, hostNetwork b }, } } + case polardbxmeta.RoleColumnar: + return func(polardbxcluster *polardbxv1.PolarDBXCluster) { + polardbxcluster.Spec.Topology.Nodes.Columnar = &polardbxv1polardbx.TopologyNodeColumnar{ + Replicas: int32(replicas), + Template: polardbxv1polardbx.ColumnarTemplate{ + Image: image, + HostNetwork: hostNetwork, + Resources: *resources.DeepCopy(), + }, + } + } case polardbxmeta.RoleGMS: return func(polardbxcluster *polardbxv1.PolarDBXCluster) { polardbxcluster.Spec.Topology.Nodes.GMS.Template = &polardbxv1polardbx.XStoreTemplate{ @@ -148,6 +159,22 @@ func ParameterTemplate(templateName string) FactoryOption { } } +func InitReadonly(cnReplicas int, name string, attendHtap bool) FactoryOption { + return func(polardbxcluster *polardbxv1.PolarDBXCluster) { + var extraParams = make(map[string]intstr.IntOrString) + if attendHtap { + extraParams["AttendHtap"] = intstr.FromString("true") + } + polardbxcluster.Spec.InitReadonly = []*polardbxv1polardbx.ReadonlyParam{ + { + CnReplicas: cnReplicas, + Name: name, + ExtraParams: extraParams, + }, + } + } +} + func NewPolarDBXCluster(name, namespace string, opts ...FactoryOption) *polardbxv1.PolarDBXCluster { obj := &polardbxv1.PolarDBXCluster{ ObjectMeta: metav1.ObjectMeta{ diff --git a/tools/xstore/cli/engine.py b/tools/xstore/cli/engine.py index db9c2bd..ed1e61c 100644 --- a/tools/xstore/cli/engine.py +++ b/tools/xstore/cli/engine.py @@ -88,3 +88,13 @@ def shutdown(): engine_group.add_command(shutdown) + + +@click.command(name="xtrabackup") +def xtrabackup_home(): + xtrabackup = global_mgr.engine().context.xtrabackup + print(xtrabackup) + return + + +engine_group.add_command(xtrabackup_home) diff --git a/tools/xstore/cli/restore.py b/tools/xstore/cli/restore.py index bd337c0..ba406fe 100644 --- a/tools/xstore/cli/restore.py +++ b/tools/xstore/cli/restore.py @@ -81,7 +81,7 @@ def start(restore_context): apply_backup_file(context, logger) mysql_bin_list = download_binlogbackup_file(binlog_dir_path, filestream_client, logger) if len( - pitr_endpoint) == 0 else download_pitr_binloglist(pitr_endpoint, pitr_xstore, logger) + pitr_endpoint) == 0 else download_pitr_binloglist(context, pitr_endpoint, pitr_xstore, logger) copy_binlog_to_new_path(mysql_bin_list, context, logger) @@ -150,7 +150,7 @@ def download_binlogbackup_file(binlog_dir_path, filestream_client, logger): return mysql_binlog_list -def download_pitr_binloglist(pitrEndpoint, xstore, logger): +def download_pitr_binloglist(context, pitrEndpoint, xstore, logger): binlogListUrl = "/".join([pitrEndpoint, "binlogs"]) + ("?xstore=%s" % xstore) response = requests.get(binlogListUrl) mysql_binlog_list = [] @@ -162,9 +162,23 @@ def download_pitr_binloglist(pitrEndpoint, xstore, logger): else: raise Exception("failed to get binlogs url = %s" % binlogListUrl) for binlog in mysql_binlog_list: - downloadUrl = "/".join([pitrEndpoint, "download", "binlog"]) + ("?xstore=%s" % xstore) + "&" + ( + downloadUrl = "/".join([pitrEndpoint, "download", "binlog"]) + ("?xstore=%s&only_meta=true" % xstore) + "&" + ( "filename=%s" % binlog) - wget.download(downloadUrl, os.path.join(RESTORE_TEMP_DIR, binlog)) + response = requests.get(downloadUrl) + if response.status_code == 200: + binlog_datasource = response.content.decode("utf-8") + cmd = " ".join( + [os.path.join("/tools/xstore/current/bin", "polardbx-job"), "-job-type=PitrDownloadFile", + "-output=" + os.path.join(RESTORE_TEMP_DIR, binlog), "-binlog-source='%s'" % binlog_datasource]) + logger.info("binlog_datasource %s" % response.content) + p = subprocess.Popen(cmd, shell=True, stdout=sys.stdout) + p.wait() + + if p.returncode > 0: + raise Exception("failed to get download binlog url = %s " % downloadUrl) + # logger.info("apply backup") + else: + raise Exception("failed to get download binlog url = %s" % downloadUrl) return mysql_binlog_list diff --git a/tools/xstore/core/config/mysql.py b/tools/xstore/core/config/mysql.py index e68cf2f..6c33090 100644 --- a/tools/xstore/core/config/mysql.py +++ b/tools/xstore/core/config/mysql.py @@ -138,6 +138,16 @@ def _override_configs(cls, target: configparser.ConfigParser, overrides: [config not target.has_section(section): target.add_section(section) for opt, value in proxy.items(): + if opt.startswith("loose_"): + actual_opt = opt[6:] + if target.has_option(section, actual_opt): + target.set(section, actual_opt, value) + continue + else: + loose_opt = "loose_" + opt + if target.has_option(section, loose_opt): + target.set(section, loose_opt, value) + continue target.set(section, opt, value) def update(self, template_file: None or AnyStr, overrides: None or [Any], create_new: bool = True): diff --git a/tools/xstore/core/context/context.py b/tools/xstore/core/context/context.py index 33fd380..2c9da39 100644 --- a/tools/xstore/core/context/context.py +++ b/tools/xstore/core/context/context.py @@ -43,7 +43,7 @@ def __init__(self): if self.is_galaxy80(): # galaxy related paths self.engine_home = self._env.get('ENGINE_HOME', '/opt/galaxy_engine') - self.xtrabackup_home = self._env.get('XTRABACKUP_HOME', '/tools/xstore/current/xcluster_xtrabackup80/bin') + self.xtrabackup_home = self._env.get('XTRABACKUP_HOME', self.get_galaxy_xtrabackup_home()) self.xtrabackup = os.path.join(self.xtrabackup_home, "xtrabackup") else: self.engine_home = self._env.get('ENGINE_HOME', '/u01/xcluster_current') @@ -53,6 +53,7 @@ def __init__(self): self._tools_home = self._env.get('TOOL_HOME', '/tools/xstore/current') self.mysqlbinlogtailor = os.path.join(self.xtrabackup_home, "mysqlbinlogtailor") self.bb_home = os.path.join(self._tools_home, "bb") + self.tools_bin_home = os.path.join(self._tools_home, "bin") self._filestream_client_home = os.path.join(self._tools_home, "bin/polardbx-filestream-client") self._hostinfo_path = "/tools/xstore/hdfs-nodes.json" @@ -81,7 +82,8 @@ def __init__(self): if self.is_galaxy80(): self.mycnf_template_path = os.path.join(self._tools_home, 'core/engine/galaxy/templates', 'my.cnf') elif self.is_xcluster57(): - self.mycnf_template_path = os.path.join(self._tools_home, 'core/engine/xcluster/templates', 'xcluster.57.cnf') + self.mycnf_template_path = os.path.join(self._tools_home, 'core/engine/xcluster/templates', + 'xcluster.57.cnf') # Set ports. self._access_port = int(self._env.get(convention.ENV_PORT_ACCESS, 3306)) @@ -438,3 +440,14 @@ def host_info(self): :return: path of host info file """ return self._hostinfo_path + + def get_galaxy_xtrabackup_home(self) -> str: + res = os.popen("mysqld -V") + lines = res.readlines() + if res.close() is None and len(lines) > 1: + version = lines[1].strip().split()[-1] + version0 = int(version.split(".")[0]) + if version0 > 1: + return "/tools/xstore/current/xtrabackup/8.0-2/xcluster_xtrabackup80/bin" + return "/tools/xstore/current/xcluster_xtrabackup80/bin" + raise Exception("failed to get xtrabackup home by `mysqld -V`")