Skip to content

Commit

Permalink
Multithread caclite Server
Browse files Browse the repository at this point in the history
gathering calcite server changes

Move to a pooled server
Correct default user data to only use current user
Add algebra test
Add multithread support test
start tidy up

add Multithreaded junit test

Add threaded test to junit

NOTE:  if high thread count and lots of cycles may need to reuse wait connections.   On linux do the following:
echo "1" > /proc/sys/net/ipv4/tcp_tw_recycle
echo "1" > /proc/sys/net/ipv4/tcp_tw_reuse

Pool Parsers Perfectly

Move to pooled model

Code cleanup add overides

Lazy table loading in calcite

Some code cleanup
  • Loading branch information
dwayneberry committed Jan 19, 2016
1 parent 459c434 commit e371a50
Show file tree
Hide file tree
Showing 13 changed files with 645 additions and 449 deletions.
297 changes: 78 additions & 219 deletions java/src/main/java/com/mapd/calcite/parser/MapDCatalogReader.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,11 @@
*/
package com.mapd.calcite.parser;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.externalize.MapDRelJsonWriter;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
Expand All @@ -32,8 +26,6 @@
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.sql2rel.StandardConvertletTable;
import org.slf4j.Logger;
Expand All @@ -43,114 +35,71 @@
*
* @author michael
*/
class MapDSerializer {

static String toString(final RelNode rel) {
if (rel == null) {
return null;
}
final MapDRelJsonWriter planWriter = new MapDRelJsonWriter();
rel.explain(planWriter);
return planWriter.asString();
}
}

public class CalciteParser {

final static Logger logger = LoggerFactory.getLogger(CalciteParser.class);

Quoting quoting = Quoting.DOUBLE_QUOTE;
Casing unquotedCasing = Casing.UNCHANGED;
Casing quotedCasing = Casing.UNCHANGED;

private RelDataTypeFactory typeFactory;
private Prepare.CatalogReader catalogReader;
private SqlValidator validator;
private SqlToRelConverter converter;

private SqlOperatorTable opTab;
private RelOptPlanner planner;
public final class MapDParser {

public static void main(String[] args) throws UnsupportedEncodingException, FileNotFoundException, IOException, SqlParseException {
logger.info("Hello, World -- CalciteParser here");
final static Logger MAPDLOGGER = LoggerFactory.getLogger(MapDParser.class);

CalciteParser x = new CalciteParser();
x.doWork(args);
}

protected void doWork(String[] args) throws UnsupportedEncodingException, FileNotFoundException, IOException, SqlParseException {
logger.debug("In doWork");

logger.info(getRelAlgebra("SELECT origin_lon, origin_lat FROM flights group by origin_lon, origin_lat", false));
private final Quoting quoting = Quoting.DOUBLE_QUOTE;
private final Casing unquotedCasing = Casing.UNCHANGED;
private final Casing quotedCasing = Casing.UNCHANGED;

//logger.info(getRelAlgebra("Select * from (SELECT a.deptime*1.4 as delay, a.foodrequest, b.plane_engine_type, "
// + "b.weatherdelay FROM flights b join food a on a.deptime=b.deptime"
// + " where (a.deptime * 1.413) =1234343)"));/
//logger.info(getRelAlgebra("SELECT * FROM SALES.EMP"));
long timer = System.currentTimeMillis();
// for (int i = 0; i < 100; i++) {
// getRelAlgebra("select empno from emp");
// }
private final RelDataTypeFactory typeFactory;
private final MapDCatalogReader catalogReader;
private final SqlValidator validator;
private final SqlToRelConverter converter;

logger.info("time for 100 parses is " + (System.currentTimeMillis() - timer) + " ms");
public MapDParser(){
typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
catalogReader = new MapDCatalogReader(typeFactory);
validator = new MapDValidator(
createOperatorTable(),
catalogReader,
typeFactory,
SqlConformance.DEFAULT);
final RexBuilder rexBuilder = new RexBuilder(typeFactory);
final RelOptCluster cluster = RelOptCluster.create(new MapDRelOptPlanner(), rexBuilder);
converter = new SqlToRelConverter(null, validator, catalogReader, cluster,
StandardConvertletTable.INSTANCE);
}

public String getRelAlgebra(String sql, final boolean legacy_syntax) throws SqlParseException {
long timer = System.currentTimeMillis();
public String getRelAlgebra(String sql, final boolean legacy_syntax, final MapDUser mapDUser)
throws SqlParseException {
SqlNode node = processSQL(sql, legacy_syntax);

typeFactory = getTypeFactory();

final Prepare.CatalogReader catalogReader
= createCatalogReader(typeFactory);

final SqlValidator validator
= createValidator(
catalogReader, typeFactory);

boolean is_select_star = isSelectStar(node);

catalogReader.setCurrentMapDUser(mapDUser);
SqlNode validate = validator.validate(node);

SqlSelect validate_select = getSelectChild(validate);

// Hide rowid from select * queries
if (legacy_syntax && is_select_star && validate_select != null) {
SqlNodeList proj_exprs = ((SqlSelect) validate).getSelectList();
SqlNodeList new_proj_exprs = new SqlNodeList(proj_exprs.getParserPosition());
for (SqlNode proj_expr : proj_exprs) {
if (proj_expr instanceof SqlIdentifier &&
(((SqlIdentifier) proj_expr).toString().toLowerCase()).endsWith(".rowid")) {
if (proj_expr instanceof SqlIdentifier
&& (((SqlIdentifier) proj_expr).toString().toLowerCase()).endsWith(".rowid")) {
continue;
}
new_proj_exprs.add(proj_expr);
}
validate_select.setSelectList(new_proj_exprs);
}

final SqlToRelConverter converter
= createSqlToRelConverter(
validator,
catalogReader,
typeFactory);

final RelRoot sqlRel = converter.convertQuery(node, true, true);
//final RelNode sqlRel = converter.convertSelect((SqlSelect)node, true);
//RexNode convertExpression = converter.convertExpression(node);

//logger.debug("After convert relNode is "+ convertExpression.toString());
//logger.debug("After convert relRoot kind is " + sqlRel.kind);

//logger.debug("After convert relRoot project is " + sqlRel.project().toString());

//logger.debug("After convert relalgebra is \n" + RelOptUtil.toString(sqlRel.project()));

RelNode project = sqlRel.project();

String res = MapDSerializer.toString(project);

//logger.info("After convert relalgebra is \n" + res);

return res;
}

Expand Down Expand Up @@ -188,9 +137,9 @@ private SqlNode processSQL(String sql, final boolean legacy_syntax) throws SqlPa
SqlParser sqlp = getSqlParser(sql);
try {
node = sqlp.parseStmt();
logger.debug(" node is \n" + node.toString());
MAPDLOGGER.debug(" node is \n" + node.toString());
} catch (SqlParseException ex) {
logger.error("failed to process SQL '" + sql + "' \n" + ex.toString());
MAPDLOGGER.error("failed to process SQL '" + sql + "' \n" + ex.toString());
throw ex;
}
if (!legacy_syntax) {
Expand Down Expand Up @@ -238,7 +187,7 @@ private static void desugar(SqlSelect select_node) {
}

private static SqlNode expandAliases(final SqlNode node,
final java.util.Map<String, SqlNode> id_to_expr) {
final java.util.Map<String, SqlNode> id_to_expr) {
if (node instanceof SqlIdentifier && id_to_expr.containsKey(node.toString())) {
return id_to_expr.get(node.toString());
}
Expand Down Expand Up @@ -268,48 +217,6 @@ private static SqlNodeList expandAliases(final SqlNodeList group_by_list, final
}
return new_group_by_list;
}
protected final RelDataTypeFactory getTypeFactory() {
if (typeFactory == null) {
typeFactory = createTypeFactory();
}
return typeFactory;
}

protected RelDataTypeFactory createTypeFactory() {
return new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
}

protected Prepare.CatalogReader createCatalogReader(
RelDataTypeFactory typeFactory) {
if (catalogReader == null) {
catalogReader = new MapDCatalogReader(typeFactory, true).init();
}
return catalogReader;
}

protected SqlValidator createValidator(
SqlValidatorCatalogReader catalogReader,
RelDataTypeFactory typeFactory) {
if (validator == null) {
validator = new MapDTestValidator(
getOperatorTable(),
createCatalogReader(typeFactory),
typeFactory,
getConformance());
}
return validator;
}

protected SqlConformance getConformance() {
return SqlConformance.DEFAULT;
}

protected final SqlOperatorTable getOperatorTable() {
if (opTab == null) {
opTab = createOperatorTable();
}
return opTab;
}

/**
* Creates an operator table.
Expand All @@ -325,31 +232,6 @@ protected SqlOperatorTable createOperatorTable() {
return tempOpTab;
}

protected SqlToRelConverter createSqlToRelConverter(
final SqlValidator validator,
final Prepare.CatalogReader catalogReader,
final RelDataTypeFactory typeFactory) {
if (converter == null) {
final RexBuilder rexBuilder = new RexBuilder(typeFactory);
final RelOptCluster cluster
= RelOptCluster.create(getPlanner(), rexBuilder);
converter = new SqlToRelConverter(null, validator, catalogReader, cluster,
StandardConvertletTable.INSTANCE);
}
return converter;
}

protected final RelOptPlanner getPlanner() {
if (planner == null) {
planner = createPlanner();
}
return planner;
}

protected RelOptPlanner createPlanner() {
return new MapDRelOptPlanner();
}

protected SqlNode parseStmt(String sql) throws SqlParseException {
return getSqlParser(sql).parseStmt();
}
Expand All @@ -362,21 +244,4 @@ protected SqlParser getSqlParser(String sql) {
.setQuotedCasing(quotedCasing)
.build());
}

private static class MapDTestValidator extends SqlValidatorImpl {

public MapDTestValidator(
SqlOperatorTable opTab,
SqlValidatorCatalogReader catalogReader,
RelDataTypeFactory typeFactory,
SqlConformance conformance) {
super(opTab, catalogReader, typeFactory, conformance);
}

// override SqlValidator
@Override
public boolean shouldExpandIdentifiers() {
return true;
}
}
}
}
11 changes: 11 additions & 0 deletions java/src/main/java/com/mapd/calcite/parser/MapDRelOptPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ public MapDRelOptPlanner() {
}

// implement RelOptPlanner
@Override
public void setRoot(RelNode rel) {
this.root = rel;
}

// implement RelOptPlanner
@Override
public RelNode getRoot() {
return root;
}
Expand All @@ -72,23 +74,27 @@ public void clear() {
this.rule = null;
}

@Override
public boolean addRule(RelOptRule rule) {
assert this.rule == null : "MapDRelOptPlanner only supports a single rule";
this.rule = rule;

return false;
}

@Override
public boolean removeRule(RelOptRule rule) {
return false;
}

// implement RelOptPlanner
@Override
public RelNode changeTraits(RelNode rel, RelTraitSet toTraits) {
return rel;
}

// implement RelOptPlanner
@Override
public RelNode findBestExp() {
if (rule != null) {
matchRecursive(root, null, -1);
Expand Down Expand Up @@ -176,18 +182,21 @@ private boolean match(
}

// implement RelOptPlanner
@Override
public RelNode register(
RelNode rel,
RelNode equivRel) {
return rel;
}

// implement RelOptPlanner
@Override
public RelNode ensureRegistered(RelNode rel, RelNode equivRel) {
return rel;
}

// implement RelOptPlanner
@Override
public boolean isRegistered(RelNode rel) {
return true;
}
Expand All @@ -199,6 +208,7 @@ public long getRelMetadataTimestamp(RelNode rel) {

/**
* Allow tests to tweak the timestamp.
* @param metadataTimestamp
*/
public void setRelMetadataTimestamp(long metadataTimestamp) {
this.metadataTimestamp = metadataTimestamp;
Expand Down Expand Up @@ -229,6 +239,7 @@ private class MapDRuleCall extends RelOptRuleCall {
}

// implement RelOptRuleCall
@Override
public void transformTo(RelNode rel, Map<RelNode, RelNode> equiv) {
transformationResult = rel;
}
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/com/mapd/calcite/parser/MapDSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
public class MapDSchema {

private final List<String> tableNames = Lists.newArrayList();
private String name;
private final String name;

public MapDSchema(String name) {
this.name = name;
Expand Down
Loading

0 comments on commit e371a50

Please sign in to comment.