美文网首页Java 核心技术
生产级rocketMQ延时消息+redis去重+最好的序列化

生产级rocketMQ延时消息+redis去重+最好的序列化

作者: rs汀 | 来源:发表于2020-01-03 10:57 被阅读0次

    1.话不多说,先提问题(某互联网公司实际需求~~~~)

    一生成订单后如果一个小时没有打款,就自动撤单,并做出惩罚措施。
    • 本文所涉及技术RocketMQ版本:4.3.1 ,JDK1.8,protostuff版本1.1.3

    2.pom文件(部分)

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.3.1</version>
            </dependency>
            <dependency>
                <groupId>com.dyuproject.protostuff</groupId>
                <artifactId>protostuff-core</artifactId>
                <version>1.1.3</version>
            </dependency>
            <dependency>
                <groupId>com.dyuproject.protostuff</groupId>
                <artifactId>protostuff-runtime</artifactId>
                <version>1.1.3</version>
            </dependency>
    

    3.直接上代码!生产者

        @Autowired
        private NoticeService noticeService; // 封装的一个mq service类
        private RuntimeSchema<String> timeSchema = RuntimeSchema.createFrom(String.class);//序列化需要使用
        public void test() {
          String messages = buildMQMessage(merchantOrder.getOrderNo(), p.getInvoke(), p.getMethod(), Datas.BORROW);
          this.noticeService.delayNotice(messages, this.timeSchema, "OtcTimer", "timer", p.getTimeLevel());
        }
    ///
      //构建JSON消息体,有orderNo,需要定时结束执行的反射方法,当前方法(方便日志),类型(区分业务)
       protected String buildMQMessage(String orderNo, String invoke, String method, String type) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("orderNo", orderNo);
            jsonObject.put("invoke", invoke);
            jsonObject.put("method", method);
            jsonObject.put("type", type);
            return jsonObject.toJSONString();
        }
    //重点讲讲这个方法
    /**
    *messages:消息体
    schema:加一个缓冲区,加快序列化速度
    topic:根据topic找消费者
    tags:标签
    timeLevel:延迟等级messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (取小标为获取对应的延迟时间,也可自定义)
    */
        public <T> void delayNotice(T messages, RuntimeSchema<T> schema, String topic, String tags, Integer timeLevel) {
            String key = OtcUtil.createUUId();
            try {
                byte[] bytes = ProtostuffIOUtil.toByteArray(messages, schema,
                        LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));//加一个缓冲区,加快序列化速度
                Message message = new Message(topic, tags, key, bytes);//组建消息体
                message.setDelayTimeLevel(timeLevel);//设置等级(下标)
                SendResult sendResult = this.defaultMQProducer.send(message);//(发送)
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    
                }
            } catch (Exception e) {
                // TODO: handle exception
                logger.error("sendMq onException , key : " + key, e);
            }
          }
           
                     
    
    

    4.直接上代码!消费者

    @Component
    public class OtcTimerConsumer {
    
        private final static Logger logger = LoggerFactory.getLogger(OtcTimerConsumer.class);
    
        private RuntimeSchema<String> schema = RuntimeSchema.createFrom(String.class);
    
        @Autowired
        private RedisService redisService;//redis去重,防止重复消费
    
        @Qualifier("borrowProcessTimerService")
        @Autowired
        private ProcessTimerService borrowProcessTimerService;
    
        @EventListener(condition = "#event.topic == 'OtcTimer'")
        public void rocketmqMsgListen(DefaultMQCustomerEvent event) throws Exception {
            try {
                // 判断key是否存在,去重
                String key = event.getMsg().getKeys();
                Set<Object> set = this.redisService.getRepeat(key);
                if (set.size() > 0) {
                    return;
                }
                // 参数解析,反序列化解析参数
                String paramter = schema.newMessage();
                ProtostuffIOUtil.mergeFrom(event.getMsg().getBody(), paramter, schema);
                if (StringUtils.isEmpty(paramter)) {
                    throw new BusinessException(Codes.CODE_500, Messages.OTC_MQ_MESSAGE_ISNULL);
                }
                JSONObject jsonObject = JSONObject.parseObject(paramter);
                String orderNo = jsonObject.getString("orderNo");
                String method = jsonObject.getString("method");
                String invoke = jsonObject.getString("invoke");//需要反射的方法
                String type = jsonObject.getString("type");
                // 重点讲解
                //getMethod(需要执行的反射方法,方法里的参数类型)
                //invoke(需要执行的反射类,方法里的参数)
               if (type.equals(Datas.BORROW)) {
                    this.borrowProcessTimerService.getClass().getMethod(invoke, String.class, String.class, String.class)
                            .invoke(this.borrowProcessTimerService, orderNo, method, invoke);
                }
                // 将消费完的key放入缓存,去重
                this.redisService.setRepeat("Otc:Timer:" + orderNo + ":" + invoke, paramter,
                        Double.valueOf(System.nanoTime()));
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new BusinessException(Codes.CODE_500, e.getMessage());
            }
        }
    
    }
    

    5.直接上代码!反射方法,处理撤单,惩罚相关逻辑

    @Service
    public class BorrowProcessTimerService extends BorrowSuperService implements ProcessTimerService {
        @Transactional(rollbackFor = Exception.class)
        public void orderTimer(String orderNo, String method, String invoke) {
                // 具体逻辑
                // 我这里用到了事务,分布式锁==保证安全
                // 处理异常日志
            }
    }
    

    总结:
    (1)无需再轮询全部订单,效率高
    (2)一个订单,任务只执行一次
    (3)时效性好

    IMG_2423.JPG

    相关文章

      网友评论

        本文标题:生产级rocketMQ延时消息+redis去重+最好的序列化

        本文链接:https://www.haomeiwen.com/subject/swqzoctx.html