@Grab('com.rabbitmq:amqp-client:2.7.0') import com.rabbitmq.client.* public class SendReceive { private String QUEUE private Channel channel public SendReceive( String hostName = "localhost", String queue = "hello" ) { QUEUE = queue ConnectionFactory factory = new ConnectionFactory() // factory.setUsername(userName) // factory.setPassword(password) // factory.setVirtualHost(virtualHost) factory.setHost(hostName) // factory.setPort(portNumber) Connection connection = factory.newConnection() channel = connection.createChannel() channel.exchangeDeclare(QUEUE, "fanout") } public void send( String message ) throws java.io.IOException { channel.basicPublish(QUEUE, "", null, message.getBytes()) println(" [x] Sent: $message") //channel.close() //connection.close() } public void receive() throws java.io.IOException, java.lang.InterruptedException { String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, QUEUE, ""); QueueingConsumer consumer = new QueueingConsumer(channel) channel.basicConsume(queueName, true, consumer) println(" [*] Waiting for messages. To exit press CTRL+C") while (true) { //loop forever! QueueingConsumer.Delivery delivery = consumer.nextDelivery() // blocks for next message! String message = new String(delivery.getBody()) println(" [x] Received: $message") } } } def input = System.console().&readLine // convenience method def client = new SendReceive( input('\nEnter rabbitMQ host location: '), input('\nEnter rabbitMQ queue name: ') ) String userType = input('\n[s]end or [r]eceive?: ') if (userType && userType == 'r') { client.receive() } else { while(true) { String msg = input('\nType a message to send -- To exit press CTRL+C: ') client.send(msg) } }
Thursday, December 15, 2011
rabbitMQ - Publish/Subscribe
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment