美文网首页
RocketMQ-分布式消息

RocketMQ-分布式消息

作者: 快点给我想个名 | 来源:发表于2019-06-24 23:04 被阅读0次
  • rocketMQ分布式事务架构设计


    rocketMQ分布式事务架构.png
    • 首先发送消息并异步执行本地事务(发送成功后,消息对消费端不可见)

    • 本地事务执行成功后,返回一个标识符
      COMMIT_MESSAGE,
      ROLLBACK_MESSAGE,
      UNKNOW;

    • 如果返回COMMIT_MESSAGE则改变MQ集群中的消息对消费端可见

    • 消费端消费消息

    • 如果第二步返回的消息为UNKNOW,则MQ集群不断重试,回调生产者的checkLocalTransaction方法。

    • producer

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        TransactionMQProducer producer = new TransactionMQProducer("test-transaction-producer");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("transaction-thread");
                return thread;
            }
        });

        producer.setNamesrvAddr("192.168.6.129:9876");
        producer.setExecutorService(executorService);
        //异步执行本地事务,异步本地事务回查
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);

        producer.start();
        
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", "TagA", "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, "executeLocalTransaction方法的参数二");
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 本地事务入库
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println(arg);
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 回调消息检查
     * @param msg Check message
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

相关文章

  • RocketMQ-分布式消息

    rocketMQ分布式事务架构设计rocketMQ分布式事务架构.png首先发送消息并异步执行本地事务(发送成功后...

  • RocketMQ-消息发送

    简介 本文通过问题入手,介绍下RocketMQ的消息发送逻辑是怎么样的。消息发送的大体逻辑图如下: 问题 首先我们...

  • RocketMQ-延时消息

    一、延时消息的使用 使用比较简单,指定message的DelayTimeLevel即可。示例代码如下: 目前roc...

  • RocketMQ-事务消息

    一、事务消息的引出 以购物场景为例,张三购买物品,账户扣款 100 元的同时,需要保证在下游的会员服务中给该账户增...

  • RocketMQ-异步消息

    异步消息 生产者

  • RocketMQ-广播消息

    广播消息 生产者 消费者

  • RocketMQ-顺序消息

    敬请期待!!

  • RocketMQ-普通消息

    一、摘要 默认消息发送超时时间为3s 默认消息发送是同步的发送模式,同步发送会发送1+重试次数,默认重试2,一共3...

  • RocketMQ-延迟消息

    延迟消息 消息发送到Broker后,要特定的事件才会被Consumer消费。 生产者 默认配置级别

  • RocketMQ-发送消息

    运行NameServer和Broker后,我们可以尝试用代码示例写一下:消息发送的三种方式 同步发送,适用场景:对...

网友评论

      本文标题:RocketMQ-分布式消息

      本文链接:https://www.haomeiwen.com/subject/qtwaqctx.html