Skip to content

Commit

Permalink
Rename database APIs
Browse files Browse the repository at this point in the history
This change renames the database APIs to have clearer names.

Signed-off-by: Ryan Nowak <[email protected]>
  • Loading branch information
rynowak committed Dec 17, 2024
1 parent 5549e1e commit 757e5b3
Show file tree
Hide file tree
Showing 192 changed files with 2,121 additions and 2,139 deletions.
8 changes: 4 additions & 4 deletions cmd/applications-rp/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/radius-project/radius/pkg/trace"

"github.com/radius-project/radius/pkg/ucp/data"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
"github.com/radius-project/radius/pkg/ucp/databaseprovider"
"github.com/radius-project/radius/pkg/ucp/hosting"
"github.com/radius-project/radius/pkg/ucp/ucplog"

Expand Down Expand Up @@ -79,14 +79,14 @@ var rootCmd = &cobra.Command{
// Must set the logger before using controller-runtime.
runtimelog.SetLogger(logger)

if options.Config.StorageProvider.Provider == dataprovider.TypeETCD &&
options.Config.StorageProvider.ETCD.InMemory {
if options.Config.DatabaseProvider.Provider == databaseprovider.TypeETCD &&
options.Config.DatabaseProvider.ETCD.InMemory {
// For in-memory etcd we need to register another service to manage its lifecycle.
//
// The client will be initialized asynchronously.
logger.Info("Enabled in-memory etcd")
client := hosting.NewAsyncValue[etcdclient.Client]()
options.Config.StorageProvider.ETCD.Client = client
options.Config.DatabaseProvider.ETCD.Client = client
options.Config.SecretProvider.ETCD.Client = client

hostingSvc = append(hostingSvc, data.NewEmbeddedETCDService(data.EmbeddedETCDServiceOptions{ClientConfigSink: client}))
Expand Down
8 changes: 4 additions & 4 deletions cmd/ucpd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
runtimelog "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/radius-project/radius/pkg/armrpc/hostoptions"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
"github.com/radius-project/radius/pkg/ucp/databaseprovider"
"github.com/radius-project/radius/pkg/ucp/hosting"
"github.com/radius-project/radius/pkg/ucp/server"
"github.com/radius-project/radius/pkg/ucp/ucplog"
Expand All @@ -52,13 +52,13 @@ var rootCmd = &cobra.Command{
// Must set the logger before using controller-runtime.
runtimelog.SetLogger(logger)

if options.StorageProviderOptions.Provider == dataprovider.TypeETCD &&
options.StorageProviderOptions.ETCD.InMemory {
if options.DatabaseProviderOptions.Provider == databaseprovider.TypeETCD &&
options.DatabaseProviderOptions.ETCD.InMemory {
// For in-memory etcd we need to register another service to manage its lifecycle.
//
// The client will be initialized asynchronously.
clientconfigSource := hosting.NewAsyncValue[etcdclient.Client]()
options.StorageProviderOptions.ETCD.Client = clientconfigSource
options.DatabaseProviderOptions.ETCD.Client = clientconfigSource
options.SecretProviderOptions.ETCD.Client = clientconfigSource
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/armrpc/asyncoperation/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
"errors"

"github.com/radius-project/radius/pkg/corerp/backend/deployment"
"github.com/radius-project/radius/pkg/ucp/store"
"github.com/radius-project/radius/pkg/ucp/database"

runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)

// Options represents controller options.
type Options struct {
// StorageClient is the data storage client.
StorageClient store.StorageClient
// DatabaseClient is the database client.
DatabaseClient database.Client

// KubeClient is the Kubernetes controller runtime client.
KubeClient runtimeclient.Client
Expand All @@ -44,11 +44,11 @@ type Options struct {
// Validate validates that required fields are set on the options.
func (o Options) Validate() error {
var err error
if o.StorageClient == nil {
err = errors.Join(err, errors.New("StorageClient is required"))
if o.DatabaseClient == nil {
err = errors.Join(err, errors.New(".DatabaseClient is required"))
}
if o.ResourceType == "" {
err = errors.Join(err, errors.New("ResourceType is required"))
err = errors.Join(err, errors.New(".ResourceType is required"))
}

// KubeClient and GetDeploymentProcessor are not used by the majority of the code, so they
Expand All @@ -62,8 +62,8 @@ type Controller interface {
// Run runs async request operation.
Run(ctx context.Context, request *Request) (Result, error)

// StorageClient gets the storage client for resource type.
StorageClient() store.StorageClient
// DatabaseClient gets the database client for resource type.
DatabaseClient() database.Client
}

// BaseController is the base struct of async operation controller.
Expand All @@ -76,9 +76,9 @@ func NewBaseAsyncController(options Options) BaseController {
return BaseController{options}
}

// StorageClient gets storage client for this controller.
func (b *BaseController) StorageClient() store.StorageClient {
return b.options.StorageClient
// DatabaseClient gets database client for this controller.
func (b *BaseController) DatabaseClient() database.Client {
return b.options.DatabaseClient
}

// KubeClient gets Kubernetes client for this controller.
Expand Down
32 changes: 16 additions & 16 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ import (
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/metrics"
"github.com/radius-project/radius/pkg/trace"
"github.com/radius-project/radius/pkg/ucp/database"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"

"github.com/google/uuid"
)

// statusManager includes the necessary functions to manage asynchronous operations.
type statusManager struct {
storageClient store.StorageClient
queue queue.Client
location string
databaseClient database.Client
queue queue.Client
location string
}

// QueueOperationOptions is the options type provided when queueing an async operation.
Expand All @@ -64,11 +64,11 @@ type StatusManager interface {
}

// New creates statusManager instance.
func New(storageClient store.StorageClient, q queue.Client, location string) StatusManager {
func New(databaseClient database.Client, queueClient queue.Client, location string) StatusManager {
return &statusManager{
storageClient: storageClient,
queue: q,
location: location,
databaseClient: databaseClient,
queue: queueClient,
location: location,
}
}

Expand All @@ -78,7 +78,7 @@ func (aom *statusManager) operationStatusResourceID(id resources.ID, operationID
}

// QueueAsyncOperation creates and saves a new status resource with the given parameters in datastore, and queues
// a request message. If an error occurs, the status is deleted using the storeClient.
// a request message. If an error occurs, the status is deleted using the databaseClient.
func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMRequestContext, options QueueOperationOptions) error {
ctx, span := trace.StartProducerSpan(ctx, "statusmanager.QueueAsyncOperation publish", trace.FrontendTracerName)
defer span.End()
Expand Down Expand Up @@ -106,8 +106,8 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
ClientObjectID: sCtx.ClientObjectID,
}

err := aom.storageClient.Save(ctx, &store.Object{
Metadata: store.Metadata{ID: opID},
err := aom.databaseClient.Save(ctx, &database.Object{
Metadata: database.Metadata{ID: opID},
Data: aos,
})

Expand All @@ -116,7 +116,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
}

if err = aom.queueRequestMessage(ctx, sCtx, aos, options.OperationTimeout); err != nil {
delErr := aom.storageClient.Delete(ctx, opID)
delErr := aom.databaseClient.Delete(ctx, opID)
if delErr != nil {
return delErr
}
Expand All @@ -130,7 +130,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR

// Get gets a status object from the datastore or an error if the retrieval fails.
func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID uuid.UUID) (*Status, error) {
obj, err := aom.storageClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
obj, err := aom.databaseClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
if err != nil {
return nil, err
}
Expand All @@ -147,7 +147,7 @@ func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID
// given parameters, and saves it back to the store.
func (aom *statusManager) Update(ctx context.Context, id resources.ID, operationID uuid.UUID, state v1.ProvisioningState, endTime *time.Time, opError *v1.ErrorDetails) error {
opID := aom.operationStatusResourceID(id, operationID)
obj, err := aom.storageClient.Get(ctx, opID)
obj, err := aom.databaseClient.Get(ctx, opID)
if err != nil {
return err
}
Expand All @@ -170,13 +170,13 @@ func (aom *statusManager) Update(ctx context.Context, id resources.ID, operation

obj.Data = s

return aom.storageClient.Save(ctx, obj, store.WithETag(obj.ETag))
return aom.databaseClient.Save(ctx, obj, database.WithETag(obj.ETag))
}

// Delete deletes the operation status resource associated with the given ID and
// operationID, and returns an error if unsuccessful.
func (aom *statusManager) Delete(ctx context.Context, id resources.ID, operationID uuid.UUID) error {
return aom.storageClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
return aom.databaseClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
}

// queueRequestMessage function is to put the async operation message to the queue to be worked on.
Expand Down
38 changes: 19 additions & 19 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ import (
"github.com/google/uuid"
v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
"github.com/radius-project/radius/pkg/armrpc/rpctest"
"github.com/radius-project/radius/pkg/ucp/database"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

type asyncOperationsManagerTest struct {
manager StatusManager
storeClient *store.MockStorageClient
queue *queue.MockClient
manager StatusManager
databaseClient *database.MockClient
queueClient *queue.MockClient
}

const (
Expand All @@ -52,10 +52,10 @@ const (

func setup(tb testing.TB) (asyncOperationsManagerTest, *gomock.Controller) {
ctrl := gomock.NewController(tb)
sc := store.NewMockStorageClient(ctrl)
sc := database.NewMockClient(ctrl)
enq := queue.NewMockClient(ctrl)
aom := New(sc, enq, "test-location")
return asyncOperationsManagerTest{manager: aom, storeClient: sc, queue: enq}, ctrl
return asyncOperationsManagerTest{manager: aom, databaseClient: sc, queueClient: enq}, ctrl
}

var reqCtx = &v1.ARMRequestContext{
Expand Down Expand Up @@ -151,16 +151,16 @@ func TestCreateAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.SaveErr)
aomTest.databaseClient.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.SaveErr)

// We can't expect an async operation to be queued if it is not saved to the DB.
if tt.SaveErr == nil {
aomTest.queue.EXPECT().Enqueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.EnqueueErr)
aomTest.queueClient.EXPECT().Enqueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.EnqueueErr)
}

// If there is an error when enqueuing the message, the async operation should be deleted.
if tt.EnqueueErr != nil {
aomTest.storeClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
aomTest.databaseClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
}

options := QueueOperationOptions{
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestDeleteAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
aomTest.databaseClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
rid, err := resources.ParseResource(azureEnvResourceID)
require.NoError(t, err)
err = aomTest.manager.Delete(context.TODO(), rid, uuid.New())
Expand All @@ -226,13 +226,13 @@ func TestGetAsyncOperationStatus(t *testing.T) {
getCases := []struct {
Desc string
GetErr error
Obj *store.Object
Obj *database.Object
}{
{
Desc: "get_success",
GetErr: nil,
Obj: &store.Object{
Metadata: store.Metadata{ID: opID.String(), ETag: "etag"},
Obj: &database.Object{
Metadata: database.Metadata{ID: opID.String(), ETag: "etag"},
Data: testAos,
},
},
Expand All @@ -248,7 +248,7 @@ func TestGetAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.
aomTest.databaseClient.
EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any()).
Return(tt.Obj, tt.GetErr)
Expand All @@ -275,14 +275,14 @@ func TestUpdateAsyncOperationStatus(t *testing.T) {
updateCases := []struct {
Desc string
GetErr error
Obj *store.Object
Obj *database.Object
SaveErr error
}{
{
Desc: "update_success",
GetErr: nil,
Obj: &store.Object{
Metadata: store.Metadata{ID: opID.String(), ETag: "etag"},
Obj: &database.Object{
Metadata: database.Metadata{ID: opID.String(), ETag: "etag"},
Data: testAos,
},
SaveErr: nil,
Expand All @@ -294,13 +294,13 @@ func TestUpdateAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.
aomTest.databaseClient.
EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any()).
Return(tt.Obj, tt.GetErr)

if tt.GetErr == nil {
aomTest.storeClient.
aomTest.databaseClient.
EXPECT().
Save(gomock.Any(), gomock.Any(), gomock.Any()).
Return(tt.SaveErr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/armrpc/asyncoperation/worker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (h *ControllerRegistry) RegisterDefault(factoryFn ControllerFactoryFunc, op
defer h.ctrlMapMu.Unlock()

// Note: we can't call opts.Validate() here because we don't know the resource type yet.
if opts.StorageClient == nil {
return fmt.Errorf("invalid controller options: .StorageClient is required")
if opts.DatabaseClient == nil {
return fmt.Errorf("invalid controller options: .DatabaseClient is required")
}

h.defaultFactory = factoryFn
Expand Down
6 changes: 3 additions & 3 deletions pkg/armrpc/asyncoperation/worker/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/corerp/backend/deployment"
"github.com/radius-project/radius/pkg/ucp/store/inmemory"
"github.com/radius-project/radius/pkg/ucp/database/inmemory"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
Expand All @@ -38,7 +38,7 @@ func TestRegister_Get(t *testing.T) {
opPut := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationPut}

ctrlOpts := ctrl.Options{
StorageClient: inmemory.NewClient(),
DatabaseClient: inmemory.NewClient(),
GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil },
}

Expand Down Expand Up @@ -79,7 +79,7 @@ func TestRegister_Get_WithDefault(t *testing.T) {
opGet := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationGet}

ctrlOpts := ctrl.Options{
StorageClient: inmemory.NewClient(),
DatabaseClient: inmemory.NewClient(),
GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil },
}

Expand Down
Loading

0 comments on commit 757e5b3

Please sign in to comment.