美文网首页
2019-05-31(RABBIT_MQ)

2019-05-31(RABBIT_MQ)

作者: 威先生2018 | 来源:发表于2020-03-22 15:40 被阅读0次

    RABBIT_MQ是消息队列,即messge queue

    作用:模块之间的解耦、流量削峰、异步消息和消息通信;高性能、高可用和最终一致性的框架
    作用链接:https://www.cnblogs.com/HigginCui/p/6478613.html

    简单教程:https://blog.csdn.net/hellozpc/article/details/81436980

    api:http://rabbitmq.mr-ping.com/
    https://docs.spring.io/spring-amqp/api/overview-summary.html

    高级应用:https://blog.csdn.net/weixin_42763504/article/details/81608237
    https://blog.csdn.net/zhihuirensheng123/article/details/82805349
    https://blog.csdn.net/qq315737546/article/details/54176560
    https://www.cnblogs.com/toov5/p/10288260.html
    https://blog.csdn.net/qq_38455201/article/details/80308771

    应用解耦:订单-》库存,解耦后:订单-》消息通道-》库存
    流量削峰:秒杀活动
    异步消息:用户注册,需要发送短信和邮箱;直接放进消息队列
    消息通信:

    rabbitmq模式

    公共创建链接

    public class RaMqConnCommon {
        public final static String QUEUE_SIMPLE = "test_queue";//简单模式
        public final static String QUEUE_PUBLIC = "test_public_queue";//订阅模式
        public final static String QUEUE_PUBLIC_2 = "test_public_queue2";//订阅模式
        public final static String QUEUE_ROUTE = "test_route_queue";//路由模式队列
        public final static String QUEUE_ROUTE_2 = "test_route_queue2";//路由模式队列
        public final static String EXCHANGE_PUBLIC = "test_public_exchange";//订阅模式交换机
        public final static String EXCHANGE_ROUTE = "test_route_exchange";//路由模式交换机
        public final static String ROUTE_KEY = "routekey";//路由key
        public final static String ROUTE_KEY_2 = "routekey2";//路由key
        /**
         * @Description:  创建RabbitMq链接
         * @Param: []
         * @return: com.rabbitmq.client.Connection
         * @Author: LJW
         * @Date: 16:33
         */
        public static Connection getConnection(){
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("testmq");
            factory.setUsername("guest");
            factory.setPassword("guest");
            Connection conn = null;
            try {
                conn = factory.newConnection();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return conn;
        }
    

    简单模式:

    生产者producer
    1.创建链接
    2.创建通道
    3.声明队列
    4.发送

            //创建链接
            Connection connection = RaMqConnCommon.getConnection();
            //创建渠道
            Channel channel = connection.createChannel();
            //声明队列(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String, Object> arguments)
            channel.queueDeclare(RaMqConnCommon.QUEUE_SIMPLE,false,false,false,null);
            String sendMessge = "Hello World!!";
            //发送消息:交换机、队列、队列特性和内容
            channel.basicPublish("",RaMqConnCommon.QUEUE_SIMPLE,null,sendMessge.getBytes());
            System.out.println("工作模式,多劳多得==========》"+sendMessge);
            channel.close();
            connection.close();
    

    消费者

           //创建链接
            Connection connection = RaMqConnCommon.getConnection();
            //创建渠道
            Channel channel = connection.createChannel();
            //消费
            channel.basicConsume(RaMqConnCommon.QUEUE_SIMPLE,true,new RecevierConsumer(channel));
    
    
    
    public class RecevierConsumer extends DefaultConsumer {
        public RecevierConsumer(Channel channel) {
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //消费的消息
            System.out.println(consumerTag+":接收到的消息="+new String(body));
        }
    }
    

    工作模式:

    生产者和上面一样

    消费者

            //创建链接
            Connection connection = RaMqConnCommon.getConnection();
            //创建渠道
            Channel channel = connection.createChannel();
            //同一时刻服务器只发送一条消息
            channel.basicQos(1);
            //消费,需要确认机制(队列名,ack,consumer)
            channel.basicConsume(RaMqConnCommon.QUEUE_SIMPLE,false,new RecevierConsumer(channel));
    
    public class RecevierConsumer extends DefaultConsumer {
        private Channel channel;
        public RecevierConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //消费的消息
            System.out.println(consumerTag+":接收到的消息="+new String(body));
            //消息确认(false为消息确认,true为未确认)
            channel.basicAck(envelope.getDeliveryTag(),false);
        }
    }
    

    订阅模式:

    1.创建链接
    2.创建通道
    3.声明交换机
    4.发送到交换机

            //创建链接
            Connection connection = RaMqConnCommon.getConnection();
            //创建渠道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(RaMqConnCommon.EXCHANGE_PUBLIC,"fanout");
            String sendMessge = "Hello World!!";
            int i = 0;
            while(true && i<100){
                String str = sendMessge+(++i);
                //发送消息到交换机(exchange、routingKey)
                channel.basicPublish(RaMqConnCommon.EXCHANGE_PUBLIC,"",null,str.getBytes());
            }
    //        //发送消息到交换机
    //        channel.basicPublish(RaMqConnCommon.EXCHANGE_PUBLIC,"",null,sendMessge.getBytes());
    //        System.out.println("订阅模式==========》"+sendMessge);
            channel.close();
            connection.close();
    

    消费者和工作模式一样

    路由模式:

    生产者
    1.创建链接
    2.创建通道
    3.声明交换机
    4.发送到交换机,并且指定路由

    消费者

            //创建链接
            Connection connection = RaMqConnCommon.getConnection();
            //创建渠道
            Channel channel = connection.createChannel();
            //同一时刻服务器只发送一条消息
            channel.basicQos(1);
            //声明队列(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
            channel.queueDeclare(RaMqConnCommon.QUEUE_ROUTE,true,false,false,null);
            //队列绑定交换机(queue,exchange,routekey)
            channel.queueBind(RaMqConnCommon.QUEUE_ROUTE,RaMqConnCommon.EXCHANGE_ROUTE,RaMqConnCommon.ROUTE_KEY);
            //消费(mq,是否自动签收,消费者)
            channel.basicConsume(RaMqConnCommon.QUEUE_ROUTE,false,new RecevierConsumer(channel));
    

    通配符模式:

    高级特性:

    returnListener:
    找不到对应的路由key或交换机,则执行本方法

            channel.addReturnListener(new ReturnListener() {
                /**
                 * int replyCode, 响应吗, 路由成没成功
                 * String replyText, 回复内容
                 * String exchange,
                 * String routingKey,
                 * AMQP.BasicProperties properties,
                 * byte[] body 实际的消息体内容
                 *
                 * @param replyCode
                 * @param replyText
                 * @param exchange
                 * @param routingKey
                 * @param properties
                 * @param body
                 * @throws IOException 跑出ioexception 异常
                 */
    
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("handle return");
                    System.out.println("replyCode: " + replyCode);
                    System.out.println("exchange: " + exchange);
                    System.out.println("routingKey: " + routingKey);
                    System.out.println("properties: " + properties.toString());
                    System.out.println("body: " + new String(body));
                }
            });
    

    ConfirmListener
    判断消息生产者是否发送成功

            // 开启发送方确认模式
            channel.confirmSelect();
            // 6. 添加一个监听
            channel.addConfirmListener(new ConfirmListener() {
                // 成功
                // 关键的唯一的消息标签deliveryTag, multiple :是否批量, 暂时不用管
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("成功 Ack");
                }
    
                // 失败, 比如磁盘写满了,mq 出现一些异常, key 容量达到上限,也有可能handleAck,handleNack 都没有收到, 进行抓取,和重发
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("失败 Nack");
                }
            });
    

    死信队列
    主要是消息过期或消费者拒绝接收,然而此类消息则进入死信队列
    主要是接收的队列绑定死信交换机

    事物

    try {
        channel.txSelect(); // 声明事务
        // 发送消息
        channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
        channel.txCommit(); // 提交事务
    } catch (Exception e) {
        channel.txRollback();
    } finally {
        channel.close();
        conn.close();
    }
    

    spring整合rabbitMq

    <!--        mq======================-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
            </dependency>
    

    发送者

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--    引入druid配置文件-->
        <context:property-placeholder location="classpath:/local/rabbitmq.properties" />
    <!--    <context:component-scan base-package="com.jwl.test.mqConfig"/>-->
    
        <!-- 链接工厂-->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}"
        username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" publisher-confirms="true"
        />
        <!--spring代理rabbit-->
        <rabbit:admin connection-factory="connectionFactory" />
        
        <!--路由交换机-->
        <rabbit:direct-exchange name="test-routekey-exchange" id="test-routekey-exchange"  durable="true"  auto-declare="true"/>
        <!--定义模板--><!-- mandatory必须设置true,return callback才生效 -->
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                         mandatory="true"
                         return-callback="messageReturn"
                         confirm-callback="messageConfirm"
        />
        
    <!--    <bean id="messageConverter" class="com.jwl.common.util.MqMessageConvert"/>-->
        <!--抵达交换机但是找不到响应的路由-->
        <bean id="messageReturn" class="com.jwl.test.mqConfig.MqReturnHandler"/>
    <!--    消息发送者-->
        <bean id="messageConfirm" class="com.jwl.test.mqConfig.MqConfirmListener"/>
        <!--死信交换机-->
        <rabbit:direct-exchange name="dead-letter-exchange" durable="true" auto-declare="true"/>
    </beans>
    

    消费者

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--    引入druid配置文件-->
        <context:property-placeholder location="classpath:/local/rabbitmq.properties" />
    <!--    链接工厂-->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}"
        username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/>
    
    <!--    spring代理rabbit-->
        <rabbit:admin connection-factory="connectionFactory" />
    
    <!--    声明队列,并且绑定死信交换机,过期时间2秒-->
        <rabbit:queue id="test_direct_queue" durable="true" name="test_direct_queue" auto-declare="true">
            <rabbit:queue-arguments>
                <entry key="x-message-ttl" value="2000" value-type="java.lang.Long"/>
                <entry key="x-dead-letter-exchange" value="dead-letter-exchange" />
                <entry key="x-dead-letter-routing-key" value="deadLetterKey" />
            </rabbit:queue-arguments>
        </rabbit:queue>
    
        <!-- 声明死信队列-->
        <rabbit:queue id="test_dead_queue" durable="true" name="test_dead_queue" auto-declare="true"/>
    
        <!--监听类绑定,设置手动消费,服务器每次只发一条数据-->
        <rabbit:listener-container prefetch="1" connection-factory="connectionFactory" message-converter="messageConverter" acknowledge="manual">
            <rabbit:listener ref="listener" method="listener" queue-names="test_direct_queue" />
            <rabbit:listener ref="deadListener" method="listener" queue-names="test_dead_queue" />
        </rabbit:listener-container>
    
        <bean id="messageConverter" class="com.jwl.common.util.MqMessageConvert"/>
        <!--监听类-->
        <bean id="listener" class="com.jwl.test.MqListener"/>
        <bean id="deadListener" class="com.jwl.test.MqDeadListener"/>
    </beans>
    

    异常

    spring整合rabbitmq,命名空间版本不对会引起找不到响应的类

    相关文章

      网友评论

          本文标题:2019-05-31(RABBIT_MQ)

          本文链接:https://www.haomeiwen.com/subject/mhwitctx.html