美文网首页
RabbitMQ/安装/队列消息

RabbitMQ/安装/队列消息

作者: 米刀灵 | 来源:发表于2016-08-28 20:47 被阅读458次

    当在很短的HTTP请求间需要执行复杂的任务时,队列的主要任务是:避免立刻执行资源密集型任务,这样的概念在web应用中极其有用。使用任务队列的另一个好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。

    安装:
    1.安装Erlang
    配置环境变量 ERLANG_HOME C:\Program Files \erl8.0
    添加到PATH %ERLANG_HOME%\bin;
    2.安装RabbitMQ
    默认安装的Rabbit MQ 监听端口是5672。使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器,已管理员身份在RabbitMQ的sbin目录下执行rabbitmq-plugins.bat" enable rabbitmq_management然后重启服务net stop RabbitMQ && net start RabbitMQ。访问http://localhost:15672 ,默认的登陆账号:guest,密码:guest。查看运行状态rabbitmqctl status
    3.添加用户

    • 列出RabbitMQ的用户,rabbitmqctl.bat list_users
    • 添加用户,rabbitmqctl.bat add_user geffzhang zsy@2014
    • 设置标签,RabbitMQ支持一些有权限意义的标签,如 administrator,monitoring,policymaker,management。rabbitmqctl.bat set_user_tags admin administrator
    • 赋予权限rabbitmqctl.bat set_permissions -p / geffzhang ".*" ".*" ".*"

    发送队列消息:
    添加依赖:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>${rabbitmq.version}</version>
    </dependency>
    

    发送队列消息:

    package testrabbitmq;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.util.concurrent.TimeoutException;
    
    /**
     * Created by zzhblh on 2016/8/28.
     */
    public class Sender {
        //队列名称
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws java.io.IOException, TimeoutException {
            /**
             * 创建连接连接到MabbitMQ
             */
            ConnectionFactory factory = new ConnectionFactory();
            //设置MabbitMQ所在主机ip或者主机名
            factory.setHost("localhost");
            //创建一个连接
            Connection connection = factory.newConnection();
            //创建一个频道
            Channel channel = connection.createChannel();
            //指定一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //发送的消息
            String message = "hello world!";
            //往队列中发出一条消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //关闭频道和连接
            channel.close();
            connection.close();
        }
    }
    

    接收队列消息:

    package testrabbitmq;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import java.util.concurrent.TimeoutException;
    
    /**
     * Created by zzhblh on 2016/8/28.
     */
    public class Recevicer {
        //队列名称
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws java.io.IOException,
                java.lang.InterruptedException, TimeoutException {
            //打开连接和创建频道,与发送端一样
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            //创建队列消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定消费队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true)
            {
                //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
    
        }
    }
    

    消息应答:
    为了保证消息永远不会丢失,RabbitMQ也支持消息应答(message acknowledgments)。消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ进行信息删除。如果消费者被杀死而没有发送应答,RabbitMQ会重新转发给别的消费者。这种机制并没有超时时间这么一说,RabbitMQ只有在消费者连接断开是重新转发此信息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。

    import com.rabbitmq.client.Channel;  
    import com.rabbitmq.client.Connection;  
    import com.rabbitmq.client.ConnectionFactory;  
    import com.rabbitmq.client.QueueingConsumer;  
      
    public class Work  
    {  
        //队列名称  
        private final static String QUEUE_NAME = "workqueue";  
      
        public static void main(String[] argv) throws java.io.IOException,  
                java.lang.InterruptedException  
        {  
            //区分不同工作进程的输出  
            int hashCode = Work.class.hashCode();  
            //创建连接和频道  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            Connection connection = factory.newConnection();  
            Channel channel = connection.createChannel();  
            //声明队列  
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
            System.out.println(hashCode  
                    + " [*] Waiting for messages. To exit press CTRL+C");  
            QueueingConsumer consumer = new QueueingConsumer(channel);  
            // 指定消费队列  
            //关闭自动应答(分发后立即自动应答,不关心后续任务成功与否),打开应答机制
            boolean ack = false ; 
            channel.basicConsume(QUEUE_NAME, ack, consumer);  
            while (true)  
            {  
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
                String message = new String(delivery.getBody());  
                System.out.println(hashCode + " [x] Received '" + message + "'");  
                //发送应答  
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
                System.out.println(hashCode + " [x] Done"); 
      
            }  
      
        }  
    }  
    

    要确认消息, 或者拒绝消息, 使用对应的 basic_ack 或者 baskc_reject 方法。区别是 reject 方法 它只能处理一条消息,但是 Consuming 可以是一次性提取多条信息的,所以 RabbitMQ 为此做了扩展, 提供了 basic_nack 方法,支持一次性拒绝多条消息。在 reject 和 nack 中还有一个 requeue 参数, 表示被拒绝的消息是否可以被重新分配. 默认是 True . 如果消息被 reject 之后, 不希望再被其它的 Consuming 得到, 可以把 requeue 参数设置成 False。

    消息持久化:
    当RabbitMQ退出或者异常退出,将会丢失所有的队列和信息。我们需要做两件事来确保信息不会被丢失:我们需要给所有的队列和消息设置持久化的标志。

    • 声明队列为持久化的
      在Producer端设置:
    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);
    
    • 声明信息为持久化的。
      在Producer端通过设置MessageProperties的值为PERSISTENT_TEXT_PLAIN:
    channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    

    分发机制:

    • Round-robin分发:
      默认的分发机制,RabbitMQ会一个一个的发送信息给下一个消费者,而不考虑每个任务的时长等等,且是一次性分配,并非一个一个分配。平均的每个消费者将会获得相等数量的消息。这样分发消息的方式叫做round-robin。如果有10个任务,3个消费者,则消费者A获得第1,4,7个任务,消费者A获得第2,5,8个任务,消费者A获得第3,6,8个任务。
    • 公平分发(Fair dispatch):
      有些情况下,默认分发机制(Round-robin)并非是我们想要的。我们可以使用basicQos方法,传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。换句话说,只有在消费者空闲的时候会发送下一条信息。在Comsumer端设置:channel.basicQos(prefetchCount)
      且这种模式下支持动态增加消费者,因为消息并没有发送出去,动态增加了消费者马上投入工作。而默认的转发机制会造成,即使动态增加了消费者,此时的消息已经分配完毕,无法立即加入工作,即使有很多未完成的任务。

    相关文章

      网友评论

          本文标题:RabbitMQ/安装/队列消息

          本文链接:https://www.haomeiwen.com/subject/yoztettx.html