美文网首页
SpringBoot整合RocketMQ

SpringBoot整合RocketMQ

作者: 青春留不住_72dc | 来源:发表于2020-03-30 23:03 被阅读0次

    消息中间件

    什么是消息中间件

    消息中间件:是利用高效可靠的消息传递机制进行异步的数据传输,并基于数据通信进行分布式系统的集成。通过提供消息队列模型和消息传递机制,可以在分布式环境下扩展进程间的通信

    消息中间件可以做什么

    应用程序之间不采取直接通信,而是使用消息中间作为中介,做到数据的异步通信。开发人员不需要考虑网络协议和远程调用的问题,只需要通过各消息中间件所提供的api,就可以简单的完成消息推送,和消息接收的业务功能。

    消息的生产者将消息存储到队列中,消息的消费者不一定马上消费消息,可以等到自己想要用到这个消息的时候,再从相应的队列中去获取消息。这样的设计可以很好的解决,大数据量数据传递所占用的资源,使数据传递和平台分开,不再需要分资源用于数据传输,可以将这些资源用去其他想要做的事情上。

    关于为什么使用RocketMQ

    1 . 支持严格的消息顺序
    2 .支持Topic与Queue两种模式
    3 .亿级消息堆积能力
    4 .比较友好的分布式特性
    5 .同时支持Push与Pull方式消费消息
    6 .历经多次天猫双十一海量消息考验
    7 .RocketMQ是纯java编写,基于通信框架Netty。

    SpringBoot整合

    看一下pom.xml的文件内容

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
     
        <groupId>com.suning.mq</groupId>
        <artifactId>rocketmq</artifactId>
        <version>1.0-SNAPSHOT</version>
     
        <dependencies>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.2.0</version>
            </dependency>
     
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.44</version>
            </dependency>
     
     
        </dependencies>
     
    </project>
    

    先定义一个消息保存的载体:

    public class Producer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("test-group");
            producer.setNamesrvAddr("localhost:9876");
            producer.setInstanceName("rmq-instance");
            producer.start();
            try {
                for (int i=0;i<100;i++){
                    User user = new User();
                    user.setLoginName("abc"+i);
                    user.setPwd(String.valueOf(i));
                    Message message = new Message("log-topic", "user-tag",JSON.toJSONString(user).getBytes());
                    System.out.println("生产者发送消息:"+JSON.toJSONString(user));
                    producer.send(message);
                }
     
     
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
     
        /**
         * 发送用户消息
         */
        static  class User implements Serializable{
            private String loginName;
            private String pwd;
     
            public String getLoginName() {
                return loginName;
            }
     
            public void setLoginName(String loginName) {
                this.loginName = loginName;
            }
     
            public String getPwd() {
                return pwd;
            }
     
            public void setPwd(String pwd) {
                this.pwd = pwd;
            }
        }
    

    定义消息的发送者:

    @Component
    public class UserProducer {
        /**
         * 生产者的组名
         */
        @Value("${suning.rocketmq.producerGroup}")
        private String producerGroup;
     
        /**
         * NameServer 地址
         */
        @Value("${suning.rocketmq.namesrvaddr}")
        private String namesrvAddr;
     
        @PostConstruct
        public void produder() {
             DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
             producer.setNamesrvAddr(namesrvAddr);
            try {
                producer.start();
                for (int i = 0; i < 100; i++) {
                    UserContent userContent = new UserContent(String.valueOf(i),"abc"+i);
                    String jsonstr = JSON.toJSONString(userContent);
                    System.out.println("发送消息:"+jsonstr);
                    Message message = new Message("user-topic", "user-tag", jsonstr.getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult result = producer.send(message);
                    System.err.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
                }
     
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.shutdown();
            }
        }
    }
    

    定义消息的消费者:

    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
     
            consumer.setNamesrvAddr("localhost:9876");
            consumer.setInstanceName("rmq-instance");
            consumer.subscribe("log-topic", "user-tag");
            
            consumer.registerMessageListener(new MessageListenerConcurrently() {
     
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println("消费者消费数据:"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
         }
    }
    

    相关文章

      网友评论

          本文标题:SpringBoot整合RocketMQ

          本文链接:https://www.haomeiwen.com/subject/qbgbuhtx.html