美文网首页
2019-05-31(RABBIT_MQ)

2019-05-31(RABBIT_MQ)

作者: 威先生2018 | 来源:发表于2020-03-22 15:40 被阅读0次

RABBIT_MQ是消息队列,即messge queue

作用:模块之间的解耦、流量削峰、异步消息和消息通信;高性能、高可用和最终一致性的框架
作用链接:https://www.cnblogs.com/HigginCui/p/6478613.html

简单教程:https://blog.csdn.net/hellozpc/article/details/81436980

api:http://rabbitmq.mr-ping.com/
https://docs.spring.io/spring-amqp/api/overview-summary.html

高级应用:https://blog.csdn.net/weixin_42763504/article/details/81608237
https://blog.csdn.net/zhihuirensheng123/article/details/82805349
https://blog.csdn.net/qq315737546/article/details/54176560
https://www.cnblogs.com/toov5/p/10288260.html
https://blog.csdn.net/qq_38455201/article/details/80308771

应用解耦:订单-》库存,解耦后:订单-》消息通道-》库存
流量削峰:秒杀活动
异步消息:用户注册,需要发送短信和邮箱;直接放进消息队列
消息通信:

rabbitmq模式

公共创建链接

public class RaMqConnCommon {
    public final static String QUEUE_SIMPLE = "test_queue";//简单模式
    public final static String QUEUE_PUBLIC = "test_public_queue";//订阅模式
    public final static String QUEUE_PUBLIC_2 = "test_public_queue2";//订阅模式
    public final static String QUEUE_ROUTE = "test_route_queue";//路由模式队列
    public final static String QUEUE_ROUTE_2 = "test_route_queue2";//路由模式队列
    public final static String EXCHANGE_PUBLIC = "test_public_exchange";//订阅模式交换机
    public final static String EXCHANGE_ROUTE = "test_route_exchange";//路由模式交换机
    public final static String ROUTE_KEY = "routekey";//路由key
    public final static String ROUTE_KEY_2 = "routekey2";//路由key
    /**
     * @Description:  创建RabbitMq链接
     * @Param: []
     * @return: com.rabbitmq.client.Connection
     * @Author: LJW
     * @Date: 16:33
     */
    public static Connection getConnection(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("testmq");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection conn = null;
        try {
            conn = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return conn;
    }

简单模式:

生产者producer
1.创建链接
2.创建通道
3.声明队列
4.发送

        //创建链接
        Connection connection = RaMqConnCommon.getConnection();
        //创建渠道
        Channel channel = connection.createChannel();
        //声明队列(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String, Object> arguments)
        channel.queueDeclare(RaMqConnCommon.QUEUE_SIMPLE,false,false,false,null);
        String sendMessge = "Hello World!!";
        //发送消息:交换机、队列、队列特性和内容
        channel.basicPublish("",RaMqConnCommon.QUEUE_SIMPLE,null,sendMessge.getBytes());
        System.out.println("工作模式,多劳多得==========》"+sendMessge);
        channel.close();
        connection.close();

消费者

       //创建链接
        Connection connection = RaMqConnCommon.getConnection();
        //创建渠道
        Channel channel = connection.createChannel();
        //消费
        channel.basicConsume(RaMqConnCommon.QUEUE_SIMPLE,true,new RecevierConsumer(channel));



public class RecevierConsumer extends DefaultConsumer {
    public RecevierConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //消费的消息
        System.out.println(consumerTag+":接收到的消息="+new String(body));
    }
}

工作模式:

生产者和上面一样

消费者

        //创建链接
        Connection connection = RaMqConnCommon.getConnection();
        //创建渠道
        Channel channel = connection.createChannel();
        //同一时刻服务器只发送一条消息
        channel.basicQos(1);
        //消费,需要确认机制(队列名,ack,consumer)
        channel.basicConsume(RaMqConnCommon.QUEUE_SIMPLE,false,new RecevierConsumer(channel));

public class RecevierConsumer extends DefaultConsumer {
    private Channel channel;
    public RecevierConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //消费的消息
        System.out.println(consumerTag+":接收到的消息="+new String(body));
        //消息确认(false为消息确认,true为未确认)
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

订阅模式:

1.创建链接
2.创建通道
3.声明交换机
4.发送到交换机

        //创建链接
        Connection connection = RaMqConnCommon.getConnection();
        //创建渠道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(RaMqConnCommon.EXCHANGE_PUBLIC,"fanout");
        String sendMessge = "Hello World!!";
        int i = 0;
        while(true && i<100){
            String str = sendMessge+(++i);
            //发送消息到交换机(exchange、routingKey)
            channel.basicPublish(RaMqConnCommon.EXCHANGE_PUBLIC,"",null,str.getBytes());
        }
//        //发送消息到交换机
//        channel.basicPublish(RaMqConnCommon.EXCHANGE_PUBLIC,"",null,sendMessge.getBytes());
//        System.out.println("订阅模式==========》"+sendMessge);
        channel.close();
        connection.close();

消费者和工作模式一样

路由模式:

生产者
1.创建链接
2.创建通道
3.声明交换机
4.发送到交换机,并且指定路由

消费者

        //创建链接
        Connection connection = RaMqConnCommon.getConnection();
        //创建渠道
        Channel channel = connection.createChannel();
        //同一时刻服务器只发送一条消息
        channel.basicQos(1);
        //声明队列(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
        channel.queueDeclare(RaMqConnCommon.QUEUE_ROUTE,true,false,false,null);
        //队列绑定交换机(queue,exchange,routekey)
        channel.queueBind(RaMqConnCommon.QUEUE_ROUTE,RaMqConnCommon.EXCHANGE_ROUTE,RaMqConnCommon.ROUTE_KEY);
        //消费(mq,是否自动签收,消费者)
        channel.basicConsume(RaMqConnCommon.QUEUE_ROUTE,false,new RecevierConsumer(channel));

通配符模式:

高级特性:

returnListener:
找不到对应的路由key或交换机,则执行本方法

        channel.addReturnListener(new ReturnListener() {
            /**
             * int replyCode, 响应吗, 路由成没成功
             * String replyText, 回复内容
             * String exchange,
             * String routingKey,
             * AMQP.BasicProperties properties,
             * byte[] body 实际的消息体内容
             *
             * @param replyCode
             * @param replyText
             * @param exchange
             * @param routingKey
             * @param properties
             * @param body
             * @throws IOException 跑出ioexception 异常
             */

            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("handle return");
                System.out.println("replyCode: " + replyCode);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("properties: " + properties.toString());
                System.out.println("body: " + new String(body));
            }
        });

ConfirmListener
判断消息生产者是否发送成功

        // 开启发送方确认模式
        channel.confirmSelect();
        // 6. 添加一个监听
        channel.addConfirmListener(new ConfirmListener() {
            // 成功
            // 关键的唯一的消息标签deliveryTag, multiple :是否批量, 暂时不用管
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("成功 Ack");
            }

            // 失败, 比如磁盘写满了,mq 出现一些异常, key 容量达到上限,也有可能handleAck,handleNack 都没有收到, 进行抓取,和重发
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("失败 Nack");
            }
        });

死信队列
主要是消息过期或消费者拒绝接收,然而此类消息则进入死信队列
主要是接收的队列绑定死信交换机

事物

try {
    channel.txSelect(); // 声明事务
    // 发送消息
    channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    channel.txCommit(); // 提交事务
} catch (Exception e) {
    channel.txRollback();
} finally {
    channel.close();
    conn.close();
}

spring整合rabbitMq

<!--        mq======================-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

发送者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--    引入druid配置文件-->
    <context:property-placeholder location="classpath:/local/rabbitmq.properties" />
<!--    <context:component-scan base-package="com.jwl.test.mqConfig"/>-->

    <!-- 链接工厂-->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}"
    username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" publisher-confirms="true"
    />
    <!--spring代理rabbit-->
    <rabbit:admin connection-factory="connectionFactory" />
    
    <!--路由交换机-->
    <rabbit:direct-exchange name="test-routekey-exchange" id="test-routekey-exchange"  durable="true"  auto-declare="true"/>
    <!--定义模板--><!-- mandatory必须设置true,return callback才生效 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     mandatory="true"
                     return-callback="messageReturn"
                     confirm-callback="messageConfirm"
    />
    
<!--    <bean id="messageConverter" class="com.jwl.common.util.MqMessageConvert"/>-->
    <!--抵达交换机但是找不到响应的路由-->
    <bean id="messageReturn" class="com.jwl.test.mqConfig.MqReturnHandler"/>
<!--    消息发送者-->
    <bean id="messageConfirm" class="com.jwl.test.mqConfig.MqConfirmListener"/>
    <!--死信交换机-->
    <rabbit:direct-exchange name="dead-letter-exchange" durable="true" auto-declare="true"/>
</beans>

消费者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--    引入druid配置文件-->
    <context:property-placeholder location="classpath:/local/rabbitmq.properties" />
<!--    链接工厂-->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}"
    username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/>

<!--    spring代理rabbit-->
    <rabbit:admin connection-factory="connectionFactory" />

<!--    声明队列,并且绑定死信交换机,过期时间2秒-->
    <rabbit:queue id="test_direct_queue" durable="true" name="test_direct_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="2000" value-type="java.lang.Long"/>
            <entry key="x-dead-letter-exchange" value="dead-letter-exchange" />
            <entry key="x-dead-letter-routing-key" value="deadLetterKey" />
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 声明死信队列-->
    <rabbit:queue id="test_dead_queue" durable="true" name="test_dead_queue" auto-declare="true"/>

    <!--监听类绑定,设置手动消费,服务器每次只发一条数据-->
    <rabbit:listener-container prefetch="1" connection-factory="connectionFactory" message-converter="messageConverter" acknowledge="manual">
        <rabbit:listener ref="listener" method="listener" queue-names="test_direct_queue" />
        <rabbit:listener ref="deadListener" method="listener" queue-names="test_dead_queue" />
    </rabbit:listener-container>

    <bean id="messageConverter" class="com.jwl.common.util.MqMessageConvert"/>
    <!--监听类-->
    <bean id="listener" class="com.jwl.test.MqListener"/>
    <bean id="deadListener" class="com.jwl.test.MqDeadListener"/>
</beans>

异常

spring整合rabbitmq,命名空间版本不对会引起找不到响应的类

相关文章

  • 2019-05-31(RABBIT_MQ)

    RABBIT_MQ是消息队列,即messge queue 作用:模块之间的解耦、流量削峰、异步消息和消息通信;高性...

  • 时间管理复盘:2019-05-31

    2019-05-31 周五 休息一日。

  • pre.clud.e|免除

    title: precluddate: 2019-05-31 10:12:22NO_sents: 592NO_re...

  • hasten 加速

    title: hastendate: 2019-05-31 10:32:53NO_sents: 112NO_ref...

  • conundrum 难题

    title: conundrumdate: 2019-05-31 10:16:41NO_sents: 54NO_r...

  • stunt 阻碍;矮化病

    title: stuntdate: 2019-05-31 10:55:19NO_sents: 199NO_refe...

  • 2019-06-01

    2019-05-31。 22:42 2019年5月31日 日精进。 体验。吸收 释放。 今日...

  • 四、onActivityResult

    2019-05-31 1.回调函数:启用其他Activity并返回结果 MainActivity中: Select...

  • 2019-5-31晨间日记

    2019-05-31 【践行人员】袁顺娟 【践行天数】212/1000 【今日天气】雨 【昨日早睡】23:00 【...

  • 咏河中石

    裂落出崇峦 磨身逆浪欢 千删浮脆去 润泽质尤完 2019-05-31

网友评论

      本文标题:2019-05-31(RABBIT_MQ)

      本文链接:https://www.haomeiwen.com/subject/mhwitctx.html