//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //
//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //
import static groovyx.gpars.dataflow.Dataflow.operator
import groovyx.gpars.dataflow.*
def _checkout = { String a ->
Thread.sleep( (Long)Math.random() * 5 ); // a bit of random delay for fun
println ">checking out...$a"
return "checkout-$a"
}
def _compileSources = { a ->
println ">compiling...$a"
return "compileSources-$a"
}
def _generateAPIDoc = { a ->
println ">api Docs...$a"
return "generateAPIDoc-$a"
}
def _generateUserDocumentation = { a ->
println ">user Docs...$a"
return "generateUserDocumentation-$a"
}
def _packageProject = { project, api, doc ->
println ">packaging...$project $api $doc"
return "packageProject-$project $api $doc"
}
def _deploy = { a ->
println ">deploying...$a"
return (!!Math.round(Math.random()))
}
/* We need a Broadcaster and Queues to wire elements together */
def checkedOutProjects_B = new DataflowBroadcast()
def urls_Q = new DataflowQueue()
def compiledProjects_Q = new DataflowQueue()
def apiDocs_Q = new DataflowQueue()
def userDocs_Q = new DataflowQueue()
def packages_Q = new DataflowQueue()
def done_Q = new DataflowQueue()
/* Here's the composition of individual build steps into a process */
operator(inputs: [urls_Q], outputs: [checkedOutProjects_B], maxForks: 3) { url ->
bindAllOutputs _checkout(url)
}
operator([checkedOutProjects_B.createReadChannel()], [compiledProjects_Q]) { projectRoot ->
bindOutput _compileSources(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), apiDocs_Q) { projectRoot ->
bindOutput _generateAPIDoc(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), userDocs_Q) { projectRoot ->
bindOutput _generateUserDocumentation(projectRoot)
}
operator([compiledProjects_Q, apiDocs_Q, userDocs_Q], [packages_Q]) { project, api, doc ->
bindOutput _packageProject(project, api, doc)
}
def deployer = operator(packages_Q, done_Q) { packagedProject ->
boolean ok = _deploy(packagedProject)
println "! Deployed? $ok"
bindOutput (ok)
}
/* add data, start the machine a rollin! */
5.times { urls_Q << "url #$it" }
5.times { urls_Q << "url #${++it*5}" }
/* Now we're set up, and can just wait for the build to finish */
println "==Starting the build process. This line MIGHT NOT be printed out first ...=="
//deployer.join() //Wait for the last operator in the network to finish??
Showing posts with label dataflow. Show all posts
Showing posts with label dataflow. Show all posts
Sunday, July 15, 2012
GPars DataflowQueues
Subscribe to:
Comments (Atom)