Thursday, December 15, 2011

rabbitMQ - Publish/Subscribe

@Grab('com.rabbitmq:amqp-client:2.7.0')
import com.rabbitmq.client.*

public class SendReceive {
  private String QUEUE
  private Channel channel
  
  public SendReceive( String hostName = "localhost", String queue = "hello" ) {
        QUEUE = queue
        ConnectionFactory factory = new ConnectionFactory()
//      factory.setUsername(userName)
//      factory.setPassword(password)
//      factory.setVirtualHost(virtualHost)
        factory.setHost(hostName)
//      factory.setPort(portNumber)
        Connection connection = factory.newConnection()
        channel = connection.createChannel()
        channel.exchangeDeclare(QUEUE, "fanout")
  }

  public void send( String message )
      throws java.io.IOException {
        channel.basicPublish(QUEUE, "", null, message.getBytes())
        println(" [x] Sent: $message")
        
        //channel.close()
        //connection.close()
  }

  public void receive()
      throws java.io.IOException, java.lang.InterruptedException {
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, QUEUE, "");
        
        QueueingConsumer consumer = new QueueingConsumer(channel)
        channel.basicConsume(queueName, true, consumer)
        println(" [*] Waiting for messages. To exit press CTRL+C")

        while (true) { //loop forever!
            QueueingConsumer.Delivery delivery = consumer.nextDelivery() // blocks for next message!
            String message = new String(delivery.getBody())
            println(" [x] Received: $message")
        }
    }
}

def input = System.console().&readLine // convenience method

def client = new SendReceive(
    input('\nEnter rabbitMQ host location: '),
    input('\nEnter rabbitMQ queue name: ')
)

String userType = input('\n[s]end or [r]eceive?: ')

if (userType && userType == 'r') {
    client.receive()
}
else {
    while(true) {
        String msg = input('\nType a message to send -- To exit press CTRL+C: ')
        client.send(msg)
    }
}

No comments:

Post a Comment