美文网首页
SpringBoot整合RabbitMQ(消息中间件)

SpringBoot整合RabbitMQ(消息中间件)

作者: 索性流年 | 来源:发表于2020-05-14 17:42 被阅读0次

    消息中间件概述

    消息队列中间件是分布式架构中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性框架目前使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RockerMQ。

    RabbitMQ 作用

    生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习Nginx有点类似。

    交换机的作用

    根据具体的路由策略分发到不同的队列中。

    交换机的四种类型

    1.Direct exchange (直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的

    2.Fanout exchange (扇型交换机)将消息路由给绑定到它身上的所有队列

    3.Topic exchange ( 主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路
    由给一个或多个绑定队列

    4.Headers exchange (头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。
    通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

    VirtualHost

    像MySQL服务器中可以添加多个数据库一样,可以指定用户对指定库表等操作权限,RabbitMQ中这种管理权限就是VirtualHost,每个VirtualHost相当于一个独立的服务器,每个VirtualHost之前相互隔离message、queue不能互通,目的是为了解耦合

    RabbitMQ消息队列的类型

    1.点对点模式: 一对一模式一个生产者投递消息给队列,只能允许有一个消费者进行消费。如若消费者集群,会进行均摊消费

    2.工作模式:又称公平消费模式,采用能者多劳的原则,哪个消费者应答的快,哪个就能多消费消息,当消费者没有应答之前,队列将不会再发送新的消息给消费者

    3.发布订阅模式:一个生产者发送消息,多个消费者获取同样的消息,包括一个生产者,一个交换机,多个队列,多个消费者。

    4.路由模式(RoutingKey):

    5.通配符模式(topic):生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
    "*"表示匹配一个词语,"#"表示匹配多个词语

    自动应答

    不在乎消费者对这个消息处理是否成功,都会告诉队列删除该消息。如果处理消息失败情况下,实现自动补偿。

    手动应答

    消费处理完业务逻辑,手动返回ack(通知),告诉队列服务器是否删除消息

    pom导入依赖

    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    

    application.yml中配置

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: suoxingliunian
        password: a1234560
        virtual-host: /admin_host
    

    消息生产者

    创建.class配置类

    /*发布订阅模式配置交换机类型为Fanou*/
    @Configuration
    public class FanoutConfig {
        //邮件队列
        private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";
        //短信队列
        private String FANOUT_SMS_QUEUE = " fanout_sms_queue" ;
        //交换机名称
        private String EXCHANGE_NAME = "fanoutExchange";
    
    
    //  定义邮件队列
        @Bean
        public Queue fanoutEamilQueue(){
            return new Queue(FANOUT_EMAIL_QUEUE);
        }
    
        //    定义短信队列
        @Bean
        public Queue fanoutSmsQueue(){
            return new Queue(FANOUT_SMS_QUEUE);
        }
    
        //    定义交换机名称 Fanout类型  其他类型例如TopicExchange
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange(EXCHANGE_NAME);
        }
    
        //    邮件队列和交换机进行绑定(参数名称一定要和队列方法、交换机名称一致)
        @Bean
        Binding bindingExchangeEamil(Queue fanoutEamilQueue,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutEamilQueue).to(fanoutExchange);
        }
    
        //    短息队列和交换机进行绑定(参数名称一定要和队列方法、交换机名称一致)
        @Bean
        Binding bindingExchangeSms(Queue fanoutSmsQueue,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutSmsQueue).to(fanoutExchange);
        }
    }
    
    

    创建.class消息发布消息

    
    @Component
    public class FanoutProducer {
        @Autowired
        public AmqpTemplate amqpTemplate;
        public void send(String queueName) {
            String msg = "sendmsg"+new Date();
    //        发送消息
            amqpTemplate.convertAndSend(queueName,msg);
        }
    }
    
    
    

    创建Controller模拟消息发布接口

    
    @RestController
    public class ProducerController {
        @Autowired
        private FanoutProducer producer;
    
        @GetMapping("/sendMsg")
        public void sendMsg(String queueName) {
            producer.send(queueName);
        }
    }
    
    

    消息消费者

    工厂方法

    
    /*工厂方法*/
    public class Producer {
        private static final String queueName = "sanshengsanshishilitaohua";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    //        获取新连接
            Connection connection = RabbitMQUtils.newConntction();
    //        创建通道
            Channel channel = connection.createChannel();
    //        创建队列
            channel.queueDeclare(queueName, false, false, false, null);
    //        发送消息
            channel.basicPublish("", queueName, null, "suoxingliunian".getBytes());
    //        关闭资源
            channel.close();
            connection.close();
        }
    }
    
    

    工具类

    public class RabbitMQUtils {
    
        //    创建RabbitMQ连接
        public static Connection newConntction() throws IOException, TimeoutException {
            //        创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //        设置连接地址
            connectionFactory.setHost("127.0.0.1");
            //        设置连接用户名
            connectionFactory.setUsername("admin");
            //        设置密码
            connectionFactory.setPassword("123456");
            //        设置端口号
            connectionFactory.setPort(5672);
            //        设置VirtualHost地址
            connectionFactory.setVirtualHost("/admin_host");
            Connection connection = connectionFactory.newConnection();
            return connection;
        }
    
    }
    
    

    消费者

    /*消息消费者*/
    public class Consumer {
        private static final String queueName = "sanshengsanshishilitaohua";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    //        获取新连接
            Connection connection = RabbitMQUtils.newConntction();
    //        创建通道
             Channel channel = connection.createChannel();
    //        消费者关联队列
    //        channel.queueDeclare(queueName, false, false, false, null);
    //        消费者获取消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    //            监听获取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String result = new String(body,"utf-8");
                    System.err.println("消费者获取生产者发送消息"+result);
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
    //        设置应答模式
            channel.basicConsume(queueName,true,defaultConsumer);
        }
    }

    相关文章

      网友评论

          本文标题:SpringBoot整合RabbitMQ(消息中间件)

          本文链接:https://www.haomeiwen.com/subject/ieuhactx.html