import static groovyx.gpars.actor.Actors.actor
/**
* A demo showing two cooperating actors. The decryptor decrypts received messages and replies them back.
* The console actor sends a message to decrypt, prints out the reply and terminates both actors.
* The main thread waits on both actors to finish using the join() method to prevent premature exit,
* since both actors use the default actor group, which uses a daemon thread pool.
*
* code derrived from: http://gpars.codehaus.org/Actor
*/
def decryptor = actor {
loop {
react {message ->
Thread.sleep(1000);
if (!(message instanceof String)) {
stop()
return
}
reply message.reverse()
}
}
}
def console = actor {
List strs = ['abc','zyx','Vvv',false];
int i = -1
loop {
if (!strs[++i]) stop()
decryptor << strs[i]
println "Sent: ${strs[i]}"
react {msg ->
println "Decrypted msg: $msg"
}
}
}
[decryptor, console]*.join()
Showing posts with label gpars. Show all posts
Showing posts with label gpars. Show all posts
Wednesday, May 28, 2014
GPars Actors
Sunday, July 15, 2012
GPars DataflowQueues
//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //
//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //
import static groovyx.gpars.dataflow.Dataflow.operator
import groovyx.gpars.dataflow.*
def _checkout = { String a ->
Thread.sleep( (Long)Math.random() * 5 ); // a bit of random delay for fun
println ">checking out...$a"
return "checkout-$a"
}
def _compileSources = { a ->
println ">compiling...$a"
return "compileSources-$a"
}
def _generateAPIDoc = { a ->
println ">api Docs...$a"
return "generateAPIDoc-$a"
}
def _generateUserDocumentation = { a ->
println ">user Docs...$a"
return "generateUserDocumentation-$a"
}
def _packageProject = { project, api, doc ->
println ">packaging...$project $api $doc"
return "packageProject-$project $api $doc"
}
def _deploy = { a ->
println ">deploying...$a"
return (!!Math.round(Math.random()))
}
/* We need a Broadcaster and Queues to wire elements together */
def checkedOutProjects_B = new DataflowBroadcast()
def urls_Q = new DataflowQueue()
def compiledProjects_Q = new DataflowQueue()
def apiDocs_Q = new DataflowQueue()
def userDocs_Q = new DataflowQueue()
def packages_Q = new DataflowQueue()
def done_Q = new DataflowQueue()
/* Here's the composition of individual build steps into a process */
operator(inputs: [urls_Q], outputs: [checkedOutProjects_B], maxForks: 3) { url ->
bindAllOutputs _checkout(url)
}
operator([checkedOutProjects_B.createReadChannel()], [compiledProjects_Q]) { projectRoot ->
bindOutput _compileSources(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), apiDocs_Q) { projectRoot ->
bindOutput _generateAPIDoc(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), userDocs_Q) { projectRoot ->
bindOutput _generateUserDocumentation(projectRoot)
}
operator([compiledProjects_Q, apiDocs_Q, userDocs_Q], [packages_Q]) { project, api, doc ->
bindOutput _packageProject(project, api, doc)
}
def deployer = operator(packages_Q, done_Q) { packagedProject ->
boolean ok = _deploy(packagedProject)
println "! Deployed? $ok"
bindOutput (ok)
}
/* add data, start the machine a rollin! */
5.times { urls_Q << "url #$it" }
5.times { urls_Q << "url #${++it*5}" }
/* Now we're set up, and can just wait for the build to finish */
println "==Starting the build process. This line MIGHT NOT be printed out first ...=="
//deployer.join() //Wait for the last operator in the network to finish??
Monday, October 17, 2011
Parallel GETs (throttle test)
import static groovyx.gpars.GParsPool.withPool as parallel
def list = ['http://google.com','http://bing.com','http://dogpile.com','http://ask.com','http://hbsecurity.com']
int maxThreads = list.size()
Range range = list.size()+1..1 // +1 for warmup round
boolean randomizeUrl = true
Long ts // start time for each thread size
println "WARMUP " + "*" * 40
(range).each { throttle ->
ts = Calendar.instance.timeInMillis
parallel( Math.min(maxThreads, throttle) ) { // restrict the thread pool size
list.eachParallel {
Long t = Calendar.instance.timeInMillis
if (randomizeUrl) it += "/?" + Math.random()*10
it.toURL().text // blockin call
println Thread.currentThread().toString() +
" [$it] DONE >> seconds: " + ((Calendar.instance.timeInMillis - t) /1000)
}
}
println "total time: " + ((Calendar.instance.timeInMillis - ts) / 1000)
println "*" * 40
}
println "==DONE=="
source: (here)
Friday, October 14, 2011
GPars: Find the greatest product of five consecutive digits in the 1000-digit number
This post was inspired by this article.
source posted: here (crashes the console!)
/* calc the maximum product of $len consecutive integers $loops times within $bigNum */
String bigNum = '73167176531330624919225119674426574742355349194934969835203127745063262395783180169848018694788518438586156078911294949545950173795833195285320880551112540698747158523863050715693290963295227443043557668966489504452445231617318564030987111217223831136222989342338030813533627661428280644448664523874930358907296290491560440772390713810515859307960866701724271218839987979087922749219016997208880937766572733300105336788122023542180975125454059475224352584907711670556013604839586446706324415722155397536978179778461740649551492908625693219784686224828397224137565705605749026140797296865241453510047482166370484403199890008895243450658541227588666881164271714799244429282308634656748139191231628245861786645835912456652947654568284891288314260769004224219022671055626321111109370544217506941658960408071984038509624554443629812309878799272442849091888458015616609791913387549920052406368991256071760605886116467109407754100225698315520005593572972571636269561882670428252483600823257530420752963450'
int len = 5
int loops = 2000
int poolSize = 2 // for testing
def ts // start time
Integer r // calc result
println "*" * 40
////normal////
ts = Calendar.instance.time.time
loops.times {
r = (0..bigNum.size()-len).collect { bigNum[it..<(it+len)].toList().inject(1) { p,n -> p*n.toInteger() } }.max { it }
}
println ("normal=$r\ntime:" + (Calendar.instance.time.time - ts)/1000)
println "*" * 40
////gpars////
ts = Calendar.instance.time.time
groovyx.gpars.GParsPool.withPool(poolSize) {
loops.times {
r = (0..bigNum.size()-len).collectParallel { bigNum[it..<(it+len)].toList().inject(1) { p,n -> p*n.toInteger() } }.maxParallel { it }
}
}
println ("gpars=$r\ntime:" + (Calendar.instance.time.time - ts)/1000)
println "*" * 40
source posted: here (crashes the console!)
Tuesday, March 29, 2011
GPars Concurrent/Parallel
@Grab(group='org.codehaus.gpars', module='gpars', version='0.11') //not needed in Groovy 1.8.x
import java.util.concurrent.Future
import groovyx.gpars.*
import groovyx.gpars.actor.*
import groovyx.gpars.agent.*
//COLLECTIONS//
List images = ['0001.jpg','2.jpeg','3.png']
GParsPool.withPool {
def selfPortraits = images.findAllParallel{it.contains('j')}.collectParallel{it.toUpperCase()}
assert selfPortraits.join(', ') == '0001.JPG, 2.JPEG'
println selfPortraits.join(', ')
//a map-reduce functional style
def smallestSelfPortrait = images.parallel
.filter{it.contains('j')}
.map{it.toUpperCase()}
.min{it.size()}
assert smallestSelfPortrait == '2.JPEG'
println smallestSelfPortrait
}
println '*' * 20
//FUTURES//
GParsPool.withPool {
Closure longLastingCalculation = { BigInteger a = 1; 2000001.times {i-> a += i }; a }
//create a new closure, which starts the original closure on a thread pool
Closure fastCalculation = longLastingCalculation.async()
//returns almost immediately
Future result = fastCalculation()
//do stuff while long calculation performs …
10.times { print it + '.. '}
println '\nBLOCKED!'
//finally ask for the result, blocking if not yet available
println 'Long result: ' + result.get()
println 'FINALLY DONE!'
}
println '*' * 20
//ACTORS//
class GameMaster extends DefaultActor {
int secretNum
int upto = 20
void afterStart() {
secretNum = new Random().nextInt(upto)
}
void act() {
loop {
react { int num ->
if (num > secretNum)
reply 'too large'
else if (num < secretNum)
reply 'too small'
else {
reply 'you win'
terminate()
}
}
}
}
}
class Player extends DefaultActor {
String name
Actor server
int myNum
int upto = 20
int guessCounter = 0
void act() {
loop {
myNum = new Random().nextInt(upto)
server.send myNum
react { String msg ->
guessCounter++
switch (msg) {
case 'too large':
println "$name: (guess #${guessCounter}) $myNum was too large"
break
case 'too small':
println "$name: (guess #${guessCounter}) $myNum was too small"
break
case 'you win':
println "$name: (guess #${guessCounter}) I won: $myNum"
terminate()
}
}
}
}
}
def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()
[master, player]*.join()
println 'GAME OVER!'
println '*' * 20
//AGENTS//
/**
* Conference stores number of registrations and allows parties to register and unregister.
* It inherits from the Agent class and adds the register() and unregister() private methods,
* which callers may use it the commands they submit to the Conference.
*/
class Conference extends Agent {
def Conference() { super(0) }
private def registered() { return data }
private def register(long num) { data += num }
private def unregister(long num) { data -= num }
}
final Agent conference = new Conference() //new Conference created
/**
* Three external parties will try to register/unregister concurrently
*/
final Thread t1 = Thread.start {
conference << {register(10L)} //send a command to register 10 attendees
}
final Thread t4 = Thread.start {
conference << {register(5L); println "!"+registered()} //send a command to view registered number
}
final Thread t2 = Thread.start {
conference << {register(5L)} //send a command to register 5 attendees
}
final Thread t3 = Thread.start {
conference << {unregister(13L)} //send a command to unregister 3 attendees
}
[t1, t2, t3, t4]*.join()
assert 7L == conference.val
println conference.val
println '*' * 20
http://groovyconsole.appspot.com/script/452001
Subscribe to:
Comments (Atom)