Showing posts with label parallel. Show all posts
Showing posts with label parallel. Show all posts

Wednesday, May 28, 2014

GPars Actors

import static groovyx.gpars.actor.Actors.actor

/**
 * A demo showing two cooperating actors. The decryptor decrypts received messages and replies them back.
 * The console actor sends a message to decrypt, prints out the reply and terminates both actors.
 * The main thread waits on both actors to finish using the join() method to prevent premature exit,
 * since both actors use the default actor group, which uses a daemon thread pool.
 * 
 * code derrived from: http://gpars.codehaus.org/Actor
 */

def decryptor = actor {
    loop {
        react {message ->
            Thread.sleep(1000);
            
            if (!(message instanceof String)) {
                stop()
                return
            }
            
            reply message.reverse()
        }
    }
}

def console = actor {
    List strs = ['abc','zyx','Vvv',false];
    int i = -1
    
    loop {
        if (!strs[++i]) stop()
        
        decryptor << strs[i]
        
        println "Sent: ${strs[i]}"
        
        react {msg ->
           println "Decrypted msg: $msg"
        }
    }
}

[decryptor, console]*.join()

Sunday, July 15, 2012

GPars DataflowQueues

//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //

//source: http://jaxenter.com/tutorial-gpars-making-parallel-systems-groovy-and-java-friendly-43529-2.html
//a sample code build process //

import static groovyx.gpars.dataflow.Dataflow.operator
import groovyx.gpars.dataflow.*


def _checkout = { String a ->
    Thread.sleep( (Long)Math.random() * 5 ); // a bit of random delay for fun
    println ">checking out...$a"
    return "checkout-$a"
}
def _compileSources = { a ->
    println ">compiling...$a"
    return "compileSources-$a"
}
def _generateAPIDoc = { a ->
    println ">api Docs...$a"
    return "generateAPIDoc-$a"
}
def _generateUserDocumentation = { a ->
    println ">user Docs...$a"
    return "generateUserDocumentation-$a"
}
def _packageProject = { project, api, doc ->
    println ">packaging...$project $api $doc"
    return "packageProject-$project $api $doc"
}
def _deploy = { a ->
    println ">deploying...$a"
    return (!!Math.round(Math.random()))
}


/* We need a Broadcaster and Queues to wire elements together */
def checkedOutProjects_B = new DataflowBroadcast()
def urls_Q = new DataflowQueue()
def compiledProjects_Q = new DataflowQueue()
def apiDocs_Q = new DataflowQueue()
def userDocs_Q = new DataflowQueue()
def packages_Q = new DataflowQueue()
def done_Q = new DataflowQueue()



/* Here's the composition of individual build steps into a process */
operator(inputs: [urls_Q], outputs: [checkedOutProjects_B], maxForks: 3) { url ->
    bindAllOutputs _checkout(url)
}
operator([checkedOutProjects_B.createReadChannel()], [compiledProjects_Q]) { projectRoot ->
    bindOutput _compileSources(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), apiDocs_Q) { projectRoot ->
    bindOutput _generateAPIDoc(projectRoot)
}
operator(checkedOutProjects_B.createReadChannel(), userDocs_Q) { projectRoot ->
    bindOutput _generateUserDocumentation(projectRoot)
}
operator([compiledProjects_Q, apiDocs_Q, userDocs_Q], [packages_Q]) { project, api, doc ->
    bindOutput _packageProject(project, api, doc)
}

def deployer = operator(packages_Q, done_Q) { packagedProject ->
    boolean ok = _deploy(packagedProject)
    println "! Deployed? $ok"
    bindOutput (ok)
}


/* add data, start the machine a rollin! */
5.times { urls_Q << "url #$it" }
5.times { urls_Q << "url #${++it*5}" }


/* Now we're set up, and can just wait for the build to finish */
println "==Starting the build process. This line MIGHT NOT be printed out first ...=="

//deployer.join()  //Wait for the last operator in the network to finish??

Sunday, March 4, 2012

Thread Pools

import java.util.concurrent.Callable
import java.util.concurrent.Executors

long ts // timer
def pool = Executors.newFixedThreadPool(THREADS)
def defer = { c -> pool.submit(c as Callable) }

//PLAIN//////////////////////////////////////////

ts = (new Date().time)
doit = { n ->
  def left = { def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp);  println Thread.currentThread().name + '!!left!!'+slp }()
  def right = { def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp);  println Thread.currentThread().name + '!!right!!'+slp }()
}
(1..3).each{ n -> println "n=$n => ${doit(n)}" }
println ((new Date().time)-ts)

//POOLED//////////////////////////////////////////////

ts = (new Date().time)
THREADS = 3
fib = { n ->
  def left = defer{ def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp);  println Thread.currentThread().name + '!!left!!'+slp }
  def right = defer{ def slp = Math.random()*n*1000+500l as Long; Thread.sleep(slp);  println Thread.currentThread().name + '!!right!!'+slp }
}
(1..3).each{ n -> println "n=$n => ${fib(n)}" }
pool.shutdown()
while(!pool.isTerminated()) { Thread.sleep(100) }
println ((new Date().time)-ts)

//FIB//////////////////////////////////////////////////

println "Calculating Fibonacci sequence in parallel..."
CUTOFF = 12    // not worth parallelizing for small n
THREADS = 10
serialFib = { n -> (n < 2) ? n : serialFib(n-1) + serialFib(n-2) }
fib = { n ->
  if (n < CUTOFF) return serialFib(n)
  def left = defer{ println Thread.currentThread().name; fib(n-1) }
  def right = defer{ println Thread.currentThread().name; fib(n-2) }
  left.get() + right.get()
}

(11..16).each{ n -> println "n=$n => ${fib(n)}" }
pool.shutdown()

Tuesday, March 29, 2011

GPars Concurrent/Parallel

@Grab(group='org.codehaus.gpars', module='gpars', version='0.11') //not needed in Groovy 1.8.x
import java.util.concurrent.Future
import groovyx.gpars.*
import groovyx.gpars.actor.*
import groovyx.gpars.agent.*

//COLLECTIONS//
List images = ['0001.jpg','2.jpeg','3.png']
GParsPool.withPool {
    def selfPortraits = images.findAllParallel{it.contains('j')}.collectParallel{it.toUpperCase()}
    assert selfPortraits.join(', ') == '0001.JPG, 2.JPEG'
    println selfPortraits.join(', ')

    //a map-reduce functional style
    def smallestSelfPortrait = images.parallel
        .filter{it.contains('j')}
        .map{it.toUpperCase()}
        .min{it.size()}
    assert smallestSelfPortrait == '2.JPEG'
    println smallestSelfPortrait
}

println '*' * 20

//FUTURES//
GParsPool.withPool {
    Closure longLastingCalculation =  { BigInteger a = 1; 2000001.times {i-> a += i }; a }
    //create a new closure, which starts the original closure on a thread pool
    Closure fastCalculation = longLastingCalculation.async()
    //returns almost immediately
    Future result = fastCalculation()
    //do stuff while long calculation performs …
    10.times { print it + '.. '}
    println '\nBLOCKED!'
    //finally ask for the result, blocking if not yet available
    println 'Long result: ' + result.get()
    println 'FINALLY DONE!'
}

println '*' * 20

//ACTORS//
class GameMaster extends DefaultActor {
    int secretNum
    int upto = 20

    void afterStart() {
        secretNum = new Random().nextInt(upto)
    }

    void act() {
        loop {
            react { int num ->
                if (num > secretNum)
                    reply 'too large'
                else if (num < secretNum)
                    reply 'too small'
                else {
                    reply 'you win'
                    terminate()
                }
            }
        }
    }
}

class Player extends DefaultActor {
    String name
    Actor server
    int myNum
    int upto = 20
    int guessCounter = 0

    void act() {
        loop {
            myNum = new Random().nextInt(upto)
            server.send myNum
            react { String msg ->
                guessCounter++
                
                switch (msg) {
                    case 'too large':
                        println "$name: (guess #${guessCounter}) $myNum was too large"
                        break
                    case 'too small':
                        println "$name: (guess #${guessCounter}) $myNum was too small"
                        break
                    case 'you win':
                        println "$name: (guess #${guessCounter}) I won: $myNum"
                        terminate()
                }
            }
        }
    }
}

def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()
[master, player]*.join()
println 'GAME OVER!'

println '*' * 20

//AGENTS//
/**
 * Conference stores number of registrations and allows parties to register and unregister.
 * It inherits from the Agent class and adds the register() and unregister() private methods,
 * which callers may use it the commands they submit to the Conference.
 */
class Conference extends Agent {
    def Conference() { super(0) }
    private def registered() { return data }
    private def register(long num) { data += num }
    private def unregister(long num) { data -= num }
}
final Agent conference = new Conference()  //new Conference created
/**
 * Three external parties will try to register/unregister concurrently
 */
final Thread t1 = Thread.start {
    conference << {register(10L)}               //send a command to register 10 attendees
}
final Thread t4 = Thread.start {
    conference << {register(5L); println "!"+registered()}    //send a command to view registered number
}
final Thread t2 = Thread.start {
    conference << {register(5L)}                //send a command to register 5 attendees
}
final Thread t3 = Thread.start {
    conference << {unregister(13L)}              //send a command to unregister 3 attendees
}

[t1, t2, t3, t4]*.join()

assert 7L == conference.val
println conference.val

println '*' * 20
http://groovyconsole.appspot.com/script/452001