美文网首页RocketMQ
RocketMQ实战入门

RocketMQ实战入门

作者: 肥兔子爱豆畜子 | 来源:发表于2021-12-31 14:10 被阅读0次
(一)安装与启动

下载rocketmq-4.8.0二进制包,启动name server和broker:

nohup sh /usr/rocketmq-4.8.0/bin/mqnamesrv > /dev/null 2>&1 & 
nohup sh /usr/rocketmq-4.8.0/bin/mqbroker -n localhost:9876 -c conf/broker.conf > /dev/null 2>&1 &

注:RocketMQ默认需求的jvm内存比较大,可以酌情减少,修改runserver.shrunbroker.sh两个启动脚本里边关于jvm内存参数的设置。

修改broker.conf配置文件,添加:

namesrvAddr=外网IP:9876
brokerIP1=外网IP

这样是为了能够使客户端能够远程的访问mq

停止mq:

./mqshutdown broker
./mqshutdown namesrv
(二)添加后台管理端

修改/etc/profile,添加环境变量:

export NAMESRV_ADDR=localhost:9876

然后source /etc/profile使配置环境变量生效。

https://github.com/apache/rocketmq-externals下载rocketmq-console项目源代码,修改配置文件application.properties启用web端登陆密码验证,并在users.properties里边配置ACL的用户和密码:

rocketmq.config.loginRequired=true #开启web端登录密码验证
# Define Admin
admin=password,1

# Define Users
#user1=user1
#user2=user2
user1=query

maven编译打包:

mvn clean package -Dmaven.test.skip=true

启动应用:

nohup java -jar rocketmq-console-ng-2.0.0.jar --server.port=19876 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 & 

访问地址: http://122.51.112.187:19876/#/login

(三)ACL访问控制列表

默认搞好的RocketMQ是任意客户端只要网络通的话都可以进行发布订阅的,而ACL机制可以使得mq对客户端的访问进行校验。mq服务端需要配置broker.confplain_acl.yml两个配置文件,前者是配置开启acl=true,后者是配置acl具体的账户与权限信息,权限信息支持设置的很细,具体到topic和group级别。

broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=访问IP:9876
brokerIP1=访问IP
aclEnable=true

plain_acl.yml

globalWhiteRemoteAddresses:

accounts:
- accessKey: wanganmq
  secretKey: rocket008
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: PUB|SUB
  defaultGroupPerm: PUB|SUB
  topicPerms:

  groupPerms:
  # the group should convert to retry topic

- accessKey: mqadmin
  secretKey: password
  whiteRemoteAddress:
  # if it is admin, it could access all resources
  admin: true
(四)实践心得

1、推荐开启ACL,然后admin账户交给管理员通过console和cli工具进行管理使用,应用中配置普通账户,这样方便进行管理。

2、一定要开发规范中约定好topic和tag,以及producer/consumer group这些个条目的命名规范,以及约定代表的含义。

例如,topic和tag首字母大写驼峰,分别表示业务域和业务域所发生的事件。用“应用名-producer-group”来命名生产者group名字,"业务处理名-consumer-group" 等等。 见名知意的命名能够提高团队开发效率。

3、跟其他开源中间件一样,了解底层实现原理永远都是对的。不仅出现问题能够搞定和排查,应用实践上也能更给出最佳实践。

4、关于每个Consumer启动多少个线程并发去消费。RocketMQ源码里写死默认是20个线程,用的是jdk线程池。可以调整。DefaultMQPushConsumer.setConsumeThreadMin(4)

附: 搞acl时的一个小插曲,RocketMQ消息发布报错:No accessKey is configured

No accessKey is configured 本来这个错误是很简单个错误,本意是mq上边开启了acl验证机制,而客户端没有配置accessKey。但是笔者这次遇到的这个问题比较有意思,废了我半天时间去研究。在这里记录一下。

当时笔者是先行自己封装好了一个rocket的工具库,考虑到安全问题,决定引入acl机制,然后主要是参照了网上的两个文章。配置mq的broker和plain_acl两个配置文件,客户端加入acl相关代码,主要是加入了AclRPCHook,在发送消息之前插入accessKey和签名供mq进行校验。之后用测试工程引用本地maven库上安装的rocket工具库进行测试,结果之前能用的代码在用了acl之后一直报错:提示accessKey没配置。

然后发现直接弄个带main方法的类运行例子里的代码是可以的,一样的代码copy到测试工程里就不行了。至此确认应该是客户端问题,mq的acl配置应该是对的。
接下来1天各种实验无果,开始回到起点,“源码之下无秘密”,潜心来看源代码,搞清楚客户端的mq发送消息以及acl的源码是怎么写的。

当调用DefaultMQProducer的send方法,同步的向mq投递消息的时候,实际上是defaultMQProducerImpl.send(msg),也即DefaultMQProducerImpl的sendKernelImpl方法,关键代码:sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage,里边其实是MQClientAPIImpl的如下方法:

private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response,addr);
    }

其中remotingClient是接口,其实现是netty NettyRemotingClient invokeSync(),后者会doBeforeRpcHooks(addr, request);

if (rpcHooks.size() > 0) {
            for (RPCHook rpcHook: rpcHooks) {
                rpcHook.doBeforeRequest(addr, request);
            }
        }

rpcHooks这里相当于是NettyRemotingClient的成员变量。

对于acl的DefaultMQProducer
DefaultMQProducer实例化的时候,是把rpcHook都给到了自己的defaultMQProducerImpl。

factory是在defaultMQProducerImpl.start()也就是DefaultMQProducer.start()的时候实例化的。

MQClientInstance是factory的实现,实例化的时候内部,

this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

然后factory提供这个mQClientAPIImpl实例。

defaultMQProducerImpl的start方法中实例化factory时,会判断如果生成过,就不再生成了。也就是说整个进程只会有一个factory实例。关键代码:

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

到这里,笔者搞清楚问题了,刚好工程里有另外一个transactionProducer的代码,是笔者用来测试事务消息的,没加acl代码,所以一旦先transactionProducer.start()创建了factory,那之后一直用这个无acl的,所以发送消息到mq的时候一直提示没有accessKey就是这个原因。

(五)Spring整合RocketMQ开发

编写自己的rocketutil开发包,然后maven install到本地maven库上。需要使用的gradle项目在repositories添加mavenLocal()

整体思路:使用集群消息模式,然后每个应用服务使用1个单例的GeneralMqProducer工具类,用于发送消息。再提供一个MQMsgHandler接口,用于应用服务自己编写接口实现:声明订阅消息topic和tag,收到订阅消息之后需要回调执行的业务逻辑。MQMsgHandler接口的实现类、也可以称为是消费者添加自定义注解MsgConsumer、通过Spring BeanPostProcessor机制加载到Spring容器。

消息Pojo:

/**
 *  用于在MQ中进行传输的事件消息
 * */
@Data
public class EventMessage {
    private String msgId; //消息的唯一标识
    private String producerGroup; //消息的生产者所属的分组
    private String topic;   //消息所属的主题(业务域)
    private String tag;     //消息tag,即该主题下的什么事件
    private String msgBody; //消息体, 一般为json string
    private String publishTime; //发布时间
    
}

生产者工具类:

/**
 *  每个应用有一个producer单例。
 *  producergroup表示消息由哪个应用集群投递。
 *  topic代表消息属于哪个业务域的消息,tag代表该业务域下的哪种事件发生了。
 *  msgId是消息的唯一标识
 * */
@Slf4j
@Component
public class GeneralMqProducer {
    
    @Value("${rocketmq.url}")
    private String mqurl;
    
    @Value("${rocketmq.accessKey}")
    private String accessKey;
    
    @Value("${rocketmq.secretKey}")
    private String secretKey;
    
    @Value("${rocketmq.producergroup.name}")
    private String producerGroupName;
    
    private DefaultMQProducer producer;
    
    /**
     *  同步发布消息
     * */
    public SendResult syncPublish(EventMessage eventMsg) {
        try {
            Message msg = new Message(eventMsg.getTopic(), eventMsg.getTag(), eventMsg.getMsgId(), eventMsg.getMsgBody().getBytes("utf-8"));
            
            SendResult sendResult = producer.send(msg);
            return sendResult;
        } catch (Exception e) {
            log.error("向RocketMQ发布消息失败:" + e.getMessage(), e);
            e.printStackTrace();
        }
        return null;
    }
    
    /**
     *  异步发布消息
     *  SendCallback为消息发送完毕后的回调方法
     * */
    public void asyncPublish(EventMessage eventMsg, SendCallback callBack) {
        try {
            eventMsg.setMsgId(UUID.randomUUID().toString().replaceAll("-",""));
            eventMsg.setProducerGroup(producerGroupName);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
            eventMsg.setPublishTime(LocalDateTime.now().format(formatter));
            
            Message msg = new Message(eventMsg.getTopic(),
                                      eventMsg.getTag(), 
                                      eventMsg.getMsgId(), 
                                      JSON.toJSONString(eventMsg).getBytes("utf-8"));
            
            producer.send(msg, callBack);
            log.info("消息已发布 msgID:{}, producer group:{}, topic:{}, tag:{}", eventMsg.getMsgId(), producerGroupName, eventMsg.getTopic(), eventMsg.getTag());
        } catch (Exception e) {
            log.error("向RocketMQ发布消息失败:" + e.getMessage(), e);
            e.printStackTrace();
        }
    }
    
    
    @PostConstruct
    public void init() {
        producer = new DefaultMQProducer(getAclRPCHook());
        producer.setProducerGroup(producerGroupName);
        producer.setNamesrvAddr(mqurl);
        try {
            producer.start();
            log.info("RocketMQ客户端producer初始化...");
        } catch (MQClientException e) {
            log.error("RocketMQ客户端producer初始化失败:" + e.getErrorMessage(), e);
        }
    }
    
    private RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }
}

消费者Handler接口:

/**
 *  消息消费者interface
 *  支持批量消费
 * */
public interface MQMsgHandler {
    public void handleMsg(List<EventMessage> eventMessages);
}

消费者注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface MsgConsumer {
    public String consumerGroup();
    public String topic();
    public String tag();
}

最后是将消费者Handler实现注入到Spring:

/**
 * regist a consumer to MQ for all MQMsgHandler implementations
 * 
 * */
@Slf4j
@Component
public class MQConsumersRegister implements BeanPostProcessor{
    
    @Value("${rocketmq.url}")
    private String mqurl;
    
    @Value("${rocketmq.accessKey}")
    private String accessKey;
    
    @Value("${rocketmq.secretKey}")
    private String secretKey;
    
    @Value("${rocketmq.consumeThreadCorePoolSize:20}")
    private int consumeThreadCorePoolSize;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        MsgConsumer msgConsumer = bean.getClass().getAnnotation(MsgConsumer.class);
        if(null != msgConsumer) { //如果有@MsgConsumer注解
            String consumerGroup = msgConsumer.consumerGroup();
            String topic = msgConsumer.topic();
            String tag = msgConsumer.tag();
            MQMsgHandler msgHandler = (MQMsgHandler) bean; //当前bean应是MQMsgHandler接口的实现
            
            registConsumer(msgHandler, consumerGroup, topic, tag);
            log.info("消费者组{}已订阅主题{}下{}事件" , consumerGroup, topic , tag );
        }
        return bean;
    }
    
    //注册consumer,并使其订阅相应的topic、tag
    private void registConsumer(MQMsgHandler msgHandler, String consumerGroup, String topic, String tag) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup, getAclRPCHook(), new AllocateMessageQueueAveragely());
        try {
            consumer.setNamesrvAddr(mqurl);
            consumer.setConsumeThreadMin(consumeThreadCorePoolSize);
            consumer.setConsumeThreadMax(consumeThreadCorePoolSize);
            consumer.setPullBatchSize(32); //一次长轮询最多从mq里拿多少个消息,默认32
            consumer.subscribe(topic, tag);
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    
                    List<EventMessage> eventMsgs = new ArrayList<>();
                    String msgContent = null;
                    
                    try {
                        for(MessageExt msg : msgs) {
                            msgContent = new String(msg.getBody(), "utf-8");
                            EventMessage eventMsg = JSON.parseObject(msgContent, EventMessage.class);
                            log.debug(JSON.toJSONString(eventMsg));
                            eventMsgs.add(eventMsg);
                        }
                        
                        msgHandler.handleMsg(eventMsgs);    //批量处理本次拉取的消息,执行业务逻辑
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        log.error("消息编码错误:" + e.getMessage(), e);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            consumer.start();
        }catch(Exception e) {
            log.error("注册消费者出错" + e.getMessage(), e);
            
        }
    }
    
    //Access Control List控制
    private RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }

}

具体使用的话,笔者在使用OpenResty+Redis+RocketMQ构建秒杀系统 - 简书 (jianshu.com)一文里实际已经用到了这个RocketMQ小工具库。用来做订单的异步入库。这里再贴一下代码:

/**
 * 异步写预约订单记录
 * */
private void sendAppointmentToMq(AppointmentDetail appointDetail) {
    String appointJson = JSON.toJSONString(appointDetail);
    EventMessage eventMsg = new EventMessage();
    eventMsg.setTopic("order");
    eventMsg.setTag("newOrder");
    eventMsg.setMsgBody(appointJson);
    
    generalMqProducer.asyncPublish(eventMsg, new SendCallback() {

        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("预约订单入库消息写入rocketmq成功,消息ID:{}", sendResult.getMsgId());
            
        }

        @Override
        public void onException(Throwable e) {
            //如果与mq通信故障了,那么可以从日志文件里找到预约记录,手工执行写入mysql
            log.error("预约订单写入rocketmq失败:{}, exception detail:{}" , appointJson , e.getMessage());
        }
        
    });
}

然后消费者是可以批量的对消息进行消费的:

@Slf4j
@Component
@MsgConsumer(consumerGroup = "newOrder-consumer-group", tag = "newOrder", topic = "order")
public class NewOrderMsgHandler implements MQMsgHandler{
    
    @Autowired
    private AppointmentDetailRepository appointmentDetailRepository;

    @Override
    public void handleMsg(List<EventMessage> eventMessages) {
        
        log.debug("收到mq消息: {}", JSON.toJSONString(eventMessages));
        
        List<AppointmentDetail> appointmentDetails = new ArrayList<>();
        for(EventMessage eventMsg : eventMessages) {
            AppointmentDetail appointmentDetail = JSON.parseObject(eventMsg.getMsgBody(), AppointmentDetail.class);
            appointmentDetails.add(appointmentDetail);
        }

        appointmentDetailRepository.saveAll(appointmentDetails); //批量入库
    }

}

总结一下,这个封装库就是典型的消息发布订阅模式:发消息用生产者工具类GeneralMqProducer的相应方法直接发就行了,订阅的话需要实现MQMsgHandler接口、在接口实现类上使用@MsgConsumer注解来声明订阅的topic和tag、重写handleMsg(eventMessages)方法来编写收到消息后的业务回调逻辑即可。

相关文章

  • rocketmq总目录

    实战 rocketmq最简单的入门demo rocketmq的常用概念,接口和方法 rocketmq的正式部署 高...

  • Apache RocketMQ从入门到实战

    《Apache RocketMQ从入门到实战》在 2017 年听到阿里巴巴将 RocketMQ 捐赠给 Apach...

  • RocketMQ

    RocketMQ实战(一)RocketMQ实战(二)RocketMQ实战(三):分布式事务RocketMQ实战(四...

  • RocketMQ实战入门

    (一)安装与启动 下载rocketmq-4.8.0二进制包,启动name server和broker: 注:Roc...

  • RocketMQ事务消息与分布式事务

    在《RocketMQ实战入门》[https://www.jianshu.com/p/1e91225c41a9]里我...

  • RocketMQ实战 - 快速入门

    RocketMQ 是阿里开源的(目前已捐赠给Apache了)一款高性能、高吞吐量的分布式消息中间件。 参考资料 十...

  • RocketMQ实战(三):分布式事务

    接 《RocketMQ实战(一)》,《RocketMQ实战(二)》,本篇博客主要讨论的话题是:顺序消费、RMQ在分...

  • RocketMQ实战(四)

    前言 这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表如下: 《RocketMQ实战(一)》 《Ro...

  • RocketMQ 实战之快速入门

    最近 RocketMQ 刚刚上生产环境,闲暇之时在这里做一些分享,主要目的是让初学者能快速上手RocketMQ。 ...

  • RocketMQ入门

    RocketMQ入门 1. RocketMQ简介 RocketMQ是阿里开源的消息中间件,它是纯java开发,具有...

网友评论

    本文标题:RocketMQ实战入门

    本文链接:https://www.haomeiwen.com/subject/rkbgqrtx.html