Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify database interface #8126

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions pkg/armrpc/asyncoperation/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package controller

import (
"context"
"errors"

"github.com/radius-project/radius/pkg/corerp/backend/deployment"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
"github.com/radius-project/radius/pkg/ucp/store"

runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -31,9 +31,6 @@ type Options struct {
// StorageClient is the data storage client.
StorageClient store.StorageClient

// DataProvider is the data storage provider.
DataProvider dataprovider.DataStorageProvider

// KubeClient is the Kubernetes controller runtime client.
KubeClient runtimeclient.Client

Expand All @@ -44,6 +41,22 @@ type Options struct {
GetDeploymentProcessor func() deployment.DeploymentProcessor
}

// Validate validates that required fields are set on the options.
func (o Options) Validate() error {
var err error
if o.StorageClient == nil {
err = errors.Join(err, errors.New("StorageClient is required"))
}
if o.ResourceType == "" {
err = errors.Join(err, errors.New("ResourceType is required"))
}

// KubeClient and GetDeploymentProcessor are not used by the majority of the code, so they
// are not validated here.

return err
}

rynowak marked this conversation as resolved.
Show resolved Hide resolved
// Controller is an interface to implement async operation controller.
type Controller interface {
// Run runs async request operation.
Expand All @@ -68,11 +81,6 @@ func (b *BaseController) StorageClient() store.StorageClient {
return b.options.StorageClient
}

// DataProvider gets data storage provider for this controller.
func (b *BaseController) DataProvider() dataprovider.DataStorageProvider {
return b.options.DataProvider
}

// KubeClient gets Kubernetes client for this controller.
func (b *BaseController) KubeClient() runtimeclient.Client {
return b.options.KubeClient
Expand Down
42 changes: 9 additions & 33 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/metrics"
"github.com/radius-project/radius/pkg/trace"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"
Expand All @@ -37,7 +36,7 @@ import (

// statusManager includes the necessary functions to manage asynchronous operations.
type statusManager struct {
storeProvider dataprovider.DataStorageProvider
storageClient store.StorageClient
queue queue.Client
location string
}
Expand Down Expand Up @@ -65,9 +64,9 @@ type StatusManager interface {
}

// New creates statusManager instance.
func New(dataProvider dataprovider.DataStorageProvider, q queue.Client, location string) StatusManager {
func New(storageClient store.StorageClient, q queue.Client, location string) StatusManager {
return &statusManager{
storeProvider: dataProvider,
storageClient: storageClient,
queue: q,
location: location,
}
Expand All @@ -78,10 +77,6 @@ func (aom *statusManager) operationStatusResourceID(id resources.ID, operationID
return fmt.Sprintf("%s/providers/%s/locations/%s/operationstatuses/%s", id.PlaneScope(), strings.ToLower(id.ProviderNamespace()), aom.location, operationID)
}

func (aom *statusManager) getClient(ctx context.Context, id resources.ID) (store.StorageClient, error) {
return aom.storeProvider.GetStorageClient(ctx, id.ProviderNamespace()+"/operationstatuses")
}

// QueueAsyncOperation creates and saves a new status resource with the given parameters in datastore, and queues
// a request message. If an error occurs, the status is deleted using the storeClient.
func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMRequestContext, options QueueOperationOptions) error {
Expand Down Expand Up @@ -111,12 +106,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
ClientObjectID: sCtx.ClientObjectID,
}

storeClient, err := aom.getClient(ctx, sCtx.ResourceID)
if err != nil {
return err
}

err = storeClient.Save(ctx, &store.Object{
err := aom.storageClient.Save(ctx, &store.Object{
Metadata: store.Metadata{ID: opID},
Data: aos,
})
Expand All @@ -126,7 +116,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
}

if err = aom.queueRequestMessage(ctx, sCtx, aos, options.OperationTimeout); err != nil {
delErr := storeClient.Delete(ctx, opID)
delErr := aom.storageClient.Delete(ctx, opID)
if delErr != nil {
return delErr
}
Expand All @@ -140,12 +130,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR

// Get gets a status object from the datastore or an error if the retrieval fails.
func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID uuid.UUID) (*Status, error) {
storeClient, err := aom.getClient(ctx, id)
if err != nil {
return nil, err
}

obj, err := storeClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
obj, err := aom.storageClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
if err != nil {
return nil, err
}
Expand All @@ -162,12 +147,7 @@ func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID
// given parameters, and saves it back to the store.
func (aom *statusManager) Update(ctx context.Context, id resources.ID, operationID uuid.UUID, state v1.ProvisioningState, endTime *time.Time, opError *v1.ErrorDetails) error {
opID := aom.operationStatusResourceID(id, operationID)
storeClient, err := aom.getClient(ctx, id)
if err != nil {
return err
}

obj, err := storeClient.Get(ctx, opID)
obj, err := aom.storageClient.Get(ctx, opID)
if err != nil {
return err
}
Expand All @@ -190,17 +170,13 @@ func (aom *statusManager) Update(ctx context.Context, id resources.ID, operation

obj.Data = s

return storeClient.Save(ctx, obj, store.WithETag(obj.ETag))
return aom.storageClient.Save(ctx, obj, store.WithETag(obj.ETag))
}

// Delete deletes the operation status resource associated with the given ID and
// operationID, and returns an error if unsuccessful.
func (aom *statusManager) Delete(ctx context.Context, id resources.ID, operationID uuid.UUID) error {
storeClient, err := aom.getClient(ctx, id)
if err != nil {
return err
}
return storeClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
return aom.storageClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
}

// queueRequestMessage function is to put the async operation message to the queue to be worked on.
Expand Down
14 changes: 5 additions & 9 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/google/uuid"
v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
"github.com/radius-project/radius/pkg/armrpc/rpctest"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"
Expand All @@ -35,10 +34,9 @@ import (
)

type asyncOperationsManagerTest struct {
manager StatusManager
storeProvider *dataprovider.MockDataStorageProvider
storeClient *store.MockStorageClient
queue *queue.MockClient
manager StatusManager
storeClient *store.MockStorageClient
queue *queue.MockClient
}

const (
Expand All @@ -54,12 +52,10 @@ const (

func setup(tb testing.TB) (asyncOperationsManagerTest, *gomock.Controller) {
ctrl := gomock.NewController(tb)
dp := dataprovider.NewMockDataStorageProvider(ctrl)
sc := store.NewMockStorageClient(ctrl)
dp.EXPECT().GetStorageClient(gomock.Any(), "Applications.Core/operationstatuses").Return(sc, nil)
enq := queue.NewMockClient(ctrl)
aom := New(dp, enq, "test-location")
return asyncOperationsManagerTest{manager: aom, storeProvider: dp, storeClient: sc, queue: enq}, ctrl
aom := New(sc, enq, "test-location")
return asyncOperationsManagerTest{manager: aom, storeClient: sc, queue: enq}, ctrl
}

var reqCtx = &v1.ARMRequestContext{
Expand Down
38 changes: 17 additions & 21 deletions pkg/armrpc/asyncoperation/worker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package worker

import (
"context"
"fmt"
"sync"

v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
)

// ControllerFactoryFunc is a factory function to create a controller.
Expand All @@ -32,40 +31,38 @@ type ControllerFactoryFunc func(opts ctrl.Options) (ctrl.Controller, error)
type ControllerRegistry struct {
ctrlMap map[string]ctrl.Controller
ctrlMapMu sync.RWMutex
sp dataprovider.DataStorageProvider

defaultFactory ControllerFactoryFunc
defaultOpts ctrl.Options
}

// NewControllerRegistry creates an ControllerRegistry instance.
func NewControllerRegistry(sp dataprovider.DataStorageProvider) *ControllerRegistry {
func NewControllerRegistry() *ControllerRegistry {
return &ControllerRegistry{
ctrlMap: map[string]ctrl.Controller{},
sp: sp,
}
}

// Register registers a controller for a specific resource type and operation method.
//
// Controllers registered using Register will be cached by the registry and the same instance will be reused.
func (h *ControllerRegistry) Register(ctx context.Context, resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
func (h *ControllerRegistry) Register(resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
h.ctrlMapMu.Lock()
defer h.ctrlMapMu.Unlock()

ot := v1.OperationType{Type: resourceType, Method: method}
storageClient, err := h.sp.GetStorageClient(ctx, resourceType)
opts.ResourceType = resourceType

err := opts.Validate()
if err != nil {
return err
return fmt.Errorf("invalid controller options: %w", err)
}
opts.StorageClient = storageClient
opts.ResourceType = resourceType

ctrl, err := factoryFn(opts)
if err != nil {
return err
}

ot := v1.OperationType{Type: resourceType, Method: method}
h.ctrlMap[ot.String()] = ctrl
return nil
}
Expand All @@ -74,41 +71,40 @@ func (h *ControllerRegistry) Register(ctx context.Context, resourceType string,
//
// The default controller will be used when Get is called with an operation type that has no registered controller.
// The default controller will not be cached by the registry.
func (h *ControllerRegistry) RegisterDefault(ctx context.Context, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
func (h *ControllerRegistry) RegisterDefault(factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
h.ctrlMapMu.Lock()
defer h.ctrlMapMu.Unlock()

// Note: we can't call opts.Validate() here because we don't know the resource type yet.
if opts.StorageClient == nil {
return fmt.Errorf("invalid controller options: .StorageClient is required")
rynowak marked this conversation as resolved.
Show resolved Hide resolved
}

h.defaultFactory = factoryFn
h.defaultOpts = opts
return nil
}

// Get gets the registered async controller instance.
func (h *ControllerRegistry) Get(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) {
func (h *ControllerRegistry) Get(operationType v1.OperationType) (ctrl.Controller, error) {
h.ctrlMapMu.RLock()
defer h.ctrlMapMu.RUnlock()

if h, ok := h.ctrlMap[operationType.String()]; ok {
return h, nil
}

return h.getDefault(ctx, operationType)
return h.getDefault(operationType)
}

func (h *ControllerRegistry) getDefault(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) {
func (h *ControllerRegistry) getDefault(operationType v1.OperationType) (ctrl.Controller, error) {
if h.defaultFactory == nil {
return nil, nil
}

storageClient, err := h.sp.GetStorageClient(ctx, operationType.Type)
if err != nil {
return nil, err
}

// Copy the options so we can update it.
opts := h.defaultOpts

opts.StorageClient = storageClient
opts.ResourceType = operationType.Type

return h.defaultFactory(opts)
Expand Down
Loading
Loading