Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
[5.5.x] package dependency/pull updates (#633)
Browse files Browse the repository at this point in the history
* Enable parallel pull for application packages.
* Update package conflict handling during pulls.
* Address review comments
  • Loading branch information
a-palchikov authored Sep 10, 2019
1 parent cb95636 commit 7506c27
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 62 deletions.
2 changes: 1 addition & 1 deletion e
Submodule e updated from a73b24 to ae9a4d
34 changes: 13 additions & 21 deletions lib/app/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func GetDependencies(req GetDependenciesRequest) (result *Dependencies, err erro
return nil, trace.Wrap(err)
}
state := &state{
packages: make(map[loc.Locator]struct{}),
apps: make(map[loc.Locator]struct{}),
visited: make(map[loc.Locator]struct{}),
}
if err = req.getDependencies(req.App, state); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -132,13 +131,13 @@ func (r GetDependenciesRequest) getDependencies(app Application, state *state) e
}
// collect application dependencies, including those of the base application
var appDeps []loc.Locator
baseLocator := app.Manifest.Base()
if baseLocator != nil {
appDeps = append(appDeps, *baseLocator)
baseApp := app.Manifest.Base()
if baseApp != nil {
appDeps = append(appDeps, *baseApp)
}
appDeps = append(appDeps, app.Manifest.Dependencies.GetApps()...)
for _, dependency := range appDeps {
if state.hasApp(dependency) {
if state.hasPackage(dependency) {
continue
}
app, err := r.Apps.GetApp(dependency)
Expand All @@ -161,32 +160,25 @@ func (r GetDependenciesRequest) getDependencies(app Application, state *state) e
return nil
}

func (r *state) hasPackage(pkg loc.Locator) bool {
_, ok := r.packages[pkg]
return ok
}

func (r *state) hasApp(pkg loc.Locator) bool {
_, ok := r.apps[pkg]
func (r *state) hasPackage(loc loc.Locator) bool {
_, ok := r.visited[loc]
return ok
}

func (r *state) addPackage(pkg pack.PackageEnvelope) {
r.packages[pkg.Locator] = struct{}{}
r.deps.Packages = append(r.deps.Packages, pkg)
func (r *state) addPackage(env pack.PackageEnvelope) {
r.visited[env.Locator] = struct{}{}
r.deps.Packages = append(r.deps.Packages, env)
}

func (r *state) addApp(app Application) {
r.apps[app.Package] = struct{}{}
r.visited[app.Package] = struct{}{}
r.deps.Apps = append(r.deps.Apps, app)
}

type state struct {
deps Dependencies
// packages lists collected package dependencies
packages map[loc.Locator]struct{}
// apps lists collected application dependencies
apps map[loc.Locator]struct{}
// visited lists already visited package dependencies
visited map[loc.Locator]struct{}
// runtimePackage is the runtime package dependency.
//
// The runtime package is computed bottom-up - from dependencies to the top-level application.
Expand Down
122 changes: 96 additions & 26 deletions lib/app/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ func (r Puller) PullApp(ctx context.Context, loc loc.Locator) error {
if err != nil {
return trace.Wrap(err)
}
deps.Apps = append(deps.Apps, *app)
return r.Pull(ctx, *deps)
r.onConflict = onConflictDependencies(r.Upsert)
err = r.pull(ctx, *deps)
if err != nil {
return trace.Wrap(err)
}
// Pull the application
r.onConflict = onConflict(r.Upsert)
return r.pullAppWithRetries(ctx, app.Package)
}

// PullPackage pulls the package specified with loc
Expand Down Expand Up @@ -86,23 +92,26 @@ func (r Puller) Pull(ctx context.Context, deps Dependencies) error {
}

func (r Puller) pull(ctx context.Context, deps Dependencies) error {
if err := r.pullPackages(ctx, deps.Packages); err != nil {
return trace.Wrap(err)
}
return r.pullApps(ctx, deps.Apps)
}

func (r Puller) pullPackages(ctx context.Context, packages []pack.PackageEnvelope) error {
group, ctx := run.WithContext(ctx, run.WithParallel(r.Parallel))
for _, env := range deps.Packages {
for _, env := range packages {
group.Go(ctx, r.pullPackageHandler(ctx, env.Locator))
}
if err := group.Wait(); err != nil {
return trace.Wrap(err)
}
// Do not pull application in parallel as the application packages are ordered
// (with dependent packages in the front)
// TODO(dmitri): would be ideal to group applications such that to make them
// pull-friendly in parallel
for _, app := range deps.Apps {
if err := r.pullAppWithRetries(ctx, app.Package); err != nil {
return trace.Wrap(err)
}
return group.Wait()
}

func (r Puller) pullApps(ctx context.Context, apps []Application) error {
group, ctx := run.WithContext(ctx, run.WithParallel(r.Parallel))
for _, app := range apps {
group.Go(ctx, r.pullAppHandler(ctx, app.Package))
}
return nil
return group.Wait()
}

// Puller pulls packages from one service to another
Expand All @@ -124,21 +133,24 @@ type Puller struct {
// Upsert is whether to create or upsert the application or package.
// The flag is applied to all dependencies
Upsert bool
// SkipIfExists indicates whether to avoid pulling if an application or package exists.
// The flag is applied to all dependencies with Upsert taking precedence
SkipIfExists bool
// MetadataOnly allows to pull only app metadata without body
// MetadataOnly specifies whether to only pull package metadata (w/o contents)
MetadataOnly bool
// Parallel defines the number of tasks to run in parallel.
// If < 0, the number of tasks is unrestricted.
// If in [0,1], the tasks are executed sequentially.
Parallel int
// onConflict specifies the package conflict handler for when the package already
// exists in DstPack.
onConflict conflictHandler
}

func (r *Puller) checkAndSetDefaults() error {
if r.FieldLogger == nil {
r.FieldLogger = logrus.WithField(trace.Component, "pull")
}
if r.onConflict == nil {
r.onConflict = onConflict(r.Upsert)
}
return nil
}

Expand All @@ -164,12 +176,14 @@ func (r Puller) pullPackage(loc loc.Locator) error {
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
if err == nil && !r.Upsert {
if r.SkipIfExists {
if err == nil {
err = r.onConflict(loc)
if utils.IsAbortError(err) {
return nil
}
logger.Info("Package already exists.")
return trace.AlreadyExists("package %v already exists", loc)
if err != nil {
return trace.Wrap(err)
}
}
logger.Info("Pull package.")
reader := ioutil.NopCloser(utils.NopReader())
Expand Down Expand Up @@ -205,6 +219,12 @@ func (r Puller) pullPackage(loc loc.Locator) error {
return trace.Wrap(err)
}

func (r Puller) pullAppHandler(ctx context.Context, loc loc.Locator) func() error {
return func() error {
return r.pullAppWithRetries(ctx, loc)
}
}

func (r Puller) pullAppWithRetries(ctx context.Context, loc loc.Locator) error {
ctx, cancel := context.WithTimeout(ctx, defaults.TransientErrorTimeout)
defer cancel()
Expand Down Expand Up @@ -232,11 +252,13 @@ func (r Puller) pullApp(loc loc.Locator) error {
}
logger := r.WithField("app", loc)
if app != nil && !upsert {
if r.SkipIfExists {
err = r.onConflict(loc)
if utils.IsAbortError(err) {
return nil
}
logger.Info("Application already exists.")
return trace.AlreadyExists("application %v already exists", loc)
if err != nil {
return trace.Wrap(err)
}
}
logger.Info("Pull application.")
var env *pack.PackageEnvelope
Expand Down Expand Up @@ -266,3 +288,51 @@ func (r Puller) pullApp(loc loc.Locator) error {
}
return trace.Wrap(err)
}

// onConflictDependencies returns the conflict handler for dealing with package
// conflicts in application dependencies. When an application is pulled (or pushed)
// from a service, the behavior regarding the conflicts is as following:
// * if a dependent (application) package already exists in the destination service,
// the operation does nothing or upserts the package (subject to upsert attribute)
// * if the top-level application package already exists in the destination service,
// the operation will ether fail with the corresponding error or upsert the package
// (subject to upsert attribute)
func onConflictDependencies(upsert bool) conflictHandler {
if upsert {
return onConflictContinue
}
return onConflictSkip
}

func onConflict(upsert bool) conflictHandler {
if upsert {
return onConflictContinue
}
return onConflictAbort
}

// onConflictAbort is a conflict handler that aborts the pull operation
// with an error
func onConflictAbort(loc loc.Locator) error {
return trace.AlreadyExists("package %v already exists", loc)
}

// onConflictContinue is a conflict handler that continues the pull operation
// if a package already exists in the destination package service
func onConflictContinue(loc loc.Locator) error {
return nil
}

// onConflictSkip is a conflict handler that aborts the pull operation
// w/o error
func onConflictSkip(loc loc.Locator) error {
return utils.Abort(nil)
}

// conflictHandler defines a functional handler to decide whether the active
// pull operation is aborted if the specified package already exists in the
// destination package service.
// If the return is nil, the pull operation continues.
// If the return is a special utils.Abort error, the pull operation is aborted without error.
// If the return is any other error, the pull operation is aborted with said error.
type conflictHandler func(loc.Locator) error
26 changes: 12 additions & 14 deletions lib/builder/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,12 @@ func (s *s3Syncer) Sync(ctx context.Context, builder *Builder, runtimeVersion se
return trace.Wrap(err)
}
puller := libapp.Puller{
FieldLogger: builder.FieldLogger,
SrcPack: env.Packages,
SrcApp: tarballApps,
DstPack: builder.Env.Packages,
DstApp: cacheApps,
Parallel: builder.VendorReq.Parallel,
SkipIfExists: true,
FieldLogger: builder.FieldLogger,
SrcPack: env.Packages,
SrcApp: tarballApps,
DstPack: builder.Env.Packages,
DstApp: cacheApps,
Parallel: builder.VendorReq.Parallel,
}
return puller.PullAppDeps(ctx, builder.appForRuntime(runtimeVersion))
}
Expand Down Expand Up @@ -134,13 +133,12 @@ func (s *packSyncer) Sync(ctx context.Context, builder *Builder, runtimeVersion
return trace.Wrap(err)
}
puller := libapp.Puller{
FieldLogger: builder.FieldLogger,
SrcPack: s.pack,
SrcApp: s.apps,
DstPack: builder.Env.Packages,
DstApp: cacheApps,
Parallel: builder.VendorReq.Parallel,
SkipIfExists: true,
FieldLogger: builder.FieldLogger,
SrcPack: s.pack,
SrcApp: s.apps,
DstPack: builder.Env.Packages,
DstApp: cacheApps,
Parallel: builder.VendorReq.Parallel,
}
err = puller.PullAppDeps(ctx, builder.appForRuntime(runtimeVersion))
if err != nil {
Expand Down

0 comments on commit 7506c27

Please sign in to comment.