From 53aed7956d4a14d154862c2606386c61d6f074d3 Mon Sep 17 00:00:00 2001 From: David VIEJO Date: Sat, 28 Sep 2024 13:41:58 +0200 Subject: [PATCH] Refactor main channel Signed-off-by: David VIEJO --- .../mainchannel/mainchannel_controller.go | 1198 +++++++---------- 1 file changed, 466 insertions(+), 732 deletions(-) diff --git a/controllers/mainchannel/mainchannel_controller.go b/controllers/mainchannel/mainchannel_controller.go index 6e5322b5..a592ffa0 100644 --- a/controllers/mainchannel/mainchannel_controller.go +++ b/controllers/mainchannel/mainchannel_controller.go @@ -6,7 +6,6 @@ import ( "crypto/tls" "crypto/x509" "crypto/x509/pkix" - "encoding/json" "fmt" "io/ioutil" "net" @@ -49,6 +48,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -95,339 +95,309 @@ func (r *FabricMainChannelReconciler) addFinalizer(reqLogger logr.Logger, m *hlf func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLogger := r.Log.WithValues("hlf", req.NamespacedName) fabricMainChannel := &hlfv1alpha1.FabricMainChannel{} + + if err := r.handleInitialSetup(ctx, req, fabricMainChannel, reqLogger); err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + + clientSet, hlfClientSet, err := r.getClientSets() + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + + sdk, err := r.setupSDK(fabricMainChannel, clientSet, hlfClientSet) + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + defer sdk.Close() + + resClient, _, err := r.setupResClient(sdk, fabricMainChannel, clientSet) + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + + resmgmtOptions := r.setupResmgmtOptions(fabricMainChannel) + + blockBytes, err := r.fetchConfigBlock(resClient, fabricMainChannel, resmgmtOptions) + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + + if err := r.joinOrderers(ctx, fabricMainChannel, clientSet, hlfClientSet, blockBytes); err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + + if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions, blockBytes, sdk, clientSet); err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + time.Sleep(3 * time.Second) + if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions); err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, err) + } + + return r.finalizeReconcile(ctx, fabricMainChannel) +} + +func (r *FabricMainChannelReconciler) handleInitialSetup(ctx context.Context, req ctrl.Request, fabricMainChannel *hlfv1alpha1.FabricMainChannel, reqLogger logr.Logger) error { err := r.Get(ctx, req.NamespacedName, fabricMainChannel) if err != nil { - log.Debugf("Error getting the object %s error=%v", req.NamespacedName, err) if apierrors.IsNotFound(err) { reqLogger.Info("MainChannel resource not found. Ignoring since object must be deleted.") - return ctrl.Result{}, nil + return nil } reqLogger.Error(err, "Failed to get MainChannel.") - return ctrl.Result{}, err + return err } - markedToBeDeleted := fabricMainChannel.GetDeletionTimestamp() != nil - if markedToBeDeleted { - if utils.Contains(fabricMainChannel.GetFinalizers(), mainChannelFinalizer) { - if err := r.finalizeMainChannel(reqLogger, fabricMainChannel); err != nil { - return ctrl.Result{}, err - } - controllerutil.RemoveFinalizer(fabricMainChannel, mainChannelFinalizer) - err := r.Update(ctx, fabricMainChannel) - if err != nil { - return ctrl.Result{}, err - } - } - return ctrl.Result{}, nil + + if fabricMainChannel.GetDeletionTimestamp() != nil { + return r.handleDeletion(reqLogger, fabricMainChannel) } + if !utils.Contains(fabricMainChannel.GetFinalizers(), mainChannelFinalizer) { - if err := r.addFinalizer(reqLogger, fabricMainChannel); err != nil { - return ctrl.Result{}, err - } + return r.addFinalizer(reqLogger, fabricMainChannel) } + + return nil +} + +func (r *FabricMainChannelReconciler) getClientSets() (*kubernetes.Clientset, *operatorv1.Clientset, error) { clientSet, err := utils.GetClientKubeWithConf(r.Config) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, nil, err } + hlfClientSet, err := operatorv1.NewForConfig(r.Config) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, nil, err } + + return clientSet, hlfClientSet, nil +} + +func (r *FabricMainChannelReconciler) setupSDK(fabricMainChannel *hlfv1alpha1.FabricMainChannel, clientSet *kubernetes.Clientset, hlfClientSet *operatorv1.Clientset) (*fabsdk.FabricSDK, error) { ncResponse, err := nc.GenerateNetworkConfig(fabricMainChannel, clientSet, hlfClientSet, "") if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "failed to generate network config"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, errors.Wrap(err, "failed to generate network config") } - log.Infof("Generated network config: %s", ncResponse.NetworkConfig) + configBackend := config.FromRaw([]byte(ncResponse.NetworkConfig), "yaml") sdk, err := fabsdk.New(configBackend) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } - defer sdk.Close() + + return sdk, nil +} + +func (r *FabricMainChannelReconciler) setupResClient(sdk *fabsdk.FabricSDK, fabricMainChannel *hlfv1alpha1.FabricMainChannel, clientSet *kubernetes.Clientset) (*resmgmt.Client, msp.SigningIdentity, error) { firstAdminOrgMSPID := fabricMainChannel.Spec.AdminOrdererOrganizations[0].MSPID idConfig, ok := fabricMainChannel.Spec.Identities[fmt.Sprintf("%s-sign", firstAdminOrgMSPID)] if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("identity not found for MSPID %s", firstAdminOrgMSPID), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + // If -sign identity is not found, try with raw MSPID + idConfig, ok = fabricMainChannel.Spec.Identities[firstAdminOrgMSPID] + if !ok { + return nil, nil, fmt.Errorf("identity not found for MSPID %s or %s-sign", firstAdminOrgMSPID, firstAdminOrgMSPID) + } } - secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(ctx, idConfig.SecretName, v1.GetOptions{}) + + secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(context.Background(), idConfig.SecretName, v1.GetOptions{}) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, nil, err } + secretData, ok := secret.Data[idConfig.SecretKey] if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("secret key %s not found", idConfig.SecretKey), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, nil, fmt.Errorf("secret key %s not found", idConfig.SecretKey) } + id := &identity{} err = yaml.Unmarshal(secretData, id) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, nil, err } + + signingIdentity, err := r.createSigningIdentity(sdk, firstAdminOrgMSPID, id) + if err != nil { + return nil, nil, err + } + + sdkContext := sdk.Context( + fabsdk.WithIdentity(signingIdentity), + fabsdk.WithOrg(firstAdminOrgMSPID), + ) + + resClient, err := resmgmt.New(sdkContext) + if err != nil { + return nil, nil, err + } + + return resClient, signingIdentity, nil +} + +func (r *FabricMainChannelReconciler) handleDeletion(reqLogger logr.Logger, fabricMainChannel *hlfv1alpha1.FabricMainChannel) error { + if utils.Contains(fabricMainChannel.GetFinalizers(), mainChannelFinalizer) { + if err := r.finalizeMainChannel(reqLogger, fabricMainChannel); err != nil { + return err + } + controllerutil.RemoveFinalizer(fabricMainChannel, mainChannelFinalizer) + err := r.Update(context.Background(), fabricMainChannel) + if err != nil { + return err + } + } + return nil +} + +func (r *FabricMainChannelReconciler) createSigningIdentity(sdk *fabsdk.FabricSDK, mspID string, id *identity) (msp.SigningIdentity, error) { sdkConfig, err := sdk.Config() if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } cryptoConfig := cryptosuite.ConfigFromBackend(sdkConfig) cryptoSuite, err := sw.GetSuiteByConfig(cryptoConfig) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } userStore := mspimpl.NewMemoryUserStore() endpointConfig, err := fab.ConfigFromBackend(sdkConfig) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } - identityManager, err := mspimpl.NewIdentityManager(firstAdminOrgMSPID, userStore, cryptoSuite, endpointConfig) + identityManager, err := mspimpl.NewIdentityManager(mspID, userStore, cryptoSuite, endpointConfig) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } - signingIdentity, err := identityManager.CreateSigningIdentity( + return identityManager.CreateSigningIdentity( msp.WithPrivateKey([]byte(id.Key.Pem)), msp.WithCert([]byte(id.Cert.Pem)), ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) +} + +func (r *FabricMainChannelReconciler) getCertPool(ordererOrg hlfv1alpha1.FabricMainChannelOrdererOrganization, clientSet *kubernetes.Clientset, hlfClientSet *operatorv1.Clientset) (*x509.CertPool, error) { + var tlsCACert string + if ordererOrg.CAName != "" && ordererOrg.CANamespace != "" { + certAuth, err := helpers.GetCertAuthByName( + clientSet, + hlfClientSet, + ordererOrg.CAName, + ordererOrg.CANamespace, + ) + if err != nil { + return nil, err + } + tlsCACert = certAuth.Status.TLSCACert + } else if ordererOrg.TLSCACert != "" && ordererOrg.SignCACert != "" { + tlsCACert = ordererOrg.TLSCACert } - log.Infof("Signing identity: %s", firstAdminOrgMSPID) - sdkContext := sdk.Context( - fabsdk.WithIdentity(signingIdentity), - fabsdk.WithOrg(firstAdminOrgMSPID), - ) + certPool := x509.NewCertPool() + ok := certPool.AppendCertsFromPEM([]byte(tlsCACert)) + if !ok { + return nil, fmt.Errorf("couldn't append certs from org %s", ordererOrg.MSPID) + } + return certPool, nil +} - resClient, err := resmgmt.New(sdkContext) +func (r *FabricMainChannelReconciler) getTLSClientCert(ordererOrg hlfv1alpha1.FabricMainChannelOrdererOrganization, fabricMainChannel *hlfv1alpha1.FabricMainChannel, clientSet *kubernetes.Clientset) (tls.Certificate, error) { + idConfig, ok := fabricMainChannel.Spec.Identities[fmt.Sprintf("%s-tls", ordererOrg.MSPID)] + if !ok { + log.Infof("Identity for MSPID %s not found, trying with normal identity", fmt.Sprintf("%s-tls", ordererOrg.MSPID)) + idConfig, ok = fabricMainChannel.Spec.Identities[ordererOrg.MSPID] + if !ok { + return tls.Certificate{}, fmt.Errorf("identity not found for MSPID %s", ordererOrg.MSPID) + } + } + secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(context.Background(), idConfig.SecretName, v1.GetOptions{}) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return tls.Certificate{}, err } - resmgmtOptions := []resmgmt.RequestOption{ - resmgmt.WithTimeout(fab2.ResMgmt, 30*time.Second), + id := &identity{} + secretData, ok := secret.Data[idConfig.SecretKey] + if !ok { + return tls.Certificate{}, fmt.Errorf("secret key %s not found", idConfig.SecretKey) } - for _, ordOrg := range fabricMainChannel.Spec.OrdererOrganizations { - for _, endpoint := range ordOrg.OrdererEndpoints { - resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(endpoint)) - } + err = yaml.Unmarshal(secretData, id) + if err != nil { + return tls.Certificate{}, err } - var blockBytes []byte + return tls.X509KeyPair( + []byte(id.Cert.Pem), + []byte(id.Key.Pem), + ) +} - var channelBlock *cb.Block - for i := 0; i < 5; i++ { - channelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) - if err == nil { - break - } - log.Warnf("Attempt %d failed to query config block from orderer: %v retrying in 1 second", i+1, err) - time.Sleep(1 * time.Second) // Sleep for a bit before retrying - } - if err == nil { - log.Infof("Channel %s already exists", fabricMainChannel.Spec.Name) - blockBytes, err = proto.Marshal(channelBlock) +func (r *FabricMainChannelReconciler) joinExternalOrderers(ordererOrg hlfv1alpha1.FabricMainChannelOrdererOrganization, fabricMainChannel *hlfv1alpha1.FabricMainChannel, blockBytes []byte, certPool *x509.CertPool, tlsClientCert tls.Certificate) error { + for _, cc := range ordererOrg.ExternalOrderersToJoin { + osnUrl := fmt.Sprintf("https://%s:%d", cc.Host, cc.AdminPort) + log.Infof("Trying to join orderer %s to channel %s", osnUrl, fabricMainChannel.Spec.Name) + + chInfoResponse, err := osnadmin.ListSingleChannel(osnUrl, fabricMainChannel.Spec.Name, certPool, tlsClientCert) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err } - } else { - log.Infof("Channel %s does not exist, creating it: %v", fabricMainChannel.Spec.Name, err) - channelConfig, err := r.mapToConfigTX(fabricMainChannel) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + defer chInfoResponse.Body.Close() + if chInfoResponse.StatusCode == 200 { + log.Infof("Orderer %s already joined to channel %s", osnUrl, fabricMainChannel.Spec.Name) + continue } - block, err := configtx.NewApplicationChannelGenesisBlock(channelConfig, fabricMainChannel.Spec.Name) + + chResponse, err := osnadmin.Join(osnUrl, blockBytes, certPool, tlsClientCert) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err + } + defer chResponse.Body.Close() + if chResponse.StatusCode == 405 { + log.Infof("Orderer %s already joined to channel %s", osnUrl, fabricMainChannel.Spec.Name) + continue } - blockBytes, err = proto.Marshal(block) + responseData, err := ioutil.ReadAll(chResponse.Body) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err } - } - - // join orderers - for _, ordererOrg := range fabricMainChannel.Spec.OrdererOrganizations { - var tlsCACert string - if ordererOrg.CAName != "" && ordererOrg.CANamespace != "" { - certAuth, err := helpers.GetCertAuthByName( - clientSet, - hlfClientSet, - ordererOrg.CAName, - ordererOrg.CANamespace, - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - tlsCACert = certAuth.Status.TLSCACert + log.Infof("Orderer %s joined Status code=%d", osnUrl, chResponse.StatusCode) - } else if ordererOrg.TLSCACert != "" && ordererOrg.SignCACert != "" { - tlsCACert = ordererOrg.TLSCACert + if chResponse.StatusCode != 201 { + return fmt.Errorf("response from orderer %s trying to join to the channel %s: %d, response: %s", osnUrl, fabricMainChannel.Spec.Name, chResponse.StatusCode, string(responseData)) } - certPool := x509.NewCertPool() - ok := certPool.AppendCertsFromPEM([]byte(tlsCACert)) - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("couldn't append certs from org %s", ordererOrg.MSPID), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + } + return nil +} + +func (r *FabricMainChannelReconciler) joinInternalOrderers(ctx context.Context, ordererOrg hlfv1alpha1.FabricMainChannelOrdererOrganization, fabricMainChannel *hlfv1alpha1.FabricMainChannel, hlfClientSet *operatorv1.Clientset, blockBytes []byte, certPool *x509.CertPool, tlsClientCert tls.Certificate, clientSet *kubernetes.Clientset) error { + for _, cc := range ordererOrg.OrderersToJoin { + ordererNode, err := hlfClientSet.HlfV1alpha1().FabricOrdererNodes(cc.Namespace).Get(ctx, cc.Name, v1.GetOptions{}) + if err != nil { + return err } - idConfig, ok := fabricMainChannel.Spec.Identities[fmt.Sprintf("%s-tls", ordererOrg.MSPID)] - if !ok { - log.Infof("Identity for MSPID %s not found, trying with normal identity", fmt.Sprintf("%s-tls", ordererOrg.MSPID)) - // try with normal identity - idConfig, ok = fabricMainChannel.Spec.Identities[ordererOrg.MSPID] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("identity not found for MSPID %s", ordererOrg.MSPID), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } + adminHost, adminPort, err := helpers.GetOrdererAdminHostAndPort(clientSet, ordererNode.Spec, ordererNode.Status) + if err != nil { + return err } - secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(ctx, idConfig.SecretName, v1.GetOptions{}) + osnUrl := fmt.Sprintf("https://%s:%d", adminHost, adminPort) + log.Infof("Trying to join orderer %s to channel %s", osnUrl, fabricMainChannel.Spec.Name) + chResponse, err := osnadmin.Join(osnUrl, blockBytes, certPool, tlsClientCert) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err } - id := &identity{} - secretData, ok := secret.Data[idConfig.SecretKey] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("secret key %s not found", idConfig.SecretKey), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + defer chResponse.Body.Close() + if chResponse.StatusCode == 405 { + log.Infof("Orderer %s already joined to channel %s", osnUrl, fabricMainChannel.Spec.Name) + continue } - err = yaml.Unmarshal(secretData, id) + responseData, err := ioutil.ReadAll(chResponse.Body) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err } - tlsClientCert, err := tls.X509KeyPair( - []byte(id.Cert.Pem), - []byte(id.Key.Pem), - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - for _, cc := range ordererOrg.ExternalOrderersToJoin { - osnUrl := fmt.Sprintf("https://%s:%d", cc.Host, cc.AdminPort) - log.Infof("Trying to join orderer %s to channel %s", osnUrl, fabricMainChannel.Spec.Name) - // get if orderer is already joined - chInfoResponse, err := osnadmin.ListSingleChannel(osnUrl, fabricMainChannel.Spec.Name, certPool, tlsClientCert) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - defer chInfoResponse.Body.Close() - if chInfoResponse.StatusCode == 200 { - log.Infof("Orderer %s already joined to channel %s", osnUrl, fabricMainChannel.Spec.Name) - continue - } - - chResponse, err := osnadmin.Join(osnUrl, blockBytes, certPool, tlsClientCert) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - defer chResponse.Body.Close() - if chResponse.StatusCode == 405 { - log.Infof("Orderer %s already joined to channel %s", osnUrl, fabricMainChannel.Spec.Name) - continue - } - responseData, err := ioutil.ReadAll(chResponse.Body) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - log.Infof("Orderer %s joined Status code=%d", osnUrl, chResponse.StatusCode) - - if chResponse.StatusCode != 201 { - r.setConditionStatus( - ctx, - fabricMainChannel, - hlfv1alpha1.FailedStatus, - false, - fmt.Errorf( - "response from orderer %s trying to join to the channel %s: %d, response: %s", - osnUrl, - fabricMainChannel.Spec.Name, - chResponse.StatusCode, - string(responseData), - ), - false, - ) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - chInfo := &osnadmin.ChannelInfo{} - err = json.Unmarshal(responseData, chInfo) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - } - - for _, cc := range ordererOrg.OrderersToJoin { - ordererNode, err := hlfClientSet.HlfV1alpha1().FabricOrdererNodes(cc.Namespace).Get(ctx, cc.Name, v1.GetOptions{}) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - adminHost, adminPort, err := helpers.GetOrdererAdminHostAndPort(clientSet, ordererNode.Spec, ordererNode.Status) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - osnUrl := fmt.Sprintf("https://%s:%d", adminHost, adminPort) - log.Infof("Trying to join orderer %s to channel %s", osnUrl, fabricMainChannel.Spec.Name) - chResponse, err := osnadmin.Join(osnUrl, blockBytes, certPool, tlsClientCert) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - defer chResponse.Body.Close() - if chResponse.StatusCode == 405 { - log.Infof("Orderer %s already joined to channel %s", osnUrl, fabricMainChannel.Spec.Name) - continue - } - responseData, err := ioutil.ReadAll(chResponse.Body) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - log.Infof("Orderer %s.%s joined Status code=%d", cc.Name, cc.Namespace, chResponse.StatusCode) - if chResponse.StatusCode != 201 { - r.setConditionStatus( - ctx, - fabricMainChannel, - hlfv1alpha1.FailedStatus, - false, - fmt.Errorf( - "response from orderer %s trying to join to the channel %s: %d, response: %s", - osnUrl, - fabricMainChannel.Spec.Name, - chResponse.StatusCode, - string(responseData), - ), - false, - ) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - chInfo := &osnadmin.ChannelInfo{} - err = json.Unmarshal(responseData, chInfo) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } + log.Infof("Orderer %s.%s joined Status code=%d", cc.Name, cc.Namespace, chResponse.StatusCode) + if chResponse.StatusCode != 201 { + return fmt.Errorf("response from orderer %s trying to join to the channel %s: %d, response: %s", osnUrl, fabricMainChannel.Spec.Name, chResponse.StatusCode, string(responseData)) } } + return nil +} - r.Log.Info("Fetching block from orderer") +func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) { var ordererChannelBlock *common.Block + var err error attemptsLeft := 5 for { ordererChannelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) @@ -437,533 +407,294 @@ func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Re if err != nil { attemptsLeft-- } - r.Log.Info(fmt.Sprintf("Failed to get block %v, attempts left %d", err, attemptsLeft)) + log.Infof("Failed to get block %v, attempts left %d", err, attemptsLeft) time.Sleep(1500 * time.Millisecond) } - if err != nil { - r.Log.Info(fmt.Sprintf("Failed to get block %v", err)) - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name) } - r.Log.Info(fmt.Sprintf("Block from channel %s fetched from orderer", fabricMainChannel.Spec.Name)) - cfgBlock, err := resource.ExtractConfigFromBlock(ordererChannelBlock) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "failed to extract config from channel block"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return ordererChannelBlock, nil +} + +func (r *FabricMainChannelReconciler) collectConfigSignatures(fabricMainChannel *hlfv1alpha1.FabricMainChannel, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset, channelConfigBytes []byte) ([]*common.ConfigSignature, error) { + var configSignatures []*common.ConfigSignature + + // Collect signatures from admin orderer organizations + for _, adminOrderer := range fabricMainChannel.Spec.AdminOrdererOrganizations { + signature, err := r.createConfigSignature(sdk, adminOrderer.MSPID, fabricMainChannel, clientSet, channelConfigBytes) + if err != nil { + return nil, err + } + configSignatures = append(configSignatures, signature) } - currentConfigTx := configtx.New(cfgBlock) - newConfigTx, err := r.mapToConfigTX(fabricMainChannel) + + // Collect signatures from admin peer organizations + for _, adminPeer := range fabricMainChannel.Spec.AdminPeerOrganizations { + signature, err := r.createConfigSignature(sdk, adminPeer.MSPID, fabricMainChannel, clientSet, channelConfigBytes) + if err != nil { + return nil, err + } + configSignatures = append(configSignatures, signature) + } + + return configSignatures, nil +} + +func (r *FabricMainChannelReconciler) createConfigSignature(sdk *fabsdk.FabricSDK, mspID string, fabricMainChannel *hlfv1alpha1.FabricMainChannel, clientSet *kubernetes.Clientset, channelConfigBytes []byte) (*common.ConfigSignature, error) { + identityName := fmt.Sprintf("%s-sign", mspID) + idConfig, ok := fabricMainChannel.Spec.Identities[identityName] + if !ok { + // If -sign identity is not found, try with raw MSPID + idConfig, ok = fabricMainChannel.Spec.Identities[mspID] + if !ok { + return nil, fmt.Errorf("identity not found for MSPID %s or %s-sign", mspID, mspID) + } + } + secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(context.Background(), idConfig.SecretName, v1.GetOptions{}) if err != nil { - log.Errorf("Error mapping channel to configtx channel: %v", err) - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error mapping channel to configtx channel"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } - var buf2 bytes.Buffer - err = protolator.DeepMarshalJSON(&buf2, cfgBlock) + secretData, ok := secret.Data[idConfig.SecretKey] + if !ok { + return nil, fmt.Errorf("secret key %s not found", idConfig.SecretKey) + } + id := &identity{} + err = yaml.Unmarshal(secretData, id) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error converting block to JSON"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } - err = updateApplicationChannelConfigTx(currentConfigTx, newConfigTx) + signingIdentity, err := r.createSigningIdentity(sdk, mspID, id) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "failed to update application channel config"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } - configUpdate, err := resmgmt.CalculateConfigUpdate(fabricMainChannel.Spec.Name, cfgBlock, currentConfigTx.UpdatedConfig()) + + sdkContext := sdk.Context( + fabsdk.WithIdentity(signingIdentity), + fabsdk.WithOrg(mspID), + ) + resClient, err := resmgmt.New(sdkContext) if err != nil { - if !strings.Contains(err.Error(), "no differences detected between original and updated config") { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error calculating config update"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err + } + return resClient.CreateConfigSignatureFromReader(signingIdentity, bytes.NewReader(channelConfigBytes)) +} + +func (r *FabricMainChannelReconciler) handleReconcileError(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, err error) (reconcile.Result, error) { + r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) + return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) +} + +func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) []resmgmt.RequestOption { + resmgmtOptions := []resmgmt.RequestOption{ + resmgmt.WithTimeout(fab2.ResMgmt, 30*time.Second), + } + + for _, ordOrg := range fabricMainChannel.Spec.OrdererOrganizations { + for _, endpoint := range ordOrg.OrdererEndpoints { + resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(endpoint)) } - log.Infof("No differences detected between original and updated config") - } else { - channelConfigBytes, err := CreateConfigUpdateEnvelope(fabricMainChannel.Spec.Name, configUpdate) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error creating config update envelope"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - var configSignatures []*common.ConfigSignature - for _, adminPeer := range fabricMainChannel.Spec.AdminPeerOrganizations { - configUpdateReader := bytes.NewReader(channelConfigBytes) - idConfig, ok := fabricMainChannel.Spec.Identities[adminPeer.MSPID] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("identity not found for MSPID %s", adminPeer.MSPID), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(ctx, idConfig.SecretName, v1.GetOptions{}) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secretData, ok := secret.Data[idConfig.SecretKey] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("secret key %s not found", idConfig.SecretKey), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - id := &identity{} - err = yaml.Unmarshal(secretData, id) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - sdkConfig, err := sdk.Config() - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - cryptoConfig := cryptosuite.ConfigFromBackend(sdkConfig) - cryptoSuite, err := sw.GetSuiteByConfig(cryptoConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - userStore := mspimpl.NewMemoryUserStore() - endpointConfig, err := fab.ConfigFromBackend(sdkConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - identityManager, err := mspimpl.NewIdentityManager(adminPeer.MSPID, userStore, cryptoSuite, endpointConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signingIdentity, err := identityManager.CreateSigningIdentity( - msp.WithPrivateKey([]byte(id.Key.Pem)), - msp.WithCert([]byte(id.Cert.Pem)), - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } + } - sdkContext := sdk.Context( - fabsdk.WithIdentity(signingIdentity), - fabsdk.WithOrg(adminPeer.MSPID), - ) - resClient, err := resmgmt.New(sdkContext) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signature, err := resClient.CreateConfigSignatureFromReader(signingIdentity, configUpdateReader) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - configSignatures = append(configSignatures, signature) - } - configUpdateReader := bytes.NewReader(channelConfigBytes) - saveChannelOpts := []resmgmt.RequestOption{ - resmgmt.WithConfigSignatures(configSignatures...), - } - saveChannelOpts = append(saveChannelOpts, resmgmtOptions...) - _ = saveChannelOpts - _ = configUpdateReader - saveChannelResponse, err := resClient.SaveChannel( - resmgmt.SaveChannelRequest{ - ChannelID: fabricMainChannel.Spec.Name, - ChannelConfig: configUpdateReader, - SigningIdentities: []msp.SigningIdentity{}, - }, - saveChannelOpts..., - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error saving application configuration"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return resmgmtOptions +} + +func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) ([]byte, error) { + var channelBlock *cb.Block + var err error + + for i := 0; i < 5; i++ { + channelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) + if err == nil { + break } - log.Infof("Application configuration updated with transaction ID: %s", saveChannelResponse.TransactionID) + log.Warnf("Attempt %d failed to query config block from orderer: %v retrying in 1 second", i+1, err) + time.Sleep(1 * time.Second) } - // update orderer config - currentConfigTx = configtx.New(cfgBlock) - newConfigTx, err = r.mapToConfigTX(fabricMainChannel) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error mapping channel to configtx channel"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + log.Infof("Channel %s does not exist, creating it: %v", fabricMainChannel.Spec.Name, err) + return r.createNewChannel(fabricMainChannel) } - err = updateOrdererChannelConfigTx(currentConfigTx, newConfigTx) + + log.Infof("Channel %s already exists", fabricMainChannel.Spec.Name) + return proto.Marshal(channelBlock) +} + +func (r *FabricMainChannelReconciler) createNewChannel(fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]byte, error) { + channelConfig, err := r.mapToConfigTX(fabricMainChannel) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "failed to update application channel config"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return nil, err } - configUpdate, err = resmgmt.CalculateConfigUpdate(fabricMainChannel.Spec.Name, cfgBlock, currentConfigTx.UpdatedConfig()) + + block, err := configtx.NewApplicationChannelGenesisBlock(channelConfig, fabricMainChannel.Spec.Name) if err != nil { - if !strings.Contains(err.Error(), "no differences detected between original and updated config") { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error calculating config update"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - log.Infof("No differences detected between original and updated config") - } else { - channelConfigBytes, err := CreateConfigUpdateEnvelope(fabricMainChannel.Spec.Name, configUpdate) + return nil, err + } + + return proto.Marshal(block) +} + +func (r *FabricMainChannelReconciler) joinOrderers(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, clientSet *kubernetes.Clientset, hlfClientSet *operatorv1.Clientset, blockBytes []byte) error { + for _, ordererOrg := range fabricMainChannel.Spec.OrdererOrganizations { + certPool, err := r.getCertPool(ordererOrg, clientSet, hlfClientSet) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error creating config update envelope"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err } - var buf2 bytes.Buffer - err = protolator.DeepMarshalJSON(&buf2, cfgBlock) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error converting block to JSON"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - configSignatures := []*cb.ConfigSignature{} - for _, adminOrderer := range fabricMainChannel.Spec.AdminOrdererOrganizations { - configUpdateReader := bytes.NewReader(channelConfigBytes) - identityName := fmt.Sprintf("%s-sign", adminOrderer.MSPID) - idConfig, ok := fabricMainChannel.Spec.Identities[identityName] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("identity not found for MSPID %s", identityName), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(ctx, idConfig.SecretName, v1.GetOptions{}) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secretData, ok := secret.Data[idConfig.SecretKey] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("secret key %s not found", idConfig.SecretKey), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - id := &identity{} - err = yaml.Unmarshal(secretData, id) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - sdkConfig, err := sdk.Config() - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - cryptoConfig := cryptosuite.ConfigFromBackend(sdkConfig) - cryptoSuite, err := sw.GetSuiteByConfig(cryptoConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - userStore := mspimpl.NewMemoryUserStore() - endpointConfig, err := fab.ConfigFromBackend(sdkConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - identityManager, err := mspimpl.NewIdentityManager(adminOrderer.MSPID, userStore, cryptoSuite, endpointConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signingIdentity, err := identityManager.CreateSigningIdentity( - msp.WithPrivateKey([]byte(id.Key.Pem)), - msp.WithCert([]byte(id.Cert.Pem)), - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - sdkContext := sdk.Context( - fabsdk.WithIdentity(signingIdentity), - fabsdk.WithOrg(adminOrderer.MSPID), - ) - resClient, err := resmgmt.New(sdkContext) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signature, err := resClient.CreateConfigSignatureFromReader(signingIdentity, configUpdateReader) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - configSignatures = append(configSignatures, signature) - } - configUpdateReader := bytes.NewReader(channelConfigBytes) - saveChannelOpts := []resmgmt.RequestOption{ - resmgmt.WithConfigSignatures(configSignatures...), - } - saveChannelOpts = append(saveChannelOpts, resmgmtOptions...) - saveChannelResponse, err := resClient.SaveChannel( - resmgmt.SaveChannelRequest{ - ChannelID: fabricMainChannel.Spec.Name, - ChannelConfig: configUpdateReader, - SigningIdentities: []msp.SigningIdentity{}, - }, - saveChannelOpts..., - ) + tlsClientCert, err := r.getTLSClientCert(ordererOrg, fabricMainChannel, clientSet) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error saving orderer configuration"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err + } + + if err := r.joinExternalOrderers(ordererOrg, fabricMainChannel, blockBytes, certPool, tlsClientCert); err != nil { + return err + } + + if err := r.joinInternalOrderers(ctx, ordererOrg, fabricMainChannel, hlfClientSet, blockBytes, certPool, tlsClientCert, clientSet); err != nil { + return err } - log.Infof("Orderer configuration updated with transaction ID: %s", saveChannelResponse.TransactionID) } - // update capabilities - currentConfigTx = configtx.New(cfgBlock) - newConfigTx, err = r.mapToConfigTX(fabricMainChannel) + return nil +} + +func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption, blockBytes []byte, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset) error { + ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error mapping channel to configtx channel"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return err } - err = updateChannelConfigTx(currentConfigTx, newConfigTx) + + cfgBlock, err := resource.ExtractConfigFromBlock(ordererChannelBlock) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "failed to update application channel config"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return errors.Wrap(err, "failed to extract config from channel block") } - configUpdate, err = resmgmt.CalculateConfigUpdate(fabricMainChannel.Spec.Name, cfgBlock, currentConfigTx.UpdatedConfig()) + currentConfigTx := configtx.New(cfgBlock) + ordererConfig, err := currentConfigTx.Orderer().Configuration() + if err != nil { + return errors.Wrap(err, "failed to get orderer configuration") + } + newConfigTx, err := r.mapToConfigTX(fabricMainChannel) + if err != nil { + return errors.Wrap(err, "error mapping channel to configtx channel") + } + isMaintenanceMode := ordererConfig.State == orderer.ConsensusStateMaintenance + switchingToMaintenanceMode := !isMaintenanceMode && newConfigTx.Orderer.State == orderer.ConsensusStateMaintenance + r.Log.Info("Is maintenance mode", "isMaintenanceMode", isMaintenanceMode, "switchingToMaintenanceMode", switchingToMaintenanceMode) + + if !isMaintenanceMode && !switchingToMaintenanceMode { + if err := updateApplicationChannelConfigTx(currentConfigTx, newConfigTx); err != nil { + return errors.Wrap(err, "failed to update application channel config") + } + } + if !switchingToMaintenanceMode { + if err := updateChannelConfigTx(currentConfigTx, newConfigTx); err != nil { + return errors.Wrap(err, "failed to update channel config") + } + } + + if err := updateOrdererChannelConfigTx(currentConfigTx, newConfigTx); err != nil { + return errors.Wrap(err, "failed to update orderer channel config") + } + + configUpdate, err := resmgmt.CalculateConfigUpdate(fabricMainChannel.Spec.Name, cfgBlock, currentConfigTx.UpdatedConfig()) if err != nil { if !strings.Contains(err.Error(), "no differences detected between original and updated config") { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error calculating config update"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return errors.Wrap(err, "error calculating config update") } log.Infof("No differences detected between original and updated config") - } else { - channelConfigBytes, err := CreateConfigUpdateEnvelope(fabricMainChannel.Spec.Name, configUpdate) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error creating config update envelope"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - configUpdateReader := bytes.NewReader(channelConfigBytes) - - configSignatures := []*cb.ConfigSignature{} - for _, adminOrderer := range fabricMainChannel.Spec.AdminOrdererOrganizations { - configUpdateReader := bytes.NewReader(channelConfigBytes) - identityName := fmt.Sprintf("%s-sign", adminOrderer.MSPID) - idConfig, ok := fabricMainChannel.Spec.Identities[identityName] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("identity not found for MSPID %s", identityName), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(ctx, idConfig.SecretName, v1.GetOptions{}) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secretData, ok := secret.Data[idConfig.SecretKey] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("secret key %s not found", idConfig.SecretKey), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - id := &identity{} - err = yaml.Unmarshal(secretData, id) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - sdkConfig, err := sdk.Config() - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - cryptoConfig := cryptosuite.ConfigFromBackend(sdkConfig) - cryptoSuite, err := sw.GetSuiteByConfig(cryptoConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - userStore := mspimpl.NewMemoryUserStore() - endpointConfig, err := fab.ConfigFromBackend(sdkConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - identityManager, err := mspimpl.NewIdentityManager(adminOrderer.MSPID, userStore, cryptoSuite, endpointConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signingIdentity, err := identityManager.CreateSigningIdentity( - msp.WithPrivateKey([]byte(id.Key.Pem)), - msp.WithCert([]byte(id.Cert.Pem)), - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } + return nil + } - sdkContext := sdk.Context( - fabsdk.WithIdentity(signingIdentity), - fabsdk.WithOrg(adminOrderer.MSPID), - ) - resClient, err := resmgmt.New(sdkContext) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signature, err := resClient.CreateConfigSignatureFromReader(signingIdentity, configUpdateReader) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - configSignatures = append(configSignatures, signature) - } + channelConfigBytes, err := CreateConfigUpdateEnvelope(fabricMainChannel.Spec.Name, configUpdate) + if err != nil { + return errors.Wrap(err, "error creating config update envelope") + } + // convert channelConfigBytes to json using protolator + var buf bytes.Buffer + err = protolator.DeepMarshalJSON(&buf, configUpdate) + if err != nil { + return errors.Wrap(err, "error unmarshalling channel config bytes to json") + } + r.Log.Info("Channel config", "config", buf.String()) - for _, adminPeer := range fabricMainChannel.Spec.AdminPeerOrganizations { - configUpdateReader := bytes.NewReader(channelConfigBytes) - idConfig, ok := fabricMainChannel.Spec.Identities[adminPeer.MSPID] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("identity not found for MSPID %s", adminPeer.MSPID), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secret, err := clientSet.CoreV1().Secrets(idConfig.SecretNamespace).Get(ctx, idConfig.SecretName, v1.GetOptions{}) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - secretData, ok := secret.Data[idConfig.SecretKey] - if !ok { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, fmt.Errorf("secret key %s not found", idConfig.SecretKey), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - id := &identity{} - err = yaml.Unmarshal(secretData, id) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - sdkConfig, err := sdk.Config() - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - cryptoConfig := cryptosuite.ConfigFromBackend(sdkConfig) - cryptoSuite, err := sw.GetSuiteByConfig(cryptoConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - userStore := mspimpl.NewMemoryUserStore() - endpointConfig, err := fab.ConfigFromBackend(sdkConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - identityManager, err := mspimpl.NewIdentityManager(adminPeer.MSPID, userStore, cryptoSuite, endpointConfig) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signingIdentity, err := identityManager.CreateSigningIdentity( - msp.WithPrivateKey([]byte(id.Key.Pem)), - msp.WithCert([]byte(id.Cert.Pem)), - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } + configSignatures, err := r.collectConfigSignatures(fabricMainChannel, sdk, clientSet, channelConfigBytes) + if err != nil { + return err + } - sdkContext := sdk.Context( - fabsdk.WithIdentity(signingIdentity), - fabsdk.WithOrg(adminPeer.MSPID), - ) - resClient, err := resmgmt.New(sdkContext) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - signature, err := resClient.CreateConfigSignatureFromReader(signingIdentity, configUpdateReader) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - configSignatures = append(configSignatures, signature) - } + saveChannelOpts := append([]resmgmt.RequestOption{ + resmgmt.WithConfigSignatures(configSignatures...), + }, resmgmtOptions...) - saveChannelOpts := []resmgmt.RequestOption{ - resmgmt.WithConfigSignatures(configSignatures...), - } - saveChannelOpts = append(saveChannelOpts, resmgmtOptions...) - saveChannelResponse, err := resClient.SaveChannel( - resmgmt.SaveChannelRequest{ - ChannelID: fabricMainChannel.Spec.Name, - ChannelConfig: configUpdateReader, - SigningIdentities: []msp.SigningIdentity{}, - }, - saveChannelOpts..., - ) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error saving capabilities"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - log.Infof("Capabilities updated with transaction ID: %s", saveChannelResponse.TransactionID) + saveChannelResponse, err := resClient.SaveChannel( + resmgmt.SaveChannelRequest{ + ChannelID: fabricMainChannel.Spec.Name, + ChannelConfig: bytes.NewReader(channelConfigBytes), + SigningIdentities: []msp.SigningIdentity{}, + }, + saveChannelOpts..., + ) + if err != nil { + return errors.Wrap(err, "error saving channel configuration") } - time.Sleep(2 * time.Second) // wait for orderers to get the new config - r.Log.Info(fmt.Sprintf("fetching block every 1 second waiting for orderers to reconcile %s", fabricMainChannel.Name)) - ordererChannelCh := make(chan *common.Block, 1) - go func() { - for { - ordererChannelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) - if err != nil { - log.Errorf("error querying orderer channel: %v", err) - time.Sleep(1 * time.Second) - } else { - log.Infof("orderer channel fetched") - ordererChannelCh <- ordererChannelBlock - break - } - } - }() - select { - case res := <-ordererChannelCh: - ordererChannelBlock = res - case <-time.After(12 * time.Second): - err = errors.New("timeout querying orderer channel") - r.Log.Error(err, "error querying orderer channel") - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + log.Infof("Channel configuration updated with transaction ID: %s", saveChannelResponse.TransactionID) + return nil +} + +func (r *FabricMainChannelReconciler) saveChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption) error { + ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions) + if err != nil { + return err } + cmnConfig, err := resource.ExtractConfigFromBlock(ordererChannelBlock) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error extracting the config from block"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return errors.Wrap(err, "error extracting the config from block") } + var buf bytes.Buffer err = protolator.DeepMarshalJSON(&buf, cmnConfig) if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error converting block to JSON"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return errors.Wrap(err, "error converting block to JSON") } + configMapName := fmt.Sprintf("%s-config", fabricMainChannel.ObjectMeta.Name) - createConfigMap := false configMapNamespace := "default" - configMap, err := clientSet.CoreV1().ConfigMaps(configMapNamespace).Get(ctx, configMapName, v1.GetOptions{}) + + return r.createOrUpdateConfigMap(ctx, configMapName, configMapNamespace, buf.String()) +} + +func (r *FabricMainChannelReconciler) createOrUpdateConfigMap(ctx context.Context, name, namespace, data string) error { + clientSet, err := utils.GetClientKubeWithConf(r.Config) if err != nil { - if apierrors.IsNotFound(err) { - reqLogger.Info(fmt.Sprintf("ConfigMap %s not found, creating it", configMapName)) - createConfigMap = true - } else { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error getting configmap"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } + return err } - log.Infof("updateOrdererChannelConfigTx: ConfigMap %s found, updating it", configMapName) - if createConfigMap { - _, err = clientSet.CoreV1().ConfigMaps(configMapNamespace).Create(ctx, &corev1.ConfigMap{ - TypeMeta: v1.TypeMeta{}, - ObjectMeta: v1.ObjectMeta{ - Name: configMapName, - Namespace: configMapNamespace, - }, - Data: map[string]string{ - "channel.json": buf.String(), - }, - }, v1.CreateOptions{}) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error creating config map"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) - } - } else { - configMap.Data["channel.json"] = buf.String() - _, err = clientSet.CoreV1().ConfigMaps(configMapNamespace).Update(ctx, configMap, v1.UpdateOptions{}) - if err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, errors.Wrapf(err, "error updating config map"), false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + configMap, err := clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, v1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + _, err = clientSet.CoreV1().ConfigMaps(namespace).Create(ctx, &corev1.ConfigMap{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Data: map[string]string{ + "channel.json": data, + }, + }, v1.CreateOptions{}) + return err } + return err } + + configMap.Data["channel.json"] = data + _, err = clientSet.CoreV1().ConfigMaps(namespace).Update(ctx, configMap, v1.UpdateOptions{}) + return err +} + +func (r *FabricMainChannelReconciler) finalizeReconcile(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel) (reconcile.Result, error) { fabricMainChannel.Status.Status = hlfv1alpha1.RunningStatus fabricMainChannel.Status.Message = "Channel setup completed" @@ -971,10 +702,11 @@ func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Re Type: status.ConditionType(fabricMainChannel.Status.Status), Status: "True", }) + if err := r.Status().Update(ctx, fabricMainChannel); err != nil { - r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.FailedStatus, false, err, false) - return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) + return reconcile.Result{}, err } + r.setConditionStatus(ctx, fabricMainChannel, hlfv1alpha1.RunningStatus, true, nil, false) return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) } @@ -1581,6 +1313,15 @@ func updateApplicationChannelConfigTx(currentConfigTX configtx.ConfigTx, newConf return errors.Wrap(err, "failed to set application policies") } if newConfigTx.Application.ACLs != nil { + // compare current acls with new acls + currentACLs, err := currentConfigTX.Application().ACLs() + if err != nil { + return errors.Wrapf(err, "failed to get current ACLs") + } + log.Infof("Current ACLs: %v", currentACLs) + log.Infof("New ACLs: %v", newConfigTx.Application.ACLs) + // compare them to see if we have to set new ACLs + var acls []string for key := range newConfigTx.Application.ACLs { acls = append(acls, key) @@ -1595,13 +1336,6 @@ func updateApplicationChannelConfigTx(currentConfigTX configtx.ConfigTx, newConf if err != nil { return errors.Wrapf(err, "failed to set ACLs") } - } else { - err = currentConfigTX.Application().SetACLs( - defaultApplicationACLs(), - ) - if err != nil { - return errors.Wrapf(err, "failed to set ACLs") - } } for _, capability := range app.Capabilities {