美文网首页
MyCAT源码分析(三)请求处理流程

MyCAT源码分析(三)请求处理流程

作者: 李亚林1990 | 来源:发表于2019-08-10 19:19 被阅读0次

    本篇将按请求处理的流程来分析MyCAT NIO网络通信框架。
    从请求的接收开始:
    1、NIOAcceptor 接收请求

        private void accept() {
            SocketChannel channel = null;
            try {
                channel = serverChannel.accept();
                channel.configureBlocking(false);
                FrontendConnection c = factory.make(channel);
                c.setAccepted(true);
                c.setId(ID_GENERATOR.getId());
                NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
                        .nextProcessor();
                c.setProcessor(processor);
                
                NIOReactor reactor = reactorPool.getNextReactor();
                reactor.postRegister(c);
    
            } catch (Exception e) {
                LOGGER.warn(getName(), e);
                closeChannel(channel);
            }
        }
    

    拆解:
    FrontendConnection c = factory.make(channel);
    调用ServerConnectionFactory.make封装SocketChannel为ServerConnection, 设置数据包handler处理器,创建当前连接的NonBlockingSession

    NIOReactor reactor = reactorPool.getNextReactor();
    reactor.postRegister(c);
    将前端连接分发到某一个reactor线程处理

    2、NIOReactor注册SelectionKey.OP_READ事件到selector,监听并处理

            private void register(Selector selector) {
                AbstractConnection c = null;
                if (registerQueue.isEmpty()) {
                    return;
                }
                while ((c = registerQueue.poll()) != null) {
                    try {
                        ((NIOSocketWR) c.getSocketWR()).register(selector);
                        c.register();
                    } catch (Exception e) {
                        c.close("register err" + e.toString());
                    }
                }
            }
    
            public void run() {
                final Selector selector = this.selector;
                Set<SelectionKey> keys = null;
                for (;;) {
                        selector.select(500L);
                        register(selector);
                        keys = selector.selectedKeys();
                        for (SelectionKey key : keys) {
                            AbstractConnection con = null;
                            Object att = key.attachment();
                            if (att != null) {
                                con = (AbstractConnection) att;
                                if (key.isValid() && key.isReadable()) {
                                        con.asynRead();
                                                            }
                                                     }
                                              }
                              }
                    }
    

    拆解:
    ((NIOSocketWR) c.getSocketWR()).register(selector);
    注册SelectionKey.OP_READ事件到selector

    c.register();
    同步发送握手数据包,并交给FrontendAuthenticator做前端认证处理;认证成功则设置下一个handler处理器FrontendCommandHandler,每一个前端handler都持有前端连接FrontendConnection

    con.asynRead();
    接收数据包,并交给FrontendCommandHandler处理

    3、FrontendCommandHandler

        @Override
        public void handle(byte[] data)
        {
            if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
            {
                MySQLMessage mm = new MySQLMessage(data);
                int  packetLength = mm.readUB3();
                if(packetLength+4==data.length)
                {
                    source.loadDataInfileData(data);
                }
                return;
            }
            switch (data[4])
            {
                case MySQLPacket.COM_INIT_DB:
                    commands.doInitDB();
                    source.initDB(data);
                    break;
                case MySQLPacket.COM_QUERY:
                    commands.doQuery();
                    source.query(data);
                    break;
                    。。。
                    。。。
             }
      }
    

    拆解:
    if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
    or
    switch (data[4])

    根据不同的请求类型进入不同的处理分支;
    以MySQLPacket.COM_QUERY为例调用FrontendQueryHandler处理

    4、ServerQueryHandler处理查询请求

        public void query(String sql) {
            
            ServerConnection c = this.source;
            int rs = ServerParse.parse(sql);
            int sqlType = rs & 0xff;
            
            switch (sqlType) {
            //explain sql
            case ServerParse.EXPLAIN:
                ExplainHandler.handle(sql, c, rs >>> 8);
                break;
            //explain2 datanode=? sql=?
            case ServerParse.EXPLAIN2:
                Explain2Handler.handle(sql, c, rs >>> 8);
                break;
            case ServerParse.SET:
                SetHandler.handle(sql, c, rs >>> 8);
                break;
            case ServerParse.SHOW:
                ShowHandler.handle(sql, c, rs >>> 8);
                break;
            case ServerParse.SELECT:
                SelectHandler.handle(sql, c, rs >>> 8);
                break;
                    。。。
                    。。。
             }
    

    拆解:
    根据不同的sql查询类型,调用不同的handler;
    以ServerParse.SELECT为例,调用SelectHandler,并传入ServerConnection

        public static void handle(String stmt, ServerConnection c, int offs) {
            int offset = offs;
            switch (ServerParseSelect.parse(stmt, offs)) {
            case ServerParseSelect.VERSION_COMMENT:
                SelectVersionComment.response(c);
                break;
            case ServerParseSelect.DATABASE:
                SelectDatabase.response(c);
                break;
            case ServerParseSelect.USER:
                SelectUser.response(c);
                break;
            default:
                c.execute(stmt, ServerParse.SELECT);
            }
            }
    

    拆解:
    查询请求进入分支c.execute(stmt, ServerParse.SELECT);
    最终调用ServerConnection. routeEndExecuteSQL;

    5、ServerConnection. routeEndExecuteSQL;

        public void routeEndExecuteSQL(String sql, int type, SchemaConfig schema) {
            // 路由计算
            RouteResultset rrs = null;
            try {
                rrs = MycatServer
                        .getInstance()
                        .getRouterservice()
                        .route(MycatServer.getInstance().getConfig().getSystem(),
                                schema, type, sql, this.charset, this);
    
            } catch (Exception e) {
                StringBuilder s = new StringBuilder();
                String msg = e.getMessage();
                writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
                return;
            }
            if (rrs != null) {
                // session执行
                session.execute(rrs, type);
            }
        }
    

    拆解:
    获取路由节点信息,进入session执行

    6、NonBlockingSession.execute

        public void execute(RouteResultset rrs, int type) {
            RouteResultsetNode[] nodes = rrs.getNodes();
            if (nodes.length == 1) {
                singleNodeHandler = new SingleNodeHandler(rrs, this);
                singleNodeHandler.execute();
            } else {
                multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this);
                multiNodeHandler.execute();
            }
        }
    

    拆解:
    根据路由信息分别进入单分片节点处理器和多分片节点处理器
    以SingleNodeHandler为例向下分析

    7、SingleNodeHandler.execute

        public void execute() throws Exception {
            startTime=System.currentTimeMillis();
            ServerConnection sc = session.getSource();
            this.isRunning = true;
            this.packetId = 0;
    //      final BackendConnection conn = session.getTarget(node);
            BackendConnection tConn = session.getTarget(node);
            if (session.getSource().isLocked()) {
                if (tConn == null) {
                    tConn = session.getLockedTarget(node); 
                }
            }
            final BackendConnection conn = tConn;
            node.setRunOnSlave(rrs.getRunOnSlave());    // 实现 master/slave注解        
            _execute(conn);
            }
    
        private void _execute(BackendConnection conn) {
            if (session.closed()) {
                endRunning();
                session.clearResources(true);
                return;
            }
            conn.setResponseHandler(this);
            try {
                conn.execute(node, session.getSource(), session.getSource()
                        .isAutocommit());
            } catch (Exception e1) {
                executeException(conn, e1);
                return;
            }
        }
    

    拆解:
    conn.setResponseHandler(this);
    设置MysqlConnection的SelectionKey.OP_READ事件处理器

    conn.execute(node, session.getSource(), session.getSource()
    .isAutocommit());
    调用MysqlConnection.synAndDoExecute同步发送查询数据包

    8、从MysqlConnection.setResponseHandler(this);逆向分析对分片节点返回数据包的处理;
    核心代码如下:

        public boolean setResponseHandler(ResponseHandler queryHandler) {
                ((MySQLConnectionHandler) handler).setResponseHandler(queryHandler);
                respHandler = queryHandler;
            }
    

    MySQLConnectionAuthenticator.handle;
    分片节点认证成功则设置handler为MySQLConnectionHandler

        @Override
        public void handle(byte[] data) {
                switch (data[4]) {
                case OkPacket.FIELD_COUNT:
                    HandshakePacket packet = source.getHandshake();
                    if (packet == null) {
                        processHandShakePacket(data);
                        // 发送认证数据包
                        source.authenticate();
                        break;
                    }
                    // 处理认证结果
                    source.setHandler(new MySQLConnectionHandler(source));
                    source.setAuthenticated(true);
                                    。。。
                                    。。。
                            }
            }
    

    MySQLConnectionFactory.make

        public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
                String schema) throws IOException {
    
            DBHostConfig dsc = pool.getConfig();
            NetworkChannel channel = openSocketChannel(MycatServer.getInstance()
                    .isAIO());
    
            MySQLConnection c = new MySQLConnection(channel, pool.isReadNode());
            MycatServer.getInstance().getConfig().setSocketParams(c, false);
            c.setHost(dsc.getIp());
            c.setPort(dsc.getPort());
            c.setUser(dsc.getUser());
            c.setPassword(dsc.getPassword());
            c.setSchema(schema);
            c.setHandler(new MySQLConnectionAuthenticator(c, handler));
            c.setPool(pool);
            c.setIdleTimeout(pool.getConfig().getIdleTimeout());
            if (channel instanceof AsynchronousSocketChannel) {
                ((AsynchronousSocketChannel) channel).connect(
                        new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
                        (CompletionHandler) MycatServer.getInstance()
                                .getConnector());
            } else {
                ((NIOConnector) MycatServer.getInstance().getConnector())
                        .postConnect(c);
    
            }
            return c;
        }
    

    拆解:
    c.setHandler(new MySQLConnectionAuthenticator(c, handler));
    设置MySQLConnectionAuthenticator为认证处理器;

    ((NIOConnector) MycatServer.getInstance().getConnector())
    .postConnect(c);
    将MySQLConnection注册到NIOConnector,并设置attr为c,如下:
    channel.register(selector, SelectionKey.OP_CONNECT, c);

    收到分片节点的查询结果的回调顺序为:

    MySQLConnectionHandler.handle --> ResponseHandler.okResponse || errorResponse
    --> SingleNodeHandler.session.getSource(); OkPacket.write(source)

    直到这里才算理清了请求的处理流程,以及前后端连接的关联模式。

    转载请备注原文链接。

    相关文章

      网友评论

          本文标题:MyCAT源码分析(三)请求处理流程

          本文链接:https://www.haomeiwen.com/subject/mpnrjctx.html