Skip to content

Commit

Permalink
[FIXED JENKINS-28994] implement optional concurrency limit for parall…
Browse files Browse the repository at this point in the history
…el blocks

The closure parameters to a parallel block call can now be pre-pended by a
number which is interpreted as a limit on the number of closure blocks to be
executed in parallel.

Example: parallel(2, {build("job1")}, {build("job2")}, {build("job3")})
The above would execute up to two of the specified jobs simultaneously.
  • Loading branch information
jcarrothers-sap committed Jun 20, 2015
1 parent bd179ac commit 99044be
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 10 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ compared to join plugin, parallel can be used for more complex workflows where t
}
)

it is possible to limit the concurrency of parallel blocks, preventing overwhelming concurrency and/or a very large queue :

parallel ( 2,
// jobs 1-6 will be scheduled in parallel,
// but only two will be executing and/or queued at any given time
{ build("job1") },
{ build("job2") },
{ build("job3") }
{ build("job4") }
{ build("job5") }
{ build("job6") }
)

you also can "name" parallel executions, so you can later use reference to extract parameters / status :

join = parallel ([
Expand Down
22 changes: 13 additions & 9 deletions src/main/groovy/com/cloudbees/plugins/flow/FlowDSL.groovy
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*
* The MIT License
*
* Copyright (c) 2013, CloudBees, Inc., Nicolas De Loof.
* Cisco Systems, Inc., a California corporation
* Copyright (c) 2013-2015, CloudBees, Inc., Nicolas De Loof.
* Cisco Systems, Inc., a California corporation
* SAP AG
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -386,34 +387,37 @@ public class FlowDelegate {
}

// allows syntax like : parallel(["Kohsuke","Nicolas"].collect { name -> return { build("job1", param1:name) } })
def List<FlowState> parallel(Collection<? extends Closure> closures) {
parallel(closures as Closure[])
def List<FlowState> parallel(int maxThreads = 0, Collection<? extends Closure> closures) {
parallel(maxThreads, closures as Closure[])
}

// allows collecting job status by name rather than by index
// inspired by https://github.com/caolan/async#parallel
def Map<?, FlowState> parallel(Map<?, ? extends Closure> args) {
def Map<?, FlowState> parallel(int maxThreads = 0, Map<?, ? extends Closure> args) {
def keys = new ArrayList<?>()
def closures = new ArrayList<? extends Closure>()
args.entrySet().each { e ->
keys.add(e.key)
closures.add(e.value)
}
def results = new LinkedHashMap<?, FlowState>()
def flowStates = parallel(closures) // as List<FlowState>
def flowStates = parallel(maxThreads, closures) // as List<FlowState>
flowStates.eachWithIndex { v, i -> results[keys[i]] = v }
results
}

def List<FlowState> parallel(Closure ... closures) {
def List<FlowState> parallel(int maxThreads = 0, Closure ... closures) {
statusCheck()
ExecutorService pool = Executors.newCachedThreadPool()
ExecutorService pool = (maxThreads <= 0) ?
Executors.newCachedThreadPool() : Executors.newFixedThreadPool(maxThreads)
Set<Run> upstream = flowRun.state.lastCompleted
Set<Run> lastCompleted = Collections.synchronizedSet(new HashSet<Run>())
def results = new CopyOnWriteArrayList<FlowState>()
def tasks = new ArrayList<Future<FlowState>>()

println("parallel {")
def startMsg = "parallel"
if ( maxThreads > 0 ) startMsg += "( "+maxThreads+" )"
println(startMsg + " {")
++indent

def current_state = flowRun.state
Expand Down
39 changes: 38 additions & 1 deletion src/test/groovy/com/cloudbees/plugins/flow/ParallelTest.groovy
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/*
* The MIT License
*
* Copyright (c) 2013, CloudBees, Inc., Nicolas De Loof.
* Copyright (c) 2013-2015, CloudBees, Inc., Nicolas De Loof.
* SAP AG
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -45,6 +46,21 @@ class ParallelTest extends DSLTestCase {
println flow.jobsGraph.edgeSet()
}

public void testParallelLimitConcurrency() {
def jobs = createJobs(["job1", "job2", "job3", "job4"])
def flow = run("""
parallel( 2,
{ build("job1") },
{ build("job2") },
{ build("job3") },
{ build("job4") },
)
""")
assertAllSuccess(jobs)
assert SUCCESS == flow.result
println flow.jobsGraph.edgeSet()
}

public void testFailOnParallelFailed() {
createJobs(["job1", "job2"])
createFailJob("willFail")
Expand Down Expand Up @@ -128,4 +144,25 @@ class ParallelTest extends DSLTestCase {
println flow.jobsGraph.edgeSet()
}

public void testParallelMapLimitConcurrency() {
def jobs = createJobs(["job1", "job2", "job3"])
def job4 = createJob("job4")
def flow = run("""
join = parallel ( 2, [
first: { build("job1") },
second: { build("job2") },
third: { build("job3") }
])
build("job4",
r1: join.first.result.name,
r2: join.second.lastBuild.parent.name)
""")
assertAllSuccess(jobs)
assertSuccess(job4)
assertHasParameter(job4, "r1", "SUCCESS")
assertHasParameter(job4, "r2", "job2")
assert SUCCESS == flow.result
println flow.jobsGraph.edgeSet()
}

}

0 comments on commit 99044be

Please sign in to comment.