美文网首页我爱编程
数据库中间件 MyCAT 源码分析 —— SQL ON Mong

数据库中间件 MyCAT 源码分析 —— SQL ON Mong

作者: dagailv | 来源:发表于2017-07-20 11:41 被阅读0次

1. 概述

2. 主流程

3. 查询操作

4. 插入操作

5. 彩蛋

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

总体流程,让你有个整体的认识

查询操作

插入操作

彩蛋,😈彩蛋,🙂彩蛋

建议你看过这两篇文章(非必须):

《MyCAT 源码分析 —— 【单库单表】插入》

《MyCAT 源码分析 —— 【单库单表】查询》

2. 主流程

MyCAT Server接收MySQL Client基于MySQL协议的请求,翻译SQLMongoDB操作发送给MongoDB Server。

MyCAT Server接收MongoDB Server返回的MongoDB数据,翻译成MySQL数据结果返回给MySQL Client。

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1、查询 MongoDB

// MongoSQLParser.javapublicMongoDataquery()throwsMongoSQLException{if(!(statementinstanceofSQLSelectStatement)) {//return null;thrownewIllegalArgumentException("not a query sql statement");  }  MongoData mongo =newMongoData();  DBCursor c =null;  SQLSelectStatement selectStmt = (SQLSelectStatement) statement;  SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();inticount =0;if(sqlSelectQueryinstanceofMySqlSelectQueryBlock) {      MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();      BasicDBObject fields =newBasicDBObject();// 显示(返回)的字段for(SQLSelectItem item : mysqlSelectQuery.getSelectList()) {//System.out.println(item.toString());if(!(item.getExpr()instanceofSQLAllColumnExpr)) {if(item.getExpr()instanceofSQLAggregateExpr) {                  SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();if(expr.getMethodName().equals("COUNT")) {// TODO 待读:count(*)icount =1;                      mongo.setField(getExprFieldName(expr), Types.BIGINT);                  }                  fields.put(getExprFieldName(expr),1);              }else{                  fields.put(getFieldName(item),1);              }          }      }// 表名SQLTableSource table = mysqlSelectQuery.getFrom();      DBCollection coll =this._db.getCollection(table.toString());      mongo.setTable(table.toString());// WHERESQLExpr expr = mysqlSelectQuery.getWhere();      DBObject query = parserWhere(expr);// GROUP BYSQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();      BasicDBObject gbkey =newBasicDBObject();if(groupby !=null) {for(SQLExpr gbexpr : groupby.getItems()) {if(gbexprinstanceofSQLIdentifierExpr) {                  String name = ((SQLIdentifierExpr) gbexpr).getName();                  gbkey.put(name, Integer.valueOf(1));              }          }          icount =2;      }// SKIP / LIMITintlimitoff =0;intlimitnum =0;if(mysqlSelectQuery.getLimit() !=null) {          limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());          limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());      }if(icount ==1) {// COUNT(*)mongo.setCount(coll.count(query));      }elseif(icount ==2) {// MapReduceBasicDBObject initial =newBasicDBObject();          initial.put("num",0);          String reduce ="function (obj, prev) { "+"  prev.num++}";          mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));      }else{if((limitoff >0) || (limitnum >0)) {              c = coll.find(query, fields).skip(limitoff).limit(limitnum);          }else{              c = coll.find(query, fields);          }// order bySQLOrderBy orderby = mysqlSelectQuery.getOrderBy();if(orderby !=null) {              BasicDBObject order =newBasicDBObject();for(inti =0; i < orderby.getItems().size(); i++) {                  SQLSelectOrderByItem orderitem = orderby.getItems().get(i);                  order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));              }              c.sort(order);// System.out.println(order);}      }      mongo.setCursor(c);  }returnmongo;}

2、查询条件

// MongoSQLParser.javaprivatevoidparserWhere(SQLExpr aexpr, BasicDBObject o){if(aexprinstanceofSQLBinaryOpExpr) {      SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;      SQLExpr exprL = expr.getLeft();if(!(exprLinstanceofSQLBinaryOpExpr)) {if(expr.getOperator().getName().equals("=")) {              o.put(exprL.toString(), getExpValue(expr.getRight()));          }else{              String op ="";if(expr.getOperator().getName().equals("<")) {                  op ="$lt";              }elseif(expr.getOperator().getName().equals("<=")) {                  op ="$lte";              }elseif(expr.getOperator().getName().equals(">")) {                  op ="$gt";              }elseif(expr.getOperator().getName().equals(">=")) {                  op ="$gte";              }elseif(expr.getOperator().getName().equals("!=")) {                  op ="$ne";              }elseif(expr.getOperator().getName().equals("<>")) {                  op ="$ne";              }              parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));          }      }else{if(expr.getOperator().getName().equals("AND")) {              parserWhere(exprL, o);              parserWhere(expr.getRight(), o);          }elseif(expr.getOperator().getName().equals("OR")) {              orWhere(exprL, expr.getRight(), o);          }else{thrownewRuntimeException("Can't identify the operation of  of where");          }      }  }}privatevoidorWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob){  BasicDBObject xo =newBasicDBObject();  BasicDBObject yo =newBasicDBObject();  parserWhere(exprL, xo);  parserWhere(exprR, yo);  ob.put("$or",newObject[]{xo, yo});}

3、解析 MongoDB 数据

// MongoResultSet.javapublicMongoResultSet(MongoData mongo, String schema)throwsSQLException{this._cursor = mongo.getCursor();this._schema = schema;this._table = mongo.getTable();this.isSum = mongo.getCount() >0;this._sum = mongo.getCount();this.isGroupBy = mongo.getType();if(this.isGroupBy) {      dblist = mongo.getGrouyBys();this.isSum =true;  }if(this._cursor !=null) {      select = _cursor.getKeysWanted().keySet().toArray(newString[0]);// 解析 fieldsif(this._cursor.hasNext()) {          _cur = _cursor.next();if(_cur !=null) {if(select.length ==0) {                  SetFields(_cur.keySet());              }              _row =1;          }      }// 设置 fields 类型if(select.length ==0) {          select =newString[]{"_id"};          SetFieldType(true);      }else{          SetFieldType(false);      }  }else{      SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};SetFieldType(mongo.getFields());  }}

当使用SELECT *查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4、返回数据给 MySQL Client

// JDBCConnection.javaprivatevoidouputResultSet(ServerConnection sc, String sql)throwsSQLException{  ResultSet rs =null;  Statement stmt =null;try{      stmt = con.createStatement();      rs = stmt.executeQuery(sql);// headerList fieldPks =newLinkedList<>();      ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,this.isSpark);intcolunmCount = fieldPks.size();      ByteBuffer byteBuf = sc.allocate();      ResultSetHeaderPacket headerPkg =newResultSetHeaderPacket();      headerPkg.fieldCount = fieldPks.size();      headerPkg.packetId = ++packetId;      byteBuf = headerPkg.write(byteBuf, sc,true);      byteBuf.flip();byte[] header =newbyte[byteBuf.limit()];      byteBuf.get(header);      byteBuf.clear();      List fields =newArrayList(fieldPks.size());for(FieldPacket curField : fieldPks) {          curField.packetId = ++packetId;          byteBuf = curField.write(byteBuf, sc,false);          byteBuf.flip();byte[] field =newbyte[byteBuf.limit()];          byteBuf.get(field);          byteBuf.clear();          fields.add(field);      }// header eofEOFPacket eofPckg =newEOFPacket();      eofPckg.packetId = ++packetId;      byteBuf = eofPckg.write(byteBuf, sc,false);      byteBuf.flip();byte[] eof =newbyte[byteBuf.limit()];      byteBuf.get(eof);      byteBuf.clear();this.respHandler.fieldEofResponse(header, fields, eof,this);// rowwhile(rs.next()) {          RowDataPacket curRow =newRowDataPacket(colunmCount);for(inti =0; i < colunmCount; i++) {intj = i +1;if(MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {                  curRow.add(rs.getBytes(j));              }elseif(fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||                      fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL -256)) {// field type is unsigned byte// ensure that do not use scientific notation formatBigDecimal val = rs.getBigDecimal(j);                  curRow.add(StringUtil.encode(val !=null? val.toPlainString() :null, sc.getCharset()));              }else{                  curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));              }          }          curRow.packetId = ++packetId;          byteBuf = curRow.write(byteBuf, sc,false);          byteBuf.flip();byte[] row =newbyte[byteBuf.limit()];          byteBuf.get(row);          byteBuf.clear();this.respHandler.rowResponse(row,this);      }      fieldPks.clear();// row eofeofPckg =newEOFPacket();      eofPckg.packetId = ++packetId;      byteBuf = eofPckg.write(byteBuf, sc,false);      byteBuf.flip();      eof =newbyte[byteBuf.limit()];      byteBuf.get(eof);      sc.recycle(byteBuf);this.respHandler.rowEofResponse(eof,this);  }finally{if(rs !=null) {try{              rs.close();          }catch(SQLException e) {          }      }if(stmt !=null) {try{              stmt.close();          }catch(SQLException e) {          }      }  }}// MongoResultSet.java@OverridepublicStringgetString(String columnLabel)throwsSQLException{  Object x = getObject(columnLabel);if(x ==null) {returnnull;  }returnx.toString();}

当返回字段值是 Object 时,返回该对象.toString()。例如:

mysql>select*fromuserorderby_idasc;+--------------------------+------+-------------------------------+| _id                      | name | profile                      |+--------------------------+------+-------------------------------+|1|123| {"age":1,"height":100} |

4. 插入操作

// MongoSQLParser.javapublicintexecuteUpdate()throwsMongoSQLException{if(statementinstanceofSQLInsertStatement) {returnInsertData((SQLInsertStatement) statement);  }if(statementinstanceofSQLUpdateStatement) {returnUpData((SQLUpdateStatement) statement);  }if(statementinstanceofSQLDropTableStatement) {returndropTable((SQLDropTableStatement) statement);  }if(statementinstanceofSQLDeleteStatement) {returnDeleteDate((SQLDeleteStatement) statement);  }if(statementinstanceofSQLCreateTableStatement) {return1;  }return1;}privateintInsertData(SQLInsertStatement state){if(state.getValues().getValues().size() ==0) {thrownewRuntimeException("number of  columns error");  }if(state.getValues().getValues().size() != state.getColumns().size()) {thrownewRuntimeException("number of values and columns have to match");  }  SQLTableSource table = state.getTableSource();  BasicDBObject o =newBasicDBObject();inti =0;for(SQLExpr col : state.getColumns()) {      o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));      i++;  }  DBCollection coll =this._db.getCollection(table.toString());  coll.insert(o);return1;}

5. 彩蛋

1、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:multi_mongodb

2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:single_mongodb_mysql

3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:single_mongodb

相关文章

网友评论

    本文标题:数据库中间件 MyCAT 源码分析 —— SQL ON Mong

    本文链接:https://www.haomeiwen.com/subject/zfpdkxtx.html