From 7506c2702d3af7c8609647a699d6d249eae6ec92 Mon Sep 17 00:00:00 2001 From: a-palchikov Date: Tue, 10 Sep 2019 19:10:51 +0200 Subject: [PATCH] [5.5.x] package dependency/pull updates (#633) * Enable parallel pull for application packages. * Update package conflict handling during pulls. * Address review comments --- e | 2 +- lib/app/dependency.go | 34 +++++------- lib/app/pull.go | 122 +++++++++++++++++++++++++++++++++--------- lib/builder/syncer.go | 26 +++++---- 4 files changed, 122 insertions(+), 62 deletions(-) diff --git a/e b/e index a73b242564..ae9a4d8df1 160000 --- a/e +++ b/e @@ -1 +1 @@ -Subproject commit a73b242564105499f9689ff30aab395d18680745 +Subproject commit ae9a4d8df10771fcbe81e01a61074c323ead72a1 diff --git a/lib/app/dependency.go b/lib/app/dependency.go index dee81ba6c8..4a771638d5 100644 --- a/lib/app/dependency.go +++ b/lib/app/dependency.go @@ -44,8 +44,7 @@ func GetDependencies(req GetDependenciesRequest) (result *Dependencies, err erro return nil, trace.Wrap(err) } state := &state{ - packages: make(map[loc.Locator]struct{}), - apps: make(map[loc.Locator]struct{}), + visited: make(map[loc.Locator]struct{}), } if err = req.getDependencies(req.App, state); err != nil { return nil, trace.Wrap(err) @@ -132,13 +131,13 @@ func (r GetDependenciesRequest) getDependencies(app Application, state *state) e } // collect application dependencies, including those of the base application var appDeps []loc.Locator - baseLocator := app.Manifest.Base() - if baseLocator != nil { - appDeps = append(appDeps, *baseLocator) + baseApp := app.Manifest.Base() + if baseApp != nil { + appDeps = append(appDeps, *baseApp) } appDeps = append(appDeps, app.Manifest.Dependencies.GetApps()...) for _, dependency := range appDeps { - if state.hasApp(dependency) { + if state.hasPackage(dependency) { continue } app, err := r.Apps.GetApp(dependency) @@ -161,32 +160,25 @@ func (r GetDependenciesRequest) getDependencies(app Application, state *state) e return nil } -func (r *state) hasPackage(pkg loc.Locator) bool { - _, ok := r.packages[pkg] - return ok -} - -func (r *state) hasApp(pkg loc.Locator) bool { - _, ok := r.apps[pkg] +func (r *state) hasPackage(loc loc.Locator) bool { + _, ok := r.visited[loc] return ok } -func (r *state) addPackage(pkg pack.PackageEnvelope) { - r.packages[pkg.Locator] = struct{}{} - r.deps.Packages = append(r.deps.Packages, pkg) +func (r *state) addPackage(env pack.PackageEnvelope) { + r.visited[env.Locator] = struct{}{} + r.deps.Packages = append(r.deps.Packages, env) } func (r *state) addApp(app Application) { - r.apps[app.Package] = struct{}{} + r.visited[app.Package] = struct{}{} r.deps.Apps = append(r.deps.Apps, app) } type state struct { deps Dependencies - // packages lists collected package dependencies - packages map[loc.Locator]struct{} - // apps lists collected application dependencies - apps map[loc.Locator]struct{} + // visited lists already visited package dependencies + visited map[loc.Locator]struct{} // runtimePackage is the runtime package dependency. // // The runtime package is computed bottom-up - from dependencies to the top-level application. diff --git a/lib/app/pull.go b/lib/app/pull.go index e97de7ff6f..763c8a7054 100644 --- a/lib/app/pull.go +++ b/lib/app/pull.go @@ -49,8 +49,14 @@ func (r Puller) PullApp(ctx context.Context, loc loc.Locator) error { if err != nil { return trace.Wrap(err) } - deps.Apps = append(deps.Apps, *app) - return r.Pull(ctx, *deps) + r.onConflict = onConflictDependencies(r.Upsert) + err = r.pull(ctx, *deps) + if err != nil { + return trace.Wrap(err) + } + // Pull the application + r.onConflict = onConflict(r.Upsert) + return r.pullAppWithRetries(ctx, app.Package) } // PullPackage pulls the package specified with loc @@ -86,23 +92,26 @@ func (r Puller) Pull(ctx context.Context, deps Dependencies) error { } func (r Puller) pull(ctx context.Context, deps Dependencies) error { + if err := r.pullPackages(ctx, deps.Packages); err != nil { + return trace.Wrap(err) + } + return r.pullApps(ctx, deps.Apps) +} + +func (r Puller) pullPackages(ctx context.Context, packages []pack.PackageEnvelope) error { group, ctx := run.WithContext(ctx, run.WithParallel(r.Parallel)) - for _, env := range deps.Packages { + for _, env := range packages { group.Go(ctx, r.pullPackageHandler(ctx, env.Locator)) } - if err := group.Wait(); err != nil { - return trace.Wrap(err) - } - // Do not pull application in parallel as the application packages are ordered - // (with dependent packages in the front) - // TODO(dmitri): would be ideal to group applications such that to make them - // pull-friendly in parallel - for _, app := range deps.Apps { - if err := r.pullAppWithRetries(ctx, app.Package); err != nil { - return trace.Wrap(err) - } + return group.Wait() +} + +func (r Puller) pullApps(ctx context.Context, apps []Application) error { + group, ctx := run.WithContext(ctx, run.WithParallel(r.Parallel)) + for _, app := range apps { + group.Go(ctx, r.pullAppHandler(ctx, app.Package)) } - return nil + return group.Wait() } // Puller pulls packages from one service to another @@ -124,21 +133,24 @@ type Puller struct { // Upsert is whether to create or upsert the application or package. // The flag is applied to all dependencies Upsert bool - // SkipIfExists indicates whether to avoid pulling if an application or package exists. - // The flag is applied to all dependencies with Upsert taking precedence - SkipIfExists bool - // MetadataOnly allows to pull only app metadata without body + // MetadataOnly specifies whether to only pull package metadata (w/o contents) MetadataOnly bool // Parallel defines the number of tasks to run in parallel. // If < 0, the number of tasks is unrestricted. // If in [0,1], the tasks are executed sequentially. Parallel int + // onConflict specifies the package conflict handler for when the package already + // exists in DstPack. + onConflict conflictHandler } func (r *Puller) checkAndSetDefaults() error { if r.FieldLogger == nil { r.FieldLogger = logrus.WithField(trace.Component, "pull") } + if r.onConflict == nil { + r.onConflict = onConflict(r.Upsert) + } return nil } @@ -164,12 +176,14 @@ func (r Puller) pullPackage(loc loc.Locator) error { if err != nil && !trace.IsNotFound(err) { return trace.Wrap(err) } - if err == nil && !r.Upsert { - if r.SkipIfExists { + if err == nil { + err = r.onConflict(loc) + if utils.IsAbortError(err) { return nil } - logger.Info("Package already exists.") - return trace.AlreadyExists("package %v already exists", loc) + if err != nil { + return trace.Wrap(err) + } } logger.Info("Pull package.") reader := ioutil.NopCloser(utils.NopReader()) @@ -205,6 +219,12 @@ func (r Puller) pullPackage(loc loc.Locator) error { return trace.Wrap(err) } +func (r Puller) pullAppHandler(ctx context.Context, loc loc.Locator) func() error { + return func() error { + return r.pullAppWithRetries(ctx, loc) + } +} + func (r Puller) pullAppWithRetries(ctx context.Context, loc loc.Locator) error { ctx, cancel := context.WithTimeout(ctx, defaults.TransientErrorTimeout) defer cancel() @@ -232,11 +252,13 @@ func (r Puller) pullApp(loc loc.Locator) error { } logger := r.WithField("app", loc) if app != nil && !upsert { - if r.SkipIfExists { + err = r.onConflict(loc) + if utils.IsAbortError(err) { return nil } - logger.Info("Application already exists.") - return trace.AlreadyExists("application %v already exists", loc) + if err != nil { + return trace.Wrap(err) + } } logger.Info("Pull application.") var env *pack.PackageEnvelope @@ -266,3 +288,51 @@ func (r Puller) pullApp(loc loc.Locator) error { } return trace.Wrap(err) } + +// onConflictDependencies returns the conflict handler for dealing with package +// conflicts in application dependencies. When an application is pulled (or pushed) +// from a service, the behavior regarding the conflicts is as following: +// * if a dependent (application) package already exists in the destination service, +// the operation does nothing or upserts the package (subject to upsert attribute) +// * if the top-level application package already exists in the destination service, +// the operation will ether fail with the corresponding error or upsert the package +// (subject to upsert attribute) +func onConflictDependencies(upsert bool) conflictHandler { + if upsert { + return onConflictContinue + } + return onConflictSkip +} + +func onConflict(upsert bool) conflictHandler { + if upsert { + return onConflictContinue + } + return onConflictAbort +} + +// onConflictAbort is a conflict handler that aborts the pull operation +// with an error +func onConflictAbort(loc loc.Locator) error { + return trace.AlreadyExists("package %v already exists", loc) +} + +// onConflictContinue is a conflict handler that continues the pull operation +// if a package already exists in the destination package service +func onConflictContinue(loc loc.Locator) error { + return nil +} + +// onConflictSkip is a conflict handler that aborts the pull operation +// w/o error +func onConflictSkip(loc loc.Locator) error { + return utils.Abort(nil) +} + +// conflictHandler defines a functional handler to decide whether the active +// pull operation is aborted if the specified package already exists in the +// destination package service. +// If the return is nil, the pull operation continues. +// If the return is a special utils.Abort error, the pull operation is aborted without error. +// If the return is any other error, the pull operation is aborted with said error. +type conflictHandler func(loc.Locator) error diff --git a/lib/builder/syncer.go b/lib/builder/syncer.go index f5e8ba08aa..d67f6c6da6 100644 --- a/lib/builder/syncer.go +++ b/lib/builder/syncer.go @@ -100,13 +100,12 @@ func (s *s3Syncer) Sync(ctx context.Context, builder *Builder, runtimeVersion se return trace.Wrap(err) } puller := libapp.Puller{ - FieldLogger: builder.FieldLogger, - SrcPack: env.Packages, - SrcApp: tarballApps, - DstPack: builder.Env.Packages, - DstApp: cacheApps, - Parallel: builder.VendorReq.Parallel, - SkipIfExists: true, + FieldLogger: builder.FieldLogger, + SrcPack: env.Packages, + SrcApp: tarballApps, + DstPack: builder.Env.Packages, + DstApp: cacheApps, + Parallel: builder.VendorReq.Parallel, } return puller.PullAppDeps(ctx, builder.appForRuntime(runtimeVersion)) } @@ -134,13 +133,12 @@ func (s *packSyncer) Sync(ctx context.Context, builder *Builder, runtimeVersion return trace.Wrap(err) } puller := libapp.Puller{ - FieldLogger: builder.FieldLogger, - SrcPack: s.pack, - SrcApp: s.apps, - DstPack: builder.Env.Packages, - DstApp: cacheApps, - Parallel: builder.VendorReq.Parallel, - SkipIfExists: true, + FieldLogger: builder.FieldLogger, + SrcPack: s.pack, + SrcApp: s.apps, + DstPack: builder.Env.Packages, + DstApp: cacheApps, + Parallel: builder.VendorReq.Parallel, } err = puller.PullAppDeps(ctx, builder.appForRuntime(runtimeVersion)) if err != nil {