美文网首页
RocketMQ-分布式消息

RocketMQ-分布式消息

作者: 快点给我想个名 | 来源:发表于2019-06-24 23:04 被阅读0次
    • rocketMQ分布式事务架构设计


      rocketMQ分布式事务架构.png
      • 首先发送消息并异步执行本地事务(发送成功后,消息对消费端不可见)

      • 本地事务执行成功后,返回一个标识符
        COMMIT_MESSAGE,
        ROLLBACK_MESSAGE,
        UNKNOW;

      • 如果返回COMMIT_MESSAGE则改变MQ集群中的消息对消费端可见

      • 消费端消费消息

      • 如果第二步返回的消息为UNKNOW,则MQ集群不断重试,回调生产者的checkLocalTransaction方法。

      • producer

    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
    
            TransactionMQProducer producer = new TransactionMQProducer("test-transaction-producer");
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("transaction-thread");
                    return thread;
                }
            });
    
            producer.setNamesrvAddr("192.168.6.129:9876");
            producer.setExecutorService(executorService);
            //异步执行本地事务,异步本地事务回查
            TransactionListener transactionListener = new TransactionListenerImpl();
            producer.setTransactionListener(transactionListener);
    
            producer.start();
            
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg =
                        new Message("TopicTest1234", "TagA", "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.sendMessageInTransaction(msg, "executeLocalTransaction方法的参数二");
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            for (int i = 0; i < 100000; i++) {
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    }
    
    public class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        /**
         * 本地事务入库
         * @param msg Half(prepare) message
         * @param arg Custom business parameter
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println(arg);
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }
    
        /**
         * 回调消息检查
         * @param msg Check message
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    

    相关文章

      网友评论

          本文标题:RocketMQ-分布式消息

          本文链接:https://www.haomeiwen.com/subject/qtwaqctx.html