美文网首页
springboot rabbitmq入门使用

springboot rabbitmq入门使用

作者: dayue_ | 来源:发表于2021-02-26 23:13 被阅读0次

RabbitMQ作为一款能实现高性能存储分发消息的分布式中间件,具有异步通信、服务解耦、接口限流、消息分发和业务延迟处理等功能,在实际生产环境中具有很广泛的应用。

为了能在项目中使用RabbitMQ,需要在本地安装RabbitMQ并能进行简单的使用。可参考改教程安装RabbitMQ:安装教程

一、总的代码目录结构如下:
rabbitmq使用代码目录结构.png
  • entity包实体类order,作为消息载体;
  • ackmodel包是写消息确认消费机制,自动确认消费和手工确认消费;
  • delay包是写延迟队列,在业务延迟处理时可用到,如订单30分钟内未付款自动取消订单相关业务;
  • exchangemodel包是写不同类型交换机下的消息模型,常见的三种消息模型是:direct-直接传输、fanout-广播、topic-主题消息模型;
  • springevent包是spring事件驱动模型的demo,要了解消息队列,先了解一下spring的事件驱动会更好;
  • 数据库是用mysql,项目启动需要连接本地数据库,新建一个exercisegroup数据库;
  • appilication.yaml配置rabbitmq地址,pom文件引入amqp starter即可。
二、spring事件驱动

(1)spring事件驱动模型由三部分构成,生产者、事件(消息)、消费者,即生成者采用异步的方式把事件(消息)发送给消费者,消费者监听到事件(消息)再进一步处理。


spring事件驱动.jpeg

(2)代码目录结构如下:


springevent.png
(3)示例代码
  • OrderEvent类,订单事件,继承ApplicationEvent,:
public class OrderEvent extends ApplicationEvent {
    public OrderEvent(Order source) {
        super(source);
    }
}
  • OrderPublisher类,生产者,异步发送事件:
@Component
public class OrderPublisher {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    public void sendMsg() {
        Order order = new Order();
        order.setOrdernum("123456");
        OrderEvent orderEvent = new OrderEvent(order);
        //发送消息
        applicationEventPublisher.publishEvent(orderEvent);
    }
}
  • OrderConsumer类,消费者,监听生产者发送过来的事件
@Component//加入Spring的IOC容器
@EnableAsync//允许异步执行
@Slf4j
public class OrderConsumer implements ApplicationListener<OrderEvent> {
    @Override
    @Async
    public void onApplicationEvent(OrderEvent event) {
        log.info("监听到订单,订单号:{}", ((Order) event.getSource()).getOrdernum());
    }
}

(4)执行MessageQueueApplicationTests的test方法即可看结果,RabbitMQ本质也是异步通信,消息在生产者端进行发送,在消费者端进行监听,对监听到的消息进一步处理,其功能更加强大。


test-springevent.png
三、RabbitMQ一些专有名词

Producer/Publisher生产者,投递消息的一方。
Consumer消费者,接收消息的一方。
Message消息:实际的数据,如demo中的order订单消息载体。
Queue队列:是RabbitMQ的内部对象,用于存储消息,最终将消息传输到消费者。
Exchange交换机:在RabbitMQ中,生产者发送消息到交换机,由交换机将消息路由到一个或者多个队列中
RoutingKey路由键:生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。
Binding绑定:RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列。

四、RabbitMQ使用

(1)简单的demo


消息队列流程图.jpeg 简单的demo.png

(2)示例代码

  • RabbitmqConfig配置类,SimpleRabbitListenerContainerFactory Bean是消息监听容器,服务于监听者;RabbitTemplate是RabbitMQ发送消息的操作组件RabbitTemplate,此外配置类还有三个Bean,一个是队列basicQueue用于存储消息最终消息会被消费者监听到,basicExchange是交换机,生产者发送消息到交换机根据路由规则发送到相应的队列basicQueue上,basicBinding是负责绑定交换机basicExchange和队列basicQueue,根据路由规则绑定起来。创建队列、交换机的名词以及路由规则我都放到常量类RabbitMqConstants里面。
@Slf4j
@Configuration
public class RabbitmqConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 下面为单一消费者实例的配置
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON的格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        return factory;
    }
    //自定义配置RabbitMQ发送消息的操作组件RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置“发送消息后进行确认”
        connectionFactory.setPublisherConfirms(true);
        //设置“发送消息后返回确认信息”
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件实例对象
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
        //发送消息后,如果发送失败,则输出“消息发送失败-消息丢失”的反馈信息
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
        //定义消息传输的格式为JSON字符串格式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //最终返回RabbitMQ的操作组件实例RabbitTemplate
        return rabbitTemplate;
    }
    //创建队列
    @Bean
    public Queue basicQueue(){
        return new Queue(RabbitMqConstants.BASIC_QUEUE,true);
    }
    //创建交换机:在这里以DirectExchange为例
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(RabbitMqConstants.BASIC_EXCHANGE,true,false);
    }
    //创建绑定
    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RabbitMqConstants.BASICE_ROUTING_KEY);
    }
}
  • RabbitMqConstants常量类,存放创建队列、交换机的名词以及路由规则。
@Data
public class RabbitMqConstants {
    //队列名词
    public static final String BASIC_QUEUE = "mq.basic.info.queue";
    //交换机名词
    public static final String BASIC_EXCHANGE = "mq.basic.info.exchange";
    //路由规则,实际为字符串
    public static final String BASICE_ROUTING_KEY = "mq.basic.info.routing.key";
}
  • BasicPublisher 类,生产者,异步发送消息
@Component
@Slf4j
public class BasicPublisher {
    //定义RabbitMQ消息操作组件RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送消息
     * @param message 待发送的消息
     */
    public void sendMsg(Order message){
        try {
            //指定消息模型中的交换机
            rabbitTemplate.setExchange(RabbitMqConstants.BASIC_EXCHANGE);
            //指定消息模型中的路由
            rabbitTemplate.setRoutingKey(RabbitMqConstants.BASICE_ROUTING_KEY);
            //转化并发送消息
            rabbitTemplate.convertAndSend(message);
            log.info("rabbitmq demo-生产者-发送消息:{} ", JSONUtil.toJsonStr(message));
        } catch (Exception e) {
            log.error("rabbitmq demo-生产者-发送消息发生异常:{} ", message, e.fillInStackTrace());
        }
    }
}
  • BasicConsumer 类,消费者,监听到消息时对消息进行处理,需要为消费者设置监听的队列mq.basic.info.queue以及监听容器singleListenerContainer。
@Component
@Slf4j
public class BasicConsumer {
    /**
     * 监听并接收消费队列中的消息-在这里采用单一容器工厂实例即可
     */
    @RabbitListener(queues = RabbitMqConstants.BASIC_QUEUE, containerFactory = "singleListenerContainer")
    public void consumeMsg(Order message) {
        try {
            log.info("rabbitmq demo-消费者-监听消费到消息:{} ", JSONUtil.toJsonStr(message));
        } catch (Exception e) {
            log.error("rabbitmq demo-消费者-发生异常:", e.fillInStackTrace());
        }
    }
}

(3)安装好erlang语言以及rabbitmq之后,项目启动,访问http://127.0.0.1:15672,输入默认账号密码,可以看到:


消息队列.png 交换机.png 交换机绑定关系.png

(4)运行test方法:

    @Test
    public void testBasicPublish() {
        Order order = new Order();
        order.setOrdernum("123456");
        basicPublisher.sendMsg(order);
    }
生产者发送消息.png 消费者监听消息.png

下一篇:springboot rabbitmq不同交换机类型实战

下一篇:springboot rabbitmq高可用消息确认消费实战

参考资料:
《分布式中间件实战》
《rabbitmq实战指南》

相关文章

网友评论

      本文标题:springboot rabbitmq入门使用

      本文链接:https://www.haomeiwen.com/subject/vtjfnktx.html