Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename database APIs #8143

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/generate.mk
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ generate-controller-gen-installed:
.PHONY: generate-ucp-crd
generate-ucp-crd: generate-controller-gen-installed ## Generates the CRDs for UCP APIServer store.
@echo "$(ARROW) Generating CRDs for ucp.dev..."
controller-gen object:headerFile=./boilerplate.go.txt paths=./pkg/ucp/store/apiserverstore/api/ucp.dev/v1alpha1/...
controller-gen crd paths=./pkg/ucp/store/apiserverstore/api/ucp.dev/v1alpha1/... output:crd:dir=./deploy/Chart/crds/ucpd
controller-gen object:headerFile=./boilerplate.go.txt paths=./pkg/ucp/database/apiserverstore/api/ucp.dev/v1alpha1/...
controller-gen crd paths=./pkg/ucp/database/apiserverstore/api/ucp.dev/v1alpha1/... output:crd:dir=./deploy/Chart/crds/ucpd

.PHONY: generate-controller
generate-controller: generate-controller-gen-installed ## Generates the CRDs for the Radius controller.
Expand Down
8 changes: 4 additions & 4 deletions cmd/applications-rp/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/radius-project/radius/pkg/trace"

"github.com/radius-project/radius/pkg/ucp/data"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
"github.com/radius-project/radius/pkg/ucp/databaseprovider"
"github.com/radius-project/radius/pkg/ucp/hosting"
"github.com/radius-project/radius/pkg/ucp/ucplog"

Expand Down Expand Up @@ -79,14 +79,14 @@ var rootCmd = &cobra.Command{
// Must set the logger before using controller-runtime.
runtimelog.SetLogger(logger)

if options.Config.StorageProvider.Provider == dataprovider.TypeETCD &&
options.Config.StorageProvider.ETCD.InMemory {
if options.Config.DatabaseProvider.Provider == databaseprovider.TypeETCD &&
options.Config.DatabaseProvider.ETCD.InMemory {
// For in-memory etcd we need to register another service to manage its lifecycle.
//
// The client will be initialized asynchronously.
logger.Info("Enabled in-memory etcd")
client := hosting.NewAsyncValue[etcdclient.Client]()
options.Config.StorageProvider.ETCD.Client = client
options.Config.DatabaseProvider.ETCD.Client = client
options.Config.SecretProvider.ETCD.Client = client

hostingSvc = append(hostingSvc, data.NewEmbeddedETCDService(data.EmbeddedETCDServiceOptions{ClientConfigSink: client}))
Expand Down
8 changes: 4 additions & 4 deletions cmd/ucpd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
runtimelog "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/radius-project/radius/pkg/armrpc/hostoptions"
"github.com/radius-project/radius/pkg/ucp/dataprovider"
"github.com/radius-project/radius/pkg/ucp/databaseprovider"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have both database and databaseprovider folders under ucp? I think this is the best practice, right? I am asking because I would create database under ucp and then provider under database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to rearrange these in the next PR 👍

I think it's a big improvement to name the package databaseprovider instead of provider. You'll notice that everywhere we reference these, they get aliased to something else.

"github.com/radius-project/radius/pkg/ucp/hosting"
"github.com/radius-project/radius/pkg/ucp/server"
"github.com/radius-project/radius/pkg/ucp/ucplog"
Expand All @@ -52,13 +52,13 @@ var rootCmd = &cobra.Command{
// Must set the logger before using controller-runtime.
runtimelog.SetLogger(logger)

if options.StorageProviderOptions.Provider == dataprovider.TypeETCD &&
options.StorageProviderOptions.ETCD.InMemory {
if options.DatabaseProviderOptions.Provider == databaseprovider.TypeETCD &&
options.DatabaseProviderOptions.ETCD.InMemory {
// For in-memory etcd we need to register another service to manage its lifecycle.
//
// The client will be initialized asynchronously.
clientconfigSource := hosting.NewAsyncValue[etcdclient.Client]()
options.StorageProviderOptions.ETCD.Client = clientconfigSource
options.DatabaseProviderOptions.ETCD.Client = clientconfigSource
options.SecretProviderOptions.ETCD.Client = clientconfigSource
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/armrpc/asyncoperation/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
"errors"

"github.com/radius-project/radius/pkg/corerp/backend/deployment"
"github.com/radius-project/radius/pkg/ucp/store"
"github.com/radius-project/radius/pkg/ucp/database"

runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)

// Options represents controller options.
type Options struct {
// StorageClient is the data storage client.
StorageClient store.StorageClient
// DatabaseClient is the database client.
DatabaseClient database.Client

// KubeClient is the Kubernetes controller runtime client.
KubeClient runtimeclient.Client
Expand All @@ -44,11 +44,11 @@ type Options struct {
// Validate validates that required fields are set on the options.
func (o Options) Validate() error {
var err error
if o.StorageClient == nil {
err = errors.Join(err, errors.New("StorageClient is required"))
if o.DatabaseClient == nil {
err = errors.Join(err, errors.New(".DatabaseClient is required"))
}
if o.ResourceType == "" {
err = errors.Join(err, errors.New("ResourceType is required"))
err = errors.Join(err, errors.New(".ResourceType is required"))
}

// KubeClient and GetDeploymentProcessor are not used by the majority of the code, so they
Expand All @@ -62,8 +62,8 @@ type Controller interface {
// Run runs async request operation.
Run(ctx context.Context, request *Request) (Result, error)

// StorageClient gets the storage client for resource type.
StorageClient() store.StorageClient
// DatabaseClient gets the database client for resource type.
DatabaseClient() database.Client
}

// BaseController is the base struct of async operation controller.
Expand All @@ -76,9 +76,9 @@ func NewBaseAsyncController(options Options) BaseController {
return BaseController{options}
}

// StorageClient gets storage client for this controller.
func (b *BaseController) StorageClient() store.StorageClient {
return b.options.StorageClient
// DatabaseClient gets database client for this controller.
func (b *BaseController) DatabaseClient() database.Client {
return b.options.DatabaseClient
}

// KubeClient gets Kubernetes client for this controller.
Expand Down
32 changes: 16 additions & 16 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ import (
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/metrics"
"github.com/radius-project/radius/pkg/trace"
"github.com/radius-project/radius/pkg/ucp/database"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"

"github.com/google/uuid"
)

// statusManager includes the necessary functions to manage asynchronous operations.
type statusManager struct {
storageClient store.StorageClient
queue queue.Client
location string
databaseClient database.Client
queue queue.Client
location string
}

// QueueOperationOptions is the options type provided when queueing an async operation.
Expand All @@ -64,11 +64,11 @@ type StatusManager interface {
}

// New creates statusManager instance.
func New(storageClient store.StorageClient, q queue.Client, location string) StatusManager {
func New(databaseClient database.Client, queueClient queue.Client, location string) StatusManager {
return &statusManager{
storageClient: storageClient,
queue: q,
location: location,
databaseClient: databaseClient,
queue: queueClient,
location: location,
}
}

Expand All @@ -78,7 +78,7 @@ func (aom *statusManager) operationStatusResourceID(id resources.ID, operationID
}

// QueueAsyncOperation creates and saves a new status resource with the given parameters in datastore, and queues
// a request message. If an error occurs, the status is deleted using the storeClient.
// a request message. If an error occurs, the status is deleted using the databaseClient.
func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMRequestContext, options QueueOperationOptions) error {
ctx, span := trace.StartProducerSpan(ctx, "statusmanager.QueueAsyncOperation publish", trace.FrontendTracerName)
defer span.End()
Expand Down Expand Up @@ -106,8 +106,8 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
ClientObjectID: sCtx.ClientObjectID,
}

err := aom.storageClient.Save(ctx, &store.Object{
Metadata: store.Metadata{ID: opID},
err := aom.databaseClient.Save(ctx, &database.Object{
Metadata: database.Metadata{ID: opID},
Data: aos,
})

Expand All @@ -116,7 +116,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR
}

if err = aom.queueRequestMessage(ctx, sCtx, aos, options.OperationTimeout); err != nil {
delErr := aom.storageClient.Delete(ctx, opID)
delErr := aom.databaseClient.Delete(ctx, opID)
if delErr != nil {
return delErr
}
Expand All @@ -130,7 +130,7 @@ func (aom *statusManager) QueueAsyncOperation(ctx context.Context, sCtx *v1.ARMR

// Get gets a status object from the datastore or an error if the retrieval fails.
func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID uuid.UUID) (*Status, error) {
obj, err := aom.storageClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
obj, err := aom.databaseClient.Get(ctx, aom.operationStatusResourceID(id, operationID))
if err != nil {
return nil, err
}
Expand All @@ -147,7 +147,7 @@ func (aom *statusManager) Get(ctx context.Context, id resources.ID, operationID
// given parameters, and saves it back to the store.
func (aom *statusManager) Update(ctx context.Context, id resources.ID, operationID uuid.UUID, state v1.ProvisioningState, endTime *time.Time, opError *v1.ErrorDetails) error {
opID := aom.operationStatusResourceID(id, operationID)
obj, err := aom.storageClient.Get(ctx, opID)
obj, err := aom.databaseClient.Get(ctx, opID)
if err != nil {
return err
}
Expand All @@ -170,13 +170,13 @@ func (aom *statusManager) Update(ctx context.Context, id resources.ID, operation

obj.Data = s

return aom.storageClient.Save(ctx, obj, store.WithETag(obj.ETag))
return aom.databaseClient.Save(ctx, obj, database.WithETag(obj.ETag))
}

// Delete deletes the operation status resource associated with the given ID and
// operationID, and returns an error if unsuccessful.
func (aom *statusManager) Delete(ctx context.Context, id resources.ID, operationID uuid.UUID) error {
return aom.storageClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
return aom.databaseClient.Delete(ctx, aom.operationStatusResourceID(id, operationID))
}

// queueRequestMessage function is to put the async operation message to the queue to be worked on.
Expand Down
38 changes: 19 additions & 19 deletions pkg/armrpc/asyncoperation/statusmanager/statusmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ import (
"github.com/google/uuid"
v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
"github.com/radius-project/radius/pkg/armrpc/rpctest"
"github.com/radius-project/radius/pkg/ucp/database"
queue "github.com/radius-project/radius/pkg/ucp/queue/client"
"github.com/radius-project/radius/pkg/ucp/resources"
"github.com/radius-project/radius/pkg/ucp/store"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

type asyncOperationsManagerTest struct {
manager StatusManager
storeClient *store.MockStorageClient
queue *queue.MockClient
manager StatusManager
databaseClient *database.MockClient
queueClient *queue.MockClient
}

const (
Expand All @@ -52,10 +52,10 @@ const (

func setup(tb testing.TB) (asyncOperationsManagerTest, *gomock.Controller) {
ctrl := gomock.NewController(tb)
sc := store.NewMockStorageClient(ctrl)
sc := database.NewMockClient(ctrl)
enq := queue.NewMockClient(ctrl)
aom := New(sc, enq, "test-location")
return asyncOperationsManagerTest{manager: aom, storeClient: sc, queue: enq}, ctrl
return asyncOperationsManagerTest{manager: aom, databaseClient: sc, queueClient: enq}, ctrl
}

var reqCtx = &v1.ARMRequestContext{
Expand Down Expand Up @@ -151,16 +151,16 @@ func TestCreateAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.SaveErr)
aomTest.databaseClient.EXPECT().Save(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.SaveErr)

// We can't expect an async operation to be queued if it is not saved to the DB.
if tt.SaveErr == nil {
aomTest.queue.EXPECT().Enqueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.EnqueueErr)
aomTest.queueClient.EXPECT().Enqueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.EnqueueErr)
}

// If there is an error when enqueuing the message, the async operation should be deleted.
if tt.EnqueueErr != nil {
aomTest.storeClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
aomTest.databaseClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
}

options := QueueOperationOptions{
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestDeleteAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
aomTest.databaseClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.DeleteErr)
rid, err := resources.ParseResource(azureEnvResourceID)
require.NoError(t, err)
err = aomTest.manager.Delete(context.TODO(), rid, uuid.New())
Expand All @@ -226,13 +226,13 @@ func TestGetAsyncOperationStatus(t *testing.T) {
getCases := []struct {
Desc string
GetErr error
Obj *store.Object
Obj *database.Object
}{
{
Desc: "get_success",
GetErr: nil,
Obj: &store.Object{
Metadata: store.Metadata{ID: opID.String(), ETag: "etag"},
Obj: &database.Object{
Metadata: database.Metadata{ID: opID.String(), ETag: "etag"},
Data: testAos,
},
},
Expand All @@ -248,7 +248,7 @@ func TestGetAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.
aomTest.databaseClient.
EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any()).
Return(tt.Obj, tt.GetErr)
Expand All @@ -275,14 +275,14 @@ func TestUpdateAsyncOperationStatus(t *testing.T) {
updateCases := []struct {
Desc string
GetErr error
Obj *store.Object
Obj *database.Object
SaveErr error
}{
{
Desc: "update_success",
GetErr: nil,
Obj: &store.Object{
Metadata: store.Metadata{ID: opID.String(), ETag: "etag"},
Obj: &database.Object{
Metadata: database.Metadata{ID: opID.String(), ETag: "etag"},
Data: testAos,
},
SaveErr: nil,
Expand All @@ -294,13 +294,13 @@ func TestUpdateAsyncOperationStatus(t *testing.T) {
aomTest, mctrl := setup(t)
defer mctrl.Finish()

aomTest.storeClient.
aomTest.databaseClient.
EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any()).
Return(tt.Obj, tt.GetErr)

if tt.GetErr == nil {
aomTest.storeClient.
aomTest.databaseClient.
EXPECT().
Save(gomock.Any(), gomock.Any(), gomock.Any()).
Return(tt.SaveErr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/armrpc/asyncoperation/worker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (h *ControllerRegistry) RegisterDefault(factoryFn ControllerFactoryFunc, op
defer h.ctrlMapMu.Unlock()

// Note: we can't call opts.Validate() here because we don't know the resource type yet.
if opts.StorageClient == nil {
return fmt.Errorf("invalid controller options: .StorageClient is required")
if opts.DatabaseClient == nil {
return fmt.Errorf("invalid controller options: .DatabaseClient is required")
}

h.defaultFactory = factoryFn
Expand Down
6 changes: 3 additions & 3 deletions pkg/armrpc/asyncoperation/worker/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
v1 "github.com/radius-project/radius/pkg/armrpc/api/v1"
ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller"
"github.com/radius-project/radius/pkg/corerp/backend/deployment"
"github.com/radius-project/radius/pkg/ucp/store/inmemory"
"github.com/radius-project/radius/pkg/ucp/database/inmemory"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
Expand All @@ -38,7 +38,7 @@ func TestRegister_Get(t *testing.T) {
opPut := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationPut}

ctrlOpts := ctrl.Options{
StorageClient: inmemory.NewClient(),
DatabaseClient: inmemory.NewClient(),
GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil },
}

Expand Down Expand Up @@ -79,7 +79,7 @@ func TestRegister_Get_WithDefault(t *testing.T) {
opGet := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationGet}

ctrlOpts := ctrl.Options{
StorageClient: inmemory.NewClient(),
DatabaseClient: inmemory.NewClient(),
GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil },
}

Expand Down
Loading
Loading