Showing posts with label gpars. Show all posts
Showing posts with label gpars. Show all posts

Wednesday, May 28, 2014

GPars Actors

import static groovyx.gpars.actor.Actors.actor

/**
 * A demo showing two cooperating actors. The decryptor decrypts received messages and replies them back.
 * The console actor sends a message to decrypt, prints out the reply and terminates both actors.
 * The main thread waits on both actors to finish using the join() method to prevent premature exit,
 * since both actors use the default actor group, which uses a daemon thread pool.
 * 
 * code derrived from: http://gpars.codehaus.org/Actor
 */

def decryptor = actor {
    loop {
        react {message ->
            Thread.sleep(1000);
            
            if (!(message instanceof String)) {
                stop()
                return
            }
            
            reply message.reverse()
        }
    }
}

def console = actor {
    List strs = ['abc','zyx','Vvv',false];
    int i = -1
    
    loop {
        if (!strs[++i]) stop()
        
        decryptor << strs[i]
        
        println "Sent: ${strs[i]}"
        
        react {msg ->
           println "Decrypted msg: $msg"
        }
    }
}

[decryptor, console]*.join()

Sunday, July 15, 2012

GPars DataflowQueues

//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //

//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //

import static groovyx.gpars.dataflow.Dataflow.operator
import groovyx.gpars.dataflow.*


def _checkout = { String a ->
    Thread.sleep( (Long)Math.random() * 5 ); // a bit of random delay for fun
    println ">checking out...$a"
    return "checkout-$a"
}
def _compileSources = { a ->
    println ">compiling...$a"
    return "compileSources-$a"
}
def _generateAPIDoc = { a ->
    println ">api Docs...$a"
    return "generateAPIDoc-$a"
}
def _generateUserDocumentation = { a ->
    println ">user Docs...$a"
    return "generateUserDocumentation-$a"
}
def _packageProject = { project, api, doc ->
    println ">packaging...$project $api $doc"
    return "packageProject-$project $api $doc"
}
def _deploy = { a ->
    println ">deploying...$a"
    return (!!Math.round(Math.random()))
}


/* We need a Broadcaster and Queues to wire elements together */
def checkedOutProjects_B = new DataflowBroadcast()
def urls_Q = new DataflowQueue()
def compiledProjects_Q = new DataflowQueue()
def apiDocs_Q = new DataflowQueue()
def userDocs_Q = new DataflowQueue()
def packages_Q = new DataflowQueue()
def done_Q = new DataflowQueue()



/* Here's the composition of individual build steps into a process */
operator(inputs: [urls_Q], outputs: [checkedOutProjects_B], maxForks: 3) { url ->
    bindAllOutputs _checkout(url)
}
operator([checkedOutProjects_B.createReadChannel()], [compiledProjects_Q]) { projectRoot ->
    bindOutput _compileSources(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), apiDocs_Q) { projectRoot ->
    bindOutput _generateAPIDoc(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), userDocs_Q) { projectRoot ->
    bindOutput _generateUserDocumentation(projectRoot)
}
operator([compiledProjects_Q, apiDocs_Q, userDocs_Q], [packages_Q]) { project, api, doc ->
    bindOutput _packageProject(project, api, doc)
}

def deployer = operator(packages_Q, done_Q) { packagedProject ->
    boolean ok = _deploy(packagedProject)
    println "! Deployed? $ok"
    bindOutput (ok)
}


/* add data, start the machine a rollin! */
5.times { urls_Q << "url #$it" }
5.times { urls_Q << "url #${++it*5}" }


/* Now we're set up, and can just wait for the build to finish */
println "==Starting the build process. This line MIGHT NOT be printed out first ...=="

//deployer.join()  //Wait for the last operator in the network to finish??

Monday, October 17, 2011

Parallel GETs (throttle test)

import static groovyx.gpars.GParsPool.withPool as parallel

def list = ['http://google.com','http://bing.com','http://dogpile.com','http://ask.com','http://hbsecurity.com']

int maxThreads = list.size()
Range range = list.size()+1..1 // +1 for warmup round
boolean randomizeUrl = true

Long ts // start time for each thread size

println "WARMUP " + "*" * 40
(range).each { throttle ->
ts = Calendar.instance.timeInMillis

parallel( Math.min(maxThreads, throttle) ) { // restrict the thread pool size
list.eachParallel {
Long t = Calendar.instance.timeInMillis
if (randomizeUrl) it += "/?" + Math.random()*10
it.toURL().text // blockin call

println Thread.currentThread().toString() + 
" [$it] DONE >> seconds: " + ((Calendar.instance.timeInMillis - t)  /1000)
}
}

println "total time: " + ((Calendar.instance.timeInMillis - ts) / 1000)
println "*" * 40
}
println "==DONE=="

source: (here)

Friday, October 14, 2011

GPars: Find the greatest product of five consecutive digits in the 1000-digit number

This post was inspired by this article.

/* calc the maximum product of $len consecutive integers $loops times within $bigNum */

String bigNum = '73167176531330624919225119674426574742355349194934969835203127745063262395783180169848018694788518438586156078911294949545950173795833195285320880551112540698747158523863050715693290963295227443043557668966489504452445231617318564030987111217223831136222989342338030813533627661428280644448664523874930358907296290491560440772390713810515859307960866701724271218839987979087922749219016997208880937766572733300105336788122023542180975125454059475224352584907711670556013604839586446706324415722155397536978179778461740649551492908625693219784686224828397224137565705605749026140797296865241453510047482166370484403199890008895243450658541227588666881164271714799244429282308634656748139191231628245861786645835912456652947654568284891288314260769004224219022671055626321111109370544217506941658960408071984038509624554443629812309878799272442849091888458015616609791913387549920052406368991256071760605886116467109407754100225698315520005593572972571636269561882670428252483600823257530420752963450'
int len = 5
int loops = 2000

int poolSize = 2 // for testing

def ts // start time
Integer r // calc result

println "*" * 40

////normal////
ts = Calendar.instance.time.time
loops.times {
r = (0..bigNum.size()-len).collect { bigNum[it..<(it+len)].toList().inject(1) { p,n -> p*n.toInteger() } }.max { it }
}
println ("normal=$r\ntime:" + (Calendar.instance.time.time - ts)/1000)

println "*" * 40

////gpars////
ts = Calendar.instance.time.time
groovyx.gpars.GParsPool.withPool(poolSize) {
loops.times {
r = (0..bigNum.size()-len).collectParallel { bigNum[it..<(it+len)].toList().inject(1) { p,n -> p*n.toInteger() } }.maxParallel { it }
}
}
println ("gpars=$r\ntime:" + (Calendar.instance.time.time - ts)/1000)

println "*" * 40


source posted: here (crashes the console!)

Tuesday, March 29, 2011

GPars Concurrent/Parallel

@Grab(group='org.codehaus.gpars', module='gpars', version='0.11') //not needed in Groovy 1.8.x
import java.util.concurrent.Future
import groovyx.gpars.*
import groovyx.gpars.actor.*
import groovyx.gpars.agent.*

//COLLECTIONS//
List images = ['0001.jpg','2.jpeg','3.png']
GParsPool.withPool {
    def selfPortraits = images.findAllParallel{it.contains('j')}.collectParallel{it.toUpperCase()}
    assert selfPortraits.join(', ') == '0001.JPG, 2.JPEG'
    println selfPortraits.join(', ')

    //a map-reduce functional style
    def smallestSelfPortrait = images.parallel
        .filter{it.contains('j')}
        .map{it.toUpperCase()}
        .min{it.size()}
    assert smallestSelfPortrait == '2.JPEG'
    println smallestSelfPortrait
}

println '*' * 20

//FUTURES//
GParsPool.withPool {
    Closure longLastingCalculation =  { BigInteger a = 1; 2000001.times {i-> a += i }; a }
    //create a new closure, which starts the original closure on a thread pool
    Closure fastCalculation = longLastingCalculation.async()
    //returns almost immediately
    Future result = fastCalculation()
    //do stuff while long calculation performs …
    10.times { print it + '.. '}
    println '\nBLOCKED!'
    //finally ask for the result, blocking if not yet available
    println 'Long result: ' + result.get()
    println 'FINALLY DONE!'
}

println '*' * 20

//ACTORS//
class GameMaster extends DefaultActor {
    int secretNum
    int upto = 20

    void afterStart() {
        secretNum = new Random().nextInt(upto)
    }

    void act() {
        loop {
            react { int num ->
                if (num > secretNum)
                    reply 'too large'
                else if (num < secretNum)
                    reply 'too small'
                else {
                    reply 'you win'
                    terminate()
                }
            }
        }
    }
}

class Player extends DefaultActor {
    String name
    Actor server
    int myNum
    int upto = 20
    int guessCounter = 0

    void act() {
        loop {
            myNum = new Random().nextInt(upto)
            server.send myNum
            react { String msg ->
                guessCounter++
                
                switch (msg) {
                    case 'too large':
                        println "$name: (guess #${guessCounter}) $myNum was too large"
                        break
                    case 'too small':
                        println "$name: (guess #${guessCounter}) $myNum was too small"
                        break
                    case 'you win':
                        println "$name: (guess #${guessCounter}) I won: $myNum"
                        terminate()
                }
            }
        }
    }
}

def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()
[master, player]*.join()
println 'GAME OVER!'

println '*' * 20

//AGENTS//
/**
 * Conference stores number of registrations and allows parties to register and unregister.
 * It inherits from the Agent class and adds the register() and unregister() private methods,
 * which callers may use it the commands they submit to the Conference.
 */
class Conference extends Agent {
    def Conference() { super(0) }
    private def registered() { return data }
    private def register(long num) { data += num }
    private def unregister(long num) { data -= num }
}
final Agent conference = new Conference()  //new Conference created
/**
 * Three external parties will try to register/unregister concurrently
 */
final Thread t1 = Thread.start {
    conference << {register(10L)}               //send a command to register 10 attendees
}
final Thread t4 = Thread.start {
    conference << {register(5L); println "!"+registered()}    //send a command to view registered number
}
final Thread t2 = Thread.start {
    conference << {register(5L)}                //send a command to register 5 attendees
}
final Thread t3 = Thread.start {
    conference << {unregister(13L)}              //send a command to unregister 3 attendees
}

[t1, t2, t3, t4]*.join()

assert 7L == conference.val
println conference.val

println '*' * 20
http://groovyconsole.appspot.com/script/452001