diff --git a/pkg/armrpc/asyncoperation/controller/controller.go b/pkg/armrpc/asyncoperation/controller/controller.go index 3081b69511..1a1ce535b9 100644 --- a/pkg/armrpc/asyncoperation/controller/controller.go +++ b/pkg/armrpc/asyncoperation/controller/controller.go @@ -18,9 +18,9 @@ package controller import ( "context" + "errors" "github.com/radius-project/radius/pkg/corerp/backend/deployment" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/store" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -31,9 +31,6 @@ type Options struct { // StorageClient is the data storage client. StorageClient store.StorageClient - // DataProvider is the data storage provider. - DataProvider dataprovider.DataStorageProvider - // KubeClient is the Kubernetes controller runtime client. KubeClient runtimeclient.Client @@ -44,6 +41,22 @@ type Options struct { GetDeploymentProcessor func() deployment.DeploymentProcessor } +// Validate validates that required fields are set on the options. +func (o Options) Validate() error { + var err error + if o.StorageClient == nil { + err = errors.Join(err, errors.New("StorageClient is required")) + } + if o.ResourceType == "" { + err = errors.Join(err, errors.New("ResourceType is required")) + } + + // KubeClient and GetDeploymentProcessor are not used by the majority of the code, so they + // are not validated here. + + return err +} + // Controller is an interface to implement async operation controller. type Controller interface { // Run runs async request operation. @@ -68,11 +81,6 @@ func (b *BaseController) StorageClient() store.StorageClient { return b.options.StorageClient } -// DataProvider gets data storage provider for this controller. -func (b *BaseController) DataProvider() dataprovider.DataStorageProvider { - return b.options.DataProvider -} - // KubeClient gets Kubernetes client for this controller. func (b *BaseController) KubeClient() runtimeclient.Client { return b.options.KubeClient diff --git a/pkg/armrpc/asyncoperation/statusmanager/statusmanager.go b/pkg/armrpc/asyncoperation/statusmanager/statusmanager.go index 7b51764390..e87c5f71c1 100644 --- a/pkg/armrpc/asyncoperation/statusmanager/statusmanager.go +++ b/pkg/armrpc/asyncoperation/statusmanager/statusmanager.go @@ -27,7 +27,6 @@ import ( ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" "github.com/radius-project/radius/pkg/metrics" "github.com/radius-project/radius/pkg/trace" - "github.com/radius-project/radius/pkg/ucp/dataprovider" queue "github.com/radius-project/radius/pkg/ucp/queue/client" "github.com/radius-project/radius/pkg/ucp/resources" "github.com/radius-project/radius/pkg/ucp/store" @@ -37,7 +36,7 @@ import ( // statusManager includes the necessary functions to manage asynchronous operations. type statusManager struct { - storeProvider dataprovider.DataStorageProvider + storageClient store.StorageClient queue queue.Client location string } @@ -65,9 +64,9 @@ type StatusManager interface { } // New creates statusManager instance. -func New(dataProvider dataprovider.DataStorageProvider, q queue.Client, location string) StatusManager { +func New(storageClient store.StorageClient, q queue.Client, location string) StatusManager { return &statusManager{ - storeProvider: dataProvider, + storageClient: storageClient, queue: q, location: location, } @@ -78,10 +77,6 @@ func (aom *statusManager) operationStatusResourceID(id resources.ID, operationID return fmt.Sprintf("%s/providers/%s/locations/%s/operationstatuses/%s", id.PlaneScope(), strings.ToLower(id.ProviderNamespace()), aom.location, operationID) } -func (aom *statusManager) getClient(ctx context.Context, id resources.ID) (store.StorageClient, error) { - return aom.storeProvider.GetStorageClient(ctx, id.ProviderNamespace()+"/operationstatuses") -} - // QueueAsyncOperation creates and saves a new status resource with the given parameters in datastore, and queues // a request message. If an error occurs, the status is deleted using the storeClient. func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMRequestContext, options QueueOperationOptions) error { @@ -111,12 +106,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR ClientObjectID: sCtx.ClientObjectID, } - storeClient, err := aom.getClient(ctx, sCtx.ResourceID) - if err != nil { - return err - } - - err = storeClient.Save(ctx, &store.Object{ + err := aom.storageClient.Save(ctx, &store.Object{ Metadata: store.Metadata{ID: opID}, Data: aos, }) @@ -126,7 +116,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR } if err = aom.queueRequestMessage(ctx, sCtx, aos, options.OperationTimeout); err != nil { - delErr := storeClient.Delete(ctx, opID) + delErr := aom.storageClient.Delete(ctx, opID) if delErr != nil { return delErr } @@ -140,12 +130,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR // Get gets a status object from the datastore or an error if the retrieval fails. func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID uuid.UUID) (*Status, error) { - storeClient, err := aom.getClient(ctx, id) - if err != nil { - return nil, err - } - - obj, err := storeClient.Get(ctx, aom.operationStatusResourceID(id, operationID)) + obj, err := aom.storageClient.Get(ctx, aom.operationStatusResourceID(id, operationID)) if err != nil { return nil, err } @@ -162,12 +147,7 @@ func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID // given parameters, and saves it back to the store. func (aom *statusManager) Update(ctx context.Context, id resources.ID, operationID uuid.UUID, state v1.ProvisioningState, endTime *time.Time, opError *v1.ErrorDetails) error { opID := aom.operationStatusResourceID(id, operationID) - storeClient, err := aom.getClient(ctx, id) - if err != nil { - return err - } - - obj, err := storeClient.Get(ctx, opID) + obj, err := aom.storageClient.Get(ctx, opID) if err != nil { return err } @@ -190,17 +170,13 @@ func (aom *statusManager) Update(ctx context.Context, id resources.ID, operation obj.Data = s - return storeClient.Save(ctx, obj, store.WithETag(obj.ETag)) + return aom.storageClient.Save(ctx, obj, store.WithETag(obj.ETag)) } // Delete deletes the operation status resource associated with the given ID and // operationID, and returns an error if unsuccessful. func (aom *statusManager) Delete(ctx context.Context, id resources.ID, operationID uuid.UUID) error { - storeClient, err := aom.getClient(ctx, id) - if err != nil { - return err - } - return storeClient.Delete(ctx, aom.operationStatusResourceID(id, operationID)) + return aom.storageClient.Delete(ctx, aom.operationStatusResourceID(id, operationID)) } // queueRequestMessage function is to put the async operation message to the queue to be worked on. diff --git a/pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go b/pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go index 7f0c330939..c61872cf3d 100644 --- a/pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go +++ b/pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go @@ -26,7 +26,6 @@ import ( "github.com/google/uuid" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" "github.com/radius-project/radius/pkg/armrpc/rpctest" - "github.com/radius-project/radius/pkg/ucp/dataprovider" queue "github.com/radius-project/radius/pkg/ucp/queue/client" "github.com/radius-project/radius/pkg/ucp/resources" "github.com/radius-project/radius/pkg/ucp/store" @@ -35,10 +34,9 @@ import ( ) type asyncOperationsManagerTest struct { - manager StatusManager - storeProvider *dataprovider.MockDataStorageProvider - storeClient *store.MockStorageClient - queue *queue.MockClient + manager StatusManager + storeClient *store.MockStorageClient + queue *queue.MockClient } const ( @@ -54,12 +52,10 @@ const ( func setup(tb testing.TB) (asyncOperationsManagerTest, *gomock.Controller) { ctrl := gomock.NewController(tb) - dp := dataprovider.NewMockDataStorageProvider(ctrl) sc := store.NewMockStorageClient(ctrl) - dp.EXPECT().GetStorageClient(gomock.Any(), "Applications.Core/operationstatuses").Return(sc, nil) enq := queue.NewMockClient(ctrl) - aom := New(dp, enq, "test-location") - return asyncOperationsManagerTest{manager: aom, storeProvider: dp, storeClient: sc, queue: enq}, ctrl + aom := New(sc, enq, "test-location") + return asyncOperationsManagerTest{manager: aom, storeClient: sc, queue: enq}, ctrl } var reqCtx = &v1.ARMRequestContext{ diff --git a/pkg/armrpc/asyncoperation/worker/registry.go b/pkg/armrpc/asyncoperation/worker/registry.go index 00dbe7e82a..eb21ec1042 100644 --- a/pkg/armrpc/asyncoperation/worker/registry.go +++ b/pkg/armrpc/asyncoperation/worker/registry.go @@ -17,12 +17,11 @@ limitations under the License. package worker import ( - "context" + "fmt" "sync" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" - "github.com/radius-project/radius/pkg/ucp/dataprovider" ) // ControllerFactoryFunc is a factory function to create a controller. @@ -32,40 +31,38 @@ type ControllerFactoryFunc func(opts ctrl.Options) (ctrl.Controller, error) type ControllerRegistry struct { ctrlMap map[string]ctrl.Controller ctrlMapMu sync.RWMutex - sp dataprovider.DataStorageProvider defaultFactory ControllerFactoryFunc defaultOpts ctrl.Options } // NewControllerRegistry creates an ControllerRegistry instance. -func NewControllerRegistry(sp dataprovider.DataStorageProvider) *ControllerRegistry { +func NewControllerRegistry() *ControllerRegistry { return &ControllerRegistry{ ctrlMap: map[string]ctrl.Controller{}, - sp: sp, } } // Register registers a controller for a specific resource type and operation method. // // Controllers registered using Register will be cached by the registry and the same instance will be reused. -func (h *ControllerRegistry) Register(ctx context.Context, resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error { +func (h *ControllerRegistry) Register(resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error { h.ctrlMapMu.Lock() defer h.ctrlMapMu.Unlock() - ot := v1.OperationType{Type: resourceType, Method: method} - storageClient, err := h.sp.GetStorageClient(ctx, resourceType) + opts.ResourceType = resourceType + + err := opts.Validate() if err != nil { - return err + return fmt.Errorf("invalid controller options: %w", err) } - opts.StorageClient = storageClient - opts.ResourceType = resourceType ctrl, err := factoryFn(opts) if err != nil { return err } + ot := v1.OperationType{Type: resourceType, Method: method} h.ctrlMap[ot.String()] = ctrl return nil } @@ -74,17 +71,22 @@ func (h *ControllerRegistry) Register(ctx context.Context, resourceType string, // // The default controller will be used when Get is called with an operation type that has no registered controller. // The default controller will not be cached by the registry. -func (h *ControllerRegistry) RegisterDefault(ctx context.Context, factoryFn ControllerFactoryFunc, opts ctrl.Options) error { +func (h *ControllerRegistry) RegisterDefault(factoryFn ControllerFactoryFunc, opts ctrl.Options) error { h.ctrlMapMu.Lock() defer h.ctrlMapMu.Unlock() + // Note: we can't call opts.Validate() here because we don't know the resource type yet. + if opts.StorageClient == nil { + return fmt.Errorf("invalid controller options: .StorageClient is required") + } + h.defaultFactory = factoryFn h.defaultOpts = opts return nil } // Get gets the registered async controller instance. -func (h *ControllerRegistry) Get(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) { +func (h *ControllerRegistry) Get(operationType v1.OperationType) (ctrl.Controller, error) { h.ctrlMapMu.RLock() defer h.ctrlMapMu.RUnlock() @@ -92,23 +94,17 @@ func (h *ControllerRegistry) Get(ctx context.Context, operationType v1.Operation return h, nil } - return h.getDefault(ctx, operationType) + return h.getDefault(operationType) } -func (h *ControllerRegistry) getDefault(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) { +func (h *ControllerRegistry) getDefault(operationType v1.OperationType) (ctrl.Controller, error) { if h.defaultFactory == nil { return nil, nil } - storageClient, err := h.sp.GetStorageClient(ctx, operationType.Type) - if err != nil { - return nil, err - } - // Copy the options so we can update it. opts := h.defaultOpts - opts.StorageClient = storageClient opts.ResourceType = operationType.Type return h.defaultFactory(opts) diff --git a/pkg/armrpc/asyncoperation/worker/registry_test.go b/pkg/armrpc/asyncoperation/worker/registry_test.go index 8ceb457448..c3078e6dcb 100644 --- a/pkg/armrpc/asyncoperation/worker/registry_test.go +++ b/pkg/armrpc/asyncoperation/worker/registry_test.go @@ -23,7 +23,7 @@ import ( v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" "github.com/radius-project/radius/pkg/corerp/backend/deployment" - "github.com/radius-project/radius/pkg/ucp/dataprovider" + "github.com/radius-project/radius/pkg/ucp/store/inmemory" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) @@ -32,21 +32,17 @@ func TestRegister_Get(t *testing.T) { mctrl := gomock.NewController(t) defer mctrl.Finish() - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() - - registry := NewControllerRegistry(mockSP) + registry := NewControllerRegistry() opGet := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationGet} opPut := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationPut} ctrlOpts := ctrl.Options{ - StorageClient: nil, - DataProvider: mockSP, + StorageClient: inmemory.NewClient(), GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil }, } - err := registry.Register(context.TODO(), opGet.Type, opGet.Method, func(opts ctrl.Options) (ctrl.Controller, error) { + err := registry.Register(opGet.Type, opGet.Method, func(opts ctrl.Options) (ctrl.Controller, error) { return &testAsyncController{ BaseController: ctrl.NewBaseAsyncController(ctrlOpts), fn: func(ctx context.Context) (ctrl.Result, error) { @@ -56,45 +52,38 @@ func TestRegister_Get(t *testing.T) { }, ctrlOpts) require.NoError(t, err) - err = registry.Register(context.TODO(), opPut.Type, opPut.Method, func(opts ctrl.Options) (ctrl.Controller, error) { + err = registry.Register(opPut.Type, opPut.Method, func(opts ctrl.Options) (ctrl.Controller, error) { return &testAsyncController{ BaseController: ctrl.NewBaseAsyncController(ctrlOpts), }, nil }, ctrlOpts) require.NoError(t, err) - ctrl, err := registry.Get(context.Background(), opGet) + ctrl, err := registry.Get(opGet) require.NoError(t, err) require.NotNil(t, ctrl) - ctrl, err = registry.Get(context.Background(), opPut) + ctrl, err = registry.Get(opPut) require.NoError(t, err) require.NotNil(t, ctrl) // Getting a controller that is not registered should return nil by default. - ctrl, err = registry.Get(context.Background(), v1.OperationType{Type: "Applications.Core/unknown", Method: v1.OperationGet}) + ctrl, err = registry.Get(v1.OperationType{Type: "Applications.Core/unknown", Method: v1.OperationGet}) require.NoError(t, err) require.Nil(t, ctrl) } func TestRegister_Get_WithDefault(t *testing.T) { - mctrl := gomock.NewController(t) - defer mctrl.Finish() - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() - - registry := NewControllerRegistry(mockSP) + registry := NewControllerRegistry() opGet := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationGet} ctrlOpts := ctrl.Options{ - StorageClient: nil, - DataProvider: mockSP, + StorageClient: inmemory.NewClient(), GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil }, } - err := registry.Register(context.TODO(), opGet.Type, opGet.Method, func(opts ctrl.Options) (ctrl.Controller, error) { + err := registry.Register(opGet.Type, opGet.Method, func(opts ctrl.Options) (ctrl.Controller, error) { return &testAsyncController{ BaseController: ctrl.NewBaseAsyncController(ctrlOpts), fn: func(ctx context.Context) (ctrl.Result, error) { @@ -104,19 +93,19 @@ func TestRegister_Get_WithDefault(t *testing.T) { }, ctrlOpts) require.NoError(t, err) - err = registry.RegisterDefault(context.TODO(), func(opts ctrl.Options) (ctrl.Controller, error) { + err = registry.RegisterDefault(func(opts ctrl.Options) (ctrl.Controller, error) { return &testAsyncController{ BaseController: ctrl.NewBaseAsyncController(ctrlOpts), }, nil }, ctrlOpts) require.NoError(t, err) - ctrl, err := registry.Get(context.Background(), opGet) + ctrl, err := registry.Get(opGet) require.NoError(t, err) require.NotNil(t, ctrl) // Getting a controller that is not registered should default the default - ctrl, err = registry.Get(context.Background(), v1.OperationType{Type: "Applications.Core/unknown", Method: v1.OperationGet}) + ctrl, err = registry.Get(v1.OperationType{Type: "Applications.Core/unknown", Method: v1.OperationGet}) require.NoError(t, err) require.NotNil(t, ctrl) } diff --git a/pkg/armrpc/asyncoperation/worker/service.go b/pkg/armrpc/asyncoperation/worker/service.go index 4a04a19384..a70b98778e 100644 --- a/pkg/armrpc/asyncoperation/worker/service.go +++ b/pkg/armrpc/asyncoperation/worker/service.go @@ -34,7 +34,7 @@ type Service struct { // Options is the server hosting options. Options hostoptions.HostOptions // StorageProvider is the provider of storage client. - StorageProvider dataprovider.DataStorageProvider + StorageProvider *dataprovider.DataStorageProvider // OperationStatusManager is the manager of the operation status. OperationStatusManager manager.StatusManager // Controllers is the registry of the async operation controllers. @@ -46,15 +46,22 @@ type Service struct { // Init initializes worker service - it initializes the StorageProvider, RequestQueue, OperationStatusManager, Controllers, KubeClient and // returns an error if any of these operations fail. func (s *Service) Init(ctx context.Context) error { - s.StorageProvider = dataprovider.NewStorageProvider(s.Options.Config.StorageProvider) + s.StorageProvider = dataprovider.DataStorageProviderFromOptions(s.Options.Config.StorageProvider) qp := qprovider.New(s.Options.Config.QueueProvider) + var err error + storageClient, err := s.StorageProvider.GetClient(ctx) + if err != nil { + return err + } + s.RequestQueue, err = qp.GetClient(ctx) if err != nil { return err } - s.OperationStatusManager = manager.New(s.StorageProvider, s.RequestQueue, s.Options.Config.Env.RoleLocation) - s.Controllers = NewControllerRegistry(s.StorageProvider) + + s.OperationStatusManager = manager.New(storageClient, s.RequestQueue, s.Options.Config.Env.RoleLocation) + s.Controllers = NewControllerRegistry() return nil } diff --git a/pkg/armrpc/asyncoperation/worker/worker.go b/pkg/armrpc/asyncoperation/worker/worker.go index e00c767170..0539576109 100644 --- a/pkg/armrpc/asyncoperation/worker/worker.go +++ b/pkg/armrpc/asyncoperation/worker/worker.go @@ -168,7 +168,7 @@ func (w *AsyncRequestProcessWorker) Start(ctx context.Context) error { } reqCtx = v1.WithARMRequestContext(reqCtx, armReqCtx) - asyncCtrl, err := w.registry.Get(reqCtx, armReqCtx.OperationType) + asyncCtrl, err := w.registry.Get(armReqCtx.OperationType) if err != nil { opLogger.Error(err, "failed to get async controller.") if err := w.requestQueue.FinishMessage(reqCtx, msgreq); err != nil { diff --git a/pkg/armrpc/asyncoperation/worker/worker_runoperation_test.go b/pkg/armrpc/asyncoperation/worker/worker_runoperation_test.go index 3e7d6280ab..9ff93b40b1 100644 --- a/pkg/armrpc/asyncoperation/worker/worker_runoperation_test.go +++ b/pkg/armrpc/asyncoperation/worker/worker_runoperation_test.go @@ -29,11 +29,11 @@ import ( ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" manager "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/corerp/backend/deployment" - "github.com/radius-project/radius/pkg/ucp/dataprovider" queue "github.com/radius-project/radius/pkg/ucp/queue/client" "github.com/radius-project/radius/pkg/ucp/queue/inmemory" "github.com/radius-project/radius/pkg/ucp/resources" "github.com/radius-project/radius/pkg/ucp/store" + inmemorystore "github.com/radius-project/radius/pkg/ucp/store/inmemory" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/mock/gomock" @@ -77,7 +77,6 @@ type testContext struct { ctx context.Context mockSC *store.MockStorageClient mockSM *manager.MockStatusManager - mockSP *dataprovider.MockDataStorageProvider testQueue *inmemory.Client internalQ *inmemory.InmemQueue @@ -115,11 +114,11 @@ func (c *testContext) cancellable(timeout time.Duration) (context.Context, conte func newTestContext(t *testing.T, lockTime time.Duration) (*testContext, *gomock.Controller) { mctrl := gomock.NewController(t) inmemQ := inmemory.NewInMemQueue(lockTime) + storageClient := store.NewMockStorageClient(mctrl) return &testContext{ ctx: context.Background(), - mockSC: store.NewMockStorageClient(mctrl), + mockSC: storageClient, mockSM: manager.NewMockStatusManager(mctrl), - mockSP: dataprovider.NewMockDataStorageProvider(mctrl), internalQ: inmemQ, testQueue: inmemory.New(inmemQ), }, mctrl @@ -147,17 +146,11 @@ func TestStart_UnknownOperation(t *testing.T) { tCtx, mctrl := newTestContext(t, defaultTestLockTime) defer mctrl.Finish() - registry := NewControllerRegistry(tCtx.mockSP) + registry := NewControllerRegistry() worker := New(Options{DequeueIntervalDuration: defaultTestDequeueInterval}, nil, tCtx.testQueue, registry) - tCtx.mockSP.EXPECT(). - GetStorageClient(gomock.Any(), gomock.Any()). - Return(tCtx.mockSC, nil). - Times(1) - opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, GetDeploymentProcessor: func() deployment.DeploymentProcessor { return deployment.NewMockDeploymentProcessor(mctrl) }, @@ -174,7 +167,6 @@ func TestStart_UnknownOperation(t *testing.T) { ctx, cancel := tCtx.cancellable(time.Duration(0)) err := registry.Register( - ctx, testResourceType, "UNDEFINED", func(opts ctrl.Options) (ctrl.Controller, error) { return testCtrl, nil @@ -215,16 +207,14 @@ func TestStart_MaxDequeueCount(t *testing.T) { }).AnyTimes() tCtx.mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) tCtx.mockSM.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(v1.ProvisioningStateFailed), gomock.Any(), gomock.Any()).Return(nil).Times(1) - tCtx.mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(tCtx.mockSC), nil).Times(1) expectedDequeueCount := 2 - registry := NewControllerRegistry(tCtx.mockSP) + registry := NewControllerRegistry() worker := New(Options{MaxOperationRetryCount: expectedDequeueCount, DequeueIntervalDuration: defaultTestDequeueInterval}, tCtx.mockSM, tCtx.testQueue, registry) opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, } testCtrl := &testAsyncController{ @@ -236,12 +226,11 @@ func TestStart_MaxDequeueCount(t *testing.T) { ctx, cancel := tCtx.cancellable(0) err := registry.Register( - ctx, testResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { return testCtrl, nil }, ctrl.Options{ - DataProvider: tCtx.mockSP, + StorageClient: tCtx.mockSC, }) require.NoError(t, err) @@ -279,14 +268,12 @@ func TestStart_MaxConcurrency(t *testing.T) { tCtx.mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() tCtx.mockSM.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(testOperationStatus, nil).AnyTimes() tCtx.mockSM.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - tCtx.mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(tCtx.mockSC), nil).AnyTimes() - registry := NewControllerRegistry(tCtx.mockSP) + registry := NewControllerRegistry() worker := New(Options{DequeueIntervalDuration: defaultTestDequeueInterval}, tCtx.mockSM, tCtx.testQueue, registry) opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, GetDeploymentProcessor: func() deployment.DeploymentProcessor { return deployment.NewMockDeploymentProcessor(mctrl) }, @@ -309,7 +296,6 @@ func TestStart_MaxConcurrency(t *testing.T) { } ctx, cancel := tCtx.cancellable(time.Duration(0)) err := registry.Register( - ctx, testResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { @@ -358,14 +344,12 @@ func TestStart_RunOperation(t *testing.T) { tCtx.mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() tCtx.mockSM.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(testOperationStatus, nil).AnyTimes() tCtx.mockSM.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - tCtx.mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(tCtx.mockSC), nil).AnyTimes() - registry := NewControllerRegistry(tCtx.mockSP) + registry := NewControllerRegistry() worker := New(Options{}, tCtx.mockSM, tCtx.testQueue, registry) opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, GetDeploymentProcessor: func() deployment.DeploymentProcessor { return deployment.NewMockDeploymentProcessor(mctrl) }, @@ -385,7 +369,6 @@ func TestStart_RunOperation(t *testing.T) { ctx, cancel := tCtx.cancellable(time.Duration(0)) err := registry.Register( - ctx, testResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { return testCtrl, nil @@ -437,7 +420,6 @@ func TestRunOperation_Successfully(t *testing.T) { opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, GetDeploymentProcessor: func() deployment.DeploymentProcessor { return deployment.NewMockDeploymentProcessor(mctrl) }, @@ -477,7 +459,6 @@ func TestRunOperation_ExtendMessageLock(t *testing.T) { opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, GetDeploymentProcessor: func() deployment.DeploymentProcessor { return deployment.NewMockDeploymentProcessor(mctrl) }, @@ -513,9 +494,13 @@ func TestRunOperation_CancelContext(t *testing.T) { worker := New(Options{}, nil, tCtx.testQueue, nil) + // This test has a race condition with the worker loop trying to make an operation status + // as failed. We can't use a mock because that might happen after the mock is destroyed (on test completion). + // + // Instead we use the in-memory store. + opts := ctrl.Options{ - StorageClient: nil, - DataProvider: tCtx.mockSP, + StorageClient: inmemorystore.NewClient(), GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil }, @@ -571,7 +556,6 @@ func TestRunOperation_Timeout(t *testing.T) { opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, GetDeploymentProcessor: func() deployment.DeploymentProcessor { return deployment.NewMockDeploymentProcessor(mctrl) }, @@ -606,7 +590,6 @@ func TestRunOperation_PanicController(t *testing.T) { opts := ctrl.Options{ StorageClient: tCtx.mockSC, - DataProvider: tCtx.mockSP, GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil }, diff --git a/pkg/armrpc/builder/builder.go b/pkg/armrpc/builder/builder.go index 0120d77f22..24f17263c2 100644 --- a/pkg/armrpc/builder/builder.go +++ b/pkg/armrpc/builder/builder.go @@ -172,7 +172,7 @@ func (b *Builder) ApplyAsyncHandler(ctx context.Context, registry *worker.Contro } if h.AsyncController != nil { - err := registry.Register(ctx, h.ResourceType, h.Method, h.AsyncController, ctrlOpts) + err := registry.Register(h.ResourceType, h.Method, h.AsyncController, ctrlOpts) if err != nil { return err } diff --git a/pkg/armrpc/builder/builder_test.go b/pkg/armrpc/builder/builder_test.go index c0c0f20654..90695a363c 100644 --- a/pkg/armrpc/builder/builder_test.go +++ b/pkg/armrpc/builder/builder_test.go @@ -23,12 +23,12 @@ import ( "github.com/go-chi/chi/v5" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" - asyncctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + backendctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/asyncoperation/worker" apictrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/rpctest" - "github.com/radius-project/radius/pkg/ucp/dataprovider" - "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/store/inmemory" "github.com/radius-project/radius/test/testcontext" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -167,28 +167,19 @@ var defaultHandlerTests = []rpctest.HandlerTestSpec{ }, } -func setup(t *testing.T) (*dataprovider.MockDataStorageProvider, *store.MockStorageClient) { - mctrl := gomock.NewController(t) - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSC := store.NewMockStorageClient(mctrl) - - mockSC.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(&store.Object{}, nil).AnyTimes() - mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockSC.EXPECT().Delete(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockSC.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any()).Return(&store.ObjectQueryResult{}, nil).AnyTimes() - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(mockSC), nil).AnyTimes() - - return mockSP, mockSC -} - func TestApplyAPIHandlers(t *testing.T) { - mockSP, _ := setup(t) - runTests := func(t *testing.T, testSpecs []rpctest.HandlerTestSpec, b *Builder) { rpctest.AssertRequests(t, testSpecs, "/api.ucp.dev", "/planes/radius/local", func(ctx context.Context) (chi.Router, error) { r := chi.NewRouter() - return r, b.ApplyAPIHandlers(ctx, r, apictrl.Options{PathBase: "/api.ucp.dev", DataProvider: mockSP}) + + options := apictrl.Options{ + Address: "localhost:8080", + PathBase: "/api.ucp.dev", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), + } + + return r, b.ApplyAPIHandlers(ctx, r, options) }) } @@ -218,7 +209,6 @@ func TestApplyAPIHandlers(t *testing.T) { } func TestApplyAPIHandlers_AvailableOperations(t *testing.T) { - mockSP, _ := setup(t) ns := newTestNamespace(t) ns.SetAvailableOperations([]v1.Operation{ @@ -237,17 +227,27 @@ func TestApplyAPIHandlers_AvailableOperations(t *testing.T) { builder := ns.GenerateBuilder() rpctest.AssertRequests(t, handlerTests, "/api.ucp.dev", "/planes/radius/local", func(ctx context.Context) (chi.Router, error) { r := chi.NewRouter() - return r, builder.ApplyAPIHandlers(ctx, r, apictrl.Options{PathBase: "/api.ucp.dev", DataProvider: mockSP}) + options := apictrl.Options{ + Address: "localhost:8080", + PathBase: "/api.ucp.dev", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), + } + return r, builder.ApplyAPIHandlers(ctx, r, options) }) } func TestApplyAsyncHandler(t *testing.T) { - mockSP, _ := setup(t) ns := newTestNamespace(t) builder := ns.GenerateBuilder() - registry := worker.NewControllerRegistry(mockSP) + registry := worker.NewControllerRegistry() ctx := testcontext.New(t) - err := builder.ApplyAsyncHandler(ctx, registry, asyncctrl.Options{}) + + options := backendctrl.Options{ + StorageClient: inmemory.NewClient(), + } + + err := builder.ApplyAsyncHandler(ctx, registry, options) require.NoError(t, err) expectedOperations := []v1.OperationType{ @@ -261,7 +261,7 @@ func TestApplyAsyncHandler(t *testing.T) { } for _, op := range expectedOperations { - jobCtrl, err := registry.Get(context.Background(), op) + jobCtrl, err := registry.Get(op) require.NoError(t, err) require.NotNil(t, jobCtrl) } diff --git a/pkg/armrpc/frontend/controller/controller.go b/pkg/armrpc/frontend/controller/controller.go index d7c8bab554..bbda7f3537 100644 --- a/pkg/armrpc/frontend/controller/controller.go +++ b/pkg/armrpc/frontend/controller/controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "errors" "net/http" "time" @@ -25,7 +26,6 @@ import ( sm "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/hostoptions" "github.com/radius-project/radius/pkg/armrpc/rest" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/store" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -55,9 +55,6 @@ type Options struct { // StorageClient is the data storage client. StorageClient store.StorageClient - // DataProvider is the data storage provider. - DataProvider dataprovider.DataStorageProvider - // KubeClient is the Kubernetes controller runtime client. KubeClient runtimeclient.Client @@ -69,6 +66,28 @@ type Options struct { StatusManager sm.StatusManager } +func (o Options) Validate() error { + var err error + if o.Address == "" { + err = errors.Join(err, errors.New(".Address is required")) + } + if o.StorageClient == nil { + err = errors.Join(err, errors.New(".StorageClient is required")) + } + if o.ResourceType == "" { + err = errors.Join(err, errors.New(".ResourceType is required")) + } + if o.StatusManager == nil { + err = errors.Join(err, errors.New(".StatusManager is required")) + } + + // PathBase is usually empty, so it is not validated here. + // + // KubeClient is not used by the majority of the code, so it is not validated here. + + return err +} + // ResourceOptions represents the options and filters for resource. type ResourceOptions[T any] struct { // RequestConverter is the request converter. @@ -122,11 +141,6 @@ func (b *BaseController) StorageClient() store.StorageClient { return b.options.StorageClient } -// DataProvider gets data storage provider for this controller. -func (b *BaseController) DataProvider() dataprovider.DataStorageProvider { - return b.options.DataProvider -} - // KubeClient gets Kubernetes client for this controller. func (b *BaseController) KubeClient() runtimeclient.Client { return b.options.KubeClient diff --git a/pkg/armrpc/frontend/controller/controller_test.go b/pkg/armrpc/frontend/controller/controller_test.go index d8435643e4..af31d927d6 100644 --- a/pkg/armrpc/frontend/controller/controller_test.go +++ b/pkg/armrpc/frontend/controller/controller_test.go @@ -20,6 +20,8 @@ import ( "testing" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" + "github.com/radius-project/radius/pkg/ucp/store" "github.com/stretchr/testify/require" ) @@ -99,3 +101,75 @@ func TestUpdateSystemData(t *testing.T) { }) } } + +func TestOptionsValidate(t *testing.T) { + tests := []struct { + name string + options Options + wantErr bool + errMsg string + }{ + { + name: "valid options", + options: Options{ + Address: "localhost:8080", + StorageClient: &store.MockStorageClient{}, + ResourceType: "testResource", + StatusManager: &statusmanager.MockStatusManager{}, + }, + wantErr: false, + }, + { + name: "missing address", + options: Options{ + StorageClient: &store.MockStorageClient{}, + ResourceType: "testResource", + StatusManager: &statusmanager.MockStatusManager{}, + }, + wantErr: true, + errMsg: ".Address is required", + }, + { + name: "missing storage client", + options: Options{ + Address: "localhost:8080", + ResourceType: "testResource", + StatusManager: &statusmanager.MockStatusManager{}, + }, + wantErr: true, + errMsg: ".StorageClient is required", + }, + { + name: "missing resource type", + options: Options{ + Address: "localhost:8080", + StorageClient: &store.MockStorageClient{}, + StatusManager: &statusmanager.MockStatusManager{}, + }, + wantErr: true, + errMsg: ".ResourceType is required", + }, + { + name: "missing status manager", + options: Options{ + Address: "localhost:8080", + StorageClient: &store.MockStorageClient{}, + ResourceType: "testResource", + }, + wantErr: true, + errMsg: ".StatusManager is required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.options.Validate() + if tt.wantErr { + require.Error(t, err) + require.Contains(t, err.Error(), tt.errMsg) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/armrpc/frontend/controller/operation.go b/pkg/armrpc/frontend/controller/operation.go index db6d510148..2d720c1f47 100644 --- a/pkg/armrpc/frontend/controller/operation.go +++ b/pkg/armrpc/frontend/controller/operation.go @@ -26,7 +26,6 @@ import ( v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" sm "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/rest" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/resources" "github.com/radius-project/radius/pkg/ucp/store" ) @@ -64,11 +63,6 @@ func (b *Operation[P, T]) StorageClient() store.StorageClient { return b.options.StorageClient } -// DataProvider gets the DataStorageProvider. -func (b *Operation[P, T]) DataProvider() dataprovider.DataStorageProvider { - return b.options.DataProvider -} - // ResourceType gets the resource type for this controller. func (b *Operation[P, T]) ResourceType() string { return b.options.ResourceType diff --git a/pkg/armrpc/frontend/defaultoperation/getoperationresult.go b/pkg/armrpc/frontend/defaultoperation/getoperationresult.go index e5f64effd8..473884ab24 100644 --- a/pkg/armrpc/frontend/defaultoperation/getoperationresult.go +++ b/pkg/armrpc/frontend/defaultoperation/getoperationresult.go @@ -56,14 +56,7 @@ func (e *GetOperationResult) Run(ctx context.Context, w http.ResponseWriter, req return rest.NewBadRequestResponse(err.Error()), nil } - // Avoid using GetResource or e.StorageClient since they will use a different - // storage client than the one we want. - storageClient, err := e.DataProvider().GetStorageClient(ctx, id.ProviderNamespace()+"/operationstatuses") - if err != nil { - return nil, err - } - - obj, err := storageClient.Get(ctx, id.String()) + obj, err := e.StorageClient().Get(ctx, id.String()) if errors.Is(&store.ErrNotFound{ID: id.String()}, err) { return rest.NewNotFoundResponse(id), nil } diff --git a/pkg/armrpc/frontend/defaultoperation/getoperationresult_test.go b/pkg/armrpc/frontend/defaultoperation/getoperationresult_test.go index f4ad86b1d6..9434bfed4a 100644 --- a/pkg/armrpc/frontend/defaultoperation/getoperationresult_test.go +++ b/pkg/armrpc/frontend/defaultoperation/getoperationresult_test.go @@ -28,7 +28,6 @@ import ( manager "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" ctrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/rpctest" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/store" "github.com/radius-project/radius/test/testcontext" "github.com/radius-project/radius/test/testutil" @@ -51,21 +50,14 @@ func TestGetOperationResultRun(t *testing.T) { t.Run("get non-existing resource", func(t *testing.T) { mctrl := gomock.NewController(t) - operationResultStoreClient := store.NewMockStorageClient(mctrl) - operationStatusStoreClient := store.NewMockStorageClient(mctrl) - - dataProvider := dataprovider.NewMockDataStorageProvider(mctrl) - dataProvider.EXPECT(). - GetStorageClient(gomock.Any(), "Applications.Core/operationstatuses"). - Return(operationStatusStoreClient, nil). - Times(1) + storageClient := store.NewMockStorageClient(mctrl) w := httptest.NewRecorder() req, err := rpctest.NewHTTPRequestFromJSON(testcontext.New(t), http.MethodGet, operationStatusTestHeaderFile, nil) require.NoError(t, err) ctx := rpctest.NewARMRequestContext(req) - operationStatusStoreClient. + storageClient. EXPECT(). Get(gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, id string, _ ...store.GetOptions) (*store.Object, error) { @@ -73,8 +65,7 @@ func TestGetOperationResultRun(t *testing.T) { }) ctl, err := NewGetOperationResult(ctrl.Options{ - DataProvider: dataProvider, - StorageClient: operationResultStoreClient, // Will not be used. + StorageClient: storageClient, }) require.NoError(t, err) @@ -125,14 +116,7 @@ func TestGetOperationResultRun(t *testing.T) { for _, tt := range opResTestCases { t.Run(tt.desc, func(t *testing.T) { mctrl := gomock.NewController(t) - operationResultStoreClient := store.NewMockStorageClient(mctrl) - operationStatusStoreClient := store.NewMockStorageClient(mctrl) - - dataProvider := dataprovider.NewMockDataStorageProvider(mctrl) - dataProvider.EXPECT(). - GetStorageClient(gomock.Any(), "Applications.Core/operationstatuses"). - Return(operationStatusStoreClient, nil). - Times(1) + storageClient := store.NewMockStorageClient(mctrl) w := httptest.NewRecorder() req, err := rpctest.NewHTTPRequestFromJSON(testcontext.New(t), http.MethodGet, operationStatusTestHeaderFile, nil) @@ -142,7 +126,7 @@ func TestGetOperationResultRun(t *testing.T) { osDataModel.Status = tt.provisioningState osDataModel.RetryAfter = time.Second * 5 - operationStatusStoreClient. + storageClient. EXPECT(). Get(gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, id string, _ ...store.GetOptions) (*store.Object, error) { @@ -153,8 +137,7 @@ func TestGetOperationResultRun(t *testing.T) { }) ctl, err := NewGetOperationResult(ctrl.Options{ - DataProvider: dataProvider, - StorageClient: operationResultStoreClient, // Will not be used. + StorageClient: storageClient, }) require.NoError(t, err) diff --git a/pkg/armrpc/frontend/server/handler.go b/pkg/armrpc/frontend/server/handler.go index d6a8354cac..e5d3c47b3d 100644 --- a/pkg/armrpc/frontend/server/handler.go +++ b/pkg/armrpc/frontend/server/handler.go @@ -130,14 +130,13 @@ func HandlerForController(controller ctrl.Controller, operationType v1.Operation // CreateHandler creates an http.Handler for the given resource type and operation method. func CreateHandler(ctx context.Context, resourceType string, operationMethod v1.OperationMethod, opts ctrl.Options, factory ControllerFactoryFunc) (http.HandlerFunc, error) { - storageClient, err := opts.DataProvider.GetStorageClient(ctx, resourceType) + opts.ResourceType = resourceType + + err := opts.Validate() if err != nil { - return nil, err + return nil, fmt.Errorf("invalid controller options: %w", err) } - opts.StorageClient = storageClient - opts.ResourceType = resourceType - ctrl, err := factory(opts) if err != nil { return nil, err @@ -155,14 +154,13 @@ func RegisterHandler(ctx context.Context, opts HandlerOptions, ctrlOpts ctrl.Opt return ErrInvalidOperationTypeOption } - storageClient, err := ctrlOpts.DataProvider.GetStorageClient(ctx, opts.ResourceType) + ctrlOpts.ResourceType = opts.ResourceType + + err := ctrlOpts.Validate() if err != nil { - return err + return fmt.Errorf("invalid controller options: %w", err) } - ctrlOpts.StorageClient = storageClient - ctrlOpts.ResourceType = opts.ResourceType - ctrl, err := opts.ControllerFactory(ctrlOpts) if err != nil { return err diff --git a/pkg/armrpc/frontend/server/handler_test.go b/pkg/armrpc/frontend/server/handler_test.go index f635c3f125..107c8b6877 100644 --- a/pkg/armrpc/frontend/server/handler_test.go +++ b/pkg/armrpc/frontend/server/handler_test.go @@ -28,11 +28,12 @@ import ( "github.com/go-chi/chi/v5" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" ctrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/rest" "github.com/radius-project/radius/pkg/armrpc/rpctest" "github.com/radius-project/radius/pkg/middleware" - "github.com/radius-project/radius/pkg/ucp/dataprovider" + "github.com/radius-project/radius/pkg/ucp/store/inmemory" "github.com/radius-project/radius/test/testcontext" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -73,18 +74,17 @@ func Test_NewSubrouter(t *testing.T) { } func Test_RegisterHandler_DeplicatedRoutes(t *testing.T) { - mctrl := gomock.NewController(t) - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() ctrlOpts := ctrl.Options{ - DataProvider: mockSP, + Address: "localhost:8080", + ResourceType: "Applications.Test/testResources", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), } p := chi.NewRouter() opts := HandlerOptions{ ParentRouter: p, - ResourceType: "Applications.Test", + ResourceType: "Applications.Test/testResources", Method: http.MethodGet, ControllerFactory: func(ctrl.Options) (ctrl.Controller, error) { return nil, nil }, Middlewares: chi.Middlewares{middleware.NormalizePath}, @@ -112,7 +112,7 @@ func Test_RegisterHandler(t *testing.T) { name: "valid route with resource type and method", opts: HandlerOptions{ ParentRouter: p, - ResourceType: "Applications.Test", + ResourceType: "Applications.Test/testResources", Method: http.MethodGet, ControllerFactory: func(ctrl.Options) (ctrl.Controller, error) { return nil, nil }, Middlewares: chi.Middlewares{middleware.NormalizePath}, @@ -124,7 +124,7 @@ func Test_RegisterHandler(t *testing.T) { opts: HandlerOptions{ ParentRouter: p, Path: "/test", - ResourceType: "Applications.Test", + ResourceType: "Applications.Test/testResources", Method: http.MethodGet, ControllerFactory: func(ctrl.Options) (ctrl.Controller, error) { return nil, nil }, Middlewares: chi.Middlewares{middleware.NormalizePath}, @@ -136,7 +136,8 @@ func Test_RegisterHandler(t *testing.T) { name: "valid route with operation type", opts: HandlerOptions{ ParentRouter: p, - OperationType: &v1.OperationType{Type: "Applications.Test", Method: "GET"}, + ResourceType: "Applications.Test/testResources", + OperationType: &v1.OperationType{Type: "Applications.Test/testResources", Method: "GET"}, ControllerFactory: func(ctrl.Options) (ctrl.Controller, error) { return nil, nil }, Middlewares: chi.Middlewares{middleware.NormalizePath}, }, @@ -148,7 +149,8 @@ func Test_RegisterHandler(t *testing.T) { opts: HandlerOptions{ ParentRouter: p, Path: "/*", - OperationType: &v1.OperationType{Type: "Applications.Test", Method: "PROXY"}, + ResourceType: "Applications.Test/testResources", + OperationType: &v1.OperationType{Type: "Applications.Test/testResources", Method: "PROXY"}, ControllerFactory: func(ctrl.Options) (ctrl.Controller, error) { return nil, nil }, }, validMethod: []string{http.MethodGet, http.MethodPost}, @@ -166,12 +168,10 @@ func Test_RegisterHandler(t *testing.T) { }, } - mctrl := gomock.NewController(t) - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() ctrlOpts := ctrl.Options{ - DataProvider: mockSP, + Address: "localhost:8080", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), } for _, tc := range tests { diff --git a/pkg/armrpc/frontend/server/service.go b/pkg/armrpc/frontend/server/service.go index f46244cfbe..6d31f1c958 100644 --- a/pkg/armrpc/frontend/server/service.go +++ b/pkg/armrpc/frontend/server/service.go @@ -40,7 +40,7 @@ type Service struct { Options hostoptions.HostOptions // StorageProvider is the provider of storage client. - StorageProvider dataprovider.DataStorageProvider + StorageProvider *dataprovider.DataStorageProvider // OperationStatusManager is the manager of the operation status. OperationStatusManager manager.StatusManager @@ -57,13 +57,19 @@ type Service struct { func (s *Service) Init(ctx context.Context) error { logger := ucplog.FromContextOrDiscard(ctx) - s.StorageProvider = dataprovider.NewStorageProvider(s.Options.Config.StorageProvider) + s.StorageProvider = dataprovider.DataStorageProviderFromOptions(s.Options.Config.StorageProvider) qp := qprovider.New(s.Options.Config.QueueProvider) + + storageClient, err := s.StorageProvider.GetClient(ctx) + if err != nil { + return err + } + reqQueueClient, err := qp.GetClient(ctx) if err != nil { return err } - s.OperationStatusManager = manager.New(s.StorageProvider, reqQueueClient, s.Options.Config.Env.RoleLocation) + s.OperationStatusManager = manager.New(storageClient, reqQueueClient, s.Options.Config.Env.RoleLocation) s.KubeClient, err = kubeutil.NewRuntimeClient(s.Options.K8sConfig) if err != nil { return err diff --git a/pkg/armrpc/rpctest/controllers.go b/pkg/armrpc/rpctest/controllers.go index 2c3c7d4cdf..80610ea8f4 100644 --- a/pkg/armrpc/rpctest/controllers.go +++ b/pkg/armrpc/rpctest/controllers.go @@ -22,7 +22,6 @@ import ( "testing" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/store" "go.uber.org/mock/gomock" ) @@ -32,17 +31,16 @@ type ControllerContext struct { Ctx context.Context MCtrl *gomock.Controller MockSC *store.MockStorageClient - MockSP *dataprovider.MockDataStorageProvider } // NewControllerContext creates a new ControllerContext for testing. func NewControllerContext(t *testing.T) *ControllerContext { mctrl := gomock.NewController(t) + return &ControllerContext{ Ctx: context.Background(), MCtrl: mctrl, MockSC: store.NewMockStorageClient(mctrl), - MockSP: dataprovider.NewMockDataStorageProvider(mctrl), } } diff --git a/pkg/corerp/backend/deployment/deploymentprocessor.go b/pkg/corerp/backend/deployment/deploymentprocessor.go index 2ebf21a3e3..80392e48bc 100644 --- a/pkg/corerp/backend/deployment/deploymentprocessor.go +++ b/pkg/corerp/backend/deployment/deploymentprocessor.go @@ -40,7 +40,6 @@ import ( msg_dm "github.com/radius-project/radius/pkg/messagingrp/datamodel" msg_ctrl "github.com/radius-project/radius/pkg/messagingrp/frontend/controller" "github.com/radius-project/radius/pkg/portableresources" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/resources" "github.com/radius-project/radius/pkg/ucp/store" "github.com/radius-project/radius/pkg/ucp/ucplog" @@ -60,15 +59,15 @@ type DeploymentProcessor interface { } // NewDeploymentProcessor creates a new instance of the DeploymentProcessor struct with the given parameters. -func NewDeploymentProcessor(appmodel model.ApplicationModel, sp dataprovider.DataStorageProvider, k8sClient controller_runtime.Client, k8sClientSet kubernetes.Interface) DeploymentProcessor { - return &deploymentProcessor{appmodel: appmodel, sp: sp, k8sClient: k8sClient, k8sClientSet: k8sClientSet} +func NewDeploymentProcessor(appmodel model.ApplicationModel, storageClient store.StorageClient, k8sClient controller_runtime.Client, k8sClientSet kubernetes.Interface) DeploymentProcessor { + return &deploymentProcessor{appmodel: appmodel, storageClient: storageClient, k8sClient: k8sClient, k8sClientSet: k8sClientSet} } var _ DeploymentProcessor = (*deploymentProcessor)(nil) type deploymentProcessor struct { - appmodel model.ApplicationModel - sp dataprovider.DataStorageProvider + appmodel model.ApplicationModel + storageClient store.StorageClient // k8sClient is the Kubernetes controller runtime client. k8sClient controller_runtime.Client // k8sClientSet is the Kubernetes client. @@ -226,14 +225,14 @@ func (dp *deploymentProcessor) getApplicationAndEnvironmentForResourceID(ctx con // 2. fetch the application properties from the DB app := &corerp_dm.Application{} - err = rp_util.FetchScopeResource(ctx, dp.sp, res.AppID.String(), app) + err = rp_util.FetchScopeResource(ctx, dp.storageClient, res.AppID.String(), app) if err != nil { return nil, nil, err } // 3. fetch the environment resource from the db to get the Namespace env := &corerp_dm.Environment{} - err = rp_util.FetchScopeResource(ctx, dp.sp, app.Properties.Environment, env) + err = rp_util.FetchScopeResource(ctx, dp.storageClient, app.Properties.Environment, env) if err != nil { return nil, nil, err } @@ -481,12 +480,7 @@ func (dp *deploymentProcessor) getAppOptions(appProp *corerp_dm.ApplicationPrope // getResourceDataByID fetches resource for the provided id from the data store func (dp *deploymentProcessor) getResourceDataByID(ctx context.Context, resourceID resources.ID) (ResourceData, error) { errMsg := "failed to fetch the resource %q. Err: %w" - sc, err := dp.sp.GetStorageClient(ctx, resourceID.Type()) - if err != nil { - return ResourceData{}, fmt.Errorf(errMsg, resourceID.String(), err) - } - - resource, err := sc.Get(ctx, resourceID.String()) + resource, err := dp.storageClient.Get(ctx, resourceID.String()) if err != nil { if errors.Is(&store.ErrNotFound{ID: resourceID.String()}, err) { return ResourceData{}, v1.NewClientErrInvalidRequest(fmt.Sprintf("resource %q does not exist", resourceID.String())) diff --git a/pkg/corerp/backend/deployment/deploymentprocessor_test.go b/pkg/corerp/backend/deployment/deploymentprocessor_test.go index 4a5cb748ef..0f2c4816e8 100644 --- a/pkg/corerp/backend/deployment/deploymentprocessor_test.go +++ b/pkg/corerp/backend/deployment/deploymentprocessor_test.go @@ -37,7 +37,6 @@ import ( "github.com/radius-project/radius/pkg/resourcemodel" rpv1 "github.com/radius-project/radius/pkg/rp/v1" "github.com/radius-project/radius/pkg/to" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/resources" resources_azure "github.com/radius-project/radius/pkg/ucp/resources/azure" resources_kubernetes "github.com/radius-project/radius/pkg/ucp/resources/kubernetes" @@ -51,8 +50,7 @@ import ( type SharedMocks struct { model model.ApplicationModel - db *store.MockStorageClient - dbProvider *dataprovider.MockDataStorageProvider + storageClient *store.MockStorageClient resourceHandler *handlers.MockResourceHandler renderer *renderers.MockRenderer mctrl *gomock.Controller @@ -131,10 +129,10 @@ func setup(t *testing.T) SharedMocks { }, } + storageClient := store.NewMockStorageClient(ctrl) return SharedMocks{ model: model, - db: store.NewMockStorageClient(ctrl), - dbProvider: dataprovider.NewMockDataStorageProvider(ctrl), + storageClient: storageClient, resourceHandler: resourceHandler, renderer: renderer, mctrl: ctrl, @@ -303,7 +301,7 @@ func Test_Render(t *testing.T) { t.Run("verify render success", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -314,7 +312,6 @@ func Test_Render(t *testing.T) { mocks.renderer.EXPECT().Render(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(testRendererOutput, nil) mocks.renderer.EXPECT().GetDependencyIDs(gomock.Any(), gomock.Any()).Times(1).Return(requiredResources, nil, nil) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).AnyTimes().Return(mocks.db, nil) cr := store.Object{ Metadata: store.Metadata{ @@ -322,7 +319,7 @@ func Test_Render(t *testing.T) { }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) application := datamodel.Application{ BaseResource: v1.BaseResource{ TrackedResource: v1.TrackedResource{ @@ -341,14 +338,14 @@ func Test_Render(t *testing.T) { }, Data: application, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) er := store.Object{ Metadata: store.Metadata{ ID: env.ID, }, Data: env, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) mongoResource := dsrp_dm.MongoDatabase{ BaseResource: v1.BaseResource{ @@ -369,7 +366,7 @@ func Test_Render(t *testing.T) { Data: mongoResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&mr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&mr, nil) rendererOutput, err := dp.Render(ctx, resourceID, &testResource) require.NoError(t, err) @@ -378,7 +375,7 @@ func Test_Render(t *testing.T) { t.Run("verify render success lowercase resourcetype", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getLowerCaseTestResource() testRendererOutput := getTestRendererOutput() @@ -386,7 +383,6 @@ func Test_Render(t *testing.T) { mocks.renderer.EXPECT().Render(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(testRendererOutput, nil) mocks.renderer.EXPECT().GetDependencyIDs(gomock.Any(), gomock.Any()).Times(1).Return([]resources.ID{}, nil, nil) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).AnyTimes().Return(mocks.db, nil) cr := store.Object{ Metadata: store.Metadata{ @@ -394,7 +390,7 @@ func Test_Render(t *testing.T) { }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) application := datamodel.Application{ BaseResource: v1.BaseResource{ TrackedResource: v1.TrackedResource{ @@ -413,14 +409,14 @@ func Test_Render(t *testing.T) { }, Data: application, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) er := store.Object{ Metadata: store.Metadata{ ID: env.ID, }, Data: env, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) rendererOutput, err := dp.Render(ctx, resourceID, &testResource) require.NoError(t, err) @@ -429,7 +425,7 @@ func Test_Render(t *testing.T) { t.Run("verify render success uppercase resourcetype", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getUpperCaseTestResource() testRendererOutput := getTestRendererOutput() @@ -437,7 +433,6 @@ func Test_Render(t *testing.T) { mocks.renderer.EXPECT().Render(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(testRendererOutput, nil) mocks.renderer.EXPECT().GetDependencyIDs(gomock.Any(), gomock.Any()).Times(1).Return([]resources.ID{}, nil, nil) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).AnyTimes().Return(mocks.db, nil) cr := store.Object{ Metadata: store.Metadata{ @@ -445,7 +440,7 @@ func Test_Render(t *testing.T) { }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) application := datamodel.Application{ BaseResource: v1.BaseResource{ TrackedResource: v1.TrackedResource{ @@ -464,14 +459,14 @@ func Test_Render(t *testing.T) { }, Data: application, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) er := store.Object{ Metadata: store.Metadata{ ID: env.ID, }, Data: env, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) rendererOutput, err := dp.Render(ctx, resourceID, &testResource) require.NoError(t, err) @@ -480,13 +475,12 @@ func Test_Render(t *testing.T) { t.Run("verify render error", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) mocks.renderer.EXPECT().GetDependencyIDs(gomock.Any(), gomock.Any()).Times(1).Return([]resources.ID{}, nil, nil) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).AnyTimes().Return(mocks.db, nil) mocks.renderer.EXPECT().Render(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(renderers.RendererOutput{}, errors.New("failed to render the resource")) cr := store.Object{ @@ -495,7 +489,7 @@ func Test_Render(t *testing.T) { }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) application := datamodel.Application{ BaseResource: v1.BaseResource{ TrackedResource: v1.TrackedResource{ @@ -514,43 +508,27 @@ func Test_Render(t *testing.T) { }, Data: application, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) er := store.Object{ Metadata: store.Metadata{ ID: env.ID, }, Data: env, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) _, err := dp.Render(ctx, resourceID, &testResource) require.Error(t, err, "failed to render the resource") }) - t.Run("Failure to get storage client", func(t *testing.T) { - mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} - - testResource := getTestResource() - resourceID := getTestResourceID(testResource.ID) - - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Times(1).Return(nil, errors.New("unsupported storage provider")) - - _, err := dp.Render(ctx, resourceID, &testResource) - require.Error(t, err) - require.Equal(t, "failed to fetch the resource \"/subscriptions/test-subscription/resourceGroups/test-resource-group/providers/Applications.Core/containers/test-resource\". Err: unsupported storage provider", err.Error()) - }) - t.Run("Resource not found in data store", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Times(1).Return(mocks.db, nil) - - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&store.Object{}, &store.ErrNotFound{ID: testResource.ID}) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&store.Object{}, &store.ErrNotFound{ID: testResource.ID}) _, err := dp.Render(ctx, resourceID, &testResource) require.Error(t, err) @@ -560,14 +538,12 @@ func Test_Render(t *testing.T) { t.Run("Data store access error", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Times(1).Return(mocks.db, nil) - - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&store.Object{}, errors.New("failed to connect to data store")) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&store.Object{}, errors.New("failed to connect to data store")) _, err := dp.Render(ctx, resourceID, &testResource) require.Error(t, err) @@ -576,7 +552,7 @@ func Test_Render(t *testing.T) { t.Run("Invalid resource type", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testInvalidResourceID := "/subscriptions/test-sub/resourceGroups/test-group/providers/Applications.foo/foo/foo" testResource := getTestResource() @@ -588,21 +564,19 @@ func Test_Render(t *testing.T) { t.Run("Invalid application id", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) testResource.Properties.Application = "invalid-app-id" - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Times(1).Return(mocks.db, nil) - cr := store.Object{ Metadata: store.Metadata{ ID: testResource.ID, }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) _, err := dp.Render(ctx, resourceID, &testResource) require.Error(t, err) @@ -612,21 +586,19 @@ func Test_Render(t *testing.T) { t.Run("Missing application id", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) testResource.Properties.Application = "" - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Times(1).Return(mocks.db, nil) - cr := store.Object{ Metadata: store.Metadata{ ID: testResource.ID, }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) _, err := dp.Render(ctx, resourceID, &testResource) require.Error(t, err) @@ -635,21 +607,19 @@ func Test_Render(t *testing.T) { t.Run("Invalid application resource type", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) testResource.Properties.Application = "/subscriptions/test-subscription/resourceGroups/test-resource-group/providers/Applications.Core/app/test-application" - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Times(1).Return(mocks.db, nil) - cr := store.Object{ Metadata: store.Metadata{ ID: testResource.ID, }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) _, err := dp.Render(ctx, resourceID, &testResource) require.Error(t, err) @@ -659,7 +629,7 @@ func Test_Render(t *testing.T) { t.Run("Missing output resource provider", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -669,7 +639,6 @@ func Test_Render(t *testing.T) { mocks.renderer.EXPECT().Render(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(testRendererOutput, nil) mocks.renderer.EXPECT().GetDependencyIDs(gomock.Any(), gomock.Any()).Times(1).Return([]resources.ID{}, nil, nil) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).AnyTimes().Return(mocks.db, nil) cr := store.Object{ Metadata: store.Metadata{ @@ -677,7 +646,7 @@ func Test_Render(t *testing.T) { }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) application := datamodel.Application{ BaseResource: v1.BaseResource{ TrackedResource: v1.TrackedResource{ @@ -696,14 +665,14 @@ func Test_Render(t *testing.T) { }, Data: application, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) er := store.Object{ Metadata: store.Metadata{ ID: env.ID, }, Data: env, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) _, err := dp.Render(ctx, resourceID, &testResource) require.Error(t, err, "output resource \"Deployment\" does not have a provider specified") @@ -711,7 +680,7 @@ func Test_Render(t *testing.T) { t.Run("Unsupported output resource provider", func(t *testing.T) { mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -720,7 +689,6 @@ func Test_Render(t *testing.T) { testRendererOutput.Resources[0].CreateResource.ResourceType.Provider = "unknown" mocks.renderer.EXPECT().GetDependencyIDs(gomock.Any(), gomock.Any()).Times(1).Return([]resources.ID{}, nil, nil) - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).AnyTimes().Return(mocks.db, nil) cr := store.Object{ Metadata: store.Metadata{ @@ -728,7 +696,7 @@ func Test_Render(t *testing.T) { }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) application := datamodel.Application{ BaseResource: v1.BaseResource{ TrackedResource: v1.TrackedResource{ @@ -747,14 +715,14 @@ func Test_Render(t *testing.T) { }, Data: application, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) er := store.Object{ Metadata: store.Metadata{ ID: env.ID, }, Data: env, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) mocks.renderer.EXPECT().Render(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(testRendererOutput, nil) @@ -765,14 +733,13 @@ func Test_Render(t *testing.T) { func setupDeployMocks(mocks SharedMocks, simulated bool) { testResource := getTestResource() - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).AnyTimes().Return(mocks.db, nil) cr := store.Object{ Metadata: store.Metadata{ ID: testResource.ID, }, Data: testResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&cr, nil) app := datamodel.Application{ BaseResource: v1.BaseResource{ @@ -793,7 +760,7 @@ func setupDeployMocks(mocks SharedMocks, simulated bool) { }, Data: app, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&ar, nil) env := datamodel.Environment{ BaseResource: v1.BaseResource{ @@ -823,14 +790,14 @@ func setupDeployMocks(mocks SharedMocks, simulated bool) { }, Data: env, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&er, nil) } func Test_Deploy(t *testing.T) { t.Run("Verify deploy success", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -870,7 +837,7 @@ func Test_Deploy(t *testing.T) { t.Run("Verify deploy success with simulated env", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -888,7 +855,7 @@ func Test_Deploy(t *testing.T) { t.Run("Verify deploy failure", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -905,7 +872,7 @@ func Test_Deploy(t *testing.T) { t.Run("Output resource dependency missing local ID", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -923,7 +890,7 @@ func Test_Deploy(t *testing.T) { t.Run("Invalid output resource type", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -941,7 +908,7 @@ func Test_Deploy(t *testing.T) { t.Run("Missing output resource identity", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() testRendererOutput := getTestRendererOutput() @@ -967,7 +934,7 @@ func Test_Delete(t *testing.T) { t.Run("Verify delete success", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) @@ -981,7 +948,7 @@ func Test_Delete(t *testing.T) { t.Run("Verify delete failure", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) @@ -995,7 +962,7 @@ func Test_Delete(t *testing.T) { t.Run("Verify delete with no output resources", func(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} testResource := getTestResource() resourceID := getTestResourceID(testResource.ID) @@ -1070,11 +1037,9 @@ func Test_getEnvOptions_PublicEndpointOverride(t *testing.T) { func Test_getResourceDataByID(t *testing.T) { ctx := testcontext.New(t) mocks := setup(t) - dp := deploymentProcessor{mocks.model, mocks.dbProvider, nil, nil} + dp := deploymentProcessor{mocks.model, mocks.storageClient, nil, nil} t.Run("Get recipe data from connected mongoDB resources", func(t *testing.T) { - mocks.dbProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Times(1).Return(mocks.db, nil) - depId, _ := resources.ParseResource("/subscriptions/test-subscription/resourceGroups/test-resource-group/providers/Applications.Datastores/mongoDatabases/test-mongo") mongoResource := buildMongoDBWithRecipe() mongoResource.PortableResourceMetadata.RecipeData = portableresources.RecipeData{} @@ -1085,7 +1050,7 @@ func Test_getResourceDataByID(t *testing.T) { Data: mongoResource, } - mocks.db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&mr, nil) + mocks.storageClient.EXPECT().Get(gomock.Any(), gomock.Any()).Times(1).Return(&mr, nil) resourceData, err := dp.getResourceDataByID(ctx, depId) require.NoError(t, err) diff --git a/pkg/corerp/frontend/controller/applications/updatefilter.go b/pkg/corerp/frontend/controller/applications/updatefilter.go index 620b5a97c4..2c254f98aa 100644 --- a/pkg/corerp/frontend/controller/applications/updatefilter.go +++ b/pkg/corerp/frontend/controller/applications/updatefilter.go @@ -63,7 +63,7 @@ func CreateAppScopedNamespace(ctx context.Context, newResource, oldResource *dat kubeNamespace = ext.KubernetesNamespace.Namespace } else { // Construct namespace using the namespace specified by environment resource. - envNamespace, err := rp_kube.FindNamespaceByEnvID(ctx, opt.DataProvider, newResource.Properties.Environment) + envNamespace, err := rp_kube.FindNamespaceByEnvID(ctx, opt.StorageClient, newResource.Properties.Environment) if err != nil { return rest.NewBadRequestResponse(fmt.Sprintf("Environment %s could not be constructed: %s", newResource.Properties.Environment, err.Error())), nil diff --git a/pkg/corerp/frontend/controller/applications/updatefilter_test.go b/pkg/corerp/frontend/controller/applications/updatefilter_test.go index 1f6de70971..0e33bbc401 100644 --- a/pkg/corerp/frontend/controller/applications/updatefilter_test.go +++ b/pkg/corerp/frontend/controller/applications/updatefilter_test.go @@ -45,7 +45,6 @@ func TestCreateAppScopedNamespace_valid_namespace(t *testing.T) { opts := ctrl.Options{ StorageClient: tCtx.MockSC, - DataProvider: tCtx.MockSP, KubeClient: k8sutil.NewFakeKubeClient(nil), } @@ -111,8 +110,6 @@ func TestCreateAppScopedNamespace_valid_namespace(t *testing.T) { }, nil }).Times(2) - tCtx.MockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(tCtx.MockSC, nil).Times(1) - envdm := &datamodel.Environment{ Properties: datamodel.EnvironmentProperties{ Compute: rpv1.EnvironmentCompute{ @@ -155,7 +152,6 @@ func TestCreateAppScopedNamespace_invalid_property(t *testing.T) { opts := ctrl.Options{ StorageClient: tCtx.MockSC, - DataProvider: tCtx.MockSP, KubeClient: k8sutil.NewFakeKubeClient(nil), } @@ -163,8 +159,6 @@ func TestCreateAppScopedNamespace_invalid_property(t *testing.T) { longAppID := "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/radius-test-rg/providers/applications.core/applications/this-is-a-very-long-application-name-that-is-invalid" longEnvID := "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/radius-test-rg/providers/applications.core/environments/this-is-a-very-long-environment-name-that-is-invalid" - tCtx.MockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(tCtx.MockSC, nil).Times(1) - envdm := &datamodel.Environment{ Properties: datamodel.EnvironmentProperties{ Compute: rpv1.EnvironmentCompute{ diff --git a/pkg/corerp/setup/setup_test.go b/pkg/corerp/setup/setup_test.go index 109761d95d..203ef7681a 100644 --- a/pkg/corerp/setup/setup_test.go +++ b/pkg/corerp/setup/setup_test.go @@ -26,13 +26,13 @@ import ( "go.uber.org/mock/gomock" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/builder" apictrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/rpctest" "github.com/radius-project/radius/pkg/recipes/controllerconfig" "github.com/radius-project/radius/pkg/sdk" - "github.com/radius-project/radius/pkg/ucp/dataprovider" - "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/store/inmemory" app_ctrl "github.com/radius-project/radius/pkg/corerp/frontend/controller/applications" ctr_ctrl "github.com/radius-project/radius/pkg/corerp/frontend/controller/containers" @@ -211,15 +211,6 @@ var handlerTests = []rpctest.HandlerTestSpec{ } func TestRouter(t *testing.T) { - mctrl := gomock.NewController(t) - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSC := store.NewMockStorageClient(mctrl) - - mockSC.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(&store.Object{}, nil).AnyTimes() - mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(mockSC), nil).AnyTimes() - conn, err := sdk.NewDirectConnection("http://localhost:9000/apis/api.ucp.dev/v1alpha3") require.NoError(t, err) cfg := &controllerconfig.RecipeControllerConfig{ @@ -232,6 +223,14 @@ func TestRouter(t *testing.T) { r := chi.NewRouter() validator, err := builder.NewOpenAPIValidator(ctx, "/api.ucp.dev", "applications.core") require.NoError(t, err) - return r, nsBuilder.ApplyAPIHandlers(ctx, r, apictrl.Options{PathBase: "/api.ucp.dev", DataProvider: mockSP}, validator) + + options := apictrl.Options{ + Address: "localhost:9000", + PathBase: "/api.ucp.dev", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), + } + + return r, nsBuilder.ApplyAPIHandlers(ctx, r, options, validator) }) } diff --git a/pkg/daprrp/setup/setup_test.go b/pkg/daprrp/setup/setup_test.go index 9bcdc2efb6..b55cd010d6 100644 --- a/pkg/daprrp/setup/setup_test.go +++ b/pkg/daprrp/setup/setup_test.go @@ -26,13 +26,13 @@ import ( "go.uber.org/mock/gomock" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/builder" apictrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/rpctest" dapr_ctrl "github.com/radius-project/radius/pkg/daprrp/frontend/controller" "github.com/radius-project/radius/pkg/recipes/controllerconfig" - "github.com/radius-project/radius/pkg/ucp/dataprovider" - "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/store/inmemory" ) var handlerTests = []rpctest.HandlerTestSpec{ @@ -137,15 +137,6 @@ var handlerTests = []rpctest.HandlerTestSpec{ } func TestRouter(t *testing.T) { - mctrl := gomock.NewController(t) - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSC := store.NewMockStorageClient(mctrl) - - mockSC.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(&store.Object{}, nil).AnyTimes() - mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(mockSC), nil).AnyTimes() - cfg := &controllerconfig.RecipeControllerConfig{} ns := SetupNamespace(cfg) nsBuilder := ns.GenerateBuilder() @@ -154,6 +145,14 @@ func TestRouter(t *testing.T) { r := chi.NewRouter() validator, err := builder.NewOpenAPIValidator(ctx, "/api.ucp.dev", "applications.dapr") require.NoError(t, err) - return r, nsBuilder.ApplyAPIHandlers(ctx, r, apictrl.Options{PathBase: "/api.ucp.dev", DataProvider: mockSP}, validator) + + options := apictrl.Options{ + Address: "localhost:9000", + PathBase: "/api.ucp.dev", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), + } + + return r, nsBuilder.ApplyAPIHandlers(ctx, r, options, validator) }) } diff --git a/pkg/datastoresrp/setup/setup_test.go b/pkg/datastoresrp/setup/setup_test.go index 82e994989c..d834cc6fbe 100644 --- a/pkg/datastoresrp/setup/setup_test.go +++ b/pkg/datastoresrp/setup/setup_test.go @@ -26,13 +26,13 @@ import ( "go.uber.org/mock/gomock" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/builder" apictrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/rpctest" ds_ctrl "github.com/radius-project/radius/pkg/datastoresrp/frontend/controller" "github.com/radius-project/radius/pkg/recipes/controllerconfig" - "github.com/radius-project/radius/pkg/ucp/dataprovider" - "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/store/inmemory" ) var handlerTests = []rpctest.HandlerTestSpec{ @@ -124,15 +124,6 @@ var handlerTests = []rpctest.HandlerTestSpec{ } func TestRouter(t *testing.T) { - mctrl := gomock.NewController(t) - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSC := store.NewMockStorageClient(mctrl) - - mockSC.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(&store.Object{}, nil).AnyTimes() - mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(mockSC), nil).AnyTimes() - cfg := &controllerconfig.RecipeControllerConfig{} ns := SetupNamespace(cfg) nsBuilder := ns.GenerateBuilder() @@ -141,6 +132,14 @@ func TestRouter(t *testing.T) { r := chi.NewRouter() validator, err := builder.NewOpenAPIValidator(ctx, "/api.ucp.dev", "applications.datastores") require.NoError(t, err) - return r, nsBuilder.ApplyAPIHandlers(ctx, r, apictrl.Options{PathBase: "/api.ucp.dev", DataProvider: mockSP}, validator) + + options := apictrl.Options{ + Address: "localhost:9000", + PathBase: "/api.ucp.dev", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), + } + + return r, nsBuilder.ApplyAPIHandlers(ctx, r, options, validator) }) } diff --git a/pkg/dynamicrp/options.go b/pkg/dynamicrp/options.go index ce18389617..109c1a00e8 100644 --- a/pkg/dynamicrp/options.go +++ b/pkg/dynamicrp/options.go @@ -50,7 +50,7 @@ type Options struct { StatusManager statusmanager.StatusManager // StorageProvider provides access to the data storage system. - StorageProvider dataprovider.DataStorageProvider + StorageProvider *dataprovider.DataStorageProvider // UCP is the connection to UCP UCP sdk.Connection @@ -65,14 +65,19 @@ func NewOptions(ctx context.Context, config *Config) (*Options, error) { options.QueueProvider = queueprovider.New(config.Queue) options.SecretProvider = secretprovider.NewSecretProvider(config.Secrets) - options.StorageProvider = dataprovider.NewStorageProvider(config.Storage) + options.StorageProvider = dataprovider.DataStorageProviderFromOptions(config.Storage) + + storageClient, err := options.StorageProvider.GetClient(ctx) + if err != nil { + return nil, err + } queueClient, err := options.QueueProvider.GetClient(ctx) if err != nil { return nil, err } - options.StatusManager = statusmanager.New(options.StorageProvider, queueClient, config.Environment.RoleLocation) + options.StatusManager = statusmanager.New(storageClient, queueClient, config.Environment.RoleLocation) var cfg *kube_rest.Config cfg, err = kubeutil.NewClientConfig(&kubeutil.ConfigOptions{ diff --git a/pkg/messagingrp/setup/setup_test.go b/pkg/messagingrp/setup/setup_test.go index ebcc1e5a34..0aa5af07bb 100644 --- a/pkg/messagingrp/setup/setup_test.go +++ b/pkg/messagingrp/setup/setup_test.go @@ -23,13 +23,13 @@ import ( "go.uber.org/mock/gomock" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/builder" apictrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/rpctest" msg_ctrl "github.com/radius-project/radius/pkg/messagingrp/frontend/controller" "github.com/radius-project/radius/pkg/recipes/controllerconfig" - "github.com/radius-project/radius/pkg/ucp/dataprovider" - "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/store/inmemory" ) var handlerTests = []rpctest.HandlerTestSpec{ @@ -66,15 +66,6 @@ var handlerTests = []rpctest.HandlerTestSpec{ } func TestRouter(t *testing.T) { - mctrl := gomock.NewController(t) - - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) - mockSC := store.NewMockStorageClient(mctrl) - - mockSC.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(&store.Object{}, nil).AnyTimes() - mockSC.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(mockSC), nil).AnyTimes() - cfg := &controllerconfig.RecipeControllerConfig{} ns := SetupNamespace(cfg) nsBuilder := ns.GenerateBuilder() @@ -83,6 +74,14 @@ func TestRouter(t *testing.T) { r := chi.NewRouter() validator, err := builder.NewOpenAPIValidator(ctx, "/api.ucp.dev", "applications.messaging") require.NoError(t, err) - return r, nsBuilder.ApplyAPIHandlers(ctx, r, apictrl.Options{PathBase: "/api.ucp.dev", DataProvider: mockSP}, validator) + + options := apictrl.Options{ + Address: "localhost:9000", + PathBase: "/api.ucp.dev", + StorageClient: inmemory.NewClient(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), + } + + return r, nsBuilder.ApplyAPIHandlers(ctx, r, options, validator) }) } diff --git a/pkg/rp/kube/resources.go b/pkg/rp/kube/resources.go index 81d3edab5e..e55cf914ed 100644 --- a/pkg/rp/kube/resources.go +++ b/pkg/rp/kube/resources.go @@ -25,13 +25,13 @@ import ( "github.com/radius-project/radius/pkg/corerp/api/v20231001preview" cdm "github.com/radius-project/radius/pkg/corerp/datamodel" rpv1 "github.com/radius-project/radius/pkg/rp/v1" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" ) // FindNamespaceByEnvID finds the environment-scope Kubernetes namespace. If the environment ID is invalid or the environment is not a Kubernetes // environment, an error is returned. -func FindNamespaceByEnvID(ctx context.Context, sp dataprovider.DataStorageProvider, envID string) (namespace string, err error) { +func FindNamespaceByEnvID(ctx context.Context, storageClient store.StorageClient, envID string) (namespace string, err error) { id, err := resources.ParseResource(envID) if err != nil { return @@ -43,12 +43,7 @@ func FindNamespaceByEnvID(ctx context.Context, sp dataprovider.DataStorageProvid } env := &cdm.Environment{} - client, err := sp.GetStorageClient(ctx, id.Type()) - if err != nil { - return - } - - res, err := client.Get(ctx, id.String()) + res, err := storageClient.Get(ctx, id.String()) if err != nil { return } diff --git a/pkg/rp/kube/resources_test.go b/pkg/rp/kube/resources_test.go index fc3e7d8bfb..738cd318e3 100644 --- a/pkg/rp/kube/resources_test.go +++ b/pkg/rp/kube/resources_test.go @@ -26,7 +26,6 @@ import ( "github.com/radius-project/radius/pkg/corerp/datamodel" rpv1 "github.com/radius-project/radius/pkg/rp/v1" "github.com/radius-project/radius/pkg/to" - "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/store" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -87,12 +86,10 @@ func TestFindNamespaceByEnvID(t *testing.T) { }, } - mockSP := dataprovider.NewMockDataStorageProvider(mctrl) mockSC := store.NewMockStorageClient(mctrl) - - mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(store.StorageClient(mockSC), nil).Times(1) mockSC.EXPECT().Get(gomock.Any(), tc.id, gomock.Any()).Return(fakeStoreObject(envdm), nil).Times(1) - ns, err := FindNamespaceByEnvID(context.Background(), mockSP, testEnvID) + + ns, err := FindNamespaceByEnvID(context.Background(), mockSC, testEnvID) require.NoError(t, err) require.Equal(t, tc.out, ns) }) diff --git a/pkg/rp/util/datastore.go b/pkg/rp/util/datastore.go index 5aa2306221..e0ca7f317b 100644 --- a/pkg/rp/util/datastore.go +++ b/pkg/rp/util/datastore.go @@ -23,14 +23,13 @@ import ( "strings" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" - "github.com/radius-project/radius/pkg/ucp/dataprovider" resources "github.com/radius-project/radius/pkg/ucp/resources" "github.com/radius-project/radius/pkg/ucp/store" ) // FetchScopeResource checks if the given scopeID is a valid resource ID for the given resource type, fetches the resource // from the storage client and returns an error if the resource does not exist. -func FetchScopeResource(ctx context.Context, sp dataprovider.DataStorageProvider, scopeID string, resource v1.DataModelInterface) error { +func FetchScopeResource(ctx context.Context, storageClient store.StorageClient, scopeID string, resource v1.DataModelInterface) error { id, err := resources.ParseResource(scopeID) if err != nil { return v1.NewClientErrInvalidRequest(fmt.Sprintf("%s is not a valid resource id for %s.", scopeID, resource.ResourceTypeName())) @@ -39,12 +38,8 @@ func FetchScopeResource(ctx context.Context, sp dataprovider.DataStorageProvider if !strings.EqualFold(id.Type(), resource.ResourceTypeName()) { return v1.NewClientErrInvalidRequest(fmt.Sprintf("linked %q has invalid %s resource type.", scopeID, resource.ResourceTypeName())) } - sc, err := sp.GetStorageClient(ctx, id.Type()) - if err != nil { - return err - } - res, err := sc.Get(ctx, id.String()) + res, err := storageClient.Get(ctx, id.String()) if errors.Is(&store.ErrNotFound{ID: id.String()}, err) { return v1.NewClientErrInvalidRequest(fmt.Sprintf("linked resource %s does not exist", scopeID)) } diff --git a/pkg/server/apiservice.go b/pkg/server/apiservice.go index b92d6fe2f2..698f935ba8 100644 --- a/pkg/server/apiservice.go +++ b/pkg/server/apiservice.go @@ -57,6 +57,11 @@ func (s *APIService) Run(ctx context.Context) error { return err } + storageClient, err := s.StorageProvider.GetClient(ctx) + if err != nil { + return err + } + address := fmt.Sprintf("%s:%d", s.Options.Config.Server.Host, s.Options.Config.Server.Port) return s.Start(ctx, server.Options{ Location: s.Options.Config.Env.RoleLocation, @@ -65,8 +70,9 @@ func (s *APIService) Run(ctx context.Context) error { Configure: func(r chi.Router) error { for _, b := range s.handlerBuilder { opts := apictrl.Options{ + Address: address, PathBase: s.Options.Config.Server.PathBase, - DataProvider: s.StorageProvider, + StorageClient: storageClient, KubeClient: s.KubeClient, StatusManager: s.OperationStatusManager, } diff --git a/pkg/server/asyncworker.go b/pkg/server/asyncworker.go index e5703885ec..fd8a162c3e 100644 --- a/pkg/server/asyncworker.go +++ b/pkg/server/asyncworker.go @@ -68,12 +68,17 @@ func (w *AsyncWorker) Run(ctx context.Context) error { return fmt.Errorf("failed to initialize application model: %w", err) } + storageClient, err := w.StorageProvider.GetClient(ctx) + if err != nil { + return err + } + for _, b := range w.handlerBuilder { opts := ctrl.Options{ - DataProvider: w.StorageProvider, - KubeClient: k8s.RuntimeClient, + StorageClient: storageClient, + KubeClient: k8s.RuntimeClient, GetDeploymentProcessor: func() deployment.DeploymentProcessor { - return deployment.NewDeploymentProcessor(appModel, w.StorageProvider, k8s.RuntimeClient, k8s.ClientSet) + return deployment.NewDeploymentProcessor(appModel, storageClient, k8s.RuntimeClient, k8s.ClientSet) }, } diff --git a/pkg/ucp/backend/service.go b/pkg/ucp/backend/service.go index fc6ecc1270..708dca726b 100644 --- a/pkg/ucp/backend/service.go +++ b/pkg/ucp/backend/service.go @@ -80,8 +80,13 @@ func (w *Service) Run(ctx context.Context) error { } } + storageClient, err := w.StorageProvider.GetClient(ctx) + if err != nil { + return err + } + opts := ctrl.Options{ - DataProvider: w.StorageProvider, + StorageClient: storageClient, } defaultDownstream, err := url.Parse(w.config.Routing.DefaultDownstreamEndpoint) @@ -90,7 +95,7 @@ func (w *Service) Run(ctx context.Context) error { } transport := otelhttp.NewTransport(http.DefaultTransport) - err = RegisterControllers(ctx, w.Controllers, w.Options.UCPConnection, transport, opts, defaultDownstream) + err = RegisterControllers(w.Controllers, w.Options.UCPConnection, transport, opts, defaultDownstream) if err != nil { return err } @@ -99,41 +104,41 @@ func (w *Service) Run(ctx context.Context) error { } // RegisterControllers registers the controllers for the UCP backend. -func RegisterControllers(ctx context.Context, registry *worker.ControllerRegistry, connection sdk.Connection, transport http.RoundTripper, opts ctrl.Options, defaultDownstream *url.URL) error { +func RegisterControllers(registry *worker.ControllerRegistry, connection sdk.Connection, transport http.RoundTripper, opts ctrl.Options, defaultDownstream *url.URL) error { // Tracked resources - err := errors.Join(nil, registry.Register(ctx, v20231001preview.ResourceType, v1.OperationMethod(datamodel.OperationProcess), func(opts ctrl.Options) (ctrl.Controller, error) { + err := errors.Join(nil, registry.Register(v20231001preview.ResourceType, v1.OperationMethod(datamodel.OperationProcess), func(opts ctrl.Options) (ctrl.Controller, error) { return resourcegroups.NewTrackedResourceProcessController(opts, transport, defaultDownstream) }, opts)) // Resource providers and related types - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceProviderResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.ResourceProviderResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.ResourceProviderPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceProviderResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.ResourceProviderResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.ResourceProviderDeleteController{ BaseController: ctrl.NewBaseAsyncController(opts), Connection: connection, }, nil }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceTypeResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.ResourceTypeResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.ResourceTypePutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceTypeResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.ResourceTypeResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.ResourceTypeDeleteController{ BaseController: ctrl.NewBaseAsyncController(opts), Connection: connection, }, nil }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.APIVersionResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.APIVersionResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.APIVersionPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.APIVersionResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.APIVersionResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.APIVersionDeleteController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.LocationResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.LocationResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.LocationPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.LocationResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { + err = errors.Join(err, registry.Register(datamodel.LocationResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { return &resourceproviders.LocationDeleteController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil }, opts)) diff --git a/pkg/ucp/dataprovider/factory.go b/pkg/ucp/dataprovider/factory.go index 1d0580336a..e879339d2b 100644 --- a/pkg/ucp/dataprovider/factory.go +++ b/pkg/ucp/dataprovider/factory.go @@ -36,7 +36,7 @@ import ( runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" ) -type storageFactoryFunc func(context.Context, StorageProviderOptions, string) (store.StorageClient, error) +type storageFactoryFunc func(ctx context.Context, options StorageProviderOptions) (store.StorageClient, error) var storageClientFactory = map[StorageProviderType]storageFactoryFunc{ TypeAPIServer: initAPIServerClient, @@ -45,7 +45,7 @@ var storageClientFactory = map[StorageProviderType]storageFactoryFunc{ TypePostgreSQL: initPostgreSQLClient, } -func initAPIServerClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) { +func initAPIServerClient(ctx context.Context, opt StorageProviderOptions) (store.StorageClient, error) { if opt.APIServer.Namespace == "" { return nil, errors.New("failed to initialize APIServer client: namespace is required") } @@ -83,7 +83,7 @@ func initAPIServerClient(ctx context.Context, opt StorageProviderOptions, _ stri // InitETCDClient checks if the ETCD client is in memory and if the client is not nil, then it initializes the storage // client and returns an ETCDClient. If either of these conditions are not met, an error is returned. -func InitETCDClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) { +func InitETCDClient(ctx context.Context, opt StorageProviderOptions) (store.StorageClient, error) { if !opt.ETCD.InMemory { return nil, errors.New("failed to initialize etcd client: inmemory is the only supported mode for now") } @@ -102,12 +102,12 @@ func InitETCDClient(ctx context.Context, opt StorageProviderOptions, _ string) ( } // initInMemoryClient creates a new in-memory store client. -func initInMemoryClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) { +func initInMemoryClient(ctx context.Context, opt StorageProviderOptions) (store.StorageClient, error) { return inmemory.NewClient(), nil } // initPostgreSQLClient creates a new PostgreSQL store client. -func initPostgreSQLClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) { +func initPostgreSQLClient(ctx context.Context, opt StorageProviderOptions) (store.StorageClient, error) { if opt.PostgreSQL.URL == "" { return nil, errors.New("failed to initialize PostgreSQL client: URL is required") } diff --git a/pkg/ucp/dataprovider/mock_datastorage_provider.go b/pkg/ucp/dataprovider/mock_datastorage_provider.go deleted file mode 100644 index e270701f4a..0000000000 --- a/pkg/ucp/dataprovider/mock_datastorage_provider.go +++ /dev/null @@ -1,80 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/radius-project/radius/pkg/ucp/dataprovider (interfaces: DataStorageProvider) -// -// Generated by this command: -// -// mockgen -typed -destination=./mock_datastorage_provider.go -package=dataprovider -self_package github.com/radius-project/radius/pkg/ucp/dataprovider github.com/radius-project/radius/pkg/ucp/dataprovider DataStorageProvider -// - -// Package dataprovider is a generated GoMock package. -package dataprovider - -import ( - context "context" - reflect "reflect" - - store "github.com/radius-project/radius/pkg/ucp/store" - gomock "go.uber.org/mock/gomock" -) - -// MockDataStorageProvider is a mock of DataStorageProvider interface. -type MockDataStorageProvider struct { - ctrl *gomock.Controller - recorder *MockDataStorageProviderMockRecorder -} - -// MockDataStorageProviderMockRecorder is the mock recorder for MockDataStorageProvider. -type MockDataStorageProviderMockRecorder struct { - mock *MockDataStorageProvider -} - -// NewMockDataStorageProvider creates a new mock instance. -func NewMockDataStorageProvider(ctrl *gomock.Controller) *MockDataStorageProvider { - mock := &MockDataStorageProvider{ctrl: ctrl} - mock.recorder = &MockDataStorageProviderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockDataStorageProvider) EXPECT() *MockDataStorageProviderMockRecorder { - return m.recorder -} - -// GetStorageClient mocks base method. -func (m *MockDataStorageProvider) GetStorageClient(arg0 context.Context, arg1 string) (store.StorageClient, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetStorageClient", arg0, arg1) - ret0, _ := ret[0].(store.StorageClient) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetStorageClient indicates an expected call of GetStorageClient. -func (mr *MockDataStorageProviderMockRecorder) GetStorageClient(arg0, arg1 any) *MockDataStorageProviderGetStorageClientCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStorageClient", reflect.TypeOf((*MockDataStorageProvider)(nil).GetStorageClient), arg0, arg1) - return &MockDataStorageProviderGetStorageClientCall{Call: call} -} - -// MockDataStorageProviderGetStorageClientCall wrap *gomock.Call -type MockDataStorageProviderGetStorageClientCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *MockDataStorageProviderGetStorageClientCall) Return(arg0 store.StorageClient, arg1 error) *MockDataStorageProviderGetStorageClientCall { - c.Call = c.Call.Return(arg0, arg1) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *MockDataStorageProviderGetStorageClientCall) Do(f func(context.Context, string) (store.StorageClient, error)) *MockDataStorageProviderGetStorageClientCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockDataStorageProviderGetStorageClientCall) DoAndReturn(f func(context.Context, string) (store.StorageClient, error)) *MockDataStorageProviderGetStorageClientCall { - c.Call = c.Call.DoAndReturn(f) - return c -} diff --git a/pkg/ucp/dataprovider/storageprovider.go b/pkg/ucp/dataprovider/storageprovider.go index ce90641bd5..0fb89234b7 100644 --- a/pkg/ucp/dataprovider/storageprovider.go +++ b/pkg/ucp/dataprovider/storageprovider.go @@ -18,71 +18,119 @@ package dataprovider import ( "context" - "errors" + "fmt" "sync" "github.com/radius-project/radius/pkg/ucp/store" - "github.com/radius-project/radius/pkg/ucp/util" ) -var ( - ErrUnsupportedStorageProvider = errors.New("unsupported storage provider") - ErrStorageNotFound = errors.New("storage provider not found") -) +// DataStorageProvider acts as a factory for storage clients. +// +// Do not use construct this directly: +// +// - Use DataStorageProviderFromOptions instead for production use. +// - Use DataStorageProviderFromMemory or DataStorageProviderFromClient for testing. +type DataStorageProvider struct { + // options configures the settings of the storage provider. + options StorageProviderOptions + + // factory is factory function to create a new storage client. Can be overridden for testing. + factory storageFactoryFunc + + // init is used to guarantee single-initialization of the storage provider. + init sync.RWMutex -var _ DataStorageProvider = (*storageProvider)(nil) + // result is result of invoking the factory (cached). + result result +} + +type result struct { + client store.StorageClient + err error +} -type storageProvider struct { - clients map[string]store.StorageClient - clientsMu sync.RWMutex - options StorageProviderOptions +// DataStorageProviderFromOptions creates a new instance of the DataStorageProvider struct with the given options. +// +// This will used the known factory functions to instantiate the storage client. +func DataStorageProviderFromOptions(options StorageProviderOptions) *DataStorageProvider { + return &DataStorageProvider{options: options} } -// NewStorageProvider creates a new instance of the "storageProvider" struct with the given -// "StorageProviderOptions" and returns it. -func NewStorageProvider(opts StorageProviderOptions) DataStorageProvider { - return &storageProvider{ - clients: map[string]store.StorageClient{}, - options: opts, +// DataStorageProviderFromMemory creates a new instance of the DataStorageProvider struct using the in-memory client. +// +// This will use the ephemeral in-memory storage client. +func DataStorageProviderFromMemory() *DataStorageProvider { + return &DataStorageProvider{options: StorageProviderOptions{Provider: TypeInMemory}} +} + +// DataStorageProviderFromClient creates a new instance of the DataStorageProvider struct with the given client. +// +// This will always return the given client and will not attempt to create a new one. This can be used for testing +// with mocks. +func DataStorageProviderFromClient(client store.StorageClient) *DataStorageProvider { + return &DataStorageProvider{result: result{client: client}} +} + +// GetStorageClient returns a storage client for the given resource type. +func (p *DataStorageProvider) GetClient(ctx context.Context) (store.StorageClient, error) { + // Guarantee single initialization. + p.init.RLock() + result := p.result + p.init.RUnlock() + + if result.client == nil && result.err == nil { + result = p.initialize(ctx) } + + // Invariant, either result.err is set or result.client is set. + if result.err != nil { + return nil, result.err + } + + if result.client == nil { + panic("invariant violated: p.result.client is nil") + } + + return result.client, nil } -// GetStorageClient checks if a StorageClient for the given resourceType already exists in the map, and -// if so, returns it. If not, it creates a new StorageClient using the storageClientFactory and adds it to the map, -// returning it. If an error occurs, it returns an error. -func (p *storageProvider) GetStorageClient(ctx context.Context, resourceType string) (store.StorageClient, error) { - cn := util.NormalizeStringToLower(resourceType) - - p.clientsMu.RLock() - c, ok := p.clients[cn] - p.clientsMu.RUnlock() - if ok { - return c, nil +func (p *DataStorageProvider) initialize(ctx context.Context) result { + p.init.Lock() + defer p.init.Unlock() + + // Invariant: p.result is set when this function exits. + // Invariant: p.result.client is nil or p.result.err is nil when this function exits. + // Invariant: p.result is returned to the caller, so they don't need to retake the lock. + + // Note: this is a double-checked locking pattern. + // + // It's possible that result was set by another goroutine before we acquired the lock. + if p.result.client != nil || p.result.err != nil { + return p.result } - var err error - if fn, ok := storageClientFactory[p.options.Provider]; ok { - // This write lock ensure that storage init function executes one by one and write client - // to map safely. - // CosmosDBStorageClient Init() calls database and collection creation control plane APIs. - // Ideally, such control plane APIs must be idempotent, but we could see unexpected failures - // by calling control plane API concurrently. Even if such issue rarely happens during release - // time, it could make the short-term downtime of the service. - // We expect that GetStorageClient() will be called during the start time. Thus, having a lock won't - // hurt any runtime performance. - p.clientsMu.Lock() - defer p.clientsMu.Unlock() - - if c, ok := p.clients[cn]; ok { - return c, nil - } + // If we get here we have the exclusive lock and need to initialize the storage client. - if c, err = fn(ctx, p.options, cn); err == nil { - p.clients[cn] = c + factory := p.factory + if factory == nil { + fn, ok := storageClientFactory[p.options.Provider] + if !ok { + p.result = result{nil, fmt.Errorf("unsupported storage provider: %s", p.options.Provider)} + return p.result } - } else { - err = ErrUnsupportedStorageProvider + + factory = fn + } + + client, err := factory(ctx, p.options) + if err != nil { + p.result = result{nil, fmt.Errorf("failed to initialize storage client: %w", err)} + return p.result + } else if client == nil { + p.result = result{nil, fmt.Errorf("failed to initialize storage client: provider returned nil")} + return p.result } - return c, err + p.result = result{client, nil} + return p.result } diff --git a/pkg/ucp/dataprovider/storageprovider_test.go b/pkg/ucp/dataprovider/storageprovider_test.go new file mode 100644 index 0000000000..984c73597f --- /dev/null +++ b/pkg/ucp/dataprovider/storageprovider_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataprovider + +import ( + "context" + "errors" + "testing" + + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/stretchr/testify/require" +) + +func Test_DataStorageProviderFromOptions(t *testing.T) { + options := StorageProviderOptions{Provider: TypeInMemory} + provider := DataStorageProviderFromOptions(options) + + require.NotNil(t, provider) + require.Equal(t, options, provider.options) + + client, err := provider.GetClient(context.Background()) + require.NoError(t, err) + require.NotNil(t, client) +} + +func Test_DataStorageProviderFromMemory(t *testing.T) { + provider := DataStorageProviderFromMemory() + + require.NotNil(t, provider) + require.Equal(t, TypeInMemory, provider.options.Provider) + + client, err := provider.GetClient(context.Background()) + require.NoError(t, err) + require.NotNil(t, client) +} + +func Test_DataStorageProviderFromClient(t *testing.T) { + mockClient := &store.MockStorageClient{} + provider := DataStorageProviderFromClient(mockClient) + + require.NotNil(t, provider) + require.Same(t, mockClient, provider.result.client) + + client, err := provider.GetClient(context.Background()) + require.NoError(t, err) + require.Same(t, client, mockClient) +} + +func Test_GetClient_CachedClient(t *testing.T) { + mockClient := &store.MockStorageClient{} + provider := DataStorageProviderFromOptions(StorageProviderOptions{Provider: "Test"}) + + callCount := 0 + provider.factory = storageFactoryFunc(func(ctx context.Context, options StorageProviderOptions) (store.StorageClient, error) { + callCount++ + return mockClient, nil + }) + + client, err := provider.GetClient(context.Background()) + require.NoError(t, err) + require.Same(t, mockClient, client) + + // Do it twice to ensure the client is cached. + client, err = provider.GetClient(context.Background()) + require.NoError(t, err) + require.Same(t, mockClient, client) + + require.Equal(t, 1, callCount) +} + +func Test_GetClient_CachedError(t *testing.T) { + provider := DataStorageProviderFromOptions(StorageProviderOptions{Provider: "Test"}) + + expectedErr := errors.New("oh noes!") + + callCount := 0 + provider.factory = storageFactoryFunc(func(ctx context.Context, options StorageProviderOptions) (store.StorageClient, error) { + callCount++ + return nil, expectedErr + }) + + client, err := provider.GetClient(context.Background()) + require.Error(t, err) + require.Equal(t, "failed to initialize storage client: oh noes!", err.Error()) + require.Nil(t, client) + + // Do it twice to ensure the client is cached. + client, err = provider.GetClient(context.Background()) + require.Error(t, err) + require.Equal(t, "failed to initialize storage client: oh noes!", err.Error()) + require.Nil(t, client) + + require.Equal(t, 1, callCount) +} + +func TestGetClient_UnsupportedProvider(t *testing.T) { + options := StorageProviderOptions{Provider: "unsupported"} + provider := DataStorageProviderFromOptions(options) + + client, err := provider.GetClient(context.Background()) + + require.Error(t, err) + require.Nil(t, client) + require.Equal(t, "unsupported storage provider: unsupported", err.Error()) +} + +func TestInitialize(t *testing.T) { + options := StorageProviderOptions{Provider: TypeInMemory} + provider := DataStorageProviderFromOptions(options) + + result := provider.initialize(context.Background()) + + require.NoError(t, result.err) + require.NotNil(t, result.client) +} diff --git a/pkg/ucp/dataprovider/types.go b/pkg/ucp/dataprovider/types.go index b6f9e8ce9b..cc577b0d63 100644 --- a/pkg/ucp/dataprovider/types.go +++ b/pkg/ucp/dataprovider/types.go @@ -16,12 +16,6 @@ limitations under the License. package dataprovider -import ( - "context" - - "github.com/radius-project/radius/pkg/ucp/store" -) - // StorageProviderType represents types of storage provider. type StorageProviderType string @@ -38,11 +32,3 @@ const ( // TypePostgreSQL represents the PostgreSQL provider. TypePostgreSQL StorageProviderType = "postgresql" ) - -//go:generate mockgen -typed -destination=./mock_datastorage_provider.go -package=dataprovider -self_package github.com/radius-project/radius/pkg/ucp/dataprovider github.com/radius-project/radius/pkg/ucp/dataprovider DataStorageProvider - -// DataStorageProvider is an interfae to provide storage client. -type DataStorageProvider interface { - // GetStorageClient creates or gets storage client. - GetStorageClient(context.Context, string) (store.StorageClient, error) -} diff --git a/pkg/ucp/frontend/api/routes.go b/pkg/ucp/frontend/api/routes.go index f18a628628..fd84d51aa5 100644 --- a/pkg/ucp/frontend/api/routes.go +++ b/pkg/ucp/frontend/api/routes.go @@ -97,6 +97,7 @@ func Register(ctx context.Context, router chi.Router, planeModules []modules.Ini ParentRouter: router, Path: "/openapi/v2", OperationType: &v1.OperationType{Type: OperationTypeKubernetesOpenAPIV2Doc, Method: v1.OperationGet}, + ResourceType: OperationTypeKubernetesOpenAPIV2Doc, Method: v1.OperationGet, ControllerFactory: kubernetes_ctrl.NewOpenAPIv2Doc, }, @@ -104,6 +105,7 @@ func Register(ctx context.Context, router chi.Router, planeModules []modules.Ini ParentRouter: router, Path: "/openapi/v3", OperationType: &v1.OperationType{Type: OperationTypeKubernetesOpenAPIV3Doc, Method: v1.OperationGet}, + ResourceType: OperationTypeKubernetesOpenAPIV3Doc, Method: v1.OperationGet, ControllerFactory: kubernetes_ctrl.NewOpenAPIv3Doc, }, @@ -111,6 +113,7 @@ func Register(ctx context.Context, router chi.Router, planeModules []modules.Ini ParentRouter: router, Path: options.PathBase, OperationType: &v1.OperationType{Type: OperationTypeKubernetesDiscoveryDoc, Method: v1.OperationGet}, + ResourceType: OperationTypeKubernetesDiscoveryDoc, Method: v1.OperationGet, ControllerFactory: kubernetes_ctrl.NewDiscoveryDoc, }, @@ -134,14 +137,21 @@ func Register(ctx context.Context, router chi.Router, planeModules []modules.Ini ParentRouter: planeCollectionRouter, Method: v1.OperationList, OperationType: &v1.OperationType{Type: OperationTypePlanes, Method: v1.OperationList}, + ResourceType: OperationTypePlanes, ControllerFactory: planes_ctrl.NewListPlanes, }, }...) + storageClient, err := options.DataProvider.GetClient(ctx) + if err != nil { + return err + } + ctrlOptions := controller.Options{ - Address: options.Address, - PathBase: options.PathBase, - DataProvider: options.DataProvider, + Address: options.Address, + PathBase: options.PathBase, + StorageClient: storageClient, + StatusManager: options.StatusManager, } for _, h := range handlerOptions { diff --git a/pkg/ucp/frontend/api/routes_test.go b/pkg/ucp/frontend/api/routes_test.go index d0a7fdc849..32fc581a22 100644 --- a/pkg/ucp/frontend/api/routes_test.go +++ b/pkg/ucp/frontend/api/routes_test.go @@ -23,6 +23,7 @@ import ( "github.com/go-chi/chi/v5" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/rpctest" "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/frontend/modules" @@ -78,14 +79,11 @@ func Test_Routes(t *testing.T) { }, } - ctrl := gomock.NewController(t) - dataProvider := dataprovider.NewMockDataStorageProvider(ctrl) - dataProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() - options := modules.Options{ - Address: "localhost", - PathBase: pathBase, - DataProvider: dataProvider, + Address: "localhost", + PathBase: pathBase, + DataProvider: dataprovider.DataStorageProviderFromMemory(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), } rpctest.AssertRouters(t, tests, pathBase, "", func(ctx context.Context) (chi.Router, error) { @@ -97,14 +95,11 @@ func Test_Routes(t *testing.T) { func Test_Route_ToModule(t *testing.T) { pathBase := "/some-path-base" - ctrl := gomock.NewController(t) - dataProvider := dataprovider.NewMockDataStorageProvider(ctrl) - dataProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() - options := modules.Options{ - Address: "localhost", - PathBase: pathBase, - DataProvider: dataProvider, + Address: "localhost", + PathBase: pathBase, + DataProvider: dataprovider.DataStorageProviderFromMemory(), + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), } r := chi.NewRouter() diff --git a/pkg/ucp/frontend/api/server.go b/pkg/ucp/frontend/api/server.go index 8a957c2d89..4f121582c7 100644 --- a/pkg/ucp/frontend/api/server.go +++ b/pkg/ucp/frontend/api/server.go @@ -84,7 +84,7 @@ type ServiceOptions struct { // Service implements the hosting.Service interface for the UCP frontend API. type Service struct { options ServiceOptions - storageProvider dataprovider.DataStorageProvider + storageProvider *dataprovider.DataStorageProvider queueProvider *queueprovider.QueueProvider secretProvider *secretprovider.SecretProvider } @@ -118,7 +118,7 @@ func (s *Service) Name() string { func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { r := chi.NewRouter() - s.storageProvider = dataprovider.NewStorageProvider(s.options.StorageProviderOptions) + s.storageProvider = dataprovider.DataStorageProviderFromOptions(s.options.StorageProviderOptions) s.queueProvider = queueprovider.New(s.options.QueueProviderOptions) s.secretProvider = secretprovider.NewSecretProvider(s.options.SecretProviderOptions) @@ -127,12 +127,17 @@ func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { return nil, err } + storageClient, err := s.storageProvider.GetClient(ctx) + if err != nil { + return nil, err + } + queueClient, err := s.queueProvider.GetClient(ctx) if err != nil { return nil, err } - statusManager := statusmanager.New(s.storageProvider, queueClient, s.options.Location) + statusManager := statusmanager.New(storageClient, queueClient, s.options.Location) moduleOptions := modules.Options{ Address: s.options.Address, @@ -217,7 +222,7 @@ func (s *Service) createPlane(ctx context.Context, plane rest.Plane) error { return fmt.Errorf("invalid plane ID: %s", plane.ID) } - db, err := s.storageProvider.GetStorageClient(ctx, "ucp") + db, err := s.storageProvider.GetClient(ctx) if err != nil { return err } diff --git a/pkg/ucp/frontend/aws/routes.go b/pkg/ucp/frontend/aws/routes.go index 6a3154f23d..6713157d50 100644 --- a/pkg/ucp/frontend/aws/routes.go +++ b/pkg/ucp/frontend/aws/routes.go @@ -102,6 +102,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeCollectionRouter, Method: v1.OperationList, OperationType: &v1.OperationType{Type: datamodel.AWSPlaneResourceType, Method: v1.OperationList}, + ResourceType: datamodel.AWSPlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return &planes_ctrl.ListPlanesByType[*datamodel.AWSPlane, datamodel.AWSPlane]{ Operation: controller.NewOperation(opts, planeResourceOptions), @@ -112,6 +113,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeResourceRouter, Method: v1.OperationGet, OperationType: &v1.OperationType{Type: datamodel.AWSPlaneResourceType, Method: v1.OperationGet}, + ResourceType: datamodel.AWSPlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return defaultoperation.NewGetResource(opts, planeResourceOptions) }, @@ -120,6 +122,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeResourceRouter, Method: v1.OperationPut, OperationType: &v1.OperationType{Type: datamodel.AWSPlaneResourceType, Method: v1.OperationPut}, + ResourceType: datamodel.AWSPlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return defaultoperation.NewDefaultSyncPut(opts, planeResourceOptions) }, @@ -128,6 +131,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeResourceRouter, Method: v1.OperationDelete, OperationType: &v1.OperationType{Type: datamodel.AWSPlaneResourceType, Method: v1.OperationDelete}, + ResourceType: datamodel.AWSPlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return defaultoperation.NewDefaultSyncDelete(opts, planeResourceOptions) }, @@ -137,6 +141,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: server.NewSubrouter(baseRouter, operationResultsPath), Method: v1.OperationGet, OperationType: &v1.OperationType{Type: OperationResultsResourceType, Method: v1.OperationGet}, + ResourceType: OperationResultsResourceType, ControllerFactory: func(opt controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewGetAWSOperationResults(opt, m.AWSClients) }, @@ -146,6 +151,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: server.NewSubrouter(baseRouter, operationStatusesPath), Method: v1.OperationGet, OperationType: &v1.OperationType{Type: OperationStatusResourceType, Method: v1.OperationGet}, + ResourceType: OperationStatusResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewGetAWSOperationStatuses(opts, m.AWSClients) }, @@ -159,6 +165,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: resourceCollectionRouter, Method: v1.OperationList, OperationType: &v1.OperationType{Type: OperationTypeAWSResource, Method: v1.OperationList}, + ResourceType: OperationTypeAWSResource, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewListAWSResources(opts, m.AWSClients) }, @@ -168,6 +175,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { Path: "/{resourceName}", Method: v1.OperationPut, OperationType: &v1.OperationType{Type: OperationTypeAWSResource, Method: v1.OperationPut}, + ResourceType: OperationTypeAWSResource, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewCreateOrUpdateAWSResource(opts, m.AWSClients) }, @@ -177,6 +185,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { Path: "/{resourceName}", Method: v1.OperationDelete, OperationType: &v1.OperationType{Type: OperationTypeAWSResource, Method: v1.OperationDelete}, + ResourceType: OperationTypeAWSResource, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewDeleteAWSResource(opts, m.AWSClients) }, @@ -186,6 +195,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { Path: "/{resourceName}", Method: v1.OperationGet, OperationType: &v1.OperationType{Type: OperationTypeAWSResource, Method: v1.OperationGet}, + ResourceType: OperationTypeAWSResource, ControllerFactory: func(opt controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewGetAWSResource(opt, m.AWSClients) }, @@ -203,6 +213,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { Path: "/:put", Method: v1.OperationPutImperative, OperationType: &v1.OperationType{Type: OperationTypeAWSResource, Method: v1.OperationPutImperative}, + ResourceType: OperationTypeAWSResource, ControllerFactory: func(opt controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewCreateOrUpdateAWSResourceWithPost(opt, m.AWSClients) }, @@ -212,6 +223,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { Path: "/:get", Method: v1.OperationGetImperative, OperationType: &v1.OperationType{Type: OperationTypeAWSResource, Method: v1.OperationGetImperative}, + ResourceType: OperationTypeAWSResource, ControllerFactory: func(opt controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewGetAWSResourceWithPost(opt, m.AWSClients) }, @@ -221,6 +233,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { Path: "/:delete", Method: v1.OperationDeleteImperative, OperationType: &v1.OperationType{Type: OperationTypeAWSResource, Method: v1.OperationDeleteImperative}, + ResourceType: OperationTypeAWSResource, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return awsproxy_ctrl.NewDeleteAWSResourceWithPost(opts, m.AWSClients) }, @@ -279,10 +292,16 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { }, }...) + storageClient, err := m.options.DataProvider.GetClient(ctx) + if err != nil { + return nil, err + } + ctrlOpts := controller.Options{ - Address: m.options.Address, - PathBase: m.options.PathBase, - DataProvider: m.options.DataProvider, + Address: m.options.Address, + PathBase: m.options.PathBase, + StorageClient: storageClient, + StatusManager: m.options.StatusManager, } for _, h := range handlerOptions { diff --git a/pkg/ucp/frontend/aws/routes_test.go b/pkg/ucp/frontend/aws/routes_test.go index aacd25032a..5311a646d8 100644 --- a/pkg/ucp/frontend/aws/routes_test.go +++ b/pkg/ucp/frontend/aws/routes_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/mock/gomock" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/rpctest" "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" "github.com/radius-project/radius/pkg/ucp/datamodel" @@ -111,8 +112,6 @@ func Test_Routes(t *testing.T) { } ctrl := gomock.NewController(t) - dataProvider := dataprovider.NewMockDataStorageProvider(ctrl) - dataProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() secretClient := secret.NewMockClient(ctrl) secretProvider := secretprovider.NewSecretProvider(secretprovider.SecretProviderOptions{}) @@ -122,13 +121,18 @@ func Test_Routes(t *testing.T) { Address: "localhost", PathBase: pathBase, Config: &hostoptions.UCPConfig{}, - DataProvider: dataProvider, + DataProvider: dataprovider.DataStorageProviderFromMemory(), SecretProvider: secretProvider, + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), } rpctest.AssertRouters(t, tests, pathBase, "", func(ctx context.Context) (chi.Router, error) { module := NewModule(options) handler, err := module.Initialize(ctx) - return handler.(chi.Router), err + if err != nil { + return nil, err + } + + return handler.(chi.Router), nil }) } diff --git a/pkg/ucp/frontend/azure/routes.go b/pkg/ucp/frontend/azure/routes.go index ca06e44f61..c54cd05fb5 100644 --- a/pkg/ucp/frontend/azure/routes.go +++ b/pkg/ucp/frontend/azure/routes.go @@ -74,6 +74,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeCollectionRouter, Method: v1.OperationList, OperationType: &v1.OperationType{Type: datamodel.AzurePlaneResourceType, Method: v1.OperationList}, + ResourceType: datamodel.AzurePlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return &planes_ctrl.ListPlanesByType[*datamodel.AzurePlane, datamodel.AzurePlane]{ Operation: controller.NewOperation(opts, planeResourceOptions), @@ -84,6 +85,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeResourceRouter, Method: v1.OperationGet, OperationType: &v1.OperationType{Type: datamodel.AzurePlaneResourceType, Method: v1.OperationGet}, + ResourceType: datamodel.AzurePlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return defaultoperation.NewGetResource(opts, planeResourceOptions) }, @@ -92,6 +94,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeResourceRouter, Method: v1.OperationPut, OperationType: &v1.OperationType{Type: datamodel.AzurePlaneResourceType, Method: v1.OperationPut}, + ResourceType: datamodel.AzurePlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return defaultoperation.NewDefaultSyncPut(opts, planeResourceOptions) }, @@ -100,6 +103,7 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeResourceRouter, Method: v1.OperationDelete, OperationType: &v1.OperationType{Type: datamodel.AzurePlaneResourceType, Method: v1.OperationDelete}, + ResourceType: datamodel.AzurePlaneResourceType, ControllerFactory: func(opts controller.Options) (controller.Controller, error) { return defaultoperation.NewDefaultSyncDelete(opts, planeResourceOptions) }, @@ -156,14 +160,21 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: planeResourceRouter, Path: server.CatchAllPath, OperationType: &v1.OperationType{Type: OperationTypeUCPAzureProxy, Method: v1.OperationProxy}, + ResourceType: OperationTypeUCPAzureProxy, ControllerFactory: planes_ctrl.NewProxyController, }, } + storageClient, err := m.options.DataProvider.GetClient(ctx) + if err != nil { + return nil, err + } + ctrlOpts := controller.Options{ - Address: m.options.Address, - PathBase: m.options.PathBase, - DataProvider: m.options.DataProvider, + Address: m.options.Address, + PathBase: m.options.PathBase, + StorageClient: storageClient, + StatusManager: m.options.StatusManager, } for _, h := range handlerOptions { diff --git a/pkg/ucp/frontend/azure/routes_test.go b/pkg/ucp/frontend/azure/routes_test.go index 8690166085..1e0e3a5f8c 100644 --- a/pkg/ucp/frontend/azure/routes_test.go +++ b/pkg/ucp/frontend/azure/routes_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/mock/gomock" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/armrpc/rpctest" "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" "github.com/radius-project/radius/pkg/ucp/datamodel" @@ -85,8 +86,6 @@ func Test_Routes(t *testing.T) { } ctrl := gomock.NewController(t) - dataProvider := dataprovider.NewMockDataStorageProvider(ctrl) - dataProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() secretClient := secret.NewMockClient(ctrl) secretProvider := secretprovider.NewSecretProvider(secretprovider.SecretProviderOptions{}) @@ -96,13 +95,18 @@ func Test_Routes(t *testing.T) { Address: "localhost", PathBase: pathBase, Config: &hostoptions.UCPConfig{}, - DataProvider: dataProvider, + DataProvider: dataprovider.DataStorageProviderFromMemory(), SecretProvider: secretProvider, + StatusManager: statusmanager.NewMockStatusManager(gomock.NewController(t)), } rpctest.AssertRouters(t, tests, pathBase, "", func(ctx context.Context) (chi.Router, error) { module := NewModule(options) handler, err := module.Initialize(ctx) - return handler.(chi.Router), err + if err != nil { + return nil, err + } + + return handler.(chi.Router), nil }) } diff --git a/pkg/ucp/frontend/modules/types.go b/pkg/ucp/frontend/modules/types.go index 6b0a0d6e9a..87abadc2c4 100644 --- a/pkg/ucp/frontend/modules/types.go +++ b/pkg/ucp/frontend/modules/types.go @@ -61,7 +61,7 @@ type Options struct { Location string // DataProvider is the data storage provider. - DataProvider dataprovider.DataStorageProvider + DataProvider *dataprovider.DataStorageProvider // QeueueProvider provides access to the queue for async operations. QueueProvider *queueprovider.QueueProvider diff --git a/pkg/ucp/frontend/radius/routes.go b/pkg/ucp/frontend/radius/routes.go index 2631bd9afd..cee401c79b 100644 --- a/pkg/ucp/frontend/radius/routes.go +++ b/pkg/ucp/frontend/radius/routes.go @@ -64,10 +64,15 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { return handler } + storageClient, err := m.options.DataProvider.GetClient(ctx) + if err != nil { + return nil, err + } + ctrlOptions := controller.Options{ Address: m.options.Address, PathBase: m.options.PathBase, - DataProvider: m.options.DataProvider, + StorageClient: storageClient, StatusManager: m.options.StatusManager, } diff --git a/pkg/ucp/frontend/radius/routes_test.go b/pkg/ucp/frontend/radius/routes_test.go index c47bb9ab1a..6e26e16adf 100644 --- a/pkg/ucp/frontend/radius/routes_test.go +++ b/pkg/ucp/frontend/radius/routes_test.go @@ -185,8 +185,7 @@ func Test_Routes(t *testing.T) { } ctrl := gomock.NewController(t) - dataProvider := dataprovider.NewMockDataStorageProvider(ctrl) - dataProvider.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + dataProvider := dataprovider.DataStorageProviderFromMemory() secretClient := secret.NewMockClient(ctrl) secretProvider := secretprovider.NewSecretProvider(secretprovider.SecretProviderOptions{}) @@ -203,6 +202,10 @@ func Test_Routes(t *testing.T) { rpctest.AssertRouters(t, tests, pathBase, "", func(ctx context.Context) (chi.Router, error) { module := NewModule(options) handler, err := module.Initialize(ctx) - return handler.(chi.Router), err + if err != nil { + return nil, err + } + + return handler.(chi.Router), nil }) } diff --git a/pkg/ucp/integrationtests/radius/proxy_test.go b/pkg/ucp/integrationtests/radius/proxy_test.go index 371a79af0a..0cd69ea5fc 100644 --- a/pkg/ucp/integrationtests/radius/proxy_test.go +++ b/pkg/ucp/integrationtests/radius/proxy_test.go @@ -176,7 +176,7 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { return result, nil } - client, err := ucp.Clients.StorageProvider.GetStorageClient(ctx, "System.Test/testResources") + client, err := ucp.Clients.StorageProvider.GetClient(ctx) require.NoError(t, err) err = client.Delete(ctx, testResourceID) require.NoError(t, err) diff --git a/pkg/ucp/integrationtests/testrp/async.go b/pkg/ucp/integrationtests/testrp/async.go index 144b598c8c..61043b1092 100644 --- a/pkg/ucp/integrationtests/testrp/async.go +++ b/pkg/ucp/integrationtests/testrp/async.go @@ -60,7 +60,7 @@ func AsyncResource(t *testing.T, ts *testserver.TestServer, rootScope string, pu resourceType := "System.Test/testResources" // We can share the storage provider with the test server. - _, err := ts.Clients.StorageProvider.GetStorageClient(ctx, "System.Test/operationStatuses") + storageClient, err := ts.Clients.StorageProvider.GetClient(ctx) require.NoError(t, err) // Do not share the queue. @@ -73,19 +73,19 @@ func AsyncResource(t *testing.T, ts *testserver.TestServer, rootScope string, pu queueClient, err := queueProvider.GetClient(ctx) require.NoError(t, err) - statusManager := statusmanager.New(ts.Clients.StorageProvider, queueClient, v1.LocationGlobal) + statusManager := statusmanager.New(storageClient, queueClient, v1.LocationGlobal) backendOpts := backend_ctrl.Options{ - DataProvider: ts.Clients.StorageProvider, + StorageClient: storageClient, } - registry := worker.NewControllerRegistry(ts.Clients.StorageProvider) - err = registry.Register(ctx, resourceType, v1.OperationPut, func(opts backend_ctrl.Options) (backend_ctrl.Controller, error) { + registry := worker.NewControllerRegistry() + err = registry.Register(resourceType, v1.OperationPut, func(opts backend_ctrl.Options) (backend_ctrl.Controller, error) { return &BackendFuncController{BaseController: backend_ctrl.NewBaseAsyncController(opts), Func: put}, nil }, backendOpts) require.NoError(t, err) - err = registry.Register(ctx, resourceType, v1.OperationDelete, func(opts backend_ctrl.Options) (backend_ctrl.Controller, error) { + err = registry.Register(resourceType, v1.OperationDelete, func(opts backend_ctrl.Options) (backend_ctrl.Controller, error) { return &BackendFuncController{BaseController: backend_ctrl.NewBaseAsyncController(opts), Func: delete}, nil }, backendOpts) require.NoError(t, err) @@ -100,7 +100,8 @@ func AsyncResource(t *testing.T, ts *testserver.TestServer, rootScope string, pu }() frontendOpts := frontend_ctrl.Options{ - DataProvider: ts.Clients.StorageProvider, + Address: "localhost:8080", + StorageClient: storageClient, StatusManager: statusManager, } diff --git a/pkg/ucp/integrationtests/testrp/sync.go b/pkg/ucp/integrationtests/testrp/sync.go index 53ca8b4438..f720a319ed 100644 --- a/pkg/ucp/integrationtests/testrp/sync.go +++ b/pkg/ucp/integrationtests/testrp/sync.go @@ -23,12 +23,14 @@ import ( "github.com/go-chi/chi/v5" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" frontend_ctrl "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/frontend/defaultoperation" "github.com/radius-project/radius/pkg/armrpc/frontend/server" "github.com/radius-project/radius/pkg/armrpc/servicecontext" "github.com/radius-project/radius/pkg/middleware" "github.com/radius-project/radius/pkg/ucp/integrationtests/testserver" + queueprovider "github.com/radius-project/radius/pkg/ucp/queue/provider" "github.com/radius-project/radius/test/testcontext" "github.com/stretchr/testify/require" ) @@ -41,11 +43,29 @@ func SyncResource(t *testing.T, ts *testserver.TestServer, rootScope string) fun r := chi.NewRouter() r.Use(servicecontext.ARMRequestCtx("", v1.LocationGlobal), middleware.LowercaseURLPath) + // We can share the storage provider with the test server. + storageClient, err := ts.Clients.StorageProvider.GetClient(ctx) + require.NoError(t, err) + + // Do not share the queue. + queueOptions := queueprovider.QueueProviderOptions{ + Provider: queueprovider.TypeInmemory, + InMemory: &queueprovider.InMemoryQueueOptions{}, + Name: "System.Test", + } + queueProvider := queueprovider.New(queueOptions) + queueClient, err := queueProvider.GetClient(ctx) + require.NoError(t, err) + + statusManager := statusmanager.New(storageClient, queueClient, v1.LocationGlobal) + ctrlOpts := frontend_ctrl.Options{ - DataProvider: ts.Clients.StorageProvider, + Address: "localhost:8080", + StatusManager: statusManager, + StorageClient: storageClient, } - err := server.ConfigureDefaultHandlers(ctx, r, rootScope, false, "System.Test", nil, ctrlOpts) + err = server.ConfigureDefaultHandlers(ctx, r, rootScope, false, "System.Test", nil, ctrlOpts) require.NoError(t, err) resourceType := "System.Test/testResources" diff --git a/pkg/ucp/integrationtests/testserver/testserver.go b/pkg/ucp/integrationtests/testserver/testserver.go index f1eba83023..218181b40c 100644 --- a/pkg/ucp/integrationtests/testserver/testserver.go +++ b/pkg/ucp/integrationtests/testserver/testserver.go @@ -105,7 +105,7 @@ type TestServerClients struct { SecretProvider *secretprovider.SecretProvider // StorageProvider is the storage client provider. - StorageProvider dataprovider.DataStorageProvider + StorageProvider *dataprovider.DataStorageProvider } // TestServerMocks provides access to mock instances created by the TestServer. @@ -150,11 +150,7 @@ func StartWithMocks(t *testing.T, configureModules func(options modules.Options) ctrl := gomock.NewController(t) dataClient := store.NewMockStorageClient(ctrl) - dataProvider := dataprovider.NewMockDataStorageProvider(ctrl) - dataProvider.EXPECT(). - GetStorageClient(gomock.Any(), gomock.Any()). - Return(dataClient, nil). - AnyTimes() + dataProvider := dataprovider.DataStorageProviderFromClient(dataClient) queueClient := queue.NewMockClient(ctrl) queueProvider := queueprovider.New(queueprovider.QueueProviderOptions{Name: "System.Resources"}) @@ -180,7 +176,7 @@ func StartWithMocks(t *testing.T, configureModules func(options modules.Options) require.NoError(t, err, "failed to load OpenAPI spec") options := modules.Options{ - Address: server.URL, + Address: "localhost:9999", // Will be dynamically populated when server is started PathBase: pathBase, Config: &hostoptions.UCPConfig{}, DataProvider: dataProvider, @@ -275,7 +271,7 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) // Generate a random base path to ensure we're handling it correctly. pathBase := "/" + uuid.New().String() - dataProvider := dataprovider.NewStorageProvider(storageOptions) + dataProvider := dataprovider.DataStorageProviderFromOptions(storageOptions) secretProvider := secretprovider.NewSecretProvider(secretOptions) queueProvider := queueprovider.New(queueOptions) @@ -296,7 +292,10 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) connection, err := sdk.NewDirectConnection(address + pathBase) require.NoError(t, err) - statusManager := statusmanager.New(dataProvider, queueClient, v1.LocationGlobal) + storageClient, err := dataProvider.GetClient(ctx) + require.NoError(t, err) + + statusManager := statusmanager.New(storageClient, queueClient, v1.LocationGlobal) specLoader, err := validator.LoadSpec(ctx, "ucp", swagger.SpecFilesUCP, []string{pathBase}, "") require.NoError(t, err, "failed to load OpenAPI spec") @@ -327,8 +326,8 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) defaultDownstream, err := url.Parse(options.Config.Routing.DefaultDownstreamEndpoint) require.NoError(t, err) - registry := worker.NewControllerRegistry(dataProvider) - err = backend.RegisterControllers(ctx, registry, connection, http.DefaultTransport, backend_ctrl.Options{DataProvider: dataProvider}, defaultDownstream) + registry := worker.NewControllerRegistry() + err = backend.RegisterControllers(registry, connection, http.DefaultTransport, backend_ctrl.Options{StorageClient: storageClient}, defaultDownstream) require.NoError(t, err) w := worker.New(worker.Options{}, statusManager, queueClient, registry) diff --git a/pkg/ucp/secret/provider/factory.go b/pkg/ucp/secret/provider/factory.go index 5608171b52..839fb7b9fa 100644 --- a/pkg/ucp/secret/provider/factory.go +++ b/pkg/ucp/secret/provider/factory.go @@ -44,7 +44,7 @@ func initETCDSecretClient(ctx context.Context, opts SecretProviderOptions) (secr // data provider already creates an etcd process which can be re-used instead of a new process for secret. client, err := dataprovider.InitETCDClient(ctx, dataprovider.StorageProviderOptions{ ETCD: opts.ETCD, - }, "") + }) if err != nil { return nil, err }