本篇我们以Transaction类消息来了解客户端消息发送。
备注:
Cat版本:v3.0.0
github:https://github.com/dianping/cat
代码示例:
Transaction t = Cat.getProducer().newTransaction("facePlus", "idCard");
t.setStatus(Transaction.SUCCESS);
OR
t.setStatus(exception);
t.complete();
MessageProducer对业务封装了CAT内部的所有细节,业务方只需要一个MessageProducer对象就可以完成消息的所有操作。
涉及的核心处理类:
image.png
一、消息创建
下面通过 newTransaction 的源码来分析 Transaction消息的创建过程 。
class DefaultMessageProducer {
public Transaction newTransaction(String type, String name) {
if (!m_manager.hasContext()) {
m_manager.setup();
}
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
m_manager.start(transaction, false);
return transaction;
}
}
首先通过消息管理者MassageManager判断是否存在消息上下文context,如果不存在则在setup中创建消息上下文。
消息上下文 Context 采用的是线程本地变量。通过ThreadLocal存取Context数据。
public class DefaultMessageManager extends ContainerHolder implements MessageManager {
private ThreadLocal<Context> m_context = new ThreadLocal<Context>();
@Override
public void setup() {
Context ctx;
if (m_domain != null) {
ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
} else {
ctx = new Context("Unknown", m_hostName, "");
}
double samplingRate = m_configManager.getSampleRatio();
if (samplingRate < 1.0 && hitSample(samplingRate)) {
ctx.m_tree.setHitSample(true);
}
m_context.set(ctx);
}
}
在Context构造函数里,我们看到了消息树MessageTree和Transaction栈被创建了,由于Context是线程本地变量,由此可以推断,每个线程都拥有各自的消息树和事务栈,这里所说的线程都是业务线程,Context属于MessageManager的内部类。可以认为MessageManager的其中一个功能是作为context的一个代理,MessageManager的start、add、end等方法,核心都是调用当前线程context的start、add、end方法。
class Context {
public Context(String domain, String hostName, String ipAddress) {
m_tree = new DefaultMessageTree();
m_stack = new Stack<Transaction>();
Thread thread = Thread.currentThread();
String groupName = thread.getThreadGroup().getName();
m_tree.setThreadGroupName(groupName);
m_tree.setThreadId(String.valueOf(thread.getId()));
...
}
}
接着MessageProducer就会创建一个Transation对象,然后将Transaction对象交给 MessageManager启动,我们通过下面源码看看的启动流程,最关键的启动步骤是调用ctx.start(transactionm forked) 完成的。
public class DefaultMessageManager extends ContainerHolder implements MessageManager, Initializable, LogEnabled {
@Override
public void start(Transaction transaction, boolean forked) {
Context ctx = getContext();
if (ctx != null) {
ctx.start(transaction, forked);
...
}
class Context {
private MessageTree m_tree;
private Stack<Transaction> m_stack;
public void start(Transaction transaction, boolean forked) {
if (!m_stack.isEmpty()) {
if (!(transaction instanceof ForkedTransaction)) {
Transaction parent = m_stack.peek();
addTransactionChild(transaction, parent);
}
} else {
m_tree.setMessage(transaction);
}
if (!forked) {
m_stack.push(transaction);
}
}
}
}
}
Context.start()方法逻辑:
如果 m_stack 不为空, 而且 transaction 类型不为 ForkedTransaction,当前 transaction 加到 m_stack 栈顶元素的子消息中去。
如果m_stack为空,就把当前这个Transaction加到MessageTree里面。
最后判断 transaction 是否是forked的事务,不是则将transaction加入 m_stack 。
二、消息发送
Transaction.complete();开始消息发送,上代码:
class DefaultTransaction {
@Override
public void complete() {
if (m_durationInMicro == -1) {
m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;
}
setCompleted(true);
if (m_manager != null) {
//消息上报
m_manager.end(this);
}
}
}
首先计算失误耗时,m_manager.end()最终调用Context.end()方法
public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.pop();
if (transaction == current) {
m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
} else {
while (transaction != current && !m_stack.empty()) {
m_validator.validate(m_stack.peek(), current);
current = m_stack.pop();
}
}
if (m_stack.isEmpty()) {
MessageTree tree = m_tree.copy();
m_tree.setMessageId(null);
m_tree.setMessage(null);
if (m_totalDurationInMicros > 0) {
adjustForTruncatedTransaction((Transaction) tree.getMessage());
}
manager.flush(tree, true);
return true;
}
}
return false;
}
代码逻辑:
从事务消息栈弹出事务直到匹配到当前结束的事务(主事务在栈底),如果消息栈为空则当前事务为主事务,调用manager.flush发送当前事务消息。
class DefaultTransaction {
class DefaultMessageManager {
public void flush(MessageTree tree, boolean clearContext) {
MessageSender sender = m_transportManager.getSender();
sender.send(tree);
}
}
}
MessageSender将消息放入消息队列异步发送。
网友评论