diff --git a/src/main/java/io/mycat/route/impl/DruidMycatRouteStrategy.java b/src/main/java/io/mycat/route/impl/DruidMycatRouteStrategy.java index ba33f1e0c..9f6a1684f 100644 --- a/src/main/java/io/mycat/route/impl/DruidMycatRouteStrategy.java +++ b/src/main/java/io/mycat/route/impl/DruidMycatRouteStrategy.java @@ -1,756 +1,767 @@ -package io.mycat.route.impl; - -import java.sql.SQLNonTransientException; -import java.sql.SQLSyntaxErrorException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.druid.sql.SQLUtils; -import com.alibaba.druid.sql.ast.SQLObject; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.ast.expr.SQLAllExpr; -import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; -import com.alibaba.druid.sql.ast.expr.SQLExistsExpr; -import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; -import com.alibaba.druid.sql.ast.expr.SQLInSubQueryExpr; -import com.alibaba.druid.sql.ast.expr.SQLQueryExpr; -import com.alibaba.druid.sql.ast.statement.SQLDeleteStatement; -import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; -import com.alibaba.druid.sql.ast.statement.SQLInsertStatement; -import com.alibaba.druid.sql.ast.statement.SQLSelect; -import com.alibaba.druid.sql.ast.statement.SQLSelectQuery; -import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; -import com.alibaba.druid.sql.ast.statement.SQLTableSource; -import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlReplaceStatement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; -import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; -import com.alibaba.druid.sql.parser.SQLStatementParser; -import com.alibaba.druid.stat.TableStat.Relationship; -import com.google.common.base.Strings; - -import io.mycat.MycatServer; -import io.mycat.backend.mysql.nio.handler.MiddlerQueryResultHandler; -import io.mycat.backend.mysql.nio.handler.MiddlerResultHandler; -import io.mycat.backend.mysql.nio.handler.SecondHandler; -import io.mycat.cache.LayerCachePool; -import io.mycat.config.ErrorCode; -import io.mycat.config.model.SchemaConfig; -import io.mycat.config.model.TableConfig; -import io.mycat.config.model.rule.RuleConfig; -import io.mycat.route.RouteResultset; -import io.mycat.route.RouteResultsetNode; -import io.mycat.route.function.SlotFunction; -import io.mycat.route.impl.middlerResultStrategy.BinaryOpResultHandler; -import io.mycat.route.impl.middlerResultStrategy.InSubQueryResultHandler; -import io.mycat.route.impl.middlerResultStrategy.RouteMiddlerReaultHandler; -import io.mycat.route.impl.middlerResultStrategy.SQLAllResultHandler; -import io.mycat.route.impl.middlerResultStrategy.SQLExistsResultHandler; -import io.mycat.route.impl.middlerResultStrategy.SQLQueryResultHandler; -import io.mycat.route.parser.druid.DruidParser; -import io.mycat.route.parser.druid.DruidParserFactory; -import io.mycat.route.parser.druid.DruidShardingParseInfo; -import io.mycat.route.parser.druid.MycatSchemaStatVisitor; -import io.mycat.route.parser.druid.MycatStatementParser; -import io.mycat.route.parser.druid.RouteCalculateUnit; -import io.mycat.route.parser.util.ParseUtil; -import io.mycat.route.util.RouterUtil; -import io.mycat.server.NonBlockingSession; -import io.mycat.server.ServerConnection; -import io.mycat.server.parser.ServerParse; - -public class DruidMycatRouteStrategy extends AbstractRouteStrategy { - - public static final Logger LOGGER = LoggerFactory.getLogger(DruidMycatRouteStrategy.class); - - private static Map,RouteMiddlerReaultHandler> middlerResultHandler = new HashMap<>(); - - static{ - middlerResultHandler.put(SQLQueryExpr.class, new SQLQueryResultHandler()); - middlerResultHandler.put(SQLBinaryOpExpr.class, new BinaryOpResultHandler()); - middlerResultHandler.put(SQLInSubQueryExpr.class, new InSubQueryResultHandler()); - middlerResultHandler.put(SQLExistsExpr.class, new SQLExistsResultHandler()); - middlerResultHandler.put(SQLAllExpr.class, new SQLAllResultHandler()); - } - - - @Override - public RouteResultset routeNormalSqlWithAST(SchemaConfig schema, - String stmt, RouteResultset rrs,String charset, - LayerCachePool cachePool,int sqlType,ServerConnection sc) throws SQLNonTransientException { - - /** - * 只有mysql时只支持mysql语法 - */ - SQLStatementParser parser = null; - if (schema.isNeedSupportMultiDBType()) { - parser = new MycatStatementParser(stmt); - } else { - parser = new MySqlStatementParser(stmt); - } - - MycatSchemaStatVisitor visitor = null; - SQLStatement statement; - - /** - * 解析出现问题统一抛SQL语法错误 - */ - try { - statement = parser.parseStatement(); - visitor = new MycatSchemaStatVisitor(); - } catch (Exception t) { - LOGGER.error("DruidMycatRouteStrategyError", t); - throw new SQLSyntaxErrorException(t); - } - - /** - * 检验unsupported statement - */ - checkUnSupportedStatement(statement); - - DruidParser druidParser = DruidParserFactory.create(schema, statement, visitor); - druidParser.parser(schema, rrs, statement, stmt,cachePool,visitor); - DruidShardingParseInfo ctx= druidParser.getCtx() ; - rrs.setTables(ctx.getTables()); - - if(visitor.isSubqueryRelationOr()){ - String err = "In subQuery,the or condition is not supported."; - LOGGER.error(err); - throw new SQLSyntaxErrorException(err); - } - - /* 按照以下情况路由 - 1.2.1 可以直接路由. - 1.2.2 两个表夸库join的sql.调用calat - 1.2.3 需要先执行subquery 的sql.把subquery拆分出来.获取结果后,与outerquery - */ - - //add huangyiming 分片规则不一样的且表中带查询条件的则走Catlet - List tables = ctx.getTables(); - SchemaConfig schemaConf = MycatServer.getInstance().getConfig().getSchemas().get(schema.getName()); - int index = 0; - RuleConfig firstRule = null; - boolean directRoute = true; - Set firstDataNodes = new HashSet(); - Map tconfigs = schemaConf==null?null:schemaConf.getTables(); - - Map rulemap = new HashMap<>(); - if(tconfigs!=null){ - for(String tableName : tables){ - TableConfig tc = tconfigs.get(tableName); - if(tc == null){ - //add 别名中取 - Map tableAliasMap = ctx.getTableAliasMap(); - if(tableAliasMap !=null && tableAliasMap.get(tableName) !=null){ - tc = schemaConf.getTables().get(tableAliasMap.get(tableName)); - } - } - - if(index == 0){ - if(tc !=null){ - firstRule= tc.getRule(); - //没有指定分片规则时,不做处理 - if(firstRule==null){ - continue; - } - firstDataNodes.addAll(tc.getDataNodes()); - rulemap.put(tc.getName(), firstRule); - } - }else{ - if(tc !=null){ - //ER关系表的时候是可能存在字表中没有tablerule的情况,所以加上判断 - RuleConfig ruleCfg = tc.getRule(); - if(ruleCfg==null){ //没有指定分片规则时,不做处理 - continue; - } - Set dataNodes = new HashSet(); - dataNodes.addAll(tc.getDataNodes()); - rulemap.put(tc.getName(), ruleCfg); - //如果匹配规则不相同或者分片的datanode不相同则需要走子查询处理 - if(firstRule!=null&&((ruleCfg !=null && !ruleCfg.getRuleAlgorithm().equals(firstRule.getRuleAlgorithm()) )||( !dataNodes.equals(firstDataNodes)))){ - directRoute = false; - break; - } - } - } - index++; - } - } - - RouteResultset rrsResult = rrs; - if(directRoute){ //直接路由 - if(!RouterUtil.isAllGlobalTable(ctx, schemaConf)){ - if(rulemap.size()>1&&!checkRuleField(rulemap,visitor)){ - String err = "In case of slice table,there is no rule field in the relationship condition!"; - LOGGER.error(err); - throw new SQLSyntaxErrorException(err); - } - } - rrsResult = directRoute(rrs,ctx,schema,druidParser,statement,cachePool); - }else{ - int subQuerySize = visitor.getSubQuerys().size(); - if(subQuerySize==0&&ctx.getTables().size()==2){ //两表关联,考虑使用catlet - if(!visitor.getRelationships().isEmpty()){ - rrs.setCacheAble(false); - rrs.setFinishedRoute(true); - rrsResult = catletRoute(schema,ctx.getSql(),charset,sc); - }else{ - rrsResult = directRoute(rrs,ctx,schema,druidParser,statement,cachePool); - } - }else if(subQuerySize==1){ //只涉及一张表的子查询,使用 MiddlerResultHandler 获取中间结果后,改写原有 sql 继续执行 TODO 后期可能会考虑多个子查询的情况. - SQLSelect sqlselect = visitor.getSubQuerys().iterator().next(); - if(!visitor.getRelationships().isEmpty()){ // 当 inner query 和 outer query 有关联条件时,暂不支持 - String err = "In case of slice table,sql have different rules,the relationship condition is not supported."; - LOGGER.error(err); - throw new SQLSyntaxErrorException(err); - }else{ - SQLSelectQuery sqlSelectQuery = sqlselect.getQuery(); - if(((MySqlSelectQueryBlock)sqlSelectQuery).getFrom() instanceof SQLExprTableSource) { - rrs.setCacheAble(false); - rrs.setFinishedRoute(true); - rrsResult = middlerResultRoute(schema,charset,sqlselect,sqlType,statement,sc); - } - } - }else if(subQuerySize >=2){ - String err = "In case of slice table,sql has different rules,currently only one subQuery is supported."; - LOGGER.error(err); - throw new SQLSyntaxErrorException(err); - } - } - return rrsResult; - } - - /** - * 子查询中存在关联查询的情况下,检查关联字段是否是分片字段 - * @param rulemap - * @param ships - * @return - */ - private boolean checkRuleField(Map rulemap,MycatSchemaStatVisitor visitor){ - - if(!MycatServer.getInstance().getConfig().getSystem().isSubqueryRelationshipCheck()){ - return true; - } - - Set ships = visitor.getRelationships(); - Iterator iter = ships.iterator(); - while(iter.hasNext()){ - Relationship ship = iter.next(); - String lefttable = ship.getLeft().getTable().toUpperCase(); - String righttable = ship.getRight().getTable().toUpperCase(); - // 如果是同一个表中的关联条件,不做处理 - if(lefttable.equals(righttable)){ - return true; - } - RuleConfig leftconfig = rulemap.get(lefttable); - RuleConfig rightconfig = rulemap.get(righttable); - - if(null!=leftconfig&&null!=rightconfig - &&leftconfig.equals(rightconfig) - &&leftconfig.getColumn().equals(ship.getLeft().getName().toUpperCase()) - &&rightconfig.getColumn().equals(ship.getRight().getName().toUpperCase())){ - return true; - } - } - return false; - } - - private RouteResultset middlerResultRoute(final SchemaConfig schema,final String charset,final SQLSelect sqlselect, - final int sqlType,final SQLStatement statement,final ServerConnection sc){ - - final String middlesql = SQLUtils.toMySqlString(sqlselect); - - MiddlerResultHandler middlerResultHandler = new MiddlerQueryResultHandler<>(new SecondHandler() { - @Override - public void doExecute(List param) { - sc.getSession2().setMiddlerResultHandler(null); - String sqls = null; - // 路由计算 - RouteResultset rrs = null; - try { - - sqls = buildSql(statement,sqlselect,param); - rrs = MycatServer - .getInstance() - .getRouterservice() - .route(MycatServer.getInstance().getConfig().getSystem(), - schema, sqlType,sqls.toLowerCase(), charset,sc ); - - } catch (Exception e) { - StringBuilder s = new StringBuilder(); - LOGGER.warn(s.append(this).append(sqls).toString() + " err:" + e.toString(),e); - String msg = e.getMessage(); - sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); - return; - } - NonBlockingSession noBlockSession = new NonBlockingSession(sc.getSession2().getSource()); - noBlockSession.setMiddlerResultHandler(null); - //session的预编译标示传递 - noBlockSession.setPrepared(sc.getSession2().isPrepared()); - if (rrs != null) { - noBlockSession.setCanClose(false); - noBlockSession.execute(rrs, ServerParse.SELECT); - } - } - } ); - sc.getSession2().setMiddlerResultHandler(middlerResultHandler); - sc.getSession2().setCanClose(false); - - // 路由计算 - RouteResultset rrs = null; - try { - rrs = MycatServer - .getInstance() - .getRouterservice() - .route(MycatServer.getInstance().getConfig().getSystem(), - schema, ServerParse.SELECT, middlesql, charset, sc); - - } catch (Exception e) { - StringBuilder s = new StringBuilder(); - LOGGER.warn(s.append(this).append(middlesql).toString() + " err:" + e.toString(),e); - String msg = e.getMessage(); - sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); - return null; - } - - if(rrs!=null){ - rrs.setCacheAble(false); - } - return rrs; - } - - /** - * 获取子查询执行结果后,改写原始sql 继续执行. - * @param statement - * @param sqlselect - * @param param - * @return - */ - private String buildSql(SQLStatement statement,SQLSelect sqlselect,List param){ - - SQLObject parent = sqlselect.getParent(); - RouteMiddlerReaultHandler handler = middlerResultHandler.get(parent.getClass()); - if(handler==null){ - throw new UnsupportedOperationException(parent.getClass()+" current is not supported "); - } - return handler.dohandler(statement, sqlselect, parent, param); - } - - /** - * 两个表的情况,catlet - * @param schema - * @param stmt - * @param charset - * @param sc - * @return - */ - private RouteResultset catletRoute(SchemaConfig schema,String stmt,String charset,ServerConnection sc){ - RouteResultset rrs = null; - try { - rrs = MycatServer - .getInstance() - .getRouterservice() - .route(MycatServer.getInstance().getConfig().getSystem(), - schema, ServerParse.SELECT, "/*!mycat:catlet=io.mycat.catlets.ShareJoin */ "+stmt, charset, sc); - - }catch(Exception e){ - - } - return rrs; - } - - /** - * 直接结果路由 - * @param rrs - * @param ctx - * @param schema - * @param druidParser - * @param statement - * @param cachePool - * @return - * @throws SQLNonTransientException - */ - private RouteResultset directRoute(RouteResultset rrs,DruidShardingParseInfo ctx,SchemaConfig schema, - DruidParser druidParser,SQLStatement statement,LayerCachePool cachePool) throws SQLNonTransientException{ - - //改写sql:如insert语句主键自增长, 在直接结果路由的情况下,进行sql 改写处理 - druidParser.changeSql(schema, rrs, statement,cachePool); - - /** - * DruidParser 解析过程中已完成了路由的直接返回 - */ - if ( rrs.isFinishedRoute() ) { - return rrs; - } - - /** - * 没有from的select语句或其他 - */ - if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty())) - { - return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), druidParser.getCtx().getSql()); - } - - if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) { - RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit(); - druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit); - } - - SortedSet nodeSet = new TreeSet(); - boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema); - for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) { - RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool); - if(rrsTmp != null&&rrsTmp.getNodes()!=null) { - for(RouteResultsetNode node :rrsTmp.getNodes()) { - nodeSet.add(node); - } - } - if(isAllGlobalTable) {//都是全局表时只计算一遍路由 - break; - } - } - - RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()]; - int i = 0; - for (RouteResultsetNode aNodeSet : nodeSet) { - nodes[i] = aNodeSet; - if(statement instanceof MySqlInsertStatement &&ctx.getTables().size()==1&&schema.getTables().containsKey(ctx.getTables().get(0))) { - RuleConfig rule = schema.getTables().get(ctx.getTables().get(0)).getRule(); - if(rule!=null&& rule.getRuleAlgorithm() instanceof SlotFunction){ - aNodeSet.setStatement(ParseUtil.changeInsertAddSlot(aNodeSet.getStatement(),aNodeSet.getSlot())); - } - } - i++; - } - rrs.setNodes(nodes); - - //分表 - /** - * subTables="t_order$1-2,t_order3" - *目前分表 1.6 开始支持 幵丏 dataNode 在分表条件下只能配置一个,分表条件下不支持join。 - */ - if(rrs.isDistTable()){ - return this.routeDisTable(statement,rrs); - } - return rrs; - } - - private SQLExprTableSource getDisTable(SQLTableSource tableSource,RouteResultsetNode node) throws SQLSyntaxErrorException{ - if(node.getSubTableName()==null){ - String msg = " sub table not exists for " + node.getName() + " on " + tableSource; - LOGGER.error("DruidMycatRouteStrategyError " + msg); - throw new SQLSyntaxErrorException(msg); - } - - SQLIdentifierExpr sqlIdentifierExpr = new SQLIdentifierExpr(); - sqlIdentifierExpr.setParent(tableSource.getParent()); - sqlIdentifierExpr.setName(node.getSubTableName()); - SQLExprTableSource from2 = new SQLExprTableSource(sqlIdentifierExpr); - return from2; - } - - private RouteResultset routeDisTable(SQLStatement statement, RouteResultset rrs) throws SQLSyntaxErrorException{ - SQLTableSource tableSource = null; - if(statement instanceof SQLInsertStatement) { - SQLInsertStatement insertStatement = (SQLInsertStatement) statement; - tableSource = insertStatement.getTableSource(); - for (RouteResultsetNode node : rrs.getNodes()) { - SQLExprTableSource from2 = getDisTable(tableSource, node); - insertStatement.setTableSource(from2); - node.setStatement(insertStatement.toString()); - } - } - if(statement instanceof SQLDeleteStatement) { - SQLDeleteStatement deleteStatement = (SQLDeleteStatement) statement; - tableSource = deleteStatement.getTableSource(); - for (RouteResultsetNode node : rrs.getNodes()) { - SQLExprTableSource from2 = getDisTable(tableSource, node); - deleteStatement.setTableSource(from2); - node.setStatement(deleteStatement.toString()); - } - } - if(statement instanceof SQLUpdateStatement) { - SQLUpdateStatement updateStatement = (SQLUpdateStatement) statement; - tableSource = updateStatement.getTableSource(); - for (RouteResultsetNode node : rrs.getNodes()) { - SQLExprTableSource from2 = getDisTable(tableSource, node); - updateStatement.setTableSource(from2); - node.setStatement(updateStatement.toString()); - } - } - - return rrs; - } - - /** - * SELECT 语句 - */ - private boolean isSelect(SQLStatement statement) { - if(statement instanceof SQLSelectStatement) { - return true; - } - return false; - } - - /** - * 检验不支持的SQLStatement类型 :不支持的类型直接抛SQLSyntaxErrorException异常 - * @param statement - * @throws SQLSyntaxErrorException - */ - private void checkUnSupportedStatement(SQLStatement statement) throws SQLSyntaxErrorException { - //不支持replace语句 - if(statement instanceof MySqlReplaceStatement) { - throw new SQLSyntaxErrorException(" ReplaceStatement can't be supported,use insert into ...on duplicate key update... instead "); - } - } - - /** - * 分析 SHOW SQL - */ - @Override - public RouteResultset analyseShowSQL(SchemaConfig schema, - RouteResultset rrs, String stmt) throws SQLSyntaxErrorException { - - String upStmt = stmt.toUpperCase(); - int tabInd = upStmt.indexOf(" TABLES"); - if (tabInd > 0) {// show tables - int[] nextPost = RouterUtil.getSpecPos(upStmt, 0); - if (nextPost[0] > 0) {// remove db info - int end = RouterUtil.getSpecEndPos(upStmt, tabInd); - if (upStmt.indexOf(" FULL") > 0) { - stmt = "SHOW FULL TABLES" + stmt.substring(end); - } else { - stmt = "SHOW TABLES" + stmt.substring(end); - } - } - String defaultNode= schema.getDataNode(); - if(!Strings.isNullOrEmpty(defaultNode)) - { - return RouterUtil.routeToSingleNode(rrs, defaultNode, stmt); - } - return RouterUtil.routeToMultiNode(false, rrs, schema.getMetaDataNodes(), stmt); - } - - /** - * show index or column - */ - int[] indx = RouterUtil.getSpecPos(upStmt, 0); - if (indx[0] > 0) { - /** - * has table - */ - int[] repPos = { indx[0] + indx[1], 0 }; - String tableName = RouterUtil.getShowTableName(stmt, repPos); - /** - * IN DB pattern - */ - int[] indx2 = RouterUtil.getSpecPos(upStmt, indx[0] + indx[1] + 1); - if (indx2[0] > 0) {// find LIKE OR WHERE - repPos[1] = RouterUtil.getSpecEndPos(upStmt, indx2[0] + indx2[1]); - - } - stmt = stmt.substring(0, indx[0]) + " FROM " + tableName + stmt.substring(repPos[1]); - RouterUtil.routeForTableMeta(rrs, schema, tableName, stmt); - return rrs; - - } - - /** - * show create table tableName - */ - int[] createTabInd = RouterUtil.getCreateTablePos(upStmt, 0); - if (createTabInd[0] > 0) { - int tableNameIndex = createTabInd[0] + createTabInd[1]; - if (upStmt.length() > tableNameIndex) { - String tableName = stmt.substring(tableNameIndex).trim(); - int ind2 = tableName.indexOf('.'); - if (ind2 > 0) { - tableName = tableName.substring(ind2 + 1); - } - RouterUtil.routeForTableMeta(rrs, schema, tableName, stmt); - return rrs; - } - } - - return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), stmt); - } - - -// /** -// * 为一个表进行条件路由 -// * @param schema -// * @param tablesAndConditions -// * @param tablesRouteMap -// * @throws SQLNonTransientException -// */ -// private static RouteResultset findRouteWithcConditionsForOneTable(SchemaConfig schema, RouteResultset rrs, -// Map> conditions, String tableName, String sql) throws SQLNonTransientException { -// boolean cache = rrs.isCacheAble(); -// //为分库表找路由 -// tableName = tableName.toUpperCase(); -// TableConfig tableConfig = schema.getTables().get(tableName); -// //全局表或者不分库的表略过(全局表后面再计算) -// if(tableConfig.isGlobalTable()) { -// return null; -// } else {//非全局表 -// Set routeSet = new HashSet(); -// String joinKey = tableConfig.getJoinKey(); -// for(Map.Entry> condition : conditions.entrySet()) { -// String colName = condition.getKey(); -// //条件字段是拆分字段 -// if(colName.equals(tableConfig.getPartitionColumn())) { -// Set columnPairs = condition.getValue(); -// -// for(ColumnRoutePair pair : columnPairs) { -// if(pair.colValue != null) { -// Integer nodeIndex = tableConfig.getRule().getRuleAlgorithm().calculate(pair.colValue); -// if(nodeIndex == null) { -// String msg = "can't find any valid datanode :" + tableConfig.getName() -// + " -> " + tableConfig.getPartitionColumn() + " -> " + pair.colValue; -// LOGGER.warn(msg); -// throw new SQLNonTransientException(msg); -// } -// String node = tableConfig.getDataNodes().get(nodeIndex); -// if(node != null) {//找到一个路由节点 -// routeSet.add(node); -// } -// } -// if(pair.rangeValue != null) { -// Integer[] nodeIndexs = tableConfig.getRule().getRuleAlgorithm() -// .calculateRange(pair.rangeValue.beginValue.toString(), pair.rangeValue.endValue.toString()); -// for(Integer idx : nodeIndexs) { -// String node = tableConfig.getDataNodes().get(idx); -// if(node != null) {//找到一个路由节点 -// routeSet.add(node); -// } -// } -// } -// } -// } else if(joinKey != null && joinKey.equals(colName)) { -// Set dataNodeSet = RouterUtil.ruleCalculate( -// tableConfig.getParentTC(), condition.getValue()); -// if (dataNodeSet.isEmpty()) { -// throw new SQLNonTransientException( -// "parent key can't find any valid datanode "); -// } -// if (LOGGER.isDebugEnabled()) { -// LOGGER.debug("found partion nodes (using parent partion rule directly) for child table to update " -// + Arrays.toString(dataNodeSet.toArray()) + " sql :" + sql); -// } -// if (dataNodeSet.size() > 1) { -// return RouterUtil.routeToMultiNode(rrs.isCacheAble(), rrs, schema.getAllDataNodes(), sql); -// } else { -// rrs.setCacheAble(true); -// return RouterUtil.routeToSingleNode(rrs, dataNodeSet.iterator().next(), sql); -// } -// } else {//条件字段不是拆分字段也不是join字段,略过 -// continue; -// -// } -// } -// return RouterUtil.routeToMultiNode(cache, rrs, routeSet, sql); -// -// } -// -// } - - public RouteResultset routeSystemInfo(SchemaConfig schema, int sqlType, - String stmt, RouteResultset rrs) throws SQLSyntaxErrorException { - switch(sqlType){ - case ServerParse.SHOW:// if origSQL is like show tables - return analyseShowSQL(schema, rrs, stmt); - case ServerParse.SELECT://if origSQL is like select @@ - int index = stmt.indexOf("@@"); - if(index > 0 && "SELECT".equals(stmt.substring(0, index).trim().toUpperCase())){ - return analyseDoubleAtSgin(schema, rrs, stmt); - } - break; - case ServerParse.DESCRIBE:// if origSQL is meta SQL, such as describe table - int ind = stmt.indexOf(' '); - stmt = stmt.trim(); - return analyseDescrSQL(schema, rrs, stmt, ind + 1); - } - return null; - } - - /** - * 对Desc语句进行分析 返回数据路由集合 - * * - * @param schema 数据库名 - * @param rrs 数据路由集合 - * @param stmt 执行语句 - * @param ind 第一个' '的位置 - * @return RouteResultset (数据路由集合) - * @author mycat - */ - private static RouteResultset analyseDescrSQL(SchemaConfig schema, - RouteResultset rrs, String stmt, int ind) { - - final String MATCHED_FEATURE = "DESCRIBE "; - final String MATCHED2_FEATURE = "DESC "; - int pos = 0; - while (pos < stmt.length()) { - char ch = stmt.charAt(pos); - // 忽略处理注释 /* */ BEN - if(ch == '/' && pos+4 < stmt.length() && stmt.charAt(pos+1) == '*') { - if(stmt.substring(pos+2).indexOf("*/") != -1) { - pos += stmt.substring(pos+2).indexOf("*/")+4; - continue; - } else { - // 不应该发生这类情况。 - throw new IllegalArgumentException("sql 注释 语法错误"); - } - } else if(ch == 'D'||ch == 'd') { - // 匹配 [describe ] - if(pos+MATCHED_FEATURE.length() < stmt.length() && (stmt.substring(pos).toUpperCase().indexOf(MATCHED_FEATURE) != -1)) { - pos = pos + MATCHED_FEATURE.length(); - break; - } else if(pos+MATCHED2_FEATURE.length() < stmt.length() && (stmt.substring(pos).toUpperCase().indexOf(MATCHED2_FEATURE) != -1)) { - pos = pos + MATCHED2_FEATURE.length(); - break; - } else { - pos++; - } - } - } - - // 重置ind坐标。BEN GONG - ind = pos; - int[] repPos = { ind, 0 }; - String tableName = RouterUtil.getTableName(stmt, repPos); - - stmt = stmt.substring(0, ind) + tableName + stmt.substring(repPos[1]); - RouterUtil.routeForTableMeta(rrs, schema, tableName, stmt); - return rrs; - } - - /** - * 根据执行语句判断数据路由 - * - * @param schema 数据库名 - * @param rrs 数据路由集合 - * @param stmt 执行sql - * @return RouteResultset 数据路由集合 - * @throws SQLSyntaxErrorException - * @author mycat - */ - private RouteResultset analyseDoubleAtSgin(SchemaConfig schema, - RouteResultset rrs, String stmt) throws SQLSyntaxErrorException { - String upStmt = stmt.toUpperCase(); - int atSginInd = upStmt.indexOf(" @@"); - if (atSginInd > 0) { - return RouterUtil.routeToMultiNode(false, rrs, schema.getMetaDataNodes(), stmt); - } - return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), stmt); - } +package io.mycat.route.impl; + +import java.sql.SQLNonTransientException; +import java.sql.SQLSyntaxErrorException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLObject; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLAllExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; +import com.alibaba.druid.sql.ast.expr.SQLExistsExpr; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLInSubQueryExpr; +import com.alibaba.druid.sql.ast.expr.SQLQueryExpr; +import com.alibaba.druid.sql.ast.statement.SQLDeleteStatement; +import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; +import com.alibaba.druid.sql.ast.statement.SQLInsertStatement; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectQuery; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLTableSource; +import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlReplaceStatement; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.druid.stat.TableStat.Relationship; +import com.google.common.base.Strings; + +import io.mycat.MycatServer; +import io.mycat.backend.mysql.nio.handler.MiddlerQueryResultHandler; +import io.mycat.backend.mysql.nio.handler.MiddlerResultHandler; +import io.mycat.backend.mysql.nio.handler.SecondHandler; +import io.mycat.cache.LayerCachePool; +import io.mycat.config.ErrorCode; +import io.mycat.config.model.SchemaConfig; +import io.mycat.config.model.TableConfig; +import io.mycat.config.model.rule.RuleConfig; +import io.mycat.route.RouteResultset; +import io.mycat.route.RouteResultsetNode; +import io.mycat.route.function.SlotFunction; +import io.mycat.route.impl.middlerResultStrategy.BinaryOpResultHandler; +import io.mycat.route.impl.middlerResultStrategy.InSubQueryResultHandler; +import io.mycat.route.impl.middlerResultStrategy.RouteMiddlerReaultHandler; +import io.mycat.route.impl.middlerResultStrategy.SQLAllResultHandler; +import io.mycat.route.impl.middlerResultStrategy.SQLExistsResultHandler; +import io.mycat.route.impl.middlerResultStrategy.SQLQueryResultHandler; +import io.mycat.route.parser.druid.DruidParser; +import io.mycat.route.parser.druid.DruidParserFactory; +import io.mycat.route.parser.druid.DruidShardingParseInfo; +import io.mycat.route.parser.druid.MycatSchemaStatVisitor; +import io.mycat.route.parser.druid.MycatStatementParser; +import io.mycat.route.parser.druid.RouteCalculateUnit; +import io.mycat.route.parser.util.ParseUtil; +import io.mycat.route.util.RouterUtil; +import io.mycat.server.NonBlockingSession; +import io.mycat.server.ServerConnection; +import io.mycat.server.parser.ServerParse; + +public class DruidMycatRouteStrategy extends AbstractRouteStrategy { + + public static final Logger LOGGER = LoggerFactory.getLogger(DruidMycatRouteStrategy.class); + + private static Map,RouteMiddlerReaultHandler> middlerResultHandler = new HashMap<>(); + + static{ + middlerResultHandler.put(SQLQueryExpr.class, new SQLQueryResultHandler()); + middlerResultHandler.put(SQLBinaryOpExpr.class, new BinaryOpResultHandler()); + middlerResultHandler.put(SQLInSubQueryExpr.class, new InSubQueryResultHandler()); + middlerResultHandler.put(SQLExistsExpr.class, new SQLExistsResultHandler()); + middlerResultHandler.put(SQLAllExpr.class, new SQLAllResultHandler()); + } + + + @Override + public RouteResultset routeNormalSqlWithAST(SchemaConfig schema, + String stmt, RouteResultset rrs,String charset, + LayerCachePool cachePool,int sqlType,ServerConnection sc) throws SQLNonTransientException { + + /** + * 只有mysql时只支持mysql语法 + */ + SQLStatementParser parser = null; + if (schema.isNeedSupportMultiDBType()) { + parser = new MycatStatementParser(stmt); + } else { + parser = new MySqlStatementParser(stmt); + } + + MycatSchemaStatVisitor visitor = null; + SQLStatement statement; + + /** + * 解析出现问题统一抛SQL语法错误 + */ + try { + statement = parser.parseStatement(); + visitor = new MycatSchemaStatVisitor(); + } catch (Exception t) { + LOGGER.error("DruidMycatRouteStrategyError", t); + throw new SQLSyntaxErrorException(t); + } + + /** + * 检验unsupported statement + */ + checkUnSupportedStatement(statement); + + DruidParser druidParser = DruidParserFactory.create(schema, statement, visitor); + druidParser.parser(schema, rrs, statement, stmt,cachePool,visitor); + DruidShardingParseInfo ctx= druidParser.getCtx() ; + rrs.setTables(ctx.getTables()); + + if(visitor.isSubqueryRelationOr()){ + String err = "In subQuery,the or condition is not supported."; + LOGGER.error(err); + throw new SQLSyntaxErrorException(err); + } + + /* 按照以下情况路由 + 1.2.1 可以直接路由. + 1.2.2 两个表夸库join的sql.调用calat + 1.2.3 需要先执行subquery 的sql.把subquery拆分出来.获取结果后,与outerquery + */ + + //add huangyiming 分片规则不一样的且表中带查询条件的则走Catlet + List tables = ctx.getTables(); + SchemaConfig schemaConf = MycatServer.getInstance().getConfig().getSchemas().get(schema.getName()); + int index = 0; + RuleConfig firstRule = null; + boolean directRoute = true; + Set firstDataNodes = new HashSet(); + Map tconfigs = schemaConf==null?null:schemaConf.getTables(); + + Map rulemap = new HashMap<>(); + if(tconfigs!=null){ + for(String tableName : tables){ + TableConfig tc = tconfigs.get(tableName); + if(tc == null){ + //add 别名中取 + Map tableAliasMap = ctx.getTableAliasMap(); + if(tableAliasMap !=null && tableAliasMap.get(tableName) !=null){ + tc = schemaConf.getTables().get(tableAliasMap.get(tableName)); + } + } + + if(index == 0){ + if(tc !=null){ + firstRule= tc.getRule(); + //没有指定分片规则时,不做处理 + if(firstRule==null){ + continue; + } + firstDataNodes.addAll(tc.getDataNodes()); + rulemap.put(tc.getName(), firstRule); + } + }else{ + if(tc !=null){ + //ER关系表的时候是可能存在字表中没有tablerule的情况,所以加上判断 + RuleConfig ruleCfg = tc.getRule(); + if(ruleCfg==null){ //没有指定分片规则时,不做处理 + continue; + } + Set dataNodes = new HashSet(); + dataNodes.addAll(tc.getDataNodes()); + rulemap.put(tc.getName(), ruleCfg); + //如果匹配规则不相同或者分片的datanode不相同则需要走子查询处理 + if(firstRule!=null&&((ruleCfg !=null && !ruleCfg.getRuleAlgorithm().equals(firstRule.getRuleAlgorithm()) )||( !dataNodes.equals(firstDataNodes)))){ + directRoute = false; + break; + } + } + } + index++; + } + } + + RouteResultset rrsResult = rrs; + if(directRoute){ //直接路由 + if(!RouterUtil.isAllGlobalTable(ctx, schemaConf)){ + if(rulemap.size()>1&&!checkRuleField(rulemap,visitor)){ + String err = "In case of slice table,there is no rule field in the relationship condition!"; + LOGGER.error(err); + throw new SQLSyntaxErrorException(err); + } + } + rrsResult = directRoute(rrs,ctx,schema,druidParser,statement,cachePool); + }else{ + int subQuerySize = visitor.getSubQuerys().size(); + if(subQuerySize==0&&ctx.getTables().size()==2){ //两表关联,考虑使用catlet + if(!visitor.getRelationships().isEmpty()){ + rrs.setCacheAble(false); + rrs.setFinishedRoute(true); + rrsResult = catletRoute(schema,ctx.getSql(),charset,sc); + }else{ + rrsResult = directRoute(rrs,ctx,schema,druidParser,statement,cachePool); + } + }else if(subQuerySize==1){ //只涉及一张表的子查询,使用 MiddlerResultHandler 获取中间结果后,改写原有 sql 继续执行 TODO 后期可能会考虑多个子查询的情况. + SQLSelect sqlselect = visitor.getSubQuerys().iterator().next(); + if(!visitor.getRelationships().isEmpty()){ // 当 inner query 和 outer query 有关联条件时,暂不支持 + String err = "In case of slice table,sql have different rules,the relationship condition is not supported."; + LOGGER.error(err); + throw new SQLSyntaxErrorException(err); + }else{ + SQLSelectQuery sqlSelectQuery = sqlselect.getQuery(); + if(((MySqlSelectQueryBlock)sqlSelectQuery).getFrom() instanceof SQLExprTableSource) { + rrs.setCacheAble(false); + rrs.setFinishedRoute(true); + rrsResult = middlerResultRoute(schema,charset,sqlselect,sqlType,statement,sc); + } + } + }else if(subQuerySize >=2){ + String err = "In case of slice table,sql has different rules,currently only one subQuery is supported."; + LOGGER.error(err); + throw new SQLSyntaxErrorException(err); + } + } + return rrsResult; + } + + /** + * 子查询中存在关联查询的情况下,检查关联字段是否是分片字段 + * @param rulemap + * @param ships + * @return + */ + private boolean checkRuleField(Map rulemap,MycatSchemaStatVisitor visitor){ + + if(!MycatServer.getInstance().getConfig().getSystem().isSubqueryRelationshipCheck()){ + return true; + } + + Set ships = visitor.getRelationships(); + Iterator iter = ships.iterator(); + while(iter.hasNext()){ + Relationship ship = iter.next(); + String lefttable = ship.getLeft().getTable().toUpperCase(); + String righttable = ship.getRight().getTable().toUpperCase(); + // 如果是同一个表中的关联条件,不做处理 + if(lefttable.equals(righttable)){ + return true; + } + RuleConfig leftconfig = rulemap.get(lefttable); + RuleConfig rightconfig = rulemap.get(righttable); + + if(null!=leftconfig&&null!=rightconfig + &&leftconfig.equals(rightconfig) + &&leftconfig.getColumn().equals(ship.getLeft().getName().toUpperCase()) + &&rightconfig.getColumn().equals(ship.getRight().getName().toUpperCase())){ + return true; + } + } + return false; + } + + private RouteResultset middlerResultRoute(final SchemaConfig schema,final String charset,final SQLSelect sqlselect, + final int sqlType,final SQLStatement statement,final ServerConnection sc){ + + final String middlesql = SQLUtils.toMySqlString(sqlselect); + + MiddlerResultHandler middlerResultHandler = new MiddlerQueryResultHandler<>(new SecondHandler() { + @Override + public void doExecute(List param) { + sc.getSession2().setMiddlerResultHandler(null); + String sqls = null; + // 路由计算 + RouteResultset rrs = null; + try { + + sqls = buildSql(statement,sqlselect,param); + rrs = MycatServer + .getInstance() + .getRouterservice() + .route(MycatServer.getInstance().getConfig().getSystem(), + schema, sqlType,sqls.toLowerCase(), charset,sc ); + + } catch (Exception e) { + StringBuilder s = new StringBuilder(); + LOGGER.warn(s.append(this).append(sqls).toString() + " err:" + e.toString(),e); + String msg = e.getMessage(); + sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); + return; + } + NonBlockingSession noBlockSession = new NonBlockingSession(sc.getSession2().getSource()); + noBlockSession.setMiddlerResultHandler(null); + //session的预编译标示传递 + noBlockSession.setPrepared(sc.getSession2().isPrepared()); + if (rrs != null) { + noBlockSession.setCanClose(false); + noBlockSession.execute(rrs, ServerParse.SELECT); + } + } + } ); + sc.getSession2().setMiddlerResultHandler(middlerResultHandler); + sc.getSession2().setCanClose(false); + + // 路由计算 + RouteResultset rrs = null; + try { + rrs = MycatServer + .getInstance() + .getRouterservice() + .route(MycatServer.getInstance().getConfig().getSystem(), + schema, ServerParse.SELECT, middlesql, charset, sc); + + } catch (Exception e) { + StringBuilder s = new StringBuilder(); + LOGGER.warn(s.append(this).append(middlesql).toString() + " err:" + e.toString(),e); + String msg = e.getMessage(); + sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); + return null; + } + + if(rrs!=null){ + rrs.setCacheAble(false); + } + return rrs; + } + + /** + * 获取子查询执行结果后,改写原始sql 继续执行. + * @param statement + * @param sqlselect + * @param param + * @return + */ + private String buildSql(SQLStatement statement,SQLSelect sqlselect,List param){ + + SQLObject parent = sqlselect.getParent(); + RouteMiddlerReaultHandler handler = middlerResultHandler.get(parent.getClass()); + if(handler==null){ + throw new UnsupportedOperationException(parent.getClass()+" current is not supported "); + } + return handler.dohandler(statement, sqlselect, parent, param); + } + + /** + * 两个表的情况,catlet + * @param schema + * @param stmt + * @param charset + * @param sc + * @return + */ + private RouteResultset catletRoute(SchemaConfig schema,String stmt,String charset,ServerConnection sc){ + RouteResultset rrs = null; + try { + rrs = MycatServer + .getInstance() + .getRouterservice() + .route(MycatServer.getInstance().getConfig().getSystem(), + schema, ServerParse.SELECT, "/*!mycat:catlet=io.mycat.catlets.ShareJoin */ "+stmt, charset, sc); + + }catch(Exception e){ + + } + return rrs; + } + + /** + * 直接结果路由 + * @param rrs + * @param ctx + * @param schema + * @param druidParser + * @param statement + * @param cachePool + * @return + * @throws SQLNonTransientException + */ + private RouteResultset directRoute(RouteResultset rrs,DruidShardingParseInfo ctx,SchemaConfig schema, + DruidParser druidParser,SQLStatement statement,LayerCachePool cachePool) throws SQLNonTransientException{ + + //改写sql:如insert语句主键自增长, 在直接结果路由的情况下,进行sql 改写处理 + druidParser.changeSql(schema, rrs, statement,cachePool); + + /** + * DruidParser 解析过程中已完成了路由的直接返回 + */ + if ( rrs.isFinishedRoute() ) { + return rrs; + } + + /** + * 没有from的select语句或其他 + */ + if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty())) + { + return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), druidParser.getCtx().getSql()); + } + + if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) { + RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit(); + druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit); + } + + SortedSet nodeSet = new TreeSet(); + boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema); + for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) { + RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool); + if(rrsTmp != null&&rrsTmp.getNodes()!=null) { + for(RouteResultsetNode node :rrsTmp.getNodes()) { + nodeSet.add(node); + } + } + if(isAllGlobalTable) {//都是全局表时只计算一遍路由 + break; + } + } + + RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()]; + int i = 0; + for (RouteResultsetNode aNodeSet : nodeSet) { + nodes[i] = aNodeSet; + if(statement instanceof MySqlInsertStatement &&ctx.getTables().size()==1&&schema.getTables().containsKey(ctx.getTables().get(0))) { + RuleConfig rule = schema.getTables().get(ctx.getTables().get(0)).getRule(); + if(rule!=null&& rule.getRuleAlgorithm() instanceof SlotFunction){ + aNodeSet.setStatement(ParseUtil.changeInsertAddSlot(aNodeSet.getStatement(),aNodeSet.getSlot())); + } + } + i++; + } + rrs.setNodes(nodes); + + //分表 + /** + * subTables="t_order$1-2,t_order3" + *目前分表 1.6 开始支持 幵丏 dataNode 在分表条件下只能配置一个,分表条件下不支持join。 + */ + if(rrs.isDistTable()){ + return this.routeDisTable(statement,rrs); + } + return rrs; + } + + private SQLExprTableSource getDisTable(SQLTableSource tableSource,RouteResultsetNode node) throws SQLSyntaxErrorException{ + if(node.getSubTableName()==null){ + String msg = " sub table not exists for " + node.getName() + " on " + tableSource; + LOGGER.error("DruidMycatRouteStrategyError " + msg); + throw new SQLSyntaxErrorException(msg); + } + + SQLIdentifierExpr sqlIdentifierExpr = new SQLIdentifierExpr(); + sqlIdentifierExpr.setParent(tableSource.getParent()); + sqlIdentifierExpr.setName(node.getSubTableName()); + SQLExprTableSource from2 = new SQLExprTableSource(sqlIdentifierExpr); + return from2; + } + + private RouteResultset routeDisTable(SQLStatement statement, RouteResultset rrs) throws SQLSyntaxErrorException{ + SQLTableSource tableSource = null; + if(statement instanceof SQLInsertStatement) { + SQLInsertStatement insertStatement = (SQLInsertStatement) statement; + tableSource = insertStatement.getTableSource(); + for (RouteResultsetNode node : rrs.getNodes()) { + SQLExprTableSource from2 = getDisTable(tableSource, node); + insertStatement.setTableSource(from2); + node.setStatement(insertStatement.toString()); + } + } + if(statement instanceof SQLDeleteStatement) { + SQLDeleteStatement deleteStatement = (SQLDeleteStatement) statement; + tableSource = deleteStatement.getTableSource(); + SQLTableSource from = deleteStatement.getFrom(); + for (RouteResultsetNode node : rrs.getNodes()) { + SQLExprTableSource from2 = getDisTable(tableSource, node); + + if (from == null) { + from2.setAlias(tableSource.toString()); + deleteStatement.setFrom(from2); + } else { + String alias = from.getAlias(); + from2.setAlias(alias); + deleteStatement.setFrom(from2); + } + + node.setStatement(deleteStatement.toString()); + } + } + if(statement instanceof SQLUpdateStatement) { + SQLUpdateStatement updateStatement = (SQLUpdateStatement) statement; + tableSource = updateStatement.getTableSource(); + for (RouteResultsetNode node : rrs.getNodes()) { + SQLExprTableSource from2 = getDisTable(tableSource, node); + from2.setAlias(updateStatement.getTableSource().getAlias()); + updateStatement.setTableSource(from2); + node.setStatement(updateStatement.toString()); + } + } + + return rrs; + } + + /** + * SELECT 语句 + */ + private boolean isSelect(SQLStatement statement) { + if(statement instanceof SQLSelectStatement) { + return true; + } + return false; + } + + /** + * 检验不支持的SQLStatement类型 :不支持的类型直接抛SQLSyntaxErrorException异常 + * @param statement + * @throws SQLSyntaxErrorException + */ + private void checkUnSupportedStatement(SQLStatement statement) throws SQLSyntaxErrorException { + //不支持replace语句 + if(statement instanceof MySqlReplaceStatement) { + throw new SQLSyntaxErrorException(" ReplaceStatement can't be supported,use insert into ...on duplicate key update... instead "); + } + } + + /** + * 分析 SHOW SQL + */ + @Override + public RouteResultset analyseShowSQL(SchemaConfig schema, + RouteResultset rrs, String stmt) throws SQLSyntaxErrorException { + + String upStmt = stmt.toUpperCase(); + int tabInd = upStmt.indexOf(" TABLES"); + if (tabInd > 0) {// show tables + int[] nextPost = RouterUtil.getSpecPos(upStmt, 0); + if (nextPost[0] > 0) {// remove db info + int end = RouterUtil.getSpecEndPos(upStmt, tabInd); + if (upStmt.indexOf(" FULL") > 0) { + stmt = "SHOW FULL TABLES" + stmt.substring(end); + } else { + stmt = "SHOW TABLES" + stmt.substring(end); + } + } + String defaultNode= schema.getDataNode(); + if(!Strings.isNullOrEmpty(defaultNode)) + { + return RouterUtil.routeToSingleNode(rrs, defaultNode, stmt); + } + return RouterUtil.routeToMultiNode(false, rrs, schema.getMetaDataNodes(), stmt); + } + + /** + * show index or column + */ + int[] indx = RouterUtil.getSpecPos(upStmt, 0); + if (indx[0] > 0) { + /** + * has table + */ + int[] repPos = { indx[0] + indx[1], 0 }; + String tableName = RouterUtil.getShowTableName(stmt, repPos); + /** + * IN DB pattern + */ + int[] indx2 = RouterUtil.getSpecPos(upStmt, indx[0] + indx[1] + 1); + if (indx2[0] > 0) {// find LIKE OR WHERE + repPos[1] = RouterUtil.getSpecEndPos(upStmt, indx2[0] + indx2[1]); + + } + stmt = stmt.substring(0, indx[0]) + " FROM " + tableName + stmt.substring(repPos[1]); + RouterUtil.routeForTableMeta(rrs, schema, tableName, stmt); + return rrs; + + } + + /** + * show create table tableName + */ + int[] createTabInd = RouterUtil.getCreateTablePos(upStmt, 0); + if (createTabInd[0] > 0) { + int tableNameIndex = createTabInd[0] + createTabInd[1]; + if (upStmt.length() > tableNameIndex) { + String tableName = stmt.substring(tableNameIndex).trim(); + int ind2 = tableName.indexOf('.'); + if (ind2 > 0) { + tableName = tableName.substring(ind2 + 1); + } + RouterUtil.routeForTableMeta(rrs, schema, tableName, stmt); + return rrs; + } + } + + return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), stmt); + } + + +// /** +// * 为一个表进行条件路由 +// * @param schema +// * @param tablesAndConditions +// * @param tablesRouteMap +// * @throws SQLNonTransientException +// */ +// private static RouteResultset findRouteWithcConditionsForOneTable(SchemaConfig schema, RouteResultset rrs, +// Map> conditions, String tableName, String sql) throws SQLNonTransientException { +// boolean cache = rrs.isCacheAble(); +// //为分库表找路由 +// tableName = tableName.toUpperCase(); +// TableConfig tableConfig = schema.getTables().get(tableName); +// //全局表或者不分库的表略过(全局表后面再计算) +// if(tableConfig.isGlobalTable()) { +// return null; +// } else {//非全局表 +// Set routeSet = new HashSet(); +// String joinKey = tableConfig.getJoinKey(); +// for(Map.Entry> condition : conditions.entrySet()) { +// String colName = condition.getKey(); +// //条件字段是拆分字段 +// if(colName.equals(tableConfig.getPartitionColumn())) { +// Set columnPairs = condition.getValue(); +// +// for(ColumnRoutePair pair : columnPairs) { +// if(pair.colValue != null) { +// Integer nodeIndex = tableConfig.getRule().getRuleAlgorithm().calculate(pair.colValue); +// if(nodeIndex == null) { +// String msg = "can't find any valid datanode :" + tableConfig.getName() +// + " -> " + tableConfig.getPartitionColumn() + " -> " + pair.colValue; +// LOGGER.warn(msg); +// throw new SQLNonTransientException(msg); +// } +// String node = tableConfig.getDataNodes().get(nodeIndex); +// if(node != null) {//找到一个路由节点 +// routeSet.add(node); +// } +// } +// if(pair.rangeValue != null) { +// Integer[] nodeIndexs = tableConfig.getRule().getRuleAlgorithm() +// .calculateRange(pair.rangeValue.beginValue.toString(), pair.rangeValue.endValue.toString()); +// for(Integer idx : nodeIndexs) { +// String node = tableConfig.getDataNodes().get(idx); +// if(node != null) {//找到一个路由节点 +// routeSet.add(node); +// } +// } +// } +// } +// } else if(joinKey != null && joinKey.equals(colName)) { +// Set dataNodeSet = RouterUtil.ruleCalculate( +// tableConfig.getParentTC(), condition.getValue()); +// if (dataNodeSet.isEmpty()) { +// throw new SQLNonTransientException( +// "parent key can't find any valid datanode "); +// } +// if (LOGGER.isDebugEnabled()) { +// LOGGER.debug("found partion nodes (using parent partion rule directly) for child table to update " +// + Arrays.toString(dataNodeSet.toArray()) + " sql :" + sql); +// } +// if (dataNodeSet.size() > 1) { +// return RouterUtil.routeToMultiNode(rrs.isCacheAble(), rrs, schema.getAllDataNodes(), sql); +// } else { +// rrs.setCacheAble(true); +// return RouterUtil.routeToSingleNode(rrs, dataNodeSet.iterator().next(), sql); +// } +// } else {//条件字段不是拆分字段也不是join字段,略过 +// continue; +// +// } +// } +// return RouterUtil.routeToMultiNode(cache, rrs, routeSet, sql); +// +// } +// +// } + + public RouteResultset routeSystemInfo(SchemaConfig schema, int sqlType, + String stmt, RouteResultset rrs) throws SQLSyntaxErrorException { + switch(sqlType){ + case ServerParse.SHOW:// if origSQL is like show tables + return analyseShowSQL(schema, rrs, stmt); + case ServerParse.SELECT://if origSQL is like select @@ + int index = stmt.indexOf("@@"); + if(index > 0 && "SELECT".equals(stmt.substring(0, index).trim().toUpperCase())){ + return analyseDoubleAtSgin(schema, rrs, stmt); + } + break; + case ServerParse.DESCRIBE:// if origSQL is meta SQL, such as describe table + int ind = stmt.indexOf(' '); + stmt = stmt.trim(); + return analyseDescrSQL(schema, rrs, stmt, ind + 1); + } + return null; + } + + /** + * 对Desc语句进行分析 返回数据路由集合 + * * + * @param schema 数据库名 + * @param rrs 数据路由集合 + * @param stmt 执行语句 + * @param ind 第一个' '的位置 + * @return RouteResultset (数据路由集合) + * @author mycat + */ + private static RouteResultset analyseDescrSQL(SchemaConfig schema, + RouteResultset rrs, String stmt, int ind) { + + final String MATCHED_FEATURE = "DESCRIBE "; + final String MATCHED2_FEATURE = "DESC "; + int pos = 0; + while (pos < stmt.length()) { + char ch = stmt.charAt(pos); + // 忽略处理注释 /* */ BEN + if(ch == '/' && pos+4 < stmt.length() && stmt.charAt(pos+1) == '*') { + if(stmt.substring(pos+2).indexOf("*/") != -1) { + pos += stmt.substring(pos+2).indexOf("*/")+4; + continue; + } else { + // 不应该发生这类情况。 + throw new IllegalArgumentException("sql 注释 语法错误"); + } + } else if(ch == 'D'||ch == 'd') { + // 匹配 [describe ] + if(pos+MATCHED_FEATURE.length() < stmt.length() && (stmt.substring(pos).toUpperCase().indexOf(MATCHED_FEATURE) != -1)) { + pos = pos + MATCHED_FEATURE.length(); + break; + } else if(pos+MATCHED2_FEATURE.length() < stmt.length() && (stmt.substring(pos).toUpperCase().indexOf(MATCHED2_FEATURE) != -1)) { + pos = pos + MATCHED2_FEATURE.length(); + break; + } else { + pos++; + } + } + } + + // 重置ind坐标。BEN GONG + ind = pos; + int[] repPos = { ind, 0 }; + String tableName = RouterUtil.getTableName(stmt, repPos); + + stmt = stmt.substring(0, ind) + tableName + stmt.substring(repPos[1]); + RouterUtil.routeForTableMeta(rrs, schema, tableName, stmt); + return rrs; + } + + /** + * 根据执行语句判断数据路由 + * + * @param schema 数据库名 + * @param rrs 数据路由集合 + * @param stmt 执行sql + * @return RouteResultset 数据路由集合 + * @throws SQLSyntaxErrorException + * @author mycat + */ + private RouteResultset analyseDoubleAtSgin(SchemaConfig schema, + RouteResultset rrs, String stmt) throws SQLSyntaxErrorException { + String upStmt = stmt.toUpperCase(); + int atSginInd = upStmt.indexOf(" @@"); + if (atSginInd > 0) { + return RouterUtil.routeToMultiNode(false, rrs, schema.getMetaDataNodes(), stmt); + } + return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), stmt); + } } \ No newline at end of file diff --git a/src/main/java/io/mycat/route/parser/druid/impl/DruidSelectParser.java b/src/main/java/io/mycat/route/parser/druid/impl/DruidSelectParser.java index 66770a8db..846939051 100644 --- a/src/main/java/io/mycat/route/parser/druid/impl/DruidSelectParser.java +++ b/src/main/java/io/mycat/route/parser/druid/impl/DruidSelectParser.java @@ -1,764 +1,812 @@ -package io.mycat.route.parser.druid.impl; - -import java.sql.SQLNonTransientException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import com.alibaba.druid.sql.SQLUtils; -import com.alibaba.druid.sql.ast.SQLExpr; -import com.alibaba.druid.sql.ast.SQLName; -import com.alibaba.druid.sql.ast.SQLOrderingSpecification; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.ast.expr.SQLAggregateExpr; -import com.alibaba.druid.sql.ast.expr.SQLAllColumnExpr; -import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; -import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; -import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; -import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr; -import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr; -import com.alibaba.druid.sql.ast.expr.SQLNumericLiteralExpr; -import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; -import com.alibaba.druid.sql.ast.expr.SQLTextLiteralExpr; -import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; -import com.alibaba.druid.sql.ast.statement.SQLSelectGroupByClause; -import com.alibaba.druid.sql.ast.statement.SQLSelectItem; -import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem; -import com.alibaba.druid.sql.ast.statement.SQLSelectQuery; -import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; -import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; -import com.alibaba.druid.sql.ast.statement.SQLTableSource; -import com.alibaba.druid.sql.dialect.db2.ast.stmt.DB2SelectQueryBlock; -import com.alibaba.druid.sql.dialect.db2.visitor.DB2OutputVisitor; -import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlOrderingExpr; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock.Limit; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUnionQuery; -import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlOutputVisitor; -import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor; -import com.alibaba.druid.sql.dialect.oracle.ast.stmt.OracleSelectQueryBlock; -import com.alibaba.druid.sql.dialect.oracle.visitor.OracleOutputVisitor; -import com.alibaba.druid.sql.dialect.postgresql.ast.stmt.PGSelectQueryBlock; -import com.alibaba.druid.sql.dialect.postgresql.visitor.PGOutputVisitor; -import com.alibaba.druid.sql.dialect.sqlserver.ast.SQLServerSelectQueryBlock; -import com.alibaba.druid.sql.visitor.SQLASTOutputVisitor; -import com.alibaba.druid.util.JdbcConstants; -import com.alibaba.druid.wall.spi.WallVisitorUtils; - -import io.mycat.MycatServer; -import io.mycat.cache.LayerCachePool; -import io.mycat.config.ErrorCode; -import io.mycat.config.model.SchemaConfig; -import io.mycat.config.model.TableConfig; -import io.mycat.route.RouteResultset; -import io.mycat.route.RouteResultsetNode; -import io.mycat.route.parser.druid.MycatSchemaStatVisitor; -import io.mycat.route.parser.druid.RouteCalculateUnit; -import io.mycat.route.util.RouterUtil; -import io.mycat.sqlengine.mpp.ColumnRoutePair; -import io.mycat.sqlengine.mpp.HavingCols; -import io.mycat.sqlengine.mpp.MergeCol; -import io.mycat.sqlengine.mpp.OrderCol; -import io.mycat.util.ObjectUtil; -import io.mycat.util.StringUtil; - -public class DruidSelectParser extends DefaultDruidParser { - - - protected boolean isNeedParseOrderAgg=true; - - @Override - public void statementParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt) { - SQLSelectStatement selectStmt = (SQLSelectStatement)stmt; - SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery(); - if(sqlSelectQuery instanceof MySqlSelectQueryBlock) { - MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)selectStmt.getSelect().getQuery(); - - parseOrderAggGroupMysql(schema, stmt,rrs, mysqlSelectQuery); - //更改canRunInReadDB属性 - if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode()) && rrs.isAutocommit() == false) - { - rrs.setCanRunInReadDB(false); - } - - } else if (sqlSelectQuery instanceof MySqlUnionQuery) { -// MySqlUnionQuery unionQuery = (MySqlUnionQuery)sqlSelectQuery; -// MySqlSelectQueryBlock left = (MySqlSelectQueryBlock)unionQuery.getLeft(); -// MySqlSelectQueryBlock right = (MySqlSelectQueryBlock)unionQuery.getLeft(); -// System.out.println(); - } - } - protected void parseOrderAggGroupMysql(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery) - { - MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor(); - stmt.accept(visitor); -// rrs.setGroupByCols((String[])visitor.getGroupByColumns().toArray()); - if(!isNeedParseOrderAgg) - { - return; - } - Map aliaColumns = parseAggGroupCommon(schema, stmt, rrs, mysqlSelectQuery); - - //setOrderByCols - if(mysqlSelectQuery.getOrderBy() != null) { - List orderByItems = mysqlSelectQuery.getOrderBy().getItems(); - rrs.setOrderByCols(buildOrderByCols(orderByItems,aliaColumns)); - } - isNeedParseOrderAgg=false; - } - protected Map parseAggGroupCommon(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, SQLSelectQueryBlock mysqlSelectQuery) - { - Map aliaColumns = new HashMap(); - Map aggrColumns = new HashMap(); - // Added by winbill, 20160314, for having clause, Begin ==> - List havingColsName = new ArrayList(); - // Added by winbill, 20160314, for having clause, End <== - List selectList = mysqlSelectQuery.getSelectList(); - boolean isNeedChangeSql=false; - int size = selectList.size(); - boolean isDistinct=mysqlSelectQuery.getDistionOption()==2; - for (int i = 0; i < size; i++) - { - SQLSelectItem item = selectList.get(i); - - if (item.getExpr() instanceof SQLAggregateExpr) - { - SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr(); - String method = expr.getMethodName(); - boolean isHasArgument=!expr.getArguments().isEmpty(); - if(isHasArgument) - { - String aggrColName = method + "(" + expr.getArguments().get(0) + ")"; // Added by winbill, 20160314, for having clause - havingColsName.add(aggrColName); // Added by winbill, 20160314, for having clause - } - //只处理有别名的情况,无别名添加别名,否则某些数据库会得不到正确结果处理 - int mergeType = MergeCol.getMergeType(method); - if (MergeCol.MERGE_AVG == mergeType&&isRoutMultiNode(schema,rrs)) - { //跨分片avg需要特殊处理,直接avg结果是不对的 - String colName = item.getAlias() != null ? item.getAlias() : method + i; - SQLSelectItem sum =new SQLSelectItem(); - String sumColName = colName + "SUM"; - sum.setAlias(sumColName); - SQLAggregateExpr sumExp =new SQLAggregateExpr("SUM"); - ObjectUtil.copyProperties(expr,sumExp); - sumExp.getArguments().addAll(expr.getArguments()); - sumExp.setMethodName("SUM"); - sum.setExpr(sumExp); - selectList.set(i, sum); - aggrColumns.put(sumColName, MergeCol.MERGE_SUM); - havingColsName.add(sumColName); // Added by winbill, 20160314, for having clause - havingColsName.add(item.getAlias() != null ? item.getAlias() : ""); // Added by winbill, 20160314, two aliases for AVG - - SQLSelectItem count =new SQLSelectItem(); - String countColName = colName + "COUNT"; - count.setAlias(countColName); - SQLAggregateExpr countExp = new SQLAggregateExpr("COUNT"); - ObjectUtil.copyProperties(expr,countExp); - countExp.getArguments().addAll(expr.getArguments()); - countExp.setMethodName("COUNT"); - count.setExpr(countExp); - selectList.add(count); - aggrColumns.put(countColName, MergeCol.MERGE_COUNT); - - isNeedChangeSql=true; - aggrColumns.put(colName, mergeType); - rrs.setHasAggrColumn(true); - } else if (MergeCol.MERGE_UNSUPPORT != mergeType){ - String aggColName = null; - StringBuilder sb = new StringBuilder(); - if(mysqlSelectQuery instanceof MySqlSelectQueryBlock) { - expr.accept(new MySqlOutputVisitor(sb)); - } else if(mysqlSelectQuery instanceof OracleSelectQueryBlock) { - expr.accept(new OracleOutputVisitor(sb)); - } else if(mysqlSelectQuery instanceof PGSelectQueryBlock){ - expr.accept(new PGOutputVisitor(sb)); - } else if(mysqlSelectQuery instanceof SQLServerSelectQueryBlock) { - expr.accept(new SQLASTOutputVisitor(sb)); - } else if(mysqlSelectQuery instanceof DB2SelectQueryBlock) { - expr.accept(new DB2OutputVisitor(sb)); - } - aggColName = sb.toString(); - - if (item.getAlias() != null && item.getAlias().length() > 0) - { - aggrColumns.put(item.getAlias(), mergeType); - aliaColumns.put(aggColName,item.getAlias()); - } else - { //如果不加,jdbc方式时取不到正确结果 ;修改添加别名 - item.setAlias(method + i); - aggrColumns.put(method + i, mergeType); - aliaColumns.put(aggColName, method + i); - isNeedChangeSql=true; - } - rrs.setHasAggrColumn(true); - havingColsName.add(item.getAlias()); // Added by winbill, 20160314, for having clause - havingColsName.add(""); // Added by winbill, 20160314, one alias for non-AVG - } - } else - { - if (!(item.getExpr() instanceof SQLAllColumnExpr)) - { - String alia = item.getAlias(); - String field = getFieldName(item); - if (alia == null) - { - alia = field; - } - aliaColumns.put(field, alia); - } - } - - } - if(aggrColumns.size() > 0) { - rrs.setMergeCols(aggrColumns); - } - - //通过优化转换成group by来实现 - if(isDistinct) - { - mysqlSelectQuery.setDistionOption(0); - SQLSelectGroupByClause groupBy=new SQLSelectGroupByClause(); - for (String fieldName : aliaColumns.keySet()) - { - groupBy.addItem(new SQLIdentifierExpr(fieldName)); - } - mysqlSelectQuery.setGroupBy(groupBy); - isNeedChangeSql=true; - } - - - //setGroupByCols - if(mysqlSelectQuery.getGroupBy() != null) { - List groupByItems = mysqlSelectQuery.getGroupBy().getItems(); - String[] groupByCols = buildGroupByCols(groupByItems,aliaColumns); - rrs.setGroupByCols(groupByCols); - rrs.setHavings(buildGroupByHaving(mysqlSelectQuery.getGroupBy().getHaving(),aliaColumns)); - rrs.setHasAggrColumn(true); - rrs.setHavingColsName(havingColsName.toArray()); // Added by winbill, 20160314, for having clause - } - - - if (isNeedChangeSql) - { - String sql = stmt.toString(); - rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(),sql,0,-1, false); - getCtx().setSql(sql); - } - return aliaColumns; - } - - private HavingCols buildGroupByHaving(SQLExpr having,Map aliaColumns ){ - if (having == null) { - return null; - } - - SQLBinaryOpExpr expr = ((SQLBinaryOpExpr) having); - SQLExpr left = expr.getLeft(); - SQLBinaryOperator operator = expr.getOperator(); - SQLExpr right = expr.getRight(); - - String leftValue = null;; - if (left instanceof SQLAggregateExpr) { - leftValue = ((SQLAggregateExpr) left).getMethodName() + "(" - + ((SQLAggregateExpr) left).getArguments().get(0) + ")"; - String aggrColumnAlias = getAliaColumn(aliaColumns,leftValue); - if(aggrColumnAlias != null) { // having聚合函数存在别名 - expr.setLeft(new SQLIdentifierExpr(aggrColumnAlias)); - leftValue = aggrColumnAlias; - } - } else if (left instanceof SQLIdentifierExpr) { - leftValue = ((SQLIdentifierExpr) left).getName(); - } - - String rightValue = null; - if (right instanceof SQLNumericLiteralExpr) { - rightValue = right.toString(); - }else if(right instanceof SQLTextLiteralExpr){ - rightValue = StringUtil.removeBackquote(right.toString()); - } - - return new HavingCols(leftValue,rightValue,operator.getName()); - } - - private boolean isRoutMultiNode(SchemaConfig schema, RouteResultset rrs) - { - if(rrs.getNodes()!=null&&rrs.getNodes().length>1) - { - return true; - } - LayerCachePool tableId2DataNodeCache = (LayerCachePool) MycatServer.getInstance().getCacheService().getCachePool("TableID2DataNodeCache"); - try - { - tryRoute(schema, rrs, tableId2DataNodeCache); - if(rrs.getNodes()!=null&&rrs.getNodes().length>1) - { - return true; - } - } catch (SQLNonTransientException e) - { - throw new RuntimeException(e); - } - return false; - } - - private String getFieldName(SQLSelectItem item){ - if ((item.getExpr() instanceof SQLPropertyExpr)||(item.getExpr() instanceof SQLMethodInvokeExpr) - || (item.getExpr() instanceof SQLIdentifierExpr) || item.getExpr() instanceof SQLBinaryOpExpr) { - return item.getExpr().toString();//字段别名 - } - else { - return item.toString(); - } - } - - /** - * 现阶段目标为 有一个只涉及到一张表的子查询时,先执行子查询,获得返回结果后,改写原有sql继续执行,得到最终结果. - * 在这种情况下,原sql不需要继续解析. - * 使用catlet 的情况也不再继续解析. - */ - @Override - public boolean afterVisitorParser(RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor) { - int subQuerySize = visitor.getSubQuerys().size(); - - if(subQuerySize==0&&ctx.getTables().size()==2){ //两表关联,考虑使用catlet - if(ctx.getVisitor().getConditions() !=null && ctx.getVisitor().getConditions().size()>0){ - return true; - } - }else if(subQuerySize==1){ //只涉及一张表的子查询,使用 MiddlerResultHandler 获取中间结果后,改写原有 sql 继续执行 TODO 后期可能会考虑多个. - SQLSelectQuery sqlSelectQuery = visitor.getSubQuerys().iterator().next().getQuery(); - if(((MySqlSelectQueryBlock)sqlSelectQuery).getFrom() instanceof SQLExprTableSource) { - return true; - } - } - - return super.afterVisitorParser(rrs, stmt, visitor); - } - - /** - * 改写sql:需要加limit的加上 - */ - @Override - public void changeSql(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,LayerCachePool cachePool) throws SQLNonTransientException { - - tryRoute(schema, rrs, cachePool); - - rrs.copyLimitToNodes(); - - SQLSelectStatement selectStmt = (SQLSelectStatement)stmt; - SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery(); - if(sqlSelectQuery instanceof MySqlSelectQueryBlock) { - MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)selectStmt.getSelect().getQuery(); - int limitStart = 0; - int limitSize = schema.getDefaultMaxLimit(); - - //clear group having - SQLSelectGroupByClause groupByClause = mysqlSelectQuery.getGroupBy(); - // Modified by winbill, 20160614, do NOT include having clause when routing to multiple nodes - if(groupByClause != null && groupByClause.getHaving() != null && isRoutMultiNode(schema,rrs)){ - groupByClause.setHaving(null); - } - - Map>> allConditions = getAllConditions(); - boolean isNeedAddLimit = isNeedAddLimit(schema, rrs, mysqlSelectQuery, allConditions); - if(isNeedAddLimit) { - Limit limit = new Limit(); - limit.setRowCount(new SQLIntegerExpr(limitSize)); - mysqlSelectQuery.setLimit(limit); - rrs.setLimitSize(limitSize); - String sql= getSql(rrs, stmt, isNeedAddLimit); - rrs.changeNodeSqlAfterAddLimit(schema, getCurentDbType(), sql, 0, limitSize, true); - - } - Limit limit = mysqlSelectQuery.getLimit(); - if(limit != null&&!isNeedAddLimit) { - SQLIntegerExpr offset = (SQLIntegerExpr)limit.getOffset(); - SQLIntegerExpr count = (SQLIntegerExpr)limit.getRowCount(); - if(offset != null) { - limitStart = offset.getNumber().intValue(); - rrs.setLimitStart(limitStart); - } - if(count != null) { - limitSize = count.getNumber().intValue(); - rrs.setLimitSize(limitSize); - } - - if(isNeedChangeLimit(rrs)) { - Limit changedLimit = new Limit(); - changedLimit.setRowCount(new SQLIntegerExpr(limitStart + limitSize)); - - if(offset != null) { - if(limitStart < 0) { - String msg = "You have an error in your SQL syntax; check the manual that " + - "corresponds to your MySQL server version for the right syntax to use near '" + limitStart + "'"; - throw new SQLNonTransientException(ErrorCode.ER_PARSE_ERROR + " - " + msg); - } else { - changedLimit.setOffset(new SQLIntegerExpr(0)); - - } - } - - mysqlSelectQuery.setLimit(changedLimit); - - String sql= getSql(rrs, stmt, isNeedAddLimit); - rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(),sql,0, limitStart + limitSize, true); - - //设置改写后的sql - ctx.setSql(sql); - - } else - { - - rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(),getCtx().getSql(),rrs.getLimitStart(), rrs.getLimitSize(), true); - // ctx.setSql(nativeSql); - - } - - - } - - if(rrs.isDistTable()){ - SQLTableSource from = mysqlSelectQuery.getFrom(); - - for (RouteResultsetNode node : rrs.getNodes()) { - SQLIdentifierExpr sqlIdentifierExpr = new SQLIdentifierExpr(); - sqlIdentifierExpr.setParent(from); - sqlIdentifierExpr.setName(node.getSubTableName()); - SQLExprTableSource from2 = new SQLExprTableSource(sqlIdentifierExpr); - from2.setAlias(from.getAlias()); - mysqlSelectQuery.setFrom(from2); - node.setStatement(stmt.toString()); - } - } - - rrs.setCacheAble(isNeedCache(schema, rrs, mysqlSelectQuery, allConditions)); - } - - } - - /** - * 获取所有的条件:因为可能被or语句拆分成多个RouteCalculateUnit,条件分散了 - * @return - */ - private Map>> getAllConditions() { - Map>> map = new HashMap>>(); - for(RouteCalculateUnit unit : ctx.getRouteCalculateUnits()) { - if(unit != null && unit.getTablesAndConditions() != null) { - map.putAll(unit.getTablesAndConditions()); - } - } - - return map; - } - - private void tryRoute(SchemaConfig schema, RouteResultset rrs, LayerCachePool cachePool) throws SQLNonTransientException { - if(rrs.isFinishedRoute()) - { - return;//避免重复路由 - } - - //无表的select语句直接路由带任一节点 - if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty())) { - rrs = RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), ctx.getSql()); - rrs.setFinishedRoute(true); - return; - } -// RouterUtil.tryRouteForTables(schema, ctx, rrs, true, cachePool); - SortedSet nodeSet = new TreeSet(); - boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema); - for (RouteCalculateUnit unit : ctx.getRouteCalculateUnits()) { - RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, ctx, unit, rrs, true, cachePool); - if (rrsTmp != null&&rrsTmp.getNodes()!=null) { - for (RouteResultsetNode node : rrsTmp.getNodes()) { - nodeSet.add(node); - } - } - if(isAllGlobalTable) {//都是全局表时只计算一遍路由 - break; - } - } - - if(nodeSet.size() == 0) { - - Collection stringCollection= ctx.getTableAliasMap().values() ; - for (String table : stringCollection) - { - if(table!=null&&table.toLowerCase().contains("information_schema.")) - { - rrs = RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), ctx.getSql()); - rrs.setFinishedRoute(true); - return; - } - } - String msg = " find no Route:" + ctx.getSql(); - LOGGER.warn(msg); - throw new SQLNonTransientException(msg); - } - - RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()]; - int i = 0; - for (Iterator iterator = nodeSet.iterator(); iterator.hasNext();) { - nodes[i] = (RouteResultsetNode) iterator.next(); - i++; - - } - - rrs.setNodes(nodes); - rrs.setFinishedRoute(true); - } - - - protected String getCurentDbType() - { - return JdbcConstants.MYSQL; - } - - - - - protected String getSql( RouteResultset rrs,SQLStatement stmt, boolean isNeedAddLimit) - { - if(getCurentDbType().equalsIgnoreCase("mysql")&&(isNeedChangeLimit(rrs)||isNeedAddLimit)) - { - - return stmt.toString(); - - } - - return getCtx().getSql(); - } - - - - protected boolean isNeedChangeLimit(RouteResultset rrs) { - if(rrs.getNodes() == null) { - return false; - } else { - if(rrs.getNodes().length > 1) { - return true; - } - return false; - - } - } - - private boolean isNeedCache(SchemaConfig schema, RouteResultset rrs, - MySqlSelectQueryBlock mysqlSelectQuery, Map>> allConditions) { - if(ctx.getTables() == null || ctx.getTables().size() == 0 ) { - return false; - } - TableConfig tc = schema.getTables().get(ctx.getTables().get(0)); - if(tc==null ||(ctx.getTables().size() == 1 && tc.isGlobalTable()) - ) {//|| (ctx.getTables().size() == 1) && tc.getRule() == null && tc.getDataNodes().size() == 1 - return false; - } else { - //单表主键查询 - if(ctx.getTables().size() == 1) { - String tableName = ctx.getTables().get(0); - String primaryKey = schema.getTables().get(tableName).getPrimaryKey(); -// schema.getTables().get(ctx.getTables().get(0)).getParentKey() != null; - if(ctx.getRouteCalculateUnit().getTablesAndConditions().get(tableName) != null - && ctx.getRouteCalculateUnit().getTablesAndConditions().get(tableName).get(primaryKey) != null - && tc.getDataNodes().size() > 1) {//有主键条件 - return false; - } - //全局表不缓存 - }else if(RouterUtil.isAllGlobalTable(ctx, schema)){ - return false; - } - return true; - } - } - - /** - * 单表且是全局表 - * 单表且rule为空且nodeNodes只有一个 - * @param schema - * @param rrs - * @param mysqlSelectQuery - * @return - */ - private boolean isNeedAddLimit(SchemaConfig schema, RouteResultset rrs, - MySqlSelectQueryBlock mysqlSelectQuery, Map>> allConditions) { -// ctx.getTablesAndConditions().get(key)) - if(rrs.getLimitSize()>-1) - { - return false; - }else - if(schema.getDefaultMaxLimit() == -1) { - return false; - } else if (mysqlSelectQuery.getLimit() != null) {//语句中已有limit - return false; - } else if(ctx.getTables().size() == 1) { - String tableName = ctx.getTables().get(0); - TableConfig tableConfig = schema.getTables().get(tableName); - if(tableConfig==null) - { - return schema.getDefaultMaxLimit() > -1; // 找不到则取schema的配置 - } - - boolean isNeedAddLimit= tableConfig.isNeedAddLimit(); - if(!isNeedAddLimit) - { - return false;//优先从配置文件取 - } - - if(schema.getTables().get(tableName).isGlobalTable()) { - return true; - } - - String primaryKey = schema.getTables().get(tableName).getPrimaryKey(); - -// schema.getTables().get(ctx.getTables().get(0)).getParentKey() != null; - if(allConditions.get(tableName) == null) {//无条件 - return true; - } - - if (allConditions.get(tableName).get(primaryKey) != null) {//条件中带主键 - return false; - } - - return true; - } else if(rrs.hasPrimaryKeyToCache() && ctx.getTables().size() == 1){//只有一个表且条件中有主键,不需要limit了,因为主键只能查到一条记录 - return false; - } else {//多表或无表 - return false; - } - - } - private String getAliaColumn(Map aliaColumns,String column ){ - String alia=aliaColumns.get(column); - if (alia==null){ - if(column.indexOf(".") < 0) { - String col = "." + column; - String col2 = ".`" + column+"`"; - //展开aliaColumns,将之类的键值对展开成 - for(Map.Entry entry : aliaColumns.entrySet()) { - if(entry.getKey().endsWith(col)||entry.getKey().endsWith(col2)) { - if(entry.getValue() != null && entry.getValue().indexOf(".") > 0) { - return column; - } - return entry.getValue(); - } - } - } - - return column; - } - else { - return alia; - } - } - - private String[] buildGroupByCols(List groupByItems,Map aliaColumns) { - String[] groupByCols = new String[groupByItems.size()]; - for(int i= 0; i < groupByItems.size(); i++) { - SQLExpr sqlExpr = groupByItems.get(i); - String column = null; - if(sqlExpr instanceof SQLIdentifierExpr ) - { - column=((SQLIdentifierExpr) sqlExpr).getName(); - } else if(sqlExpr instanceof SQLMethodInvokeExpr){ - column = ((SQLMethodInvokeExpr) sqlExpr).toString(); - } else if(sqlExpr instanceof MySqlOrderingExpr){ - //todo czn - SQLExpr expr = ((MySqlOrderingExpr) sqlExpr).getExpr(); - - if (expr instanceof SQLName) - { - column = StringUtil.removeBackquote(((SQLName) expr).getSimpleName());//不要转大写 2015-2-10 sohudo StringUtil.removeBackquote(expr.getSimpleName().toUpperCase()); - } else - { - column = StringUtil.removeBackquote(expr.toString()); - } - } else if(sqlExpr instanceof SQLPropertyExpr){ - /** - * 针对子查询别名,例如select id from (select h.id from hotnews h union select h.title from hotnews h ) as t1 group by t1.id; - */ - column = sqlExpr.toString(); - } - if(column == null){ - column = sqlExpr.toString(); - } - int dotIndex=column.indexOf(".") ; - int bracketIndex=column.indexOf("(") ; - //通过判断含有括号来决定是否为函数列 - if(dotIndex!=-1&&bracketIndex==-1) - { - //此步骤得到的column必须是不带.的,有别名的用别名,无别名的用字段名 - column=column.substring(dotIndex+1) ; - } - groupByCols[i] = getAliaColumn(aliaColumns,column);//column; - } - return groupByCols; - } - - protected LinkedHashMap buildOrderByCols(List orderByItems,Map aliaColumns) { - LinkedHashMap map = new LinkedHashMap(); - for(int i= 0; i < orderByItems.size(); i++) { - SQLOrderingSpecification type = orderByItems.get(i).getType(); - //orderColumn只记录字段名称,因为返回的结果集是不带表名的。 - SQLExpr expr = orderByItems.get(i).getExpr(); - String col; - if (expr instanceof SQLName) { - col = ((SQLName)expr).getSimpleName(); - } - else { - col =expr.toString(); - } - if(type == null) { - type = SQLOrderingSpecification.ASC; - } - col=getAliaColumn(aliaColumns,col);//此步骤得到的col必须是不带.的,有别名的用别名,无别名的用字段名 - map.put(col, type == SQLOrderingSpecification.ASC ? OrderCol.COL_ORDER_TYPE_ASC : OrderCol.COL_ORDER_TYPE_DESC); - } - return map; - } - - private boolean isConditionAlwaysTrue(SQLStatement statement) { - SQLSelectStatement selectStmt = (SQLSelectStatement)statement; - SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery(); - if(sqlSelectQuery instanceof MySqlSelectQueryBlock) { - MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)selectStmt.getSelect().getQuery(); - SQLExpr expr = mysqlSelectQuery.getWhere(); - - Object o = WallVisitorUtils.getValue(expr); - if(Boolean.TRUE.equals(o)) { - return true; - } - return false; - } else {//union - return false; - } - - } - - protected void setLimitIFChange(SQLStatement stmt, RouteResultset rrs, SchemaConfig schema, SQLBinaryOpExpr one, int firstrownum, int lastrownum) - { - rrs.setLimitStart(firstrownum); - rrs.setLimitSize(lastrownum - firstrownum); - LayerCachePool tableId2DataNodeCache = (LayerCachePool) MycatServer.getInstance().getCacheService().getCachePool("TableID2DataNodeCache"); - try - { - tryRoute(schema, rrs, tableId2DataNodeCache); - } catch (SQLNonTransientException e) - { - throw new RuntimeException(e); - } - if (isNeedChangeLimit(rrs)) - { - one.setRight(new SQLIntegerExpr(0)); - String curentDbType ="db2".equalsIgnoreCase(this.getCurentDbType())?"oracle":getCurentDbType(); - String sql = SQLUtils.toSQLString(stmt, curentDbType);; - rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(), sql,0,lastrownum, false); - //设置改写后的sql - getCtx().setSql(sql); - } - } -} +package io.mycat.route.parser.druid.impl; + +import java.sql.SQLNonTransientException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.SQLName; +import com.alibaba.druid.sql.ast.SQLOrderingSpecification; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLAggregateExpr; +import com.alibaba.druid.sql.ast.expr.SQLAllColumnExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr; +import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr; +import com.alibaba.druid.sql.ast.expr.SQLNumericLiteralExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.expr.SQLTextLiteralExpr; +import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; +import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource; +import com.alibaba.druid.sql.ast.statement.SQLSelectGroupByClause; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQuery; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLSubqueryTableSource; +import com.alibaba.druid.sql.ast.statement.SQLTableSource; +import com.alibaba.druid.sql.dialect.db2.ast.stmt.DB2SelectQueryBlock; +import com.alibaba.druid.sql.dialect.db2.visitor.DB2OutputVisitor; +import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlOrderingExpr; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock.Limit; +import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUnionQuery; +import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlOutputVisitor; +import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor; +import com.alibaba.druid.sql.dialect.oracle.ast.stmt.OracleSelectQueryBlock; +import com.alibaba.druid.sql.dialect.oracle.visitor.OracleOutputVisitor; +import com.alibaba.druid.sql.dialect.postgresql.ast.stmt.PGSelectQueryBlock; +import com.alibaba.druid.sql.dialect.postgresql.visitor.PGOutputVisitor; +import com.alibaba.druid.sql.dialect.sqlserver.ast.SQLServerSelectQueryBlock; +import com.alibaba.druid.sql.visitor.SQLASTOutputVisitor; +import com.alibaba.druid.util.JdbcConstants; +import com.alibaba.druid.wall.spi.WallVisitorUtils; + +import io.mycat.MycatServer; +import io.mycat.cache.LayerCachePool; +import io.mycat.config.ErrorCode; +import io.mycat.config.model.SchemaConfig; +import io.mycat.config.model.TableConfig; +import io.mycat.route.RouteResultset; +import io.mycat.route.RouteResultsetNode; +import io.mycat.route.parser.druid.MycatSchemaStatVisitor; +import io.mycat.route.parser.druid.RouteCalculateUnit; +import io.mycat.route.util.RouterUtil; +import io.mycat.sqlengine.mpp.ColumnRoutePair; +import io.mycat.sqlengine.mpp.HavingCols; +import io.mycat.sqlengine.mpp.MergeCol; +import io.mycat.sqlengine.mpp.OrderCol; +import io.mycat.util.ObjectUtil; +import io.mycat.util.StringUtil; + +public class DruidSelectParser extends DefaultDruidParser { + private final Logger logger = LoggerFactory.getLogger(getClass()); + + + protected boolean isNeedParseOrderAgg=true; + + @Override + public void statementParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt) { + SQLSelectStatement selectStmt = (SQLSelectStatement)stmt; + SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery(); + if(sqlSelectQuery instanceof MySqlSelectQueryBlock) { + MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)selectStmt.getSelect().getQuery(); + + parseOrderAggGroupMysql(schema, stmt,rrs, mysqlSelectQuery); + //更改canRunInReadDB属性 + if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode()) && rrs.isAutocommit() == false) + { + rrs.setCanRunInReadDB(false); + } + + } else if (sqlSelectQuery instanceof MySqlUnionQuery) { +// MySqlUnionQuery unionQuery = (MySqlUnionQuery)sqlSelectQuery; +// MySqlSelectQueryBlock left = (MySqlSelectQueryBlock)unionQuery.getLeft(); +// MySqlSelectQueryBlock right = (MySqlSelectQueryBlock)unionQuery.getLeft(); +// System.out.println(); + } + } + protected void parseOrderAggGroupMysql(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery) + { + MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor(); + stmt.accept(visitor); +// rrs.setGroupByCols((String[])visitor.getGroupByColumns().toArray()); + if(!isNeedParseOrderAgg) + { + return; + } + Map aliaColumns = parseAggGroupCommon(schema, stmt, rrs, mysqlSelectQuery); + + //setOrderByCols + if(mysqlSelectQuery.getOrderBy() != null) { + List orderByItems = mysqlSelectQuery.getOrderBy().getItems(); + rrs.setOrderByCols(buildOrderByCols(orderByItems,aliaColumns)); + } + isNeedParseOrderAgg=false; + } + protected Map parseAggGroupCommon(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, SQLSelectQueryBlock mysqlSelectQuery) + { + Map aliaColumns = new HashMap(); + Map aggrColumns = new HashMap(); + // Added by winbill, 20160314, for having clause, Begin ==> + List havingColsName = new ArrayList(); + // Added by winbill, 20160314, for having clause, End <== + List selectList = mysqlSelectQuery.getSelectList(); + boolean isNeedChangeSql=false; + int size = selectList.size(); + boolean isDistinct=mysqlSelectQuery.getDistionOption()==2; + for (int i = 0; i < size; i++) + { + SQLSelectItem item = selectList.get(i); + + if (item.getExpr() instanceof SQLAggregateExpr) + { + SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr(); + String method = expr.getMethodName(); + boolean isHasArgument=!expr.getArguments().isEmpty(); + if(isHasArgument) + { + String aggrColName = method + "(" + expr.getArguments().get(0) + ")"; // Added by winbill, 20160314, for having clause + havingColsName.add(aggrColName); // Added by winbill, 20160314, for having clause + } + //只处理有别名的情况,无别名添加别名,否则某些数据库会得不到正确结果处理 + int mergeType = MergeCol.getMergeType(method); + if (MergeCol.MERGE_AVG == mergeType&&isRoutMultiNode(schema,rrs)) + { //跨分片avg需要特殊处理,直接avg结果是不对的 + String colName = item.getAlias() != null ? item.getAlias() : method + i; + SQLSelectItem sum =new SQLSelectItem(); + String sumColName = colName + "SUM"; + sum.setAlias(sumColName); + SQLAggregateExpr sumExp =new SQLAggregateExpr("SUM"); + ObjectUtil.copyProperties(expr,sumExp); + sumExp.getArguments().addAll(expr.getArguments()); + sumExp.setMethodName("SUM"); + sum.setExpr(sumExp); + selectList.set(i, sum); + aggrColumns.put(sumColName, MergeCol.MERGE_SUM); + havingColsName.add(sumColName); // Added by winbill, 20160314, for having clause + havingColsName.add(item.getAlias() != null ? item.getAlias() : ""); // Added by winbill, 20160314, two aliases for AVG + + SQLSelectItem count =new SQLSelectItem(); + String countColName = colName + "COUNT"; + count.setAlias(countColName); + SQLAggregateExpr countExp = new SQLAggregateExpr("COUNT"); + ObjectUtil.copyProperties(expr,countExp); + countExp.getArguments().addAll(expr.getArguments()); + countExp.setMethodName("COUNT"); + count.setExpr(countExp); + selectList.add(count); + aggrColumns.put(countColName, MergeCol.MERGE_COUNT); + + isNeedChangeSql=true; + aggrColumns.put(colName, mergeType); + rrs.setHasAggrColumn(true); + } else if (MergeCol.MERGE_UNSUPPORT != mergeType){ + String aggColName = null; + StringBuilder sb = new StringBuilder(); + if(mysqlSelectQuery instanceof MySqlSelectQueryBlock) { + expr.accept(new MySqlOutputVisitor(sb)); + } else if(mysqlSelectQuery instanceof OracleSelectQueryBlock) { + expr.accept(new OracleOutputVisitor(sb)); + } else if(mysqlSelectQuery instanceof PGSelectQueryBlock){ + expr.accept(new PGOutputVisitor(sb)); + } else if(mysqlSelectQuery instanceof SQLServerSelectQueryBlock) { + expr.accept(new SQLASTOutputVisitor(sb)); + } else if(mysqlSelectQuery instanceof DB2SelectQueryBlock) { + expr.accept(new DB2OutputVisitor(sb)); + } + aggColName = sb.toString(); + + if (item.getAlias() != null && item.getAlias().length() > 0) + { + aggrColumns.put(item.getAlias(), mergeType); + aliaColumns.put(aggColName,item.getAlias()); + } else + { //如果不加,jdbc方式时取不到正确结果 ;修改添加别名 + item.setAlias(method + i); + aggrColumns.put(method + i, mergeType); + aliaColumns.put(aggColName, method + i); + isNeedChangeSql=true; + } + rrs.setHasAggrColumn(true); + havingColsName.add(item.getAlias()); // Added by winbill, 20160314, for having clause + havingColsName.add(""); // Added by winbill, 20160314, one alias for non-AVG + } + } else + { + if (!(item.getExpr() instanceof SQLAllColumnExpr)) + { + String alia = item.getAlias(); + String field = getFieldName(item); + if (alia == null) + { + alia = field; + } + aliaColumns.put(field, alia); + } + } + + } + if(aggrColumns.size() > 0) { + rrs.setMergeCols(aggrColumns); + } + + //通过优化转换成group by来实现 + if(isDistinct) + { + mysqlSelectQuery.setDistionOption(0); + SQLSelectGroupByClause groupBy=new SQLSelectGroupByClause(); + for (String fieldName : aliaColumns.keySet()) + { + groupBy.addItem(new SQLIdentifierExpr(fieldName)); + } + mysqlSelectQuery.setGroupBy(groupBy); + isNeedChangeSql=true; + } + + + //setGroupByCols + if(mysqlSelectQuery.getGroupBy() != null) { + List groupByItems = mysqlSelectQuery.getGroupBy().getItems(); + String[] groupByCols = buildGroupByCols(groupByItems,aliaColumns); + rrs.setGroupByCols(groupByCols); + rrs.setHavings(buildGroupByHaving(mysqlSelectQuery.getGroupBy().getHaving(),aliaColumns)); + rrs.setHasAggrColumn(true); + rrs.setHavingColsName(havingColsName.toArray()); // Added by winbill, 20160314, for having clause + } + + + if (isNeedChangeSql) + { + String sql = stmt.toString(); + rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(),sql,0,-1, false); + getCtx().setSql(sql); + } + return aliaColumns; + } + + private HavingCols buildGroupByHaving(SQLExpr having,Map aliaColumns ){ + if (having == null) { + return null; + } + + SQLBinaryOpExpr expr = ((SQLBinaryOpExpr) having); + SQLExpr left = expr.getLeft(); + SQLBinaryOperator operator = expr.getOperator(); + SQLExpr right = expr.getRight(); + + String leftValue = null;; + if (left instanceof SQLAggregateExpr) { + leftValue = ((SQLAggregateExpr) left).getMethodName() + "(" + + ((SQLAggregateExpr) left).getArguments().get(0) + ")"; + String aggrColumnAlias = getAliaColumn(aliaColumns,leftValue); + if(aggrColumnAlias != null) { // having聚合函数存在别名 + expr.setLeft(new SQLIdentifierExpr(aggrColumnAlias)); + leftValue = aggrColumnAlias; + } + } else if (left instanceof SQLIdentifierExpr) { + leftValue = ((SQLIdentifierExpr) left).getName(); + } + + String rightValue = null; + if (right instanceof SQLNumericLiteralExpr) { + rightValue = right.toString(); + }else if(right instanceof SQLTextLiteralExpr){ + rightValue = StringUtil.removeBackquote(right.toString()); + } + + return new HavingCols(leftValue,rightValue,operator.getName()); + } + + private boolean isRoutMultiNode(SchemaConfig schema, RouteResultset rrs) + { + if(rrs.getNodes()!=null&&rrs.getNodes().length>1) + { + return true; + } + LayerCachePool tableId2DataNodeCache = (LayerCachePool) MycatServer.getInstance().getCacheService().getCachePool("TableID2DataNodeCache"); + try + { + tryRoute(schema, rrs, tableId2DataNodeCache); + if(rrs.getNodes()!=null&&rrs.getNodes().length>1) + { + return true; + } + } catch (SQLNonTransientException e) + { + throw new RuntimeException(e); + } + return false; + } + + private String getFieldName(SQLSelectItem item){ + if ((item.getExpr() instanceof SQLPropertyExpr)||(item.getExpr() instanceof SQLMethodInvokeExpr) + || (item.getExpr() instanceof SQLIdentifierExpr) || item.getExpr() instanceof SQLBinaryOpExpr) { + return item.getExpr().toString();//字段别名 + } + else { + return item.toString(); + } + } + + /** + * 现阶段目标为 有一个只涉及到一张表的子查询时,先执行子查询,获得返回结果后,改写原有sql继续执行,得到最终结果. + * 在这种情况下,原sql不需要继续解析. + * 使用catlet 的情况也不再继续解析. + */ + @Override + public boolean afterVisitorParser(RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor) { + int subQuerySize = visitor.getSubQuerys().size(); + + if(subQuerySize==0&&ctx.getTables().size()==2){ //两表关联,考虑使用catlet + if(ctx.getVisitor().getConditions() !=null && ctx.getVisitor().getConditions().size()>0){ + return true; + } + }else if(subQuerySize==1){ //只涉及一张表的子查询,使用 MiddlerResultHandler 获取中间结果后,改写原有 sql 继续执行 TODO 后期可能会考虑多个. + SQLSelectQuery sqlSelectQuery = visitor.getSubQuerys().iterator().next().getQuery(); + if(((MySqlSelectQueryBlock)sqlSelectQuery).getFrom() instanceof SQLExprTableSource) { + return true; + } + } + + return super.afterVisitorParser(rrs, stmt, visitor); + } + + /** + * 改写sql:需要加limit的加上 + */ + @Override + public void changeSql(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,LayerCachePool cachePool) throws SQLNonTransientException { + + tryRoute(schema, rrs, cachePool); + + rrs.copyLimitToNodes(); + + SQLSelectStatement selectStmt = (SQLSelectStatement)stmt; + SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery(); + if(sqlSelectQuery instanceof MySqlSelectQueryBlock) { + MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)selectStmt.getSelect().getQuery(); + int limitStart = 0; + int limitSize = schema.getDefaultMaxLimit(); + + //clear group having + SQLSelectGroupByClause groupByClause = mysqlSelectQuery.getGroupBy(); + // Modified by winbill, 20160614, do NOT include having clause when routing to multiple nodes + if(groupByClause != null && groupByClause.getHaving() != null && isRoutMultiNode(schema,rrs)){ + groupByClause.setHaving(null); + } + + Map>> allConditions = getAllConditions(); + boolean isNeedAddLimit = isNeedAddLimit(schema, rrs, mysqlSelectQuery, allConditions); + if(isNeedAddLimit) { + Limit limit = new Limit(); + limit.setRowCount(new SQLIntegerExpr(limitSize)); + mysqlSelectQuery.setLimit(limit); + rrs.setLimitSize(limitSize); + String sql= getSql(rrs, stmt, isNeedAddLimit); + rrs.changeNodeSqlAfterAddLimit(schema, getCurentDbType(), sql, 0, limitSize, true); + + } + Limit limit = mysqlSelectQuery.getLimit(); + if(limit != null&&!isNeedAddLimit) { + SQLIntegerExpr offset = (SQLIntegerExpr)limit.getOffset(); + SQLIntegerExpr count = (SQLIntegerExpr)limit.getRowCount(); + if(offset != null) { + limitStart = offset.getNumber().intValue(); + rrs.setLimitStart(limitStart); + } + if(count != null) { + limitSize = count.getNumber().intValue(); + rrs.setLimitSize(limitSize); + } + + if(isNeedChangeLimit(rrs)) { + Limit changedLimit = new Limit(); + changedLimit.setRowCount(new SQLIntegerExpr(limitStart + limitSize)); + + if(offset != null) { + if(limitStart < 0) { + String msg = "You have an error in your SQL syntax; check the manual that " + + "corresponds to your MySQL server version for the right syntax to use near '" + limitStart + "'"; + throw new SQLNonTransientException(ErrorCode.ER_PARSE_ERROR + " - " + msg); + } else { + changedLimit.setOffset(new SQLIntegerExpr(0)); + + } + } + + mysqlSelectQuery.setLimit(changedLimit); + + String sql= getSql(rrs, stmt, isNeedAddLimit); + rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(),sql,0, limitStart + limitSize, true); + + //设置改写后的sql + ctx.setSql(sql); + + } else + { + + rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(),getCtx().getSql(),rrs.getLimitStart(), rrs.getLimitSize(), true); + // ctx.setSql(nativeSql); + + } + + + } + + if(rrs.isDistTable()){ + MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery(); + SQLTableSource from2 = query.getFrom(); + if (from2 instanceof SQLSubqueryTableSource) { + SQLSubqueryTableSource from = (SQLSubqueryTableSource) from2; + MySqlSelectQueryBlock query2 = (MySqlSelectQueryBlock) from.getSelect().getQuery(); + sqlRoute(rrs, selectStmt, mysqlSelectQuery, query2); + } else { + sqlRoute(rrs, selectStmt, mysqlSelectQuery, query); + } + logger.info("执行sql: " + selectStmt.toString()); + } + + rrs.setCacheAble(isNeedCache(schema, rrs, mysqlSelectQuery, allConditions)); + } + + } + private void sqlRoute(RouteResultset rrs, SQLSelectStatement selectStmt, MySqlSelectQueryBlock mysqlSelectQuery, + MySqlSelectQueryBlock query) throws SQLNonTransientException { + SQLTableSource from2 = query.getFrom(); + SQLExprTableSource left2 = (SQLExprTableSource) getExpr(from2); + String alias = left2.getAlias(); + + left2.setAlias(alias); + + SQLTableSource from1 = mysqlSelectQuery.getFrom(); + + for (RouteResultsetNode node : rrs.getNodes()) { + /* + SQLExprTableSource from2 = new SQLExprTableSource(sqlIdentifierExpr); + from2.setAlias(alias); + */ + SQLIdentifierExpr sqlIdentifierExpr = new SQLIdentifierExpr(); + sqlIdentifierExpr.setParent(from1); + sqlIdentifierExpr.setName(node.getSubTableName()); + left2.setExpr(sqlIdentifierExpr); + + //mysqlSelectQuery.setFrom(left2); + node.setStatement(selectStmt.toString()); + } + } + + private SQLTableSource getExpr(SQLTableSource source) throws SQLNonTransientException { + if (source instanceof SQLExprTableSource) { + return source; + } else if (source instanceof SQLJoinTableSource) { + SQLJoinTableSource joinsource = (SQLJoinTableSource) source; + SQLTableSource right = joinsource.getRight(); + + if (right instanceof SQLJoinTableSource) { + + } else { + + } + + return getExpr(joinsource.getLeft()); + } else { + throw new SQLNonTransientException(ErrorCode.ER_PARSE_ERROR + " - " + "sql不支持"); + } + } + + /** + * 获取所有的条件:因为可能被or语句拆分成多个RouteCalculateUnit,条件分散了 + * @return + */ + private Map>> getAllConditions() { + Map>> map = new HashMap>>(); + for(RouteCalculateUnit unit : ctx.getRouteCalculateUnits()) { + if(unit != null && unit.getTablesAndConditions() != null) { + map.putAll(unit.getTablesAndConditions()); + } + } + + return map; + } + + private void tryRoute(SchemaConfig schema, RouteResultset rrs, LayerCachePool cachePool) throws SQLNonTransientException { + if(rrs.isFinishedRoute()) + { + return;//避免重复路由 + } + + //无表的select语句直接路由带任一节点 + if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty())) { + rrs = RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), ctx.getSql()); + rrs.setFinishedRoute(true); + return; + } +// RouterUtil.tryRouteForTables(schema, ctx, rrs, true, cachePool); + SortedSet nodeSet = new TreeSet(); + boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema); + for (RouteCalculateUnit unit : ctx.getRouteCalculateUnits()) { + RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, ctx, unit, rrs, true, cachePool); + if (rrsTmp != null&&rrsTmp.getNodes()!=null) { + for (RouteResultsetNode node : rrsTmp.getNodes()) { + nodeSet.add(node); + } + } + if(isAllGlobalTable) {//都是全局表时只计算一遍路由 + break; + } + } + + if(nodeSet.size() == 0) { + + Collection stringCollection= ctx.getTableAliasMap().values() ; + for (String table : stringCollection) + { + if(table!=null&&table.toLowerCase().contains("information_schema.")) + { + rrs = RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), ctx.getSql()); + rrs.setFinishedRoute(true); + return; + } + } + String msg = " find no Route:" + ctx.getSql(); + LOGGER.warn(msg); + throw new SQLNonTransientException(msg); + } + + RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()]; + int i = 0; + for (Iterator iterator = nodeSet.iterator(); iterator.hasNext();) { + nodes[i] = (RouteResultsetNode) iterator.next(); + i++; + + } + + rrs.setNodes(nodes); + rrs.setFinishedRoute(true); + } + + + protected String getCurentDbType() + { + return JdbcConstants.MYSQL; + } + + + + + protected String getSql( RouteResultset rrs,SQLStatement stmt, boolean isNeedAddLimit) + { + if(getCurentDbType().equalsIgnoreCase("mysql")&&(isNeedChangeLimit(rrs)||isNeedAddLimit)) + { + + return stmt.toString(); + + } + + return getCtx().getSql(); + } + + + + protected boolean isNeedChangeLimit(RouteResultset rrs) { + if(rrs.getNodes() == null) { + return false; + } else { + if(rrs.getNodes().length > 1) { + return true; + } + return false; + + } + } + + private boolean isNeedCache(SchemaConfig schema, RouteResultset rrs, + MySqlSelectQueryBlock mysqlSelectQuery, Map>> allConditions) { + if(ctx.getTables() == null || ctx.getTables().size() == 0 ) { + return false; + } + TableConfig tc = schema.getTables().get(ctx.getTables().get(0)); + if(tc==null ||(ctx.getTables().size() == 1 && tc.isGlobalTable()) + ) {//|| (ctx.getTables().size() == 1) && tc.getRule() == null && tc.getDataNodes().size() == 1 + return false; + } else { + //单表主键查询 + if(ctx.getTables().size() == 1) { + String tableName = ctx.getTables().get(0); + String primaryKey = schema.getTables().get(tableName).getPrimaryKey(); +// schema.getTables().get(ctx.getTables().get(0)).getParentKey() != null; + if(ctx.getRouteCalculateUnit().getTablesAndConditions().get(tableName) != null + && ctx.getRouteCalculateUnit().getTablesAndConditions().get(tableName).get(primaryKey) != null + && tc.getDataNodes().size() > 1) {//有主键条件 + return false; + } + //全局表不缓存 + }else if(RouterUtil.isAllGlobalTable(ctx, schema)){ + return false; + } + return true; + } + } + + /** + * 单表且是全局表 + * 单表且rule为空且nodeNodes只有一个 + * @param schema + * @param rrs + * @param mysqlSelectQuery + * @return + */ + private boolean isNeedAddLimit(SchemaConfig schema, RouteResultset rrs, + MySqlSelectQueryBlock mysqlSelectQuery, Map>> allConditions) { +// ctx.getTablesAndConditions().get(key)) + if(rrs.getLimitSize()>-1) + { + return false; + }else + if(schema.getDefaultMaxLimit() == -1) { + return false; + } else if (mysqlSelectQuery.getLimit() != null) {//语句中已有limit + return false; + } else if(ctx.getTables().size() == 1) { + String tableName = ctx.getTables().get(0); + TableConfig tableConfig = schema.getTables().get(tableName); + if(tableConfig==null) + { + return schema.getDefaultMaxLimit() > -1; // 找不到则取schema的配置 + } + + boolean isNeedAddLimit= tableConfig.isNeedAddLimit(); + if(!isNeedAddLimit) + { + return false;//优先从配置文件取 + } + + if(schema.getTables().get(tableName).isGlobalTable()) { + return true; + } + + String primaryKey = schema.getTables().get(tableName).getPrimaryKey(); + +// schema.getTables().get(ctx.getTables().get(0)).getParentKey() != null; + if(allConditions.get(tableName) == null) {//无条件 + return true; + } + + if (allConditions.get(tableName).get(primaryKey) != null) {//条件中带主键 + return false; + } + + return true; + } else if(rrs.hasPrimaryKeyToCache() && ctx.getTables().size() == 1){//只有一个表且条件中有主键,不需要limit了,因为主键只能查到一条记录 + return false; + } else {//多表或无表 + return false; + } + + } + private String getAliaColumn(Map aliaColumns,String column ){ + String alia=aliaColumns.get(column); + if (alia==null){ + if(column.indexOf(".") < 0) { + String col = "." + column; + String col2 = ".`" + column+"`"; + //展开aliaColumns,将之类的键值对展开成 + for(Map.Entry entry : aliaColumns.entrySet()) { + if(entry.getKey().endsWith(col)||entry.getKey().endsWith(col2)) { + if(entry.getValue() != null && entry.getValue().indexOf(".") > 0) { + return column; + } + return entry.getValue(); + } + } + } + + return column; + } + else { + return alia; + } + } + + private String[] buildGroupByCols(List groupByItems,Map aliaColumns) { + String[] groupByCols = new String[groupByItems.size()]; + for(int i= 0; i < groupByItems.size(); i++) { + SQLExpr sqlExpr = groupByItems.get(i); + String column = null; + if(sqlExpr instanceof SQLIdentifierExpr ) + { + column=((SQLIdentifierExpr) sqlExpr).getName(); + } else if(sqlExpr instanceof SQLMethodInvokeExpr){ + column = ((SQLMethodInvokeExpr) sqlExpr).toString(); + } else if(sqlExpr instanceof MySqlOrderingExpr){ + //todo czn + SQLExpr expr = ((MySqlOrderingExpr) sqlExpr).getExpr(); + + if (expr instanceof SQLName) + { + column = StringUtil.removeBackquote(((SQLName) expr).getSimpleName());//不要转大写 2015-2-10 sohudo StringUtil.removeBackquote(expr.getSimpleName().toUpperCase()); + } else + { + column = StringUtil.removeBackquote(expr.toString()); + } + } else if(sqlExpr instanceof SQLPropertyExpr){ + /** + * 针对子查询别名,例如select id from (select h.id from hotnews h union select h.title from hotnews h ) as t1 group by t1.id; + */ + column = sqlExpr.toString(); + } + if(column == null){ + column = sqlExpr.toString(); + } + int dotIndex=column.indexOf(".") ; + int bracketIndex=column.indexOf("(") ; + //通过判断含有括号来决定是否为函数列 + if(dotIndex!=-1&&bracketIndex==-1) + { + //此步骤得到的column必须是不带.的,有别名的用别名,无别名的用字段名 + column=column.substring(dotIndex+1) ; + } + groupByCols[i] = getAliaColumn(aliaColumns,column);//column; + } + return groupByCols; + } + + protected LinkedHashMap buildOrderByCols(List orderByItems,Map aliaColumns) { + LinkedHashMap map = new LinkedHashMap(); + for(int i= 0; i < orderByItems.size(); i++) { + SQLOrderingSpecification type = orderByItems.get(i).getType(); + //orderColumn只记录字段名称,因为返回的结果集是不带表名的。 + SQLExpr expr = orderByItems.get(i).getExpr(); + String col; + if (expr instanceof SQLName) { + col = ((SQLName)expr).getSimpleName(); + } + else { + col =expr.toString(); + } + if(type == null) { + type = SQLOrderingSpecification.ASC; + } + col=getAliaColumn(aliaColumns,col);//此步骤得到的col必须是不带.的,有别名的用别名,无别名的用字段名 + map.put(col, type == SQLOrderingSpecification.ASC ? OrderCol.COL_ORDER_TYPE_ASC : OrderCol.COL_ORDER_TYPE_DESC); + } + return map; + } + + private boolean isConditionAlwaysTrue(SQLStatement statement) { + SQLSelectStatement selectStmt = (SQLSelectStatement)statement; + SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery(); + if(sqlSelectQuery instanceof MySqlSelectQueryBlock) { + MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)selectStmt.getSelect().getQuery(); + SQLExpr expr = mysqlSelectQuery.getWhere(); + + Object o = WallVisitorUtils.getValue(expr); + if(Boolean.TRUE.equals(o)) { + return true; + } + return false; + } else {//union + return false; + } + + } + + protected void setLimitIFChange(SQLStatement stmt, RouteResultset rrs, SchemaConfig schema, SQLBinaryOpExpr one, int firstrownum, int lastrownum) + { + rrs.setLimitStart(firstrownum); + rrs.setLimitSize(lastrownum - firstrownum); + LayerCachePool tableId2DataNodeCache = (LayerCachePool) MycatServer.getInstance().getCacheService().getCachePool("TableID2DataNodeCache"); + try + { + tryRoute(schema, rrs, tableId2DataNodeCache); + } catch (SQLNonTransientException e) + { + throw new RuntimeException(e); + } + if (isNeedChangeLimit(rrs)) + { + one.setRight(new SQLIntegerExpr(0)); + String curentDbType ="db2".equalsIgnoreCase(this.getCurentDbType())?"oracle":getCurentDbType(); + String sql = SQLUtils.toSQLString(stmt, curentDbType);; + rrs.changeNodeSqlAfterAddLimit(schema,getCurentDbType(), sql,0,lastrownum, false); + //设置改写后的sql + getCtx().setSql(sql); + } + } +}