Skip to content

Commit

Permalink
Simplify database interface
Browse files Browse the repository at this point in the history
This change simplifes our database provider interface by removing the 'name' parameter.

Prior to this change, each database client was used to store and retrieve a single resource type's data. Every piece of code had to be aware of the resource type associated with the client, and make sure to get a different client for each resource type it interacted with.

This was complicated further by the fact that not all of our database providers implemented this restriction.

---

After removing this restriction, our code can be simplified all over the place. A lot of code that used to need the `dataprovider.DataStoreProvider` now just needs the `store.StorageClient`.

This change pushes the simplification as far as possible. I also added validation to places where we initialize controllers, because it was too easy to initialize a controller with missing data.

Signed-off-by: Ryan Nowak <[email protected]>
  • Loading branch information
rynowak committed Dec 16, 2024
1 parent 3383692 commit afc96c6
Show file tree
Hide file tree
Showing 52 changed files with 571 additions and 639 deletions.
26 changes: 17 additions & 9 deletions pkg/armrpc/asyncoperation/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package controller

import (
"context"
"errors"

"github.com/radius-project/radius/pkg/corerp/backend/deployment"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
"github.com/radius-project/radius/pkg/ucp/store"

runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -31,9 +31,6 @@ type Options struct {
// StorageClient is the data storage client.
StorageClient store.StorageClient

// DataProvider is the data storage provider.
DataProvider dataprovider.DataStorageProvider

// KubeClient is the Kubernetes controller runtime client.
KubeClient runtimeclient.Client

Expand All @@ -44,6 +41,22 @@ type Options struct {
GetDeploymentProcessor func() deployment.DeploymentProcessor
}

// Validate validates that required fields are set on the options.
func (o Options) Validate() error {
var err error
if o.StorageClient == nil {
err = errors.Join(err, errors.New("StorageClient is required"))
}
if o.ResourceType == "" {
err = errors.Join(err, errors.New("ResourceType is required"))
}

// KubeClient and GetDeploymentProcessor are not used by the majority of the code, so they
// are not validated here.

return err
}

// Controller is an interface to implement async operation controller.
type Controller interface {
// Run runs async request operation.
Expand All @@ -68,11 +81,6 @@ func (b *BaseController) StorageClient() store.StorageClient {
return b.options.StorageClient
}

// DataProvider gets data storage provider for this controller.
func (b *BaseController) DataProvider() dataprovider.DataStorageProvider {
return b.options.DataProvider
}

// KubeClient gets Kubernetes client for this controller.
func (b *BaseController) KubeClient() runtimeclient.Client {
return b.options.KubeClient
Expand Down
42 changes: 9 additions & 33 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/metrics"
"github.com/radius-project/radius/pkg/trace"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"
Expand All @@ -37,7 +36,7 @@ import (

// statusManager includes the necessary functions to manage asynchronous operations.
type statusManager struct {
storeProvider dataprovider.DataStorageProvider
storageClient store.StorageClient
queue queue.Client
location string
}
Expand Down Expand Up @@ -65,9 +64,9 @@ type StatusManager interface {
}

// New creates statusManager instance.
func New(dataProvider dataprovider.DataStorageProvider, q queue.Client, location string) StatusManager {
func New(storageClient store.StorageClient, q queue.Client, location string) StatusManager {
return &statusManager{
storeProvider: dataProvider,
storageClient: storageClient,
queue: q,
location: location,
}
Expand All @@ -78,10 +77,6 @@ func (aom *statusManager) operationStatusResourceID(id resources.ID, operationID
return fmt.Sprintf("%s/providers/%s/locations/%s/operationstatuses/%s", id.PlaneScope(), strings.ToLower(id.ProviderNamespace()), aom.location, operationID)
}

func (aom *statusManager) getClient(ctx context.Context, id resources.ID) (store.StorageClient, error) {
return aom.storeProvider.GetStorageClient(ctx, id.ProviderNamespace()+"/operationstatuses")
}

// QueueAsyncOperation creates and saves a new status resource with the given parameters in datastore, and queues
// a request message. If an error occurs, the status is deleted using the storeClient.
func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMRequestContext, options QueueOperationOptions) error {
Expand Down Expand Up @@ -111,12 +106,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
ClientObjectID: sCtx.ClientObjectID,
}

storeClient, err := aom.getClient(ctx, sCtx.ResourceID)
if err != nil {
return err
}

err = storeClient.Save(ctx, &store.Object{
err := aom.storageClient.Save(ctx, &store.Object{
Metadata: store.Metadata{ID: opID},
Data: aos,
})
Expand All @@ -126,7 +116,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
}

if err = aom.queueRequestMessage(ctx, sCtx, aos, options.OperationTimeout); err != nil {
delErr := storeClient.Delete(ctx, opID)
delErr := aom.storageClient.Delete(ctx, opID)
if delErr != nil {
return delErr
}
Expand All @@ -140,12 +130,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR

// Get gets a status object from the datastore or an error if the retrieval fails.
func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID uuid.UUID) (*Status, error) {
storeClient, err := aom.getClient(ctx, id)
if err != nil {
return nil, err
}

obj, err := storeClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
obj, err := aom.storageClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
if err != nil {
return nil, err
}
Expand All @@ -162,12 +147,7 @@ func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID
// given parameters, and saves it back to the store.
func (aom *statusManager) Update(ctx context.Context, id resources.ID, operationID uuid.UUID, state v1.ProvisioningState, endTime *time.Time, opError *v1.ErrorDetails) error {
opID := aom.operationStatusResourceID(id, operationID)
storeClient, err := aom.getClient(ctx, id)
if err != nil {
return err
}

obj, err := storeClient.Get(ctx, opID)
obj, err := aom.storageClient.Get(ctx, opID)
if err != nil {
return err
}
Expand All @@ -190,17 +170,13 @@ func (aom *statusManager) Update(ctx context.Context, id resources.ID, operation

obj.Data = s

return storeClient.Save(ctx, obj, store.WithETag(obj.ETag))
return aom.storageClient.Save(ctx, obj, store.WithETag(obj.ETag))
}

// Delete deletes the operation status resource associated with the given ID and
// operationID, and returns an error if unsuccessful.
func (aom *statusManager) Delete(ctx context.Context, id resources.ID, operationID uuid.UUID) error {
storeClient, err := aom.getClient(ctx, id)
if err != nil {
return err
}
return storeClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
return aom.storageClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
}

// queueRequestMessage function is to put the async operation message to the queue to be worked on.
Expand Down
14 changes: 5 additions & 9 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/google/uuid"
v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
"github.com/radius-project/radius/pkg/armrpc/rpctest"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"
Expand All @@ -35,10 +34,9 @@ import (
)

type asyncOperationsManagerTest struct {
manager StatusManager
storeProvider *dataprovider.MockDataStorageProvider
storeClient *store.MockStorageClient
queue *queue.MockClient
manager StatusManager
storeClient *store.MockStorageClient
queue *queue.MockClient
}

const (
Expand All @@ -54,12 +52,10 @@ const (

func setup(tb testing.TB) (asyncOperationsManagerTest, *gomock.Controller) {
ctrl := gomock.NewController(tb)
dp := dataprovider.NewMockDataStorageProvider(ctrl)
sc := store.NewMockStorageClient(ctrl)
dp.EXPECT().GetStorageClient(gomock.Any(), "Applications.Core/operationstatuses").Return(sc, nil)
enq := queue.NewMockClient(ctrl)
aom := New(dp, enq, "test-location")
return asyncOperationsManagerTest{manager: aom, storeProvider: dp, storeClient: sc, queue: enq}, ctrl
aom := New(sc, enq, "test-location")
return asyncOperationsManagerTest{manager: aom, storeClient: sc, queue: enq}, ctrl
}

var reqCtx = &v1.ARMRequestContext{
Expand Down
38 changes: 17 additions & 21 deletions pkg/armrpc/asyncoperation/worker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package worker

import (
"context"
"fmt"
"sync"

v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
)

// ControllerFactoryFunc is a factory function to create a controller.
Expand All @@ -32,40 +31,38 @@ type ControllerFactoryFunc func(opts ctrl.Options) (ctrl.Controller, error)
type ControllerRegistry struct {
ctrlMap map[string]ctrl.Controller
ctrlMapMu sync.RWMutex
sp dataprovider.DataStorageProvider

defaultFactory ControllerFactoryFunc
defaultOpts ctrl.Options
}

// NewControllerRegistry creates an ControllerRegistry instance.
func NewControllerRegistry(sp dataprovider.DataStorageProvider) *ControllerRegistry {
func NewControllerRegistry() *ControllerRegistry {
return &ControllerRegistry{
ctrlMap: map[string]ctrl.Controller{},
sp: sp,
}
}

// Register registers a controller for a specific resource type and operation method.
//
// Controllers registered using Register will be cached by the registry and the same instance will be reused.
func (h *ControllerRegistry) Register(ctx context.Context, resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
func (h *ControllerRegistry) Register(resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
h.ctrlMapMu.Lock()
defer h.ctrlMapMu.Unlock()

ot := v1.OperationType{Type: resourceType, Method: method}
storageClient, err := h.sp.GetStorageClient(ctx, resourceType)
opts.ResourceType = resourceType

err := opts.Validate()
if err != nil {
return err
return fmt.Errorf("invalid controller options: %w", err)
}
opts.StorageClient = storageClient
opts.ResourceType = resourceType

ctrl, err := factoryFn(opts)
if err != nil {
return err
}

ot := v1.OperationType{Type: resourceType, Method: method}
h.ctrlMap[ot.String()] = ctrl
return nil
}
Expand All @@ -74,41 +71,40 @@ func (h *ControllerRegistry) Register(ctx context.Context, resourceType string,
//
// The default controller will be used when Get is called with an operation type that has no registered controller.
// The default controller will not be cached by the registry.
func (h *ControllerRegistry) RegisterDefault(ctx context.Context, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
func (h *ControllerRegistry) RegisterDefault(factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
h.ctrlMapMu.Lock()
defer h.ctrlMapMu.Unlock()

// Note: we can't call opts.Validate() here because we don't know the resource type yet.
if opts.StorageClient == nil {
return fmt.Errorf("invalid controller options: .StorageClient is required")
}

h.defaultFactory = factoryFn
h.defaultOpts = opts
return nil
}

// Get gets the registered async controller instance.
func (h *ControllerRegistry) Get(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) {
func (h *ControllerRegistry) Get(operationType v1.OperationType) (ctrl.Controller, error) {
h.ctrlMapMu.RLock()
defer h.ctrlMapMu.RUnlock()

if h, ok := h.ctrlMap[operationType.String()]; ok {
return h, nil
}

return h.getDefault(ctx, operationType)
return h.getDefault(operationType)
}

func (h *ControllerRegistry) getDefault(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) {
func (h *ControllerRegistry) getDefault(operationType v1.OperationType) (ctrl.Controller, error) {
if h.defaultFactory == nil {
return nil, nil
}

storageClient, err := h.sp.GetStorageClient(ctx, operationType.Type)
if err != nil {
return nil, err
}

// Copy the options so we can update it.
opts := h.defaultOpts

opts.StorageClient = storageClient
opts.ResourceType = operationType.Type

return h.defaultFactory(opts)
Expand Down
Loading

0 comments on commit afc96c6

Please sign in to comment.