美文网首页RabbitMQ学习RabbitMQ
RabbitMQ笔记二十三 :异步RPC之二(使用Spring

RabbitMQ笔记二十三 :异步RPC之二(使用Spring

作者: 二月_春风 | 来源:发表于2017-11-05 18:07 被阅读210次

    使用Spring AMQP实现RPC异步调用

    示列

    服务器端

    应用启动类代码,

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("===server startup======");
            TimeUnit.SECONDS.sleep(120);
            context.close();
        }
    }
    

    配置类:
    监听了sms队列,这个队列将会是客户端请求消息发送到的队列,配置了适配器,适配器中去调用服务,适配器返回的值就是服务端返回给客户端的RPC调用的结果

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("sms");
            container.setAcknowledgeMode(AcknowledgeMode.NONE);
            //使用适配器的方式
            container.setMessageListener(new MessageListenerAdapter(new SendSMSHandler()));
            return container;
        }
    }
    

    处理器,处理器中调用具体的服务,我们此列子中处理器方法返回的值是boolean类型

    import java.util.concurrent.TimeUnit;
    
    public class SendSMSHandler {
    
        public boolean handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
    
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return is;
        }
    }
    

    服务接口

    public class SendSMSTool {
    
        public static boolean sendSMS(String phone,String content){
            System.out.println("发送短信内容:【"+content+"】到手机号:"+phone);
            return phone.length() > 6;
        }
    }
    

    服务端步骤

    1. 消息处理方法,一定要有返回值,这个返回值就是就是server回复客户端的结果。比如我们SendSMSHandler.handleMessage方法返回的值。

    客户端
    应用启动类:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
    
            //设置超时时间,单位是ms
            rabbitTemplate.setReplyTimeout(10000);
    
            String phone = "15634344321";
            String content ="周年庆,五折优惠";
    
            MessageProperties messageProperties = new MessageProperties();
            Message message = new Message((phone+":"+content).getBytes(),messageProperties);
    
            //rabbitTemplate.send("","sms",message);
    
            Message reply = rabbitTemplate.sendAndReceive("","sms",message,
                    new CorrelationData(UUID.randomUUID().toString()));
    
            System.out.println(reply);
            System.out.println("message,body:"+new String(reply.getBody()));
            System.out.println("message,properties:"+reply.getMessageProperties());
    
            TimeUnit.SECONDS.sleep(30);
            context.close();
        }
    }
    

    配置类代码:

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    }
    

    如果服务端睡眠6s,则客户端通过sendAndReceive方法接收到的Message对象为空,怎样设置呢?
    客户端通过设置rabbitTemplate.setReplyTimeout(10000);就可以了。

    客户端步骤

    1. 使用sendAndReceive方法发送消息,该方法返回一个Message对象,该对象就是server返回的结果
    2. sendAndReceive如果超过5s还没有收到结果,则返回null,这个超时时间可以通过rabbitTemplate.setReplyTimeout()来进行设置
    3. server端返回的结果一定要注意,和MessageConverter有关,默认的org.springframework.amqp.support.converter.SimpleMessageConverter会把基本的数据类型转换成Serializable对象,这样的话,client端接收的也是序列化的java对象,所以,需要合理设置MessageConverter

    示列代码中服务端返回给客户端的是Boolean类型,

    启动服务端客户端代码:
    服务器打印控制台打印:

    15634344321:周年庆,五折优惠
    发送短信内容:【周年庆,五折优惠】到手机号:15634344321
    

    客户端控制台打印:

    message,body:����sr��java.lang.Boolean� r�՜�����Z��valuexp�
    message,properties:MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAB5yYWJiaXRAaVpicDFqY3d4N3NmYjFud3pyZWh5NloAAFu0AAAABwI=.kHL9zxtdQmtcxl0mQF8zrg==, receivedDelay=null, deliveryTag=1, messageCount=null, consumerTag=null, consumerQueue=null]
    

    我们发现客户端接收到的数据乱码,将服务端的处理器的返回值改写成String类型的,

    import java.util.concurrent.TimeUnit;
    
    public class SendSMSHandler {
    
        public String handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
    
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return is ? "success":"false";
        }
    }
    

    此时发现客户端接收的消息数据没有乱码,原因何在?我们总结一下就是服务器端处理器返回给客户端boolean类型,那么返回的消息数据就乱码,如果返回的是String类型,那么返回的消息数据就不会乱码。

    之前我们学习了org.springframework.amqp.support.converter.MessageConverter接口,当客户端向服务端发送消息的时候会进行消息类型转换,调用了fromMessage方法,而当服务器返回给客户端的时候会将服务端的对象转换成Message对象,很明显调用的是toMessage方法。

    我们知道org.springframework.amqp.support.converter.MessageConverter接口的默认实现是org.springframework.amqp.support.converter.SimpleMessageConverter,而toMessage方法的实现是在其继承的对象AbstractMessageConverter中,

    我们看到其AbstractMessageConverter.toMessage方法的实现逻辑是:

        @Override
        public final Message toMessage(Object object, MessageProperties messageProperties)
                throws MessageConversionException {
            if (messageProperties == null) {
                messageProperties = new MessageProperties();
            }
            //将对象转换成Message对象
            Message message = createMessage(object, messageProperties);
            messageProperties = message.getMessageProperties();
            if (this.createMessageIds && messageProperties.getMessageId() == null) {
                messageProperties.setMessageId(UUID.randomUUID().toString());
            }
            return message;
        }
    

    createMessage方法就是将对象转换成Message对象,

        /**
         * Creates an AMQP Message from the provided Object.
         */
        @Override
        protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            byte[] bytes = null;
            if (object instanceof byte[]) {
                bytes = (byte[]) object;
                messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
            }
            else if (object instanceof String) {
                try {
                    bytes = ((String) object).getBytes(this.defaultCharset);
                }
                catch (UnsupportedEncodingException e) {
                    throw new MessageConversionException(
                            "failed to convert to Message content", e);
                }
                messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
                messageProperties.setContentEncoding(this.defaultCharset);
            }
            //因为boolean类型实现Serializable接口,所以会将其序列化
            else if (object instanceof Serializable) {
                try {
                    bytes = SerializationUtils.serialize(object);
                }
                catch (IllegalArgumentException e) {
                    throw new MessageConversionException(
                            "failed to convert to serialized Message content", e);
                }
                messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
            }
            if (bytes != null) {
                messageProperties.setContentLength(bytes.length);
            }
            return new Message(bytes, messageProperties);
        }
    

    我们在程序中将序列化对象直接转换成字符串所以乱码,而返回的是String类型的情形的时候先将字符串转换成相应的字节数组,然后返回new Message(bytes, messageProperties);就不会乱码。

    继续探讨,当我们服务端返回的是一个对象的时候,客户端会返回空
    返回的对象:

    public class SendStatus {
    
        private String phone;
    
        private String result;
    
        public String getPhone() {
            return phone;
        }
    
        public void setPhone(String phone) {
            this.phone = phone;
        }
    
        public String getResult() {
            return result;
        }
    
        public void setResult(String result) {
            this.result = result;
        }
    }
    

    将该对象返回:

    public class SendSMSHandler {
    
        public SendStatus handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
            SendStatus sendStatus = new SendStatus();
            sendStatus.setPhone(phone);
            sendStatus.setResult(is ? "SUCCESS":"FAILURE");
            return sendStatus;
        }
    }
    

    服务端控制台:

    15634344321:周年庆,五折优惠
    发送短信内容:【周年庆,五折优惠】到手机号:15634344321
    

    客户端:

    message,body:
    message,properties:MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/octet-stream, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAB5yYWJiaXRAaVpicDFqY3d4N3NmYjFud3pyZWh5NloAAFxBAAAABwI=.01fOGW/nvS2nz6gKza+cjg==, receivedDelay=null, deliveryTag=1, messageCount=null, consumerTag=null, consumerQueue=null]
    

    原因何在,因为我们定义的SendStatus不走createMessage中的所有if分支,最后返回的是null,怎么解决呢,要么自己去定义一个org.springframework.amqp.support.converter.MessageConverter实现,要么换一个默认的org.springframework.amqp.support.converter.MessageConverter实现。

    改造后的示列

    使用AMQP自带的消息类型转换器Jackson2JsonMessageConverter
    服务端

    应用启动类,

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("===server startup======");
            TimeUnit.SECONDS.sleep(120);
            context.close();
        }
    }
    

    配置类,添加自定义的消息转换器

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("sms");
            container.setAcknowledgeMode(AcknowledgeMode.NONE);
            //使用适配器的方式
            container.setMessageListener(new MessageListenerAdapter(new SendSMSHandler(),new Jackson2JsonMessageConverter()));
            return container;
        }
    }
    

    处理器handler,返回自定义的SendStatus类型

    import java.util.concurrent.TimeUnit;
    
    public class SendSMSHandler {
    
        public SendStatus handleMessage(byte[] body){
            String _body = new String(body);
            System.out.println(_body);
            String[] sms = _body.split(":");
            String phone = sms[0];
            String content = sms[1];
    
            boolean is = SendSMSTool.sendSMS(phone,content);
            SendStatus sendStatus = new SendStatus();
            sendStatus.setPhone(phone);
            sendStatus.setResult(is ? "SUCCESS":"FAILURE");
    
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return sendStatus;
        }
    }
    

    接口服务,

    public class SendSMSTool {
    
        public static boolean sendSMS(String phone,String content){
            System.out.println("发送短信内容:【"+content+"】到手机号:"+phone);
            return phone.length() > 6;
        }
    }
    
    

    服务端返回的对象,

    public class SendStatus {
        private String phone;
    
        private String result;
    
        public String getPhone() {
            return phone;
        }
    
        public void setPhone(String phone) {
            this.phone = phone;
        }
    
        public String getResult() {
            return result;
        }
    
        public void setResult(String result) {
            this.result = result;
        }
    }
    

    客户端
    应用启动类,

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
    
            //设置超时时间,单位是ms
            rabbitTemplate.setReplyTimeout(10000);
    
            String phone = "15634344321";
            String content ="周年庆,五折优惠";
    
            MessageProperties messageProperties = new MessageProperties();
            Message message = new Message((phone+":"+content).getBytes(),messageProperties);
    
            //rabbitTemplate.send("","sms",message);
    
            Message reply = rabbitTemplate.sendAndReceive("","sms",message,
                    new CorrelationData(UUID.randomUUID().toString()));
    
            System.out.println(reply);
            System.out.println("message,body:"+new String(reply.getBody()));
            System.out.println("message,properties:"+reply.getMessageProperties());
    
            TimeUnit.SECONDS.sleep(30);
            context.close();
        }
    }
    

    配置类

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    }
    

    启动服务器客户端,客户端返回

    message,body:{"phone":"15634344321","result":"SUCCESS"}
    message,properties:MessageProperties [headers={__TypeId__=rpc.server.SendStatus}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to
    

    返回了SendStatus的JSON格式,因为使用了Jackson2JsonMessageConverter消息类型转换器。

    相关文章

      网友评论

        本文标题:RabbitMQ笔记二十三 :异步RPC之二(使用Spring

        本文链接:https://www.haomeiwen.com/subject/sczypxtx.html