美文网首页
Mac SpringBoot RocketMQ 整合使用

Mac SpringBoot RocketMQ 整合使用

作者: Hahn_z | 来源:发表于2020-06-01 22:55 被阅读0次

    下载构建

    // 下载地址
    https://rocketmq.apache.org/dowloading/releases
    
    # 我这里是下bin版本 就是二进制版本 只需要解压zip文件
    cd 指定解压目录
    
    # 启动 NameServer
    nohup sh bin/mqnamesrv &
    # 启动 日志
    tail -f ~/logs/rocketmqlogs/namesrv.log
    
    

    启动成功

    1.png
    # 启动 Broker
    nohup sh bin/mqbroker -n localhost:9876 &
    # 启动 日志
    tail -f ~/logs/rocketmqlogs/broker.log
    
    

    启动成功

    2
    sh bin/mqshutdown broker    //停止 broker
    
    sh bin/mqshutdown namesrv   //停止 nameserver
    
    

    Rocketmq管理后台

    # 下载地址
    https://github.com/apache/rocketmq
    # 用idea 打开rocketmq-console
    # 下载依赖
    # 打开http://localhost:8080/
    

    打开成功

    1.png

    编写生产者和消费者

    写配置

    # application.properties
    
    # 消费者的组名
    apache.rocketmq.consumer.PushConsumer=PushConsumer
    # 同步生产者的组名
    apache.rocketmq.producer.syncProducerGroup=syncProducerGroup
    # 异步生产者的组名
    apache.rocketmq.producer.asyncProducerGroup=asyncProducerGroup
    # NameServer地址
    apache.rocketmq.namesrvAddr=localhost:9876
    

    生产者

    @Component
    public class RocketMQClient {
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.syncProducerGroup}")
        private String syncProducerGroup;
    
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.asyncProducerGroup}")
        private String asyncProducerGroup;
    
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        /**
         * 同步发送
         */
        @PostConstruct
        public void SyncProducer() {
            //生产者的组名
            DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
            //指定NameServer地址,多个地址以 ; 隔开
            producer.setNamesrvAddr(namesrvAddr);
            // 同步发送消息重试次数,默认为 2
            producer.setRetryTimesWhenSendFailed(3);
    
            try {
                /**
                 * Producer对象在使用之前必须要调用start初始化,初始化一次即可
                 * 注意:切记不可以在每次发送消息时,都调用start方法
                 */
                producer.start();
    
                //创建一个消息实例,包含 topic、tag 和 消息体
                //如下:topic 为 "demo",tag 为 "push"
                Message message = new Message("demo", "push", "发送消息----同步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
    
    //            目前RocketMQ只支持固定精度级别的定时消息,服务器按照1-N定义了如下级别:
    //            “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
    //            ;若要发送定时消息,在应用层初始化Message消息对象之后,
    //            调用setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s:
                message.setDelayTimeLevel(2);
    
    
                SendResult result = producer.send(message);
                System.out.println("发送同步响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
    
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.shutdown();
            }
        }
    
    
        /**
         * 异步发送
         */
        @PostConstruct
        public void AsyncProducer() {
            //生产者的组名
            DefaultMQProducer producer = new DefaultMQProducer(asyncProducerGroup);
            //指定NameServer地址,多个地址以 ; 隔开
            producer.setNamesrvAddr(namesrvAddr);
    
            try {
    
                producer.start();
                producer.setRetryTimesWhenSendAsyncFailed(0);
    
                Message msg = new Message("demo",
                        "push",
                        "发送消息----异步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //重点在这里 异步发送回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("发送异步响应:MsgId:" + sendResult.getMsgId() + ",发送状态:" + sendResult.getSendStatus());
                        producer.shutdown();
                    }
    
                    @Override
                    public void onException(Throwable e) {
                        e.printStackTrace();
                        producer.shutdown();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    
        }
    
    
        /**
         * onewag
         */
        @PostConstruct
        public void OnewayProducer() {
            //生产者的组名
            DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
            //指定NameServer地址,多个地址以 ; 隔开
            producer.setNamesrvAddr(namesrvAddr);
    
            try {
                /**
                 * Producer对象在使用之前必须要调用start初始化,初始化一次即可
                 * 注意:切记不可以在每次发送消息时,都调用start方法
                 */
                producer.start();
                
                Message message = new Message("demo", "push", "发送消息----单向信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
    
    
                producer.sendOneway(message);
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.shutdown();
            }
        }
    
    }
    
    # 消费者
    @Component
    public class RocketMQServer {
        /**
         * 消费者的组名
         */
        @Value("${apache.rocketmq.consumer.PushConsumer}")
        private String consumerGroup;
    
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQPushConsumer() {
            //消费者的组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
    
            //指定NameServer地址,多个地址以 ; 隔开
            consumer.setNamesrvAddr(namesrvAddr);
            try {
                //订阅PushTopic下Tag为push的消息
                consumer.subscribe("demo", "push");
    
                //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
                //如果非第一次启动,那么按照上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                    try {
                        for (MessageExt messageExt : list) {
    
                            System.out.println("messageExt: " + messageExt);//输出消息内容
    
                            String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
    
                            System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        // 接收失败重试
                        if (list.get(0).getReconsumeTimes() == 3){
                            // 重试3次
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                        }else {
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    rocketmq信息

    rocketmq信息

    rocketmq管理后台发送主题

    mq发送.png

    rocketmq管理后台发送主题的状态

    mq状态.png

    idea 接受mq

    mq接收.png

    rocketmq管理后台查看管理后台

    mq管理.png

    docker安装rocketmq

    相关文章

      网友评论

          本文标题:Mac SpringBoot RocketMQ 整合使用

          本文链接:https://www.haomeiwen.com/subject/ozxdzhtx.html