美文网首页
SpringBoot整合RabbitMQ(三)

SpringBoot整合RabbitMQ(三)

作者: 程序员小杰 | 来源:发表于2020-11-04 23:29 被阅读0次

上一篇:初识RabbitMQ(maven版本):https://www.jianshu.com/p/568765c0b94b

1.导入依赖

image.png

pom

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

2. 配置配置文件

spring:
  application:
    name: rabbitmq-boot
  rabbitmq:
    host: 47.105.198.54
    port: 5672
    virtual-host: /test-1
    username: test
    password: 123456

SpringBoot中使用RabbitTemplate 简化对RabbitMQ操作,使用时候直接在项目中注入即可使用。

3、使用

hello world模型使用

image.png

1、开发生产者

 @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void testHello(){
        /**
         * 参数一:队列名称
         * 参数二:消息内容
         */
        rabbitTemplate.convertAndSend("hello-boot","hello boot");
    }

注:单独运行生产者并不会产生队列,队列是由消费者创建的
2、开发消费者

@Component
public class HelloCustomer {
 // 通过注解创建 hello-boot 队列
 @RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
    public void consumptionStr(String message){
        System.out.println("消费消息:" + message);
    }
}

然后运行生产者
@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding
@Queue注解可以设置队列属性

work模型使用

image.png

1、开发生产者

    @Test
    void testWork(){
        /**
         * 参数一:队列名称
         * 参数二:消息内容
         */
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work-boot","work boot");
        }
    }

2、开发消费者

@Component
public class WorkCustomer {
 // 通过注解创建 work-boot 队列
    @RabbitListener(queuesToDeclare = @Queue("work-boot"))
    public void work1(String message){
        System.out.println("消费消息-1:" + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work-boot"))
    public void work2(String message){
        System.out.println("消费消息-2:" + message);
    }
}

结果:
消费消息-1:work boot
消费消息-1:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-1:work boot
消费消息-1:work boot
消费消息-1:work boot

说明:默认在Spring AMQP中实现Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置

fanout模型使用

image.png

1、开发生产者

 @Test
    void testfanout(){
        /**
         * 参数一:交换机名称
         * 参数二:路由名称(在fanout模式下,该参数无作用)
         * 参数三:消息内容
         * 
         */
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("fanout-boot-exchange","","fanout boot");
        }
    }

2、开发消费者

@RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  //创建临时队列
                    exchange = @Exchange(value = "fanout-boot-exchange",type = "fanout")  //绑定交换机
            )
    })
    public void fanoutStr(String message){
        System.out.println("消费消息-1:" + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  //创建临时队列
                    exchange = @Exchange(value = "fanout-boot-exchange",type = "fanout")  //绑定交换机
            )
    })
    public void fanoutStr2(String message){
        System.out.println("消费消息-2:" + message);
    }
结果:
消费消息-2:fanout boot
消费消息-1:fanout boot

@QueueBinding注解用来绑定队列和交换机
@Exchange注解用来声明交换机

Route模型使用

image.png

1、开发生产者

  @Test
    void testDirect(){
        /**
         * 参数一:交换机名称
         * 参数二:路由名称
         * 参数三:消息内容
         *
         */
        rabbitTemplate.convertAndSend("direct-boot-exchange","error","direct boot error");

    }

2、开发消费者

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class directCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,  //创建临时队列
                    exchange = @Exchange(value = "direct-boot-exchange",type = "direct"),  //绑定交换机名称和类型,默认类型就为direct
                    key = {"info"})  //路由key

    })
    public void directStr(String message){
        System.out.println("消费消息-1:" + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "direct-boot-exchange",type = "direct"),
            key={"info","error"})

    })
    public void directStr2(String message){
        System.out.println("消费消息-2:" + message);
    }
}

发送路由key为error

消费消息-2:direct boot error

发送路由key为info

消费消息-1:direct boot info
消费消息-2:direct boot info

Topic模型使用

image.png

1、开发生产者

   @Test
    void testTopic(){
        /**
         * 参数一:交换机名称
         * 参数二:路由名称
         * 参数三:消息内容
         *
         */
        rabbitTemplate.convertAndSend("topic-boot-exchange","user.role.query","topic boot user.role.query");

    }

2、开发消费者

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "topic-boot-exchange",type = "topic"),
                    key ={"user.*"})

    })
    public void topicStr(String message){
        System.out.println("消费消息-1:" + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "topic-boot-exchange",type = "topic"),
                    key ={"user.#"})

    })
    public void topicStr2(String message){
        System.out.println("消费消息-2:" + message);
    }
}

发送路由key为user.role.query

消费消息-2:topic boot user.role.query

发送路由key为user.query

消费消息-2:topic boot user.query
消费消息-1:topic boot user.query

上面将五种消息模型都简单的使用一遍,但我们生产者每次发送的都是Stirng类型的数据,如果我们要发送对象呢?通过查看rabbitTemplate#convertAndSend的接口定义,我们知道发送的消息可以是Object类型,那么是不是意味着任何对象都可以推送给mq呢?开始尝试一下

加入依赖

 <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
  <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.0</version>
        </dependency>

创建实体

@Data
public class PayDTO {

    private Long payId;
    private String payBank;
    private String money;
    private String payTime;
}

开发生产者

@Component
@Slf4j
public class RabbitReceiver {
   public void send(PayDTO pay){
        log.info("支付发送消息 RabbitSender.send() payDTO = {}", JSON.toJSONString(pay));
        //发送hello world模型的方式消息
        rabbitTemplate.convertAndSend("hello-boot",pay);
    }
}

调用

    @GetMapping("/sendPay")
    public String sendPay(){
        PayDTO payDTO = new PayDTO();
        payDTO.setMoney("8888888");
        payDTO.setPayBank("***银行");
        payDTO.setPayId(123541880889922L);
        payDTO.setPayTime("2029-11-18 14:15:48 654");
        rabbitSender.send(payDTO);
        return "success";
    }

调用之后我们可以看到控制台出现报错信息:http://localhost:8080/sendPay

java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, 
byte[] and Serializable payloads, received: com.gongj.apppay.dto.PayDTO
image.png

为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用SimpleMessageConverter来实现封装Message逻辑的,核心代码为
org.springframework.amqp.support.converter.SimpleMessageConverter.createMessage

protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        byte[] bytes = null;
        if (object instanceof byte[]) {
            bytes = (byte[])((byte[])object);
            messageProperties.setContentType("application/octet-stream");
        } else if (object instanceof String) {
            try {
                bytes = ((String)object).getBytes(this.defaultCharset);
            } catch (UnsupportedEncodingException var6) {
                throw new MessageConversionException("failed to convert to Message content", var6);
            }

            messageProperties.setContentType("text/plain");
            messageProperties.setContentEncoding(this.defaultCharset);
        } else if (object instanceof Serializable) {
            try {
                bytes = SerializationUtils.serialize(object);
            } catch (IllegalArgumentException var5) {
                throw new MessageConversionException("failed to convert to serialized Message content", var5);
            }

            messageProperties.setContentType("application/x-java-serialized-object");
        }

        if (bytes != null) {
            messageProperties.setContentLength((long)bytes.length);
            return new Message(bytes, messageProperties);
        } else {
            throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
        }
    }

上面逻辑很明确的指出了,只接受byte数组,String字符串,可序列化对象。

可序列化对象

我们这就演示用可序列化对象实现传递对象,对象需要实现Serializable接口。Stirng和byte[]就不演示了。

@Data
public class PayDTO implements Serializable {

    private Long payId;
    private String payBank;
    private String money;
    private String payTime;
}

然后再次调用接口,http://localhost:8080/sendPay,可以看到生产者已经发送消息成功了

image.png
想要看到我上面的那种效果,需要这条消息没有被消费掉。

开发消费者

@Component
@Slf4j
public class RabbitReceiver {

   @RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
    public void infoConsumption(PayDTO message,@Headers Map<String,Object> headers){
        log.info("RabbitReceiver.infoConsumption() data = {}", JSON.toJSONString(message));
        for(Map.Entry<String, Object> map :headers.entrySet()){
            log.info("headers ===> " + map.getKey() + "=" + map.getValue());
        }
    }
image.png

使用上述这种方式发送的对象必须遵循两个条件

  • bean必须实现Serializable 接口
  • 生产者消费者的bean包名、类名、属性名必须一致

我们已经知道RabbitTemplate默认是利用SimpleMessageConverter来实现封装Message逻辑,它只接受byte数组,String字符串,可序列化对象。那我们会想有没有其他的MessageConverter来友好的支持任何类型的对象

自定义MessageConverter

自定义一个json序列化方式的MessageConverter来解决上面的问题(FastJson方式)

生产者端配置
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitTemplate fastjsonRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new AbstractMessageConverter() {
            @Override
            protected Message createMessage(Object object, MessageProperties messageProperties) {
                messageProperties.setContentType("application/json");
                return new Message(JSON.toJSONBytes(object), messageProperties);
            }

            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                return JSON.parse(message.getBody());
            }
        });
        return rabbitTemplate;
    }
}

PayDTO就可以不需要实现Serializable 接口了
mq内接收到的推送消息如下


image.png

消费者

@RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
    public void infoConsumption(@Payload String data, @Headers Map<String,Object> headers) throws IOException {
        log.info("RabbitReceiver.infoConsumption() data = {}", data);

        for(Map.Entry<String, Object> map :headers.entrySet()){
            log.info("headers ===> " + map.getKey() + "=" + map.getValue());
        }
    }
image.png
ps:修改消费者的接收方式
  @RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
    public void infoConsumption(@Payload JSONObject data, @Headers Map<String,Object> headers) throws IOException {
        log.info("RabbitReceiver.infoConsumption() data = {}", data.toJSONString());
        PayDTO pay = data.toJavaObject(PayDTO.class);
        for(Map.Entry<String, Object> map :headers.entrySet()){
            log.info("headers ===> " + map.getKey() + "=" + map.getValue());
        }
    }

在消费时出现转换异常

Caused by: org.springframework.messaging.converter.MessageConversionException
: Cannot convert from [[B] to [com.alibaba.fastjson.JSONObject] for 
GenericMessage [payload=byte[101], headers={amqp_receivedDeliveryMode=PERSISTENT, 
amqp_receivedRoutingKey=hello-boot, amqp_deliveryTag=1, 
amqp_consumerQueue=hello-boot, amqp_redelivered=false, 
id=85b64ff1-de50-2544-c6f8-e2ce899837cf, 
amqp_consumerTag=amq.ctag-yjs0gj4B7WaHPT43CpwuSQ, 
amqp_lastInBatch=false, contentType=application/json, timestamp=1603377654068}]

在消费端增加配置

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.context.annotation.Bean;
@Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new AbstractMessageConverter() {
            @Override
            protected Message createMessage(Object object, MessageProperties messageProperties) {
                messageProperties.setContentType("application/json");
                return new Message(JSON.toJSONBytes(object), messageProperties);
            }
            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                return JSON.parse(message.getBody());
            }
        });
        return factory;
    }
image.png

Jackson2JsonMessageConverter

上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是Jackson2JsonMessageConverter

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;

//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}

下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了headers和content_encoding


image.png

开发消费者

 @RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
    public void infoConsumption(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("RabbitReceiver.infoConsumption() data = {}", data.toString());
        ObjectMapper objectMapper = new ObjectMapper();
        PayDTO payDTO = objectMapper.readValue(data, PayDTO.class);
        log.info("getPayBank = {}====",payDTO.getPayBank());
    }

调用接口,http://localhost:8080/sendPay
完整代码:https://gitee.com/gongjienianq/rabbitmq

相关文章

网友评论

      本文标题:SpringBoot整合RabbitMQ(三)

      本文链接:https://www.haomeiwen.com/subject/chgwuktx.html