From 4c242d71d5536dc5c45b00710d63ce3104fd540c Mon Sep 17 00:00:00 2001 From: lakshmimsft Date: Wed, 11 Dec 2024 09:16:38 -0800 Subject: [PATCH] add service --- cmd/ucpd/cmd/root.go | 56 ++--- pkg/ucp/manifestservice/service.go | 235 ++++++------------ pkg/ucp/server/server.go | 5 +- .../ucpclient_test_deletethisfile.go | 206 --------------- 4 files changed, 106 insertions(+), 396 deletions(-) delete mode 100644 pkg/ucp/ucpclient/ucpclient_test_deletethisfile.go diff --git a/cmd/ucpd/cmd/root.go b/cmd/ucpd/cmd/root.go index aa1ad71738..b19e064fbd 100644 --- a/cmd/ucpd/cmd/root.go +++ b/cmd/ucpd/cmd/root.go @@ -19,8 +19,6 @@ package cmd import ( "context" "fmt" - "os" - "time" "github.com/go-logr/logr" "github.com/spf13/cobra" @@ -31,7 +29,6 @@ import ( "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/hosting" "github.com/radius-project/radius/pkg/ucp/server" - "github.com/radius-project/radius/pkg/ucp/ucpclient" "github.com/radius-project/radius/pkg/ucp/ucplog" ) @@ -69,32 +66,33 @@ var rootCmd = &cobra.Command{ if err != nil { return err } - - // Discuss if there is a better way to check if the server is listening.. - // Start RegisterManifests in a goroutine after 15 seconds - go func() { - time.Sleep(15 * time.Second) - // Register manifests - manifestDir := options.Config.Manifests.ManifestDirectory - if _, err := os.Stat(manifestDir); os.IsNotExist(err) { - logger.Error(err, "Manifest directory does not exist", "directory", manifestDir) - return - } else if err != nil { - logger.Error(err, "Error checking manifest directory", "directory", manifestDir) - return - } - - ucpclient, err := ucpclient.NewUCPClient(options.UCPConnection) - if err != nil { - logger.Error(err, "Failed to create UCP client") - } - - if err := ucpclient.RegisterManifests(cmd.Context(), manifestDir); err != nil { - logger.Error(err, "Failed to register manifests") - } else { - logger.Info("Successfully registered manifests", "directory", manifestDir) - } - }() + /* + // Discuss if there is a better way to check if the server is listening.. + // Start RegisterManifests in a goroutine after 15 seconds + go func() { + time.Sleep(15 * time.Second) + // Register manifests + manifestDir := options.Config.Manifests.ManifestDirectory + if _, err := os.Stat(manifestDir); os.IsNotExist(err) { + logger.Error(err, "Manifest directory does not exist", "directory", manifestDir) + return + } else if err != nil { + logger.Error(err, "Error checking manifest directory", "directory", manifestDir) + return + } + + ucpclient, err := ucpclient.NewUCPClient(options.UCPConnection) + if err != nil { + logger.Error(err, "Failed to create UCP client") + } + + if err := ucpclient.RegisterManifests(cmd.Context(), manifestDir); err != nil { + logger.Error(err, "Failed to register manifests") + } else { + logger.Info("Successfully registered manifests", "directory", manifestDir) + } + }() + */ ctx := logr.NewContext(cmd.Context(), logger) return hosting.RunWithInterrupts(ctx, host) diff --git a/pkg/ucp/manifestservice/service.go b/pkg/ucp/manifestservice/service.go index 034a720102..d38f092408 100644 --- a/pkg/ucp/manifestservice/service.go +++ b/pkg/ucp/manifestservice/service.go @@ -1,113 +1,56 @@ -package manifestservice - -// Import your logger, server, ucpclient, etc. -// "github.com/yourorg/yourproject/logger" -// "github.com/yourorg/yourproject/server" -// "github.com/yourorg/yourproject/ucpclient" - /* -// Service is a service to run AsyncReqeustProcessWorker. -type Service struct { - worker.Service - - config ucpoptions.UCPConfig -} - -// NewService creates new service instance to run AsyncRequestProcessWorker. -func NewService(options hostoptions.HostOptions, config ucpoptions.UCPConfig) *Service { - return &Service{ - Service: worker.Service{ - ProviderName: UCPProviderName, - Options: options, - }, - config: config, - } -} - -// Name returns a string containing the UCPProviderName and the text "async worker". -func (w *Service) Name() string { - return fmt.Sprintf("%s async worker", UCPProviderName) -} +Copyright 2023 The Radius Authors. -// Run starts the service and worker. It initializes the service and sets the worker options based on the configuration, -// then starts the service with the given worker options. It returns an error if the initialization fails. -func (w *Service) Run(ctx context.Context) error { - if err := w.Init(ctx); err != nil { - return err - } - - workerOpts := worker.Options{} - if w.Options.Config.WorkerServer != nil { - if w.Options.Config.WorkerServer.MaxOperationConcurrency != nil { - workerOpts.MaxOperationConcurrency = *w.Options.Config.WorkerServer.MaxOperationConcurrency - } - if w.Options.Config.WorkerServer.MaxOperationRetryCount != nil { - workerOpts.MaxOperationRetryCount = *w.Options.Config.WorkerServer.MaxOperationRetryCount - } - } +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - opts := ctrl.Options{ - DataProvider: w.StorageProvider, - } + http://www.apache.org/licenses/LICENSE-2.0 - defaultDownstream, err := url.Parse(w.config.Routing.DefaultDownstreamEndpoint) - if err != nil { - return err - } +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ - transport := otelhttp.NewTransport(http.DefaultTransport) - err = RegisterControllers(ctx, w.Controllers, w.Options.UCPConnection, transport, opts, defaultDownstream) - if err != nil { - return err - } +package manifestservice - return w.Start(ctx, workerOpts) +import ( + "context" + "fmt" + "net" + "os" + "os/signal" + "syscall" + "time" + + "github.com/radius-project/radius/pkg/sdk" + "github.com/radius-project/radius/pkg/ucp/hosting" + ucpoptions "github.com/radius-project/radius/pkg/ucp/hostoptions" + "github.com/radius-project/radius/pkg/ucp/ucpclient" + "github.com/radius-project/radius/pkg/ucp/ucplog" +) + +// Service implements the hosting.Service interface for registering manifests. +type Service struct { + ucpConnection sdk.Connection + options ucpoptions.UCPConfig } -// RegisterControllers registers the controllers for the UCP backend. -func RegisterControllers(ctx context.Context, registry *worker.ControllerRegistry, connection sdk.Connection, transport http.RoundTripper, opts ctrl.Options, defaultDownstream *url.URL) error { - // Tracked resources - err := errors.Join(nil, registry.Register(ctx, v20231001preview.ResourceType, v1.OperationMethod(datamodel.OperationProcess), func(opts ctrl.Options) (ctrl.Controller, error) { - return resourcegroups.NewTrackedResourceProcessController(opts, transport, defaultDownstream) - }, opts)) - - // Resource providers and related types - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceProviderResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.ResourceProviderPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil - }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceProviderResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.ResourceProviderDeleteController{ - BaseController: ctrl.NewBaseAsyncController(opts), - Connection: connection, - }, nil - }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceTypeResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.ResourceTypePutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil - }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.ResourceTypeResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.ResourceTypeDeleteController{ - BaseController: ctrl.NewBaseAsyncController(opts), - Connection: connection, - }, nil - }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.APIVersionResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.APIVersionPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil - }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.APIVersionResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.APIVersionDeleteController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil - }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.LocationResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.LocationPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil - }, opts)) - err = errors.Join(err, registry.Register(ctx, datamodel.LocationResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) { - return &resourceproviders.LocationDeleteController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil - }, opts)) +var _ hosting.Service = (*Service)(nil) - if err != nil { - return err +// NewService creates a server to register manifests. +func NewService(connection sdk.Connection, options ucpoptions.UCPConfig) *Service { + return &Service{ + ucpConnection: connection, + options: options, } +} - return nil +// Name gets this service name. +func (s *Service) Name() string { + return "manifestservice" } func waitForServer(ctx context.Context, host, port string, retryInterval time.Duration, timeout time.Duration) error { @@ -134,76 +77,50 @@ func waitForServer(ctx context.Context, host, port string, retryInterval time.Du } } -func main() { - - // Start the server in a separate goroutine - go func() { - if err := host.Start(); err != nil { - logger.Error(err, "Server failed to start") - os.Exit(1) - } - }() +func (w *Service) Run(ctx context.Context) error { + logger := ucplog.FromContextOrDiscard(ctx) // Set up signal handling for graceful shutdown stopChan := make(chan os.Signal, 1) signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM) // Start the manifest registration in a goroutine - go func() { - // Define connection parameters - hostName := host.HostName() // Replace with actual method - port := host.Port() // Replace with actual method - - // Define context with timeout for the connection attempts - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Attempt to connect to the server - err := waitForServer(ctx, hostName, port, 500*time.Millisecond, 30*time.Second) - if err != nil { - logger.Error(err, "Server is not available for manifest registration") - return - } - - // Server is up, proceed to register manifests - manifestDir := options.Config.Manifests.ManifestDirectory - if _, err := os.Stat(manifestDir); os.IsNotExist(err) { - logger.Error(err, "Manifest directory does not exist", "directory", manifestDir) - return - } else if err != nil { - logger.Error(err, "Error checking manifest directory", "directory", manifestDir) - return - } + //go func() { + // Define connection parameters + hostName := "localhost" //w.ucpConnection.Endpoint()/split to get host? // Replace with actual method + port := "9000" // extract from endpoint Replace with actual method - ucpclient, err := ucpclient.NewUCPClient(options.UCPConnection) - if err != nil { - logger.Error(err, "Failed to create UCP client") - return - } - - // Proceed with registering manifests - if err := ucpclient.RegisterManifests(ctx, manifestDir); err != nil { - logger.Error(err, "Failed to register manifests") - return - } - - logger.Info("Successfully registered manifests", "directory", manifestDir) - }() + // Attempt to connect to the server + err := waitForServer(ctx, hostName, port, 500*time.Millisecond, 5*time.Second) + if err != nil { + logger.Error(err, "Server is not available for manifest registration") + return nil + } - // Wait for a termination signal - <-stopChan - logger.Info("Received termination signal. Shutting down...") + // Server is up, proceed to register manifests + manifestDir := w.options.Manifests.ManifestDirectory + if _, err := os.Stat(manifestDir); os.IsNotExist(err) { + logger.Error(err, "Manifest directory does not exist", "directory", manifestDir) + return nil + } else if err != nil { + logger.Error(err, "Error checking manifest directory", "directory", manifestDir) + return nil + } - // Gracefully shut down the server - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer shutdownCancel() + ucpclient, err := ucpclient.NewUCPClient(w.ucpConnection) + if err != nil { + logger.Error(err, "Failed to create UCP client") + return nil + } - if err := host.Shutdown(shutdownCtx); err != nil { - logger.Error(err, "Server shutdown failed") - } else { - logger.Info("Server gracefully stopped") + // Proceed with registering manifests + if err := ucpclient.RegisterManifests(ctx, manifestDir); err != nil { + logger.Error(err, "Failed to register manifests") + return nil } - // Additional cleanup if necessary + logger.Info("Successfully registered manifests", "directory", manifestDir) + //}() + + return nil } -*/ diff --git a/pkg/ucp/server/server.go b/pkg/ucp/server/server.go index 6d9d0cb068..5d414e0357 100644 --- a/pkg/ucp/server/server.go +++ b/pkg/ucp/server/server.go @@ -39,7 +39,7 @@ import ( "github.com/radius-project/radius/pkg/ucp/hosting" "github.com/radius-project/radius/pkg/ucp/hostoptions" - //"github.com/radius-project/radius/pkg/ucp/manifestservice" + "github.com/radius-project/radius/pkg/ucp/manifestservice" qprovider "github.com/radius-project/radius/pkg/ucp/queue/provider" "github.com/radius-project/radius/pkg/ucp/rest" "github.com/radius-project/radius/pkg/ucp/secret/provider" @@ -206,7 +206,8 @@ func NewServer(options *Options) (*hosting.Host, error) { options.TracerProviderOptions.ServiceName = "ucp" hostingServices = append(hostingServices, &trace.Service{Options: options.TracerProviderOptions}) - //hostingServices = append(hostingServices, &manifestservice.Service{Options: options.LoggingOptions}) + hostingServices = append(hostingServices, manifestservice.NewService(options.UCPConnection, *options.Config)) + return &hosting.Host{ Services: hostingServices, }, nil diff --git a/pkg/ucp/ucpclient/ucpclient_test_deletethisfile.go b/pkg/ucp/ucpclient/ucpclient_test_deletethisfile.go deleted file mode 100644 index 61ba93a7e6..0000000000 --- a/pkg/ucp/ucpclient/ucpclient_test_deletethisfile.go +++ /dev/null @@ -1,206 +0,0 @@ -/* -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package ucpclient - -import ( - "context" - "fmt" - "net/http" - - armpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/policy" - azfake "github.com/Azure/azure-sdk-for-go/sdk/azcore/fake" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" - "github.com/radius-project/radius/pkg/to" - "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" - ucpfake "github.com/radius-project/radius/pkg/ucp/api/v20231001preview/fake" -) - -type FakeUCPClient struct { - ResourceProvidersClient v20231001preview.ResourceProvidersClient - ResourceTypesClient v20231001preview.ResourceTypesClient - APIVersionsClient v20231001preview.APIVersionsClient - LocationsClient v20231001preview.LocationsClient -} - -func NewFakeUCPClient() (*FakeUCPClient, error) { - - // Create fake servers for each client - resourceProvidersServer := &ucpfake.ResourceProvidersServer{ - BeginCreateOrUpdate: func( - ctx context.Context, - planeName string, - resourceProviderName string, - resource v20231001preview.ResourceProviderResource, - options *v20231001preview.ResourceProvidersClientBeginCreateOrUpdateOptions, - ) (resp azfake.PollerResponder[v20231001preview.ResourceProvidersClientCreateOrUpdateResponse], errResp azfake.ErrorResponder) { - // Simulate successful creation - result := v20231001preview.ResourceProvidersClientCreateOrUpdateResponse{ - ResourceProviderResource: resource, - } - resp.AddNonTerminalResponse(http.StatusAccepted, nil) - resp.SetTerminalResponse(http.StatusOK, result, nil) - - return - }, - Get: func( - ctx context.Context, - planeName string, - resourceProviderName string, - options *v20231001preview.ResourceProvidersClientGetOptions, - ) (resp azfake.Responder[v20231001preview.ResourceProvidersClientGetResponse], errResp azfake.ErrorResponder) { - response := v20231001preview.ResourceProvidersClientGetResponse{ - ResourceProviderResource: v20231001preview.ResourceProviderResource{ - Name: to.Ptr(resourceProviderName), - }, - } - resp.SetResponse(http.StatusOK, response, nil) - return - }, - } - - // Create other fake servers similarly - resourceTypesServer := &ucpfake.ResourceTypesServer{ - BeginCreateOrUpdate: func( - ctx context.Context, - planeName string, - resourceProviderName string, - resourceTypeName string, - resource v20231001preview.ResourceTypeResource, - options *v20231001preview.ResourceTypesClientBeginCreateOrUpdateOptions, - ) (resp azfake.PollerResponder[v20231001preview.ResourceTypesClientCreateOrUpdateResponse], errResp azfake.ErrorResponder) { - result := v20231001preview.ResourceTypesClientCreateOrUpdateResponse{ - ResourceTypeResource: resource, - } - - // Create a PollerResponder to simulate a long-running operation (LRO) - resp.SetTerminalResponse(http.StatusOK, result, nil) - - return - }, - Get: func( - ctx context.Context, - planeName string, - resourceProviderName string, - resourceTypeName string, - options *v20231001preview.ResourceTypesClientGetOptions, - ) (resp azfake.Responder[v20231001preview.ResourceTypesClientGetResponse], errResp azfake.ErrorResponder) { - response := v20231001preview.ResourceTypesClientGetResponse{ - ResourceTypeResource: v20231001preview.ResourceTypeResource{ - Name: to.Ptr(resourceTypeName), - }, - } - resp.SetResponse(http.StatusOK, response, nil) - return - }, - } - - apiVersionsServer := &ucpfake.APIVersionsServer{ - BeginCreateOrUpdate: func( - ctx context.Context, - planeName string, - resourceProviderName string, - resourceTypeName string, - apiVersionName string, // Added missing parameter - resource v20231001preview.APIVersionResource, - options *v20231001preview.APIVersionsClientBeginCreateOrUpdateOptions, - ) (resp azfake.PollerResponder[v20231001preview.APIVersionsClientCreateOrUpdateResponse], errResp azfake.ErrorResponder) { - // Simulate successful creation - result := v20231001preview.APIVersionsClientCreateOrUpdateResponse{ - APIVersionResource: resource, - } - resp.AddNonTerminalResponse(http.StatusAccepted, nil) - resp.SetTerminalResponse(http.StatusOK, result, nil) - return - }, - } - - locationsServer := &ucpfake.LocationsServer{ - BeginCreateOrUpdate: func( - ctx context.Context, - planeName string, - resourceProviderName string, - locationName string, - resource v20231001preview.LocationResource, - options *v20231001preview.LocationsClientBeginCreateOrUpdateOptions, - ) (resp azfake.PollerResponder[v20231001preview.LocationsClientCreateOrUpdateResponse], errResp azfake.ErrorResponder) { - // Simulate successful creation - result := v20231001preview.LocationsClientCreateOrUpdateResponse{ - LocationResource: resource, - } - resp.AddNonTerminalResponse(http.StatusAccepted, nil) - resp.SetTerminalResponse(http.StatusOK, result, nil) - - return - }, - } - - // Create individual transports for each fake server - resourceProvidersTransport := ucpfake.NewResourceProvidersServerTransport(resourceProvidersServer) - resourceTypesTransport := ucpfake.NewResourceTypesServerTransport(resourceTypesServer) - apiVersionsTransport := ucpfake.NewAPIVersionsServerTransport(apiVersionsServer) - locationsTransport := ucpfake.NewLocationsServerTransport(locationsServer) - - // Configure client options with respective transports - resourceProvidersOptions := &armpolicy.ClientOptions{ - ClientOptions: policy.ClientOptions{ - Transport: resourceProvidersTransport, - }, - } - - resourceTypesOptions := &armpolicy.ClientOptions{ - ClientOptions: policy.ClientOptions{ - Transport: resourceTypesTransport, - }, - } - - apiVersionsOptions := &armpolicy.ClientOptions{ - ClientOptions: policy.ClientOptions{ - Transport: apiVersionsTransport, - }, - } - - locationsOptions := &armpolicy.ClientOptions{ - ClientOptions: policy.ClientOptions{ - Transport: locationsTransport, - }, - } - - credential := &azfake.TokenCredential{} - - resourceProvidersClient, err := v20231001preview.NewResourceProvidersClient(credential, resourceProvidersOptions) - if err != nil { - return nil, fmt.Errorf("failed to create fake ResourceProvidersClient: %w", err) - } - - resourceTypesClient, err := v20231001preview.NewResourceTypesClient(credential, resourceTypesOptions) - if err != nil { - return nil, fmt.Errorf("failed to create fake ResourceTypesClient: %w", err) - } - - apiVersionsClient, err := v20231001preview.NewAPIVersionsClient(credential, apiVersionsOptions) - if err != nil { - return nil, fmt.Errorf("failed to create fake APIVersionsClient: %w", err) - } - - locationsClient, err := v20231001preview.NewLocationsClient(credential, locationsOptions) - if err != nil { - return nil, fmt.Errorf("failed to create fake LocationsClient: %w", err) - } - - // Return the FakeUCPClient instance - return &FakeUCPClient{ - ResourceProvidersClient: *resourceProvidersClient, - ResourceTypesClient: *resourceTypesClient, - APIVersionsClient: *apiVersionsClient, - LocationsClient: *locationsClient, - }, nil -}