

作者: Vanes丶 | 来源:发表于2018-11-07 13:38 被阅读0次


    RabbitMQ is a message broker(消息中间件):
    • it accepts and forwards messages.
      • You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. Postman will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.
      • The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.
      • RabbitMQ, and messaging in general, uses some jargon.
    • According to "Hello world in RabbitMQ with Java"

    Brief model (Product-Consume-Model)

    Producing means nothing more than sending. A program that sends messages is a producer :

    • A queue is the name for a post box which lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can only be stored inside a queue. A queue is only bound by the host's memory & disk limits, it's essentially a large message buffer. Many producers can send messages that go to one queue, and many consumers can try to receive data from one queue. This is how we represent a queue:
    // queue_name
    // I am sorry about that I am not good at painting :(

    Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages:

    • Note that the producer, consumer, and broker do not have to reside on the same host; indeed in most applications they don't.

    Start with "Hello,RabbitMQ"

    • In this part of the tutorial we'll write two programs in Java; a producer that sends a single message, and a consumer that receives messages and prints them out. We'll gloss over some of the detail in the Java API, concentrating on this very simple thing just to get started. It's a "Hello World" of messaging.
    1. Simple Send And Receive with Named Queue
    • SimpleSender.java
    package tech.shunzi.mq.demo;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import static tech.shunzi.mq.demo.MQConstants.QUEUE_NAME;
    public class SimpleSender {
        public static void main(String[] argv) throws Exception
            ConnectionFactory factory = new ConnectionFactory();
            // connect to a broker with its name or ip address
            Connection connection = factory.newConnection();
            // create a channel, which is where most of the API for getting things done resides.
            Channel channel = connection.createChannel();
            // Declaring a queue is idempotent - it will only be created if it doesn't exist already.
            // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // The message content is a byte array, so you can encode whatever you like there.
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
    • SimpleReceiver.java
    package tech.shunzi.mq.demo;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import static tech.shunzi.mq.demo.MQConstants.QUEUE_NAME;
    public class SimpleReceiver {
        public static void main(String[] argv) throws Exception
            ConnectionFactory factory = new ConnectionFactory();
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            Consumer consumer = new DefaultConsumer(channel)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, UnsupportedEncodingException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
            channel.basicConsume(QUEUE_NAME, true, consumer);
    • Key Config: ConnectionFactory, Host, Connection, Channel, Queue
    2. Distribute time-consuming tasks among multiple workers
    • NewTask.java
    package tech.shunzi.mq.demo.multi.consumer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    public class NewTask {
        private static final String TASK_QUEUE_NAME = "task_queue";
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            // get the argv from the command line and format e.g.: xxx xxx xxx
            String message = getMessage(argv);
            // MessageProperties.PERSISTENT_TEXT_PLAIN can make sure message persistent
            // Marking messages as persistent doesn't fully guarantee that a message won't be lost.
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        private static String getMessage(String[] strings) {
            if (strings.length < 1) {
                return "Hello World!";
            return joinStrings(strings, " ");
        private static String joinStrings(String[] strings, String delimiter) {
            int length = strings.length;
            if (length == 0)
                return "";
            StringBuilder words = new StringBuilder(strings[0]);
            for (int i = 1; i < length; i++) {
            return words.toString();
    • Worker.java
    package tech.shunzi.mq.demo.multi.consumer;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    public class Worker {
        private static final String TASK_QUEUE_NAME = "task_queue";
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            // make the queue durable.   true
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // producer can only send a message to a consumer.   balanced load
            final Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                    try {
                        // simulate a task which cost some time.
                    } finally {
                        System.out.println(" [x] Done");
                        // false means need ack to verify.
                        channel.basicAck(envelope.getDeliveryTag(), false);
            channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
        private static void doWork(String task) {
            for (char ch : task.toCharArray()) {
                if (ch == '.') {
                    try {
                    } catch (InterruptedException _ignored) {
    • Some key functions details: interface Channel.java
         * Declare a queue
         * @see com.rabbitmq.client.AMQP.Queue.Declare
         * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
         * @param queue the name of the queue
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
         * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
         * @param arguments other properties (construction arguments) for the queue
         * @return a declaration-confirm method to indicate the queue was successfully declared
         * @throws java.io.IOException if an error is encountered
        Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
         * Publish a message.
         * Publishing to a non-existent exchange will result in a channel-level
         * protocol exception, which closes the channel.
         * Invocations of <code>Channel#basicPublish</code> will eventually block if a
         * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
         * @see com.rabbitmq.client.AMQP.Basic.Publish
         * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
         * @param exchange the exchange to publish the message to
         * @param routingKey the routing key
         * @param props other properties for the message - routing headers etc
         * @param body the message body
         * @throws java.io.IOException if an error is encountered
        void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; 
         * Request a specific prefetchCount "quality of service" settings
         * for this channel.
         * @see #basicQos(int, int, boolean)
         * @param prefetchCount maximum number of messages that the server
         * will deliver, 0 if unlimited
         * @throws java.io.IOException if an error is encountered
        void basicQos(int prefetchCount) throws IOException;
         * Acknowledge one or several received
         * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
         * containing the received message being acknowledged.
         * @see com.rabbitmq.client.AMQP.Basic.Ack
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param multiple true to acknowledge all messages up to and
         * including the supplied delivery tag; false to acknowledge just
         * the supplied delivery tag.
         * @throws java.io.IOException if an error is encountered
        void basicAck(long deliveryTag, boolean multiple) throws IOException;  
         * Start a non-nolocal, non-exclusive consumer, with
         * a server-generated consumerTag.
         * @param queue the name of the queue
         * @param autoAck true if the server should consider messages
         * acknowledged once delivered; false if the server should expect
         * explicit acknowledgements
         * @param callback an interface to the consumer object
         * @return the consumerTag generated by the server
         * @throws java.io.IOException if an error is encountered
         * @see com.rabbitmq.client.AMQP.Basic.Consume
         * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
         * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
        String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    • autoAck:
      • true - The msg will be acknowledged once delivered
      • false - The server expects explicit acknowledgements. (It means that manual ack publish is necessary. basicAck())
    • durable: (Producer & Consumer Both Config this)
      • true - To make sure that RabbitMQ will never lose our queue. And it only makes sense when first create/setup.
    • BasicProperties props :
      • MessageProperties.PERSISTENT_TEXT_PLAIN - Make sure the message persistent. Attention: The persistence guarantees aren't strong, but it's more than enough for our simple task queue. One situation is that message is stored in cache and hasn't be stored on disk.
    • prefetchCount: maximum number of messages that the server will deliver
      • 1 - This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
      • ...



