美文网首页
Mac SpringBoot RocketMQ 整合使用

Mac SpringBoot RocketMQ 整合使用

作者: Hahn_z | 来源:发表于2020-06-01 22:55 被阅读0次

下载构建

// 下载地址
https://rocketmq.apache.org/dowloading/releases
# 我这里是下bin版本 就是二进制版本 只需要解压zip文件
cd 指定解压目录

# 启动 NameServer
nohup sh bin/mqnamesrv &
# 启动 日志
tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功

1.png
# 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 启动 日志
tail -f ~/logs/rocketmqlogs/broker.log

启动成功

2
sh bin/mqshutdown broker    //停止 broker

sh bin/mqshutdown namesrv   //停止 nameserver

Rocketmq管理后台

# 下载地址
https://github.com/apache/rocketmq
# 用idea 打开rocketmq-console
# 下载依赖
# 打开http://localhost:8080/

打开成功

1.png

编写生产者和消费者

写配置

# application.properties

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 同步生产者的组名
apache.rocketmq.producer.syncProducerGroup=syncProducerGroup
# 异步生产者的组名
apache.rocketmq.producer.asyncProducerGroup=asyncProducerGroup
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876

生产者

@Component
public class RocketMQClient {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.syncProducerGroup}")
    private String syncProducerGroup;

    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.asyncProducerGroup}")
    private String asyncProducerGroup;


    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    /**
     * 同步发送
     */
    @PostConstruct
    public void SyncProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);
        // 同步发送消息重试次数,默认为 2
        producer.setRetryTimesWhenSendFailed(3);

        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();

            //创建一个消息实例,包含 topic、tag 和 消息体
            //如下:topic 为 "demo",tag 为 "push"
            Message message = new Message("demo", "push", "发送消息----同步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));

//            目前RocketMQ只支持固定精度级别的定时消息,服务器按照1-N定义了如下级别:
//            “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
//            ;若要发送定时消息,在应用层初始化Message消息对象之后,
//            调用setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s:
            message.setDelayTimeLevel(2);


            SendResult result = producer.send(message);
            System.out.println("发送同步响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }


    /**
     * 异步发送
     */
    @PostConstruct
    public void AsyncProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(asyncProducerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {

            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);

            Message msg = new Message("demo",
                    "push",
                    "发送消息----异步信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //重点在这里 异步发送回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送异步响应:MsgId:" + sendResult.getMsgId() + ",发送状态:" + sendResult.getSendStatus());
                    producer.shutdown();
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    producer.shutdown();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }


    }


    /**
     * onewag
     */
    @PostConstruct
    public void OnewayProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(syncProducerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();
            
            Message message = new Message("demo", "push", "发送消息----单向信息-----".getBytes(RemotingHelper.DEFAULT_CHARSET));


            producer.sendOneway(message);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }

}
# 消费者
@Component
public class RocketMQServer {
    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;


    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("demo", "push");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        System.out.println("messageExt: " + messageExt);//输出消息内容

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    // 接收失败重试
                    if (list.get(0).getReconsumeTimes() == 3){
                        // 重试3次
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                    }else {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

rocketmq信息

rocketmq信息

rocketmq管理后台发送主题

mq发送.png

rocketmq管理后台发送主题的状态

mq状态.png

idea 接受mq

mq接收.png

rocketmq管理后台查看管理后台

mq管理.png

docker安装rocketmq

相关文章

网友评论

      本文标题:Mac SpringBoot RocketMQ 整合使用

      本文链接:https://www.haomeiwen.com/subject/ozxdzhtx.html