Skip to content

Commit

Permalink
Fixed concurrency bug when etcd-druid runs more than one worker
Browse files Browse the repository at this point in the history
  • Loading branch information
I308301 committed Mar 20, 2020
1 parent f6ad156 commit d727a2d
Showing 1 changed file with 47 additions and 52 deletions.
99 changes: 47 additions & 52 deletions controllers/etcd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ const (
// EtcdReconciler reconciles a Etcd object
type EtcdReconciler struct {
client.Client
Scheme *runtime.Scheme
chartApplier kubernetes.ChartApplier
Config *rest.Config
ChartApplier kubernetes.ChartApplier
RenderedChart *chartrenderer.RenderedChart
ImageVector imagevector.ImageVector
Scheme *runtime.Scheme
chartApplier kubernetes.ChartApplier
Config *rest.Config
ChartApplier kubernetes.ChartApplier
ImageVector imagevector.ImageVector
}

// NewReconcilerWithImageVector creates a new EtcdReconciler object with an image vector
Expand Down Expand Up @@ -180,7 +179,7 @@ func (r *EtcdReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if !reflect.DeepEqual(etcd.Spec, etcdCopy.Spec) {
etcdCopy.Spec = etcd.Spec
}
logger.Infof("Reconciling etcd: %s", etcd.GetName())
logger.Infof("Reconciling etcd: %s/%s", etcd.GetNamespace(), etcd.GetName())
if !etcdCopy.DeletionTimestamp.IsZero() {
logger.Infof("Deletion timestamp set for etcd: %s", etcd.GetName())
if err := r.removeFinalizersToDependantSecrets(etcdCopy); err != nil {
Expand Down Expand Up @@ -274,7 +273,7 @@ func (r *EtcdReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}, nil
}

func (r *EtcdReconciler) reconcileServices(etcd *druidv1alpha1.Etcd) (*corev1.Service, error) {
func (r *EtcdReconciler) reconcileServices(etcd *druidv1alpha1.Etcd, renderedChart *chartrenderer.RenderedChart) (*corev1.Service, error) {
logger.Infof("Reconciling etcd services for etcd:%s in namespace:%s", etcd.Name, etcd.Namespace)

selector, err := metav1.LabelSelectorAsSelector(etcd.Spec.Selector)
Expand All @@ -291,9 +290,6 @@ func (r *EtcdReconciler) reconcileServices(etcd *druidv1alpha1.Etcd) (*corev1.Se
logger.Error(err, "Error listing services")
return nil, err
}
for _, s := range services.Items {
logger.Infof("Services: %s", s.Name)
}

// NOTE: filteredStatefulSets are pointing to deepcopies of the cache, but this could change in the future.
// Ref: https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.2/pkg/cache/internal/cache_reader.go#L74
Expand Down Expand Up @@ -323,16 +319,17 @@ func (r *EtcdReconciler) reconcileServices(etcd *druidv1alpha1.Etcd) (*corev1.Se
return nil, err
}

// Statefulset is claimed by for this etcd. Just sync the specs
if service, err = r.syncServiceSpec(service, etcd); err != nil {
// Service is claimed by for this etcd. Just sync the specs
if service, err = r.syncServiceSpec(service, etcd, renderedChart); err != nil {
return nil, err
}

return service, err
}

// Required Service doesn't exist. Create new

ss, err := r.getServiceFromEtcd(etcd)
ss, err := r.getServiceFromEtcd(etcd, renderedChart)
if err != nil {
return nil, err
}
Expand All @@ -359,8 +356,8 @@ func (r *EtcdReconciler) reconcileServices(etcd *druidv1alpha1.Etcd) (*corev1.Se
return ss.DeepCopy(), err
}

func (r *EtcdReconciler) syncServiceSpec(ss *corev1.Service, etcd *druidv1alpha1.Etcd) (*corev1.Service, error) {
decoded, err := r.getServiceFromEtcd(etcd)
func (r *EtcdReconciler) syncServiceSpec(ss *corev1.Service, etcd *druidv1alpha1.Etcd, renderedChart *chartrenderer.RenderedChart) (*corev1.Service, error) {
decoded, err := r.getServiceFromEtcd(etcd, renderedChart)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -389,23 +386,23 @@ func (r *EtcdReconciler) syncServiceSpec(ss *corev1.Service, etcd *druidv1alpha1
return ssCopy, err
}

func (r *EtcdReconciler) getServiceFromEtcd(etcd *druidv1alpha1.Etcd) (*corev1.Service, error) {
func (r *EtcdReconciler) getServiceFromEtcd(etcd *druidv1alpha1.Etcd, renderedChart *chartrenderer.RenderedChart) (*corev1.Service, error) {
var err error
decoded := &corev1.Service{}
servicePath := getChartPathForService()
if _, ok := r.RenderedChart.Files()[servicePath]; !ok {
if _, ok := renderedChart.Files()[servicePath]; !ok {
return nil, fmt.Errorf("missing service template file in the charts: %v", servicePath)
}

decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(r.RenderedChart.Files()[servicePath])), 1024)
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(renderedChart.Files()[servicePath])), 1024)

if err = decoder.Decode(&decoded); err != nil {
return nil, err
}
return decoded, nil
}

func (r *EtcdReconciler) reconcileConfigMaps(etcd *druidv1alpha1.Etcd) (*corev1.ConfigMap, error) {
func (r *EtcdReconciler) reconcileConfigMaps(etcd *druidv1alpha1.Etcd, renderedChart *chartrenderer.RenderedChart) (*corev1.ConfigMap, error) {
logger.Infof("Reconciling etcd configmap for etcd:%s in namespace:%s", etcd.Name, etcd.Namespace)

selector, err := metav1.LabelSelectorAsSelector(etcd.Spec.Selector)
Expand All @@ -423,8 +420,6 @@ func (r *EtcdReconciler) reconcileConfigMaps(etcd *druidv1alpha1.Etcd) (*corev1.
return nil, err
}

logger.Infof("Configmaps: %d", len(cms.Items))

// NOTE: filteredStatefulSets are pointing to deepcopies of the cache, but this could change in the future.
// Ref: https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.2/pkg/cache/internal/cache_reader.go#L74
// if you need to modify them, you need to copy it first.
Expand All @@ -451,16 +446,18 @@ func (r *EtcdReconciler) reconcileConfigMaps(etcd *druidv1alpha1.Etcd) (*corev1.
if err != nil {
return nil, err
}

// ConfigMap is claimed by for this etcd. Just sync the data
if cm, err = r.syncConfigMapData(cm, etcd); err != nil {
if cm, err = r.syncConfigMapData(cm, etcd, renderedChart); err != nil {
return nil, err
}

return cm, err
}

// Required Configmap doesn't exist. Create new

cm, err := r.getConfigMapFromEtcd(etcd)
cm, err := r.getConfigMapFromEtcd(etcd, renderedChart)
if err != nil {
return nil, err
}
Expand All @@ -487,8 +484,8 @@ func (r *EtcdReconciler) reconcileConfigMaps(etcd *druidv1alpha1.Etcd) (*corev1.
return cm.DeepCopy(), err
}

func (r *EtcdReconciler) syncConfigMapData(cm *corev1.ConfigMap, etcd *druidv1alpha1.Etcd) (*corev1.ConfigMap, error) {
decoded, err := r.getConfigMapFromEtcd(etcd)
func (r *EtcdReconciler) syncConfigMapData(cm *corev1.ConfigMap, etcd *druidv1alpha1.Etcd, renderedChart *chartrenderer.RenderedChart) (*corev1.ConfigMap, error) {
decoded, err := r.getConfigMapFromEtcd(etcd, renderedChart)
if err != nil {
return nil, err
}
Expand All @@ -515,23 +512,25 @@ func (r *EtcdReconciler) syncConfigMapData(cm *corev1.ConfigMap, etcd *druidv1al
return cmCopy, err
}

func (r *EtcdReconciler) getConfigMapFromEtcd(etcd *druidv1alpha1.Etcd) (*corev1.ConfigMap, error) {
func (r *EtcdReconciler) getConfigMapFromEtcd(etcd *druidv1alpha1.Etcd, renderedChart *chartrenderer.RenderedChart) (*corev1.ConfigMap, error) {
var err error
decoded := &corev1.ConfigMap{}
configMapPath := getChartPathForConfigMap()
if _, ok := r.RenderedChart.Files()[configMapPath]; !ok {

if _, ok := renderedChart.Files()[configMapPath]; !ok {
return nil, fmt.Errorf("missing configmap template file in the charts: %v", configMapPath)
}

//logger.Infof("%v: %v", statefulsetPath, renderer.Files()[statefulsetPath])
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(r.RenderedChart.Files()[configMapPath])), 1024)
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(renderedChart.Files()[configMapPath])), 1024)

if err = decoder.Decode(&decoded); err != nil {
return nil, err
}
return decoded, nil
}

func (r *EtcdReconciler) reconcileStatefulSet(cm *corev1.ConfigMap, svc *corev1.Service, etcd *druidv1alpha1.Etcd) (*appsv1.StatefulSet, error) {
func (r *EtcdReconciler) reconcileStatefulSet(cm *corev1.ConfigMap, svc *corev1.Service, etcd *druidv1alpha1.Etcd, values map[string]interface{}) (*appsv1.StatefulSet, error) {
logger.Infof("Reconciling etcd statefulset for etcd:%s in namespace:%s", etcd.Name, etcd.Namespace)
selector, err := metav1.LabelSelectorAsSelector(etcd.Spec.Selector)
if err != nil {
Expand Down Expand Up @@ -574,7 +573,7 @@ func (r *EtcdReconciler) reconcileStatefulSet(cm *corev1.ConfigMap, svc *corev1.
return nil, err
}
// Statefulset is claimed by for this etcd. Just sync the specs
if ss, err = r.syncStatefulSetSpec(ss, cm, svc, etcd); err != nil {
if ss, err = r.syncStatefulSetSpec(ss, cm, svc, etcd, values); err != nil {
return nil, err
}

Expand Down Expand Up @@ -602,7 +601,7 @@ func (r *EtcdReconciler) reconcileStatefulSet(cm *corev1.ConfigMap, svc *corev1.
}

// Required statefulset doesn't exist. Create new
ss, err := r.getStatefulSetFromEtcd(etcd, cm, svc)
ss, err := r.getStatefulSetFromEtcd(etcd, cm, svc, values)
if err != nil {
return nil, err
}
Expand All @@ -624,7 +623,6 @@ func (r *EtcdReconciler) reconcileStatefulSet(cm *corev1.ConfigMap, svc *corev1.
if err := controllerutil.SetControllerReference(etcd, ss, r.Scheme); err != nil {
return nil, err
}
logger.Info("Deployed etcd statefulset.")
return ss.DeepCopy(), err
}

Expand All @@ -636,8 +634,8 @@ func getContainerMapFromPodTemplateSpec(spec v1.PodSpec) map[string]v1.Container
return containers
}

func (r *EtcdReconciler) syncStatefulSetSpec(ss *appsv1.StatefulSet, cm *corev1.ConfigMap, svc *corev1.Service, etcd *druidv1alpha1.Etcd) (*appsv1.StatefulSet, error) {
decoded, err := r.getStatefulSetFromEtcd(etcd, cm, svc)
func (r *EtcdReconciler) syncStatefulSetSpec(ss *appsv1.StatefulSet, cm *corev1.ConfigMap, svc *corev1.Service, etcd *druidv1alpha1.Etcd, values map[string]interface{}) (*appsv1.StatefulSet, error) {
decoded, err := r.getStatefulSetFromEtcd(etcd, cm, svc, values)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -678,15 +676,20 @@ func (r *EtcdReconciler) syncStatefulSetSpec(ss *appsv1.StatefulSet, cm *corev1.
return ssCopy, err
}

func (r *EtcdReconciler) getStatefulSetFromEtcd(etcd *druidv1alpha1.Etcd, cm *corev1.ConfigMap, svc *corev1.Service) (*appsv1.StatefulSet, error) {
func (r *EtcdReconciler) getStatefulSetFromEtcd(etcd *druidv1alpha1.Etcd, cm *corev1.ConfigMap, svc *corev1.Service, values map[string]interface{}) (*appsv1.StatefulSet, error) {
var err error
decoded := &appsv1.StatefulSet{}
statefulSetPath := getChartPathForStatefulSet()
if _, ok := r.RenderedChart.Files()[statefulSetPath]; !ok {
chartPath := getChartPath()
renderedChart, err := r.ChartApplier.Render(chartPath, etcd.Name, etcd.Namespace, values)
if err != nil {
return nil, err
}
if _, ok := renderedChart.Files()[statefulSetPath]; !ok {
return nil, fmt.Errorf("missing configmap template file in the charts: %v", statefulSetPath)
}
//logger.Infof("%v: %v", statefulsetPath, renderer.Files()[statefulsetPath])
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(r.RenderedChart.Files()[statefulSetPath])), 1024)
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(renderedChart.Files()[statefulSetPath])), 1024)

if err = decoder.Decode(&decoded); err != nil {
return nil, err
Expand All @@ -700,37 +703,29 @@ func (r *EtcdReconciler) reconcileEtcd(etcd *druidv1alpha1.Etcd) (*corev1.Servic
if err != nil {
return nil, nil, err
}

chartPath := getChartPath()
renderer, err := r.ChartApplier.Render(chartPath, etcd.Name, etcd.Namespace, values)
renderedChart, err := r.ChartApplier.Render(chartPath, etcd.Name, etcd.Namespace, values)
if err != nil {
return nil, nil, err
}
r.RenderedChart = renderer

svc, err := r.reconcileServices(etcd)
svc, err := r.reconcileServices(etcd, renderedChart)
if err != nil {
return nil, nil, err
}
if svc != nil {
values["serviceName"] = svc.Name
}
cm, err := r.reconcileConfigMaps(etcd)

cm, err := r.reconcileConfigMaps(etcd, renderedChart)
if err != nil {
return nil, nil, err
}

if cm != nil {
values["configMapName"] = cm.Name
}
//Re-render the chart with updated service and configmap name.
renderer, err = r.ChartApplier.Render(chartPath, etcd.Name, etcd.Namespace, values)
if err != nil {
return nil, nil, err
}

r.RenderedChart = renderer

ss, err := r.reconcileStatefulSet(cm, svc, etcd)
ss, err := r.reconcileStatefulSet(cm, svc, etcd, values)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit d727a2d

Please sign in to comment.