本文分两部分:
第一部分讲解 从NIO获取sql命令到开始路由解析 的这段流程。该部分只是粗略讲解,以便调试代码的时候大约知道“我从何处来?”;
第二部分详细讲解路由解析的流程。
一、从NIO获取sql命令到开始路由解析
1、前端处理器的消费者
启动Skynet,SkynetServer类会开启4个前端处理器线程FrontEndHandlerRunner.java,4个是CPU核数。
for (int i = 0; i < system.getProcessorExecutor(); i++) {
businessExecutor.execute(new FrontEndHandlerRunnable(frontHandlerQueue));
}
从下图可以看出,前端处理器线程用来处理一个阻塞队列FrontHandlerQueue中的元素,是一个消费者,run()方法中是一个while(true)的死循环,当阻塞队列FrontHandlerQueue中没有值时,代码阻塞,有值时,向下执行,主要业务由handler.handle();去处理。
public class FrontEndHandlerRunnable implements Runnable {
private final BlockingQueue<FrontendCommandHandler> frontHandlerQueue;
public FrontEndHandlerRunnable(BlockingQueue<FrontendCommandHandler> frontHandlerQueue) {
this.frontHandlerQueue = frontHandlerQueue;
}
@Override
public void run() {
FrontendCommandHandler handler;
while (true) {
try {
handler = frontHandlerQueue.take();
.........
handler.handle();
.........
2、前端处理器的生产者
消费者有了,接下来我们去找生产者。回到SkynetServer.java中,显而易见,这是一个饿汉式单例,它有很多成员变量,在这里我们可以看到我们需要的前端处理器队列:
private BlockingQueue<FrontendCommandHandler> frontHandlerQueue;
结合前面的代码,我们可以发现,项目中任何给frontHandlerQueue队列添加元素的地方都是生产者,我们在代码中一直追寻frontHandlerQueue变量,最终到了FrontCommandHandler.handler(byte[] data)方法中,该方法的最后一行代码如下:
SkynetServer.getInstance().getFrontHandlerQueue().offer(this);
this为FrontCommandHandler的实例,显而易见,此处就是生产者生产对象,并放入前端处理队列的地方。
由此方法向上追溯,会找到NIOSocketWR.java的asyncRead()方法,它是负责读数据的方法,由RW.java这个负责读写数据的线程来调用。
管中窥豹,可见一斑。其实Skynet中几乎所有的生产者消费者模式都是如此,在SkynetServer.java这个单例中统一定义,根据它里面的引用都能找到其生产者,然后由多线程的处理器去消费。
3、获取sql语句和sql类型
上文提到FrontEndHandlerRunnable.java中的run()方法中调用了 handler.handle(),几乎所有的业务逻辑都由该方法处理,我们点进去继续看这个方法,一直到FrontendCommandHandler.java的handleData(byte[] data)方法,里面有许多的switch—case判断,如下图:
protected void handleData(byte[] data) {
.........
switch (data[4]) {
case MySQLPacket.COM_INIT_DB:
commands.doInitDB();
source.initDB(data);
break;
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
.........
handleData(byte[] data)方法的入参就是一个完成的数据包,我们取了data中的第5个字节来判断命令类型,之所以取第5个字节,这是根据mysql协议的约定的报文结构来的,在这里对mysql的报文结构做一个极简单回顾:
MySQL报文结构
如图,Mysql的报文结构前4个字节为消息头,其中前3个是字节合在一起说明本条消息的长度,第4个字节是序号,用于保证消息的顺序。从第5个字节往后为客户端正式的命令请求报文,而命令请求报文的第1个字节又为当前命令类型,如下图:
MySQL客户端命令请求报文
上述代码中有多个switch—case判断,我们只需关注case MySQLPacket.COM_QUERY,我们发出的大部分sql命令都会走这个分支,我们进入source.query(data)中,发现该方法中首次将数据解析成了sql语句:
public void query(byte[] data) {
// 取得语句
String sql = null;
try {
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
sql = mm.readString(charsetName.getClient());
} catch (UnsupportedEncodingException e) {
.........
this.query(sql);
}
追踪代码中 this.query(sql)方法,一直到ServerQueryHandler.java中的query(String sql):
public void query(String sql) {
.........
//通过字符串解析,获取sql语句的类型
int rs = ServerParse.parse(sql);
.........
int sqlType = rs & 0xff;
//处理注释
if (isWithHint) {
.........
c.execute(sql, rs & 0xff);
} else {
//根据sql类型,分发给不同的handler进行处理
switch (sqlType) {
case ServerParse.EXPLAIN:
ExplainHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.EXPLAIN2:
Explain2Handler.handle(sql, c, rs >>> 8);
break;
case ServerParse.DESCRIBE:
DescribeHandler.handle(sql, c);
break;
case ServerParse.SET:
SetHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.SHOW:
ShowHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.SELECT:
SelectHandler.handle(sql, c, rs >>> 8);
break;
.........
default:
.........
c.execute(sql, rs & 0xff);
}
}
}
该方法较长,篇幅原因,上面只截取了相对重要的部分,注释其实已经很清楚了,最终会交由各个sql类型的handler去处理,以常用的 select 类型为例,调用了SelectHandler.handle(sql, c, rs >>> 8),该方法最终还是调用ServerConnection.java的execute(String sql, int type)方法,殊途同归,其它种类的解析器也会除了特殊情况直接返回的,都会调用该方法去做路由解析。
网友评论