Skip to content

Commit

Permalink
Refactor proService's updateSubscriptions in anticipation of Landscape
Browse files Browse the repository at this point in the history
  • Loading branch information
EduardGomezEscandell committed Oct 24, 2023
1 parent 9cbf554 commit cf579bd
Showing 1 changed file with 92 additions and 45 deletions.
137 changes: 92 additions & 45 deletions windows-agent/internal/proservices/proservices.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/canonical/ubuntu-pro-for-windows/common"
"github.com/canonical/ubuntu-pro-for-windows/windows-agent/internal/config"
"github.com/canonical/ubuntu-pro-for-windows/windows-agent/internal/distros/database"
"github.com/canonical/ubuntu-pro-for-windows/windows-agent/internal/distros/task"
"github.com/canonical/ubuntu-pro-for-windows/windows-agent/internal/grpc/interceptorschain"
log "github.com/canonical/ubuntu-pro-for-windows/windows-agent/internal/grpc/logstreamer"
"github.com/canonical/ubuntu-pro-for-windows/windows-agent/internal/proservices/landscape"
Expand Down Expand Up @@ -102,7 +103,7 @@ func New(ctx context.Context, args ...Option) (s Manager, err error) {
}()

go func() {
err := updateSubscriptions(ctx, opts.cacheDir, conf, db)
err := updateRegistrySettings(ctx, opts.cacheDir, conf, db)
if err != nil {
log.Warningf(ctx, "Could not update subscriptions: %v", err)
}
Expand Down Expand Up @@ -154,71 +155,117 @@ func (m Manager) RegisterGRPCServices(ctx context.Context) *grpc.Server {
return grpcServer
}

// updateSubscriptions checks if the subscription has changed since the last time it was called. If so, the new subscription
// is pushed to all distros as a deferred task.
func updateSubscriptions(ctx context.Context, cacheDir string, conf *config.Config, db *database.DistroDB) error {
proToken, _, err := conf.Subscription(ctx)
if err != nil {
return fmt.Errorf("could not retrieve current subscription: %v", err)
}
// updateRegistrySettings checks if any of the registry settings have changed since this function was last called.
// If so, updated settings are pushed to the distros.
func updateRegistrySettings(ctx context.Context, cacheDir string, conf *config.Config, db *database.DistroDB) error {
type getTask = func(context.Context, string, *config.Config, *database.DistroDB) (task.Task, error)

if !subscriptionIsNew(ctx, cacheDir, proToken) {
return nil
// Collect tasks for updated settings
var acc error
var taskList []task.Task
for _, f := range []getTask{getNewSubscription} {
task, err := f(ctx, cacheDir, conf, db)
if err != nil {
errors.Join(acc, err)
continue
}
if task != nil {
taskList = append(taskList, task)
}
}

task := tasks.ProAttachment{Token: proToken}
if acc != nil {
log.Warningf(ctx, "Could not obtain some updated registry settings: %v", acc)
}

// Apply tasks for updated settings
acc = nil
for _, d := range db.GetAll() {
err = errors.Join(err, d.SubmitDeferredTasks(task))
acc = errors.Join(acc, d.SubmitDeferredTasks(taskList...))
}

if err != nil {
return fmt.Errorf("could not submit new token to certain distros: %v", err)
if acc != nil {
return fmt.Errorf("could not submit new token to certain distros: %v", acc)
}

return nil
}

// subscriptionIsNew detects if the current subscription is different from the last time it was called.
func subscriptionIsNew(ctx context.Context, cacheDir string, newSubscription string) (new bool) {
cachePath := filepath.Join(cacheDir, "subscription.csum")
newCheckSum := sha512.Sum512([]byte(newSubscription))
// getNewSubscription checks if the subscription has changed since the last time it was called. If so, the new subscription
// is returned in the form of a task.
func getNewSubscription(ctx context.Context, cacheDir string, conf *config.Config, db *database.DistroDB) (task.Task, error) {
proToken, _, err := conf.Subscription(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve current subscription: %v", err)
}

// Update cache on exit
defer func() {
if newSubscription == "" {
// If there is no subscription, we remove the file.
// This preserves this function's idempotency.
err := os.Remove(cachePath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Warningf(ctx, "Could not write new subscription to cache: %v", err)
}
return
}
isNew, err := valueIsNew(filepath.Join(cacheDir, "subscription.csum"), []byte(proToken))
if err != nil {
log.Warningf(ctx, "could not update checksum for Ubuntu Pro subscription: %v", err)
}
if isNew {
return nil, nil
}

if !new {
return
}
log.Debug(ctx, "New Ubuntu Pro subscription settings detected in registry")
return tasks.ProAttachment{Token: proToken}, nil
}

err := os.WriteFile(cachePath, newCheckSum[:], 0600)
if err != nil {
log.Warningf(ctx, "Could not write new subscription to cache: %v", err)
}
}()
// valueIsNew detects if the current value is different from the last time it was used.
// The return value is usable even if error is returned.
func valueIsNew(cachePath string, newValue []byte) (new bool, err error) {
var newCheckSum []byte
if len(newValue) != 0 {
tmp := sha512.Sum512(newValue)
newCheckSum = tmp[:]
}

defer decorateUpdateCache(&new, &err, cachePath, newCheckSum)

oldChecksum, err := os.ReadFile(cachePath)
if errors.Is(err, fs.ErrNotExist) {
// File not found: there was no subscription before
// (Lack of) subscription is new only if the new subscription non-empty.
return newSubscription != ""
// File not found: there was no value before
oldChecksum = nil
} else if err != nil {
log.Warningf(ctx, "Could not read old subscription, assuming subscription is new. Error: %v", err)
return true
return true, fmt.Errorf("could not read old value: %v", err)
}

if slices.Equal(oldChecksum, newCheckSum[:]) {
return false
if slices.Equal(oldChecksum, newCheckSum) {
return false, nil
}

return true
return true, nil
}

// decorateUpdateCache acts depending on caller's return values (hence decorate).
// It stores the new checksum to the cachefile. Any errors are joined to *err.
func decorateUpdateCache(new *bool, err *error, cachePath string, newCheckSum []byte) {
writeCacheErr := func() error {
// If the value is empty, we remove the file.
// This preserves this function's idempotency.
if len(newCheckSum) == 0 {
err := os.Remove(cachePath)
if errors.Is(err, fs.ErrNotExist) {
return nil
}
if err != nil {
return fmt.Errorf("could not remove old checksum: %v", err)
}
return nil
}

// Value is unchanged: don't write to file
if !*new {
return nil
}

// Update to file
if err := os.WriteFile(cachePath, newCheckSum[:], 0600); err != nil {
return fmt.Errorf("could not write checksum to cache: %v", err)
}

return nil
}()

*err = errors.Join(*err, writeCacheErr)
}

0 comments on commit cf579bd

Please sign in to comment.