-
rocketMQ分布式事务架构设计
rocketMQ分布式事务架构.png-
首先发送消息并异步执行本地事务(发送成功后,消息对消费端不可见)
-
本地事务执行成功后,返回一个标识符
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW; -
如果返回COMMIT_MESSAGE则改变MQ集群中的消息对消费端可见
-
消费端消费消息
-
如果第二步返回的消息为UNKNOW,则MQ集群不断重试,回调生产者的checkLocalTransaction方法。
-
producer
-
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("test-transaction-producer");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("transaction-thread");
return thread;
}
});
producer.setNamesrvAddr("192.168.6.129:9876");
producer.setExecutorService(executorService);
//异步执行本地事务,异步本地事务回查
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", "TagA", "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, "executeLocalTransaction方法的参数二");
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 本地事务入库
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(arg);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
/**
* 回调消息检查
* @param msg Check message
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
网友评论