Skip to content

Commit

Permalink
add service
Browse files Browse the repository at this point in the history
  • Loading branch information
lakshmimsft committed Dec 11, 2024
1 parent 9d19b27 commit 4cef599
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 396 deletions.
56 changes: 27 additions & 29 deletions cmd/ucpd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package cmd
import (
"context"
"fmt"
"os"
"time"

"github.com/go-logr/logr"
"github.com/spf13/cobra"
Expand All @@ -31,7 +29,6 @@ import (
"github.com/radius-project/radius/pkg/ucp/dataprovider"
"github.com/radius-project/radius/pkg/ucp/hosting"
"github.com/radius-project/radius/pkg/ucp/server"
"github.com/radius-project/radius/pkg/ucp/ucpclient"
"github.com/radius-project/radius/pkg/ucp/ucplog"
)

Expand Down Expand Up @@ -69,32 +66,33 @@ var rootCmd = &cobra.Command{
if err != nil {
return err
}

// Discuss if there is a better way to check if the server is listening..
// Start RegisterManifests in a goroutine after 15 seconds
go func() {
time.Sleep(15 * time.Second)
// Register manifests
manifestDir := options.Config.Manifests.ManifestDirectory
if _, err := os.Stat(manifestDir); os.IsNotExist(err) {
logger.Error(err, "Manifest directory does not exist", "directory", manifestDir)
return
} else if err != nil {
logger.Error(err, "Error checking manifest directory", "directory", manifestDir)
return
}

ucpclient, err := ucpclient.NewUCPClient(options.UCPConnection)
if err != nil {
logger.Error(err, "Failed to create UCP client")
}

if err := ucpclient.RegisterManifests(cmd.Context(), manifestDir); err != nil {
logger.Error(err, "Failed to register manifests")
} else {
logger.Info("Successfully registered manifests", "directory", manifestDir)
}
}()
/*
// Discuss if there is a better way to check if the server is listening..
// Start RegisterManifests in a goroutine after 15 seconds
go func() {
time.Sleep(15 * time.Second)
// Register manifests
manifestDir := options.Config.Manifests.ManifestDirectory
if _, err := os.Stat(manifestDir); os.IsNotExist(err) {
logger.Error(err, "Manifest directory does not exist", "directory", manifestDir)
return
} else if err != nil {
logger.Error(err, "Error checking manifest directory", "directory", manifestDir)
return
}
ucpclient, err := ucpclient.NewUCPClient(options.UCPConnection)
if err != nil {
logger.Error(err, "Failed to create UCP client")
}
if err := ucpclient.RegisterManifests(cmd.Context(), manifestDir); err != nil {
logger.Error(err, "Failed to register manifests")
} else {
logger.Info("Successfully registered manifests", "directory", manifestDir)
}
}()
*/

ctx := logr.NewContext(cmd.Context(), logger)
return hosting.RunWithInterrupts(ctx, host)
Expand Down
235 changes: 76 additions & 159 deletions pkg/ucp/manifestservice/service.go
Original file line number Diff line number Diff line change
@@ -1,113 +1,56 @@
package manifestservice

// Import your logger, server, ucpclient, etc.
// "github.com/yourorg/yourproject/logger"
// "github.com/yourorg/yourproject/server"
// "github.com/yourorg/yourproject/ucpclient"

/*
// Service is a service to run AsyncReqeustProcessWorker.
type Service struct {
worker.Service
config ucpoptions.UCPConfig
}
// NewService creates new service instance to run AsyncRequestProcessWorker.
func NewService(options hostoptions.HostOptions, config ucpoptions.UCPConfig) *Service {
return &Service{
Service: worker.Service{
ProviderName: UCPProviderName,
Options: options,
},
config: config,
}
}
// Name returns a string containing the UCPProviderName and the text "async worker".
func (w *Service) Name() string {
return fmt.Sprintf("%s async worker", UCPProviderName)
}
Copyright 2023 The Radius Authors.
// Run starts the service and worker. It initializes the service and sets the worker options based on the configuration,
// then starts the service with the given worker options. It returns an error if the initialization fails.
func (w *Service) Run(ctx context.Context) error {
if err := w.Init(ctx); err != nil {
return err
}
workerOpts := worker.Options{}
if w.Options.Config.WorkerServer != nil {
if w.Options.Config.WorkerServer.MaxOperationConcurrency != nil {
workerOpts.MaxOperationConcurrency = *w.Options.Config.WorkerServer.MaxOperationConcurrency
}
if w.Options.Config.WorkerServer.MaxOperationRetryCount != nil {
workerOpts.MaxOperationRetryCount = *w.Options.Config.WorkerServer.MaxOperationRetryCount
}
}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
opts := ctrl.Options{
DataProvider: w.StorageProvider,
}
http://www.apache.org/licenses/LICENSE-2.0
defaultDownstream, err := url.Parse(w.config.Routing.DefaultDownstreamEndpoint)
if err != nil {
return err
}
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

transport := otelhttp.NewTransport(http.DefaultTransport)
err = RegisterControllers(ctx, w.Controllers, w.Options.UCPConnection, transport, opts, defaultDownstream)
if err != nil {
return err
}
package manifestservice

return w.Start(ctx, workerOpts)
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"

"github.com/radius-project/radius/pkg/sdk"
"github.com/radius-project/radius/pkg/ucp/hosting"
ucpoptions "github.com/radius-project/radius/pkg/ucp/hostoptions"
"github.com/radius-project/radius/pkg/ucp/ucpclient"
"github.com/radius-project/radius/pkg/ucp/ucplog"
)

// Service implements the hosting.Service interface for registering manifests.
type Service struct {
ucpConnection sdk.Connection
options ucpoptions.UCPConfig
}

// RegisterControllers registers the controllers for the UCP backend.
func RegisterControllers(ctx context.Context, registry *worker.ControllerRegistry, connection sdk.Connection, transport http.RoundTripper, opts ctrl.Options, defaultDownstream *url.URL) error {
// Tracked resources
err := errors.Join(nil, registry.Register(ctx, v20231001preview.ResourceType, v1.OperationMethod(datamodel.OperationProcess), func(opts ctrl.Options) (ctrl.Controller, error) {
return resourcegroups.NewTrackedResourceProcessController(opts, transport, defaultDownstream)
}, opts))
// Resource providers and related types
err = errors.Join(err, registry.Register(ctx, datamodel.ResourceProviderResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.ResourceProviderPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil
}, opts))
err = errors.Join(err, registry.Register(ctx, datamodel.ResourceProviderResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.ResourceProviderDeleteController{
BaseController: ctrl.NewBaseAsyncController(opts),
Connection: connection,
}, nil
}, opts))
err = errors.Join(err, registry.Register(ctx, datamodel.ResourceTypeResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.ResourceTypePutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil
}, opts))
err = errors.Join(err, registry.Register(ctx, datamodel.ResourceTypeResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.ResourceTypeDeleteController{
BaseController: ctrl.NewBaseAsyncController(opts),
Connection: connection,
}, nil
}, opts))
err = errors.Join(err, registry.Register(ctx, datamodel.APIVersionResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.APIVersionPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil
}, opts))
err = errors.Join(err, registry.Register(ctx, datamodel.APIVersionResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.APIVersionDeleteController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil
}, opts))
err = errors.Join(err, registry.Register(ctx, datamodel.LocationResourceType, v1.OperationPut, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.LocationPutController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil
}, opts))
err = errors.Join(err, registry.Register(ctx, datamodel.LocationResourceType, v1.OperationDelete, func(opts ctrl.Options) (ctrl.Controller, error) {
return &resourceproviders.LocationDeleteController{BaseController: ctrl.NewBaseAsyncController(opts)}, nil
}, opts))
var _ hosting.Service = (*Service)(nil)

if err != nil {
return err
// NewService creates a server to register manifests.
func NewService(connection sdk.Connection, options ucpoptions.UCPConfig) *Service {
return &Service{
ucpConnection: connection,
options: options,
}
}

return nil
// Name gets this service name.
func (s *Service) Name() string {
return "manifestservice"
}

func waitForServer(ctx context.Context, host, port string, retryInterval time.Duration, timeout time.Duration) error {
Expand All @@ -134,76 +77,50 @@ func waitForServer(ctx context.Context, host, port string, retryInterval time.Du
}
}

func main() {
// Start the server in a separate goroutine
go func() {
if err := host.Start(); err != nil {
logger.Error(err, "Server failed to start")
os.Exit(1)
}
}()
func (w *Service) Run(ctx context.Context) error {
logger := ucplog.FromContextOrDiscard(ctx)

// Set up signal handling for graceful shutdown
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)

// Start the manifest registration in a goroutine
go func() {
// Define connection parameters
hostName := host.HostName() // Replace with actual method
port := host.Port() // Replace with actual method
// Define context with timeout for the connection attempts
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Attempt to connect to the server
err := waitForServer(ctx, hostName, port, 500*time.Millisecond, 30*time.Second)
if err != nil {
logger.Error(err, "Server is not available for manifest registration")
return
}
// Server is up, proceed to register manifests
manifestDir := options.Config.Manifests.ManifestDirectory
if _, err := os.Stat(manifestDir); os.IsNotExist(err) {
logger.Error(err, "Manifest directory does not exist", "directory", manifestDir)
return
} else if err != nil {
logger.Error(err, "Error checking manifest directory", "directory", manifestDir)
return
}
//go func() {
// Define connection parameters
hostName := "localhost" //w.ucpConnection.Endpoint()/split to get host? // Replace with actual method
port := "9000" // extract from endpoint Replace with actual method

ucpclient, err := ucpclient.NewUCPClient(options.UCPConnection)
if err != nil {
logger.Error(err, "Failed to create UCP client")
return
}
// Proceed with registering manifests
if err := ucpclient.RegisterManifests(ctx, manifestDir); err != nil {
logger.Error(err, "Failed to register manifests")
return
}
logger.Info("Successfully registered manifests", "directory", manifestDir)
}()
// Attempt to connect to the server
err := waitForServer(ctx, hostName, port, 500*time.Millisecond, 5*time.Second)
if err != nil {
logger.Error(err, "Server is not available for manifest registration")
return nil
}

// Wait for a termination signal
<-stopChan
logger.Info("Received termination signal. Shutting down...")
// Server is up, proceed to register manifests
manifestDir := w.options.Manifests.ManifestDirectory
if _, err := os.Stat(manifestDir); os.IsNotExist(err) {
logger.Error(err, "Manifest directory does not exist", "directory", manifestDir)
return nil
} else if err != nil {
logger.Error(err, "Error checking manifest directory", "directory", manifestDir)
return nil
}

// Gracefully shut down the server
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
ucpclient, err := ucpclient.NewUCPClient(w.ucpConnection)
if err != nil {
logger.Error(err, "Failed to create UCP client")
return nil
}

if err := host.Shutdown(shutdownCtx); err != nil {
logger.Error(err, "Server shutdown failed")
} else {
logger.Info("Server gracefully stopped")
// Proceed with registering manifests
if err := ucpclient.RegisterManifests(ctx, manifestDir); err != nil {
logger.Error(err, "Failed to register manifests")
return nil
}

// Additional cleanup if necessary
logger.Info("Successfully registered manifests", "directory", manifestDir)
//}()

return nil
}
*/
5 changes: 3 additions & 2 deletions pkg/ucp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/radius-project/radius/pkg/ucp/hosting"
"github.com/radius-project/radius/pkg/ucp/hostoptions"

//"github.com/radius-project/radius/pkg/ucp/manifestservice"
"github.com/radius-project/radius/pkg/ucp/manifestservice"
qprovider "github.com/radius-project/radius/pkg/ucp/queue/provider"
"github.com/radius-project/radius/pkg/ucp/rest"
"github.com/radius-project/radius/pkg/ucp/secret/provider"
Expand Down Expand Up @@ -206,7 +206,8 @@ func NewServer(options *Options) (*hosting.Host, error) {
options.TracerProviderOptions.ServiceName = "ucp"
hostingServices = append(hostingServices, &trace.Service{Options: options.TracerProviderOptions})

//hostingServices = append(hostingServices, &manifestservice.Service{Options: options.LoggingOptions})
hostingServices = append(hostingServices, manifestservice.NewService(options.UCPConnection, *options.Config))

return &hosting.Host{
Services: hostingServices,
}, nil
Expand Down
Loading

0 comments on commit 4cef599

Please sign in to comment.