美文网首页
1-5 交换机HelloWord

1-5 交换机HelloWord

作者: Finlay_Li | 来源:发表于2020-07-09 12:30 被阅读0次

    交换机属性

    1. Name:交换机名称
    2. Type:交换机类型direct、topic、 fanout、 headers
    3. Durability:是否需要持久化,true为持久化
    4. Auto Delete:当最后一个绑定到Exchange.上的队列删除后,自动删除该Exchange
    5. Internal:当前Exchange是否用于RabbitMQ内部使用, 默认为False
    6. Arguments:扩展参数,用于扩展AMQP协议自制定化使用

    Direct Exchange

    简介

    所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

    注意: Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作。消息传 递时,RouteKey必须完全匹配才会被队列接收,消费者才能读取到消息,否则该消息会被抛弃

    结构图

    image.png

    示例

    
    package com.dodou.liwh.amqp.boot.direct;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DirectConfig {
    
        @Bean
        DirectExchange directExchange() {
            DirectExchange direct_ex = new DirectExchange("direct.ex");
            return direct_ex;
        }
    
        @Bean
        Queue directQueue() {
            Queue direct_que = new Queue("direct.que");
            return direct_que;
        }
    
        /*队列绑定交换机:Boot自动绑定
          1)每一个队列的名字不能相同
          2)并且转入的参数名要匹配*/
        @Bean
        Binding binding(DirectExchange directExchange,Queue directQueue) {
            Binding binding = BindingBuilder.bind(queue).to(directExchange).with("direct.rout.key");
            return binding;
        }
    }
    
    
    package com.dodou.liwh.amqp.boot.direct;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DirectSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
        private String RoutingKey = "direct.rout.key";
        private String EX_NAME = "direct.ex";
        private String msg = "交换机类型是:direct,用以支持RoutingKey";
    
        //发送消息到Exchange
        public void Send() {
            amqpTemplate.convertAndSend(EX_NAME, RoutingKey, msg);
            System.out.println(msg);
        }
    
    }
    
    
    package com.dodou.liwh.amqp.boot.direct;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "direct.que")
    public class DirectReceiver {
    
        @RabbitHandler
        public void consume(String msg) {
            System.out.println("Receiver消费:队列是:direct" + ":消息:" + msg);
        }
    }
    
    

    Topic Exchange

    简介

    所有发送到Topic Exchange的消息被转发到,模糊匹配RouteKey的Queue
    如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

    匹配规则

    • “ # ” 表示0个或若干个关键字。
    • “ * ” 表示一个关键字。
    • “ . “ 用于分割关键字,如topic.warn.timeout


      image.png

    示例

    
    package com.dodou.liwh.amqp.boot.topic;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    @Configuration
    public class TopicConfig {
    
        @Bean
        public TopicExchange topicExchange() {
            // 参数1 name :交换机名称
            // 参数2 durable :是否持久化
            // 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
            TopicExchange exchange = new TopicExchange("topic.ex", false, false);
            return exchange;
        }
    
        @Bean
        public Queue topicQueue() {
            // 参数1 name :队列名
            // 参数2 durable :是否持久化
            // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
            // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
            Queue queue = new Queue("topic.que",false,false,false);
            return queue;
        }
    
        @Bean
        public Binding binding(TopicExchange topicExchange, Queue topicQueue) {
            //绑定消费规则
            Binding binding = BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#");
            return binding;
        }
    }
    
    package com.quanwugou.mall.mq;
    
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.math.BigDecimal;
    
    @Component
    public class TopicSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
        private String TOPIC_EX = "topic.ex";
        //生产者发送RoutingKey
        private String RoutingKey = "topic.hello.fast";
        private String msg = "交换机类型是:topic,模糊匹配RoutingKey";
    
        public void send() throws Exception {
            Hehe hehe = new Hehe();
            hehe.setI(1);
            Hei hei = new Hei();
            hei.setNum(BigDecimal.ONE);
            hehe.setO(hei);
            //SimpleMessageConverter only supports String, byte[] and Serializable payloads : 实体类有没有序列化? 默认的转换器是SimpleMessageConverter,它适用于String、Serializable实例和字节数组。
            //仅推荐JSONString、Serializable实例
            amqpTemplate.convertAndSend(TOPIC_EX, RoutingKey, hehe);
            System.out.println(msg);
        }
    }
    
    package com.quanwugou.mall.mq;
    
    
    import cn.hutool.core.util.ObjectUtil;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class TopicReceiver {
    
        /*msg : 消费的消息
         *channel : 当前操作通道
         *@Header : 可以获取到所有的头部信息
         * */
        @RabbitListener(queues = "topic.que")
        @RabbitHandler
        public void rec(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
            try {
                channel.basicQos(1);
                Hehe hehe = (Hehe) ObjectUtil.deserialize(msg.getBody());
                System.out.println("receiver: " + hehe.toString());
                channel.basicAck(tag, false);
            } catch (Exception e) {
                // TODO 消费失败,那么我们可以进行容错处理,比如转移当前消息进入其它队列
    //                channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息
    //                而basicReject一次只能拒绝一条消息
    
    //                tag:消息标识
    //                false:是否批量.true:将一次性拒绝所有小于deliveryTag的消息
    //                true:重新排队
                channel.basicNack(tag, false, true);//重排并不是放在最后
            }
        }
    }
    

    Fanout Exchange

    简介

    1. 不关心路由键,只需要将队列绑定到交换机上即可,因此转发消息是最快的
    2. 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

    结构图

    image.png

    示例

    放在2-1 RabbitTemplate实现

    相关文章

      网友评论

          本文标题:1-5 交换机HelloWord

          本文链接:https://www.haomeiwen.com/subject/dkvjcktx.html