美文网首页
RabbitMQ-Tutorials-One

RabbitMQ-Tutorials-One

作者: Vanes丶 | 来源:发表于2018-11-07 13:38 被阅读0次

    Introduction

    RabbitMQ is a message broker(消息中间件):
    • it accepts and forwards messages.
      • You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. Postman will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.
      • The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.
      • RabbitMQ, and messaging in general, uses some jargon.
    • According to "Hello world in RabbitMQ with Java"

    Brief model (Product-Consume-Model)

    Producing means nothing more than sending. A program that sends messages is a producer :

    Produce
    • A queue is the name for a post box which lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can only be stored inside a queue. A queue is only bound by the host's memory & disk limits, it's essentially a large message buffer. Many producers can send messages that go to one queue, and many consumers can try to receive data from one queue. This is how we represent a queue:
    // queue_name
    口口口口口口口口口口...... 
    // I am sorry about that I am not good at painting :(
    

    Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages:

    Consume
    • Note that the producer, consumer, and broker do not have to reside on the same host; indeed in most applications they don't.

    Start with "Hello,RabbitMQ"

    • In this part of the tutorial we'll write two programs in Java; a producer that sends a single message, and a consumer that receives messages and prints them out. We'll gloss over some of the detail in the Java API, concentrating on this very simple thing just to get started. It's a "Hello World" of messaging.
    1. Simple Send And Receive with Named Queue
    • SimpleSender.java
    package tech.shunzi.mq.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import static tech.shunzi.mq.demo.MQConstants.QUEUE_NAME;
    
    public class SimpleSender {
    
        public static void main(String[] argv) throws Exception
        {
            ConnectionFactory factory = new ConnectionFactory();
    
            // connect to a broker with its name or ip address
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
    
            // create a channel, which is where most of the API for getting things done resides.
            Channel channel = connection.createChannel();
    
            // Declaring a queue is idempotent - it will only be created if it doesn't exist already.
            // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            String message = "Hello World!";
            // The message content is a byte array, so you can encode whatever you like there.
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    
    • SimpleReceiver.java
    package tech.shunzi.mq.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    
    import static tech.shunzi.mq.demo.MQConstants.QUEUE_NAME;
    
    public class SimpleReceiver {
    
        public static void main(String[] argv) throws Exception
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, UnsupportedEncodingException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    • Key Config: ConnectionFactory, Host, Connection, Channel, Queue
    2. Distribute time-consuming tasks among multiple workers
    • NewTask.java
    package tech.shunzi.mq.demo.multi.consumer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class NewTask {
    
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
            // get the argv from the command line and format e.g.: xxx xxx xxx
            String message = getMessage(argv);
    
            // MessageProperties.PERSISTENT_TEXT_PLAIN can make sure message persistent
            // Marking messages as persistent doesn't fully guarantee that a message won't be lost.
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    
        private static String getMessage(String[] strings) {
            if (strings.length < 1) {
                return "Hello World!";
            }
            return joinStrings(strings, " ");
        }
    
        private static String joinStrings(String[] strings, String delimiter) {
            int length = strings.length;
            if (length == 0)
                return "";
            StringBuilder words = new StringBuilder(strings[0]);
            for (int i = 1; i < length; i++) {
                words.append(delimiter).append(strings[i]);
            }
            return words.toString();
        }
    }
    
    
    • Worker.java
    package tech.shunzi.mq.demo.multi.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Worker {
    
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            // make the queue durable.   true
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            // producer can only send a message to a consumer.   balanced load
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
    
                    System.out.println(" [x] Received '" + message + "'");
                    try {
                        // simulate a task which cost some time.
                        doWork(message);
                    } finally {
                        System.out.println(" [x] Done");
                        // false means need ack to verify.
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
        }
    
        private static void doWork(String task) {
            for (char ch : task.toCharArray()) {
                if (ch == '.') {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException _ignored) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    • Some key functions details: interface Channel.java
        /**
         * Declare a queue
         * @see com.rabbitmq.client.AMQP.Queue.Declare
         * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
         * @param queue the name of the queue
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
         * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
         * @param arguments other properties (construction arguments) for the queue
         * @return a declaration-confirm method to indicate the queue was successfully declared
         * @throws java.io.IOException if an error is encountered
         */
        Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
                                     
        /**
         * Publish a message.
         *
         * Publishing to a non-existent exchange will result in a channel-level
         * protocol exception, which closes the channel.
         *
         * Invocations of <code>Channel#basicPublish</code> will eventually block if a
         * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
         *
         * @see com.rabbitmq.client.AMQP.Basic.Publish
         * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
         * @param exchange the exchange to publish the message to
         * @param routingKey the routing key
         * @param props other properties for the message - routing headers etc
         * @param body the message body
         * @throws java.io.IOException if an error is encountered
         */
        void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; 
        
        /**
         * Request a specific prefetchCount "quality of service" settings
         * for this channel.
         *
         * @see #basicQos(int, int, boolean)
         * @param prefetchCount maximum number of messages that the server
         * will deliver, 0 if unlimited
         * @throws java.io.IOException if an error is encountered
         */
        void basicQos(int prefetchCount) throws IOException;
        
        /**
         * Acknowledge one or several received
         * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
         * containing the received message being acknowledged.
         * @see com.rabbitmq.client.AMQP.Basic.Ack
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param multiple true to acknowledge all messages up to and
         * including the supplied delivery tag; false to acknowledge just
         * the supplied delivery tag.
         * @throws java.io.IOException if an error is encountered
         */
        void basicAck(long deliveryTag, boolean multiple) throws IOException;  
        
        /**
         * Start a non-nolocal, non-exclusive consumer, with
         * a server-generated consumerTag.
         * @param queue the name of the queue
         * @param autoAck true if the server should consider messages
         * acknowledged once delivered; false if the server should expect
         * explicit acknowledgements
         * @param callback an interface to the consumer object
         * @return the consumerTag generated by the server
         * @throws java.io.IOException if an error is encountered
         * @see com.rabbitmq.client.AMQP.Basic.Consume
         * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
         * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
         */
        String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    
    • autoAck:
      • true - The msg will be acknowledged once delivered
      • false - The server expects explicit acknowledgements. (It means that manual ack publish is necessary. basicAck())
    • durable: (Producer & Consumer Both Config this)
      • true - To make sure that RabbitMQ will never lose our queue. And it only makes sense when first create/setup.
    • BasicProperties props :
      • MessageProperties.PERSISTENT_TEXT_PLAIN - Make sure the message persistent. Attention: The persistence guarantees aren't strong, but it's more than enough for our simple task queue. One situation is that message is stored in cache and hasn't be stored on disk.
    • prefetchCount: maximum number of messages that the server will deliver
      • 1 - This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
      • ...

    相关文章

      网友评论

          本文标题:RabbitMQ-Tutorials-One

          本文链接:https://www.haomeiwen.com/subject/shfvxqtx.html