美文网首页
Spring Boot集成RabbitMQ-简单示例

Spring Boot集成RabbitMQ-简单示例

作者: alex很累 | 来源:发表于2020-07-23 22:33 被阅读0次

    示例代码地址:
    https://github.com/sushizhendeqiang/springboot-rabbitmq-demo

    一、添加依赖

    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    

    二、配置文件添加配置

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtual-host: /
        connection-timeout: 10000
        listener:
          simple:
            # 手动应答
            acknowledge-mode: manual 
            auto-startup: true
        # 不重回队列
            default-requeue-rejected: false 
            concurrency: 5
            max-concurrency: 20
            # 每次只处理一个信息
        prefetch: 1 
            retry:
              enabled: false
    

    三、生产者、消费者

    生产者

    package com.rabbitmq.demo.demo;
    
    import com.rabbitmq.demo.entity.User;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Component;
    
    import java.nio.charset.Charset;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    /**
     * @Author: sush4
     * @Description:
     * @Date: 2020/7/18
     */
    
    @Component
    public class RabbitProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        /**
         * 发送消息
         */
        @SendTo
        public void sendMessage() {
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //发送简单消息
                IntStream.rangeClosed(1, 5).forEach(num -> {
                    String body = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " : " + num;
                    MessageProperties properties = new MessageProperties();
                    //消息内容的编码格式
                    properties.setContentEncoding("UTF-8");
                    //Delivery mode: 是否持久化,1 - Non-persistent,2 - Persistent
                    properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    Message message = new Message(body.getBytes(Charset.forName(properties.getContentEncoding())), properties);
                    amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbitmq-demo-routingkey", message);
                });
                // 发送java bean 消息
                // 实体要序列化 否则 会发送失败
                IntStream.rangeClosed(1, 5).forEach(num -> {
                    //这里的builder是lombok快速构建实体的一个方法
                    User user = User.builder().userId(num).username("zhangsan:" + num).password("666666").build();
                    //convertAndSend方法参数说明:
                    //参数1:exchange     交换机名称
                    //参数2:routingKey   绑定关系,通过绑定关系,将exchage交换机绑定到queue队列
                    amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbitmq-demo-routingkey-bean", user);
                });
            }).start();
        }
    }
    
    

    生产者其实没什么好说的,一个convertAndSend完事~
    要注意的是:如果想直接发送的实体的话,实体要序列化(Serializable)

    消费者

    package com.rabbitmq.demo.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.demo.entity.User;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    
    /**
     * @Author: sush4
     * @Description:
     * @Date: 2020/7/18
     */
    
    @Component
    @Slf4j
    public class RabbitReceiver {
        @RabbitHandler
        @RabbitListener(queues = "rabbitmq-demo")
        public void receiveMessage(Message message, Channel channel) throws UnsupportedEncodingException {
            String encoding = message.getMessageProperties().getContentEncoding();
            log.info("接收到string消息:[{}]", new String(message.getBody(), "UTF-8"));
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
        }
    
        @RabbitHandler
        @RabbitListener(queues = "rabbitmq-demo-bean")
        public void receiveMessage(User user, Message message, Channel channel) {
            log.info("接收到bean消息:[{}]", user);
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
        }
    }
    

    消费者关键是两个注释:
    @RabbitListener:监听队列中的消息
    @RabbitHandler:和@RabbitListener配合使用,@RabbitListener监听到消息,交由@RabbitHandler标注的方法处理

    这里我贴出来的示例可能不能明确感受到这两个注解的作用,可以看下图:
    具体运行哪个方法,要看message的参数类型

    image.png
    图片来源:https://www.jianshu.com/p/911d987b5f11

    另外,关于@RabbitListener的使用,我这里只指明了queue队列,如果在rabbitmq中不对该队列进行创建配置的话,是不会监听到信息的(队列都没有,监听了毛线);
    这里还有另外一种写法:

    @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "rabbitmq-demo", durable = "true"),
                exchange = @Exchange(name = "rabbit-springboot-exchange", durable = "true", type = "topic"),
                key = "rabbitmq-demo-routingkey"
        ))
    

    这么写的话,不存在的情况下,会自动创建~
    Queue、Exchange、routingKey就不用赘述了,分别是队列、交换机、绑定关系;
    durable这个字段表明是否持久化。


    这篇文章只介绍了rabbitmq的简单使用~
    有理解不正确的地方,望指正!

    相关文章

      网友评论

          本文标题:Spring Boot集成RabbitMQ-简单示例

          本文链接:https://www.haomeiwen.com/subject/sxpklktx.html