@Grapes(
@Grab(group='org.pcollections', module='pcollections', version='2.1.2')
)
import org.pcollections.*
PSet set = HashTreePSet.empty()
set = set + "something"
println set
println(set + "something else")
println set // immutable
Showing posts with label thread. Show all posts
Showing posts with label thread. Show all posts
Thursday, May 29, 2014
Persistent (Immutable) Collections
Sunday, July 15, 2012
GPars DataflowQueues
//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //
//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //
import static groovyx.gpars.dataflow.Dataflow.operator
import groovyx.gpars.dataflow.*
def _checkout = { String a ->
Thread.sleep( (Long)Math.random() * 5 ); // a bit of random delay for fun
println ">checking out...$a"
return "checkout-$a"
}
def _compileSources = { a ->
println ">compiling...$a"
return "compileSources-$a"
}
def _generateAPIDoc = { a ->
println ">api Docs...$a"
return "generateAPIDoc-$a"
}
def _generateUserDocumentation = { a ->
println ">user Docs...$a"
return "generateUserDocumentation-$a"
}
def _packageProject = { project, api, doc ->
println ">packaging...$project $api $doc"
return "packageProject-$project $api $doc"
}
def _deploy = { a ->
println ">deploying...$a"
return (!!Math.round(Math.random()))
}
/* We need a Broadcaster and Queues to wire elements together */
def checkedOutProjects_B = new DataflowBroadcast()
def urls_Q = new DataflowQueue()
def compiledProjects_Q = new DataflowQueue()
def apiDocs_Q = new DataflowQueue()
def userDocs_Q = new DataflowQueue()
def packages_Q = new DataflowQueue()
def done_Q = new DataflowQueue()
/* Here's the composition of individual build steps into a process */
operator(inputs: [urls_Q], outputs: [checkedOutProjects_B], maxForks: 3) { url ->
bindAllOutputs _checkout(url)
}
operator([checkedOutProjects_B.createReadChannel()], [compiledProjects_Q]) { projectRoot ->
bindOutput _compileSources(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), apiDocs_Q) { projectRoot ->
bindOutput _generateAPIDoc(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), userDocs_Q) { projectRoot ->
bindOutput _generateUserDocumentation(projectRoot)
}
operator([compiledProjects_Q, apiDocs_Q, userDocs_Q], [packages_Q]) { project, api, doc ->
bindOutput _packageProject(project, api, doc)
}
def deployer = operator(packages_Q, done_Q) { packagedProject ->
boolean ok = _deploy(packagedProject)
println "! Deployed? $ok"
bindOutput (ok)
}
/* add data, start the machine a rollin! */
5.times { urls_Q << "url #$it" }
5.times { urls_Q << "url #${++it*5}" }
/* Now we're set up, and can just wait for the build to finish */
println "==Starting the build process. This line MIGHT NOT be printed out first ...=="
//deployer.join() //Wait for the last operator in the network to finish??
Sunday, March 4, 2012
Thread Pools
import java.util.concurrent.Callable
import java.util.concurrent.Executors
long ts // timer
def pool = Executors.newFixedThreadPool(THREADS)
def defer = { c -> pool.submit(c as Callable) }
//PLAIN//////////////////////////////////////////
ts = (new Date().time)
doit = { n ->
def left = { def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp); println Thread.currentThread().name + '!!left!!'+slp }()
def right = { def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp); println Thread.currentThread().name + '!!right!!'+slp }()
}
(1..3).each{ n -> println "n=$n => ${doit(n)}" }
println ((new Date().time)-ts)
//POOLED//////////////////////////////////////////////
ts = (new Date().time)
THREADS = 3
fib = { n ->
def left = defer{ def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp); println Thread.currentThread().name + '!!left!!'+slp }
def right = defer{ def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp); println Thread.currentThread().name + '!!right!!'+slp }
}
(1..3).each{ n -> println "n=$n => ${fib(n)}" }
pool.shutdown()
while(!pool.isTerminated()) { Thread.sleep(100) }
println ((new Date().time)-ts)
//FIB//////////////////////////////////////////////////
println "Calculating Fibonacci sequence in parallel..."
CUTOFF = 12 // not worth parallelizing for small n
THREADS = 10
serialFib = { n -> (n < 2) ? n : serialFib(n-1) + serialFib(n-2) }
fib = { n ->
if (n < CUTOFF) return serialFib(n)
def left = defer{ println Thread.currentThread().name; fib(n-1) }
def right = defer{ println Thread.currentThread().name; fib(n-2) }
left.get() + right.get()
}
(11..16).each{ n -> println "n=$n => ${fib(n)}" }
pool.shutdown()
Subscribe to:
Comments (Atom)