aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java222
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java374
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java689
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java42
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java2
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java12
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java50
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java3
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java198
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java237
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java184
-rw-r--r--contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java171
-rw-r--r--contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java1715
-rw-r--r--contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java176
-rw-r--r--contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java186
-rw-r--r--contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java115
-rw-r--r--contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java127
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java49
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java28
31 files changed, 4746 insertions, 87 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java
new file mode 100644
index 000000000..a57f5b5ec
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.CloneVisitor;
+import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.index.IndexProperties;
+import org.apache.drill.exec.store.mapr.PluginConstants;
+import org.apache.drill.exec.util.EncodedSchemaPathSet;
+import org.apache.drill.common.expression.LogicalExpression;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+public class MapRDBIndexDescriptor extends DrillIndexDescriptor {
+
+ protected final Object desc;
+ protected final Set<LogicalExpression> allFields;
+ protected final Set<LogicalExpression> indexedFields;
+ protected MapRDBFunctionalIndexInfo functionalInfo;
+ protected PluginCost pluginCost;
+
+ public MapRDBIndexDescriptor(List<LogicalExpression> indexCols,
+ CollationContext indexCollationContext,
+ List<LogicalExpression> nonIndexCols,
+ List<LogicalExpression> rowKeyColumns,
+ String indexName,
+ String tableName,
+ IndexType type,
+ Object desc,
+ DbGroupScan scan,
+ NullDirection nullsDirection) {
+ super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection);
+ this.desc = desc;
+ this.indexedFields = ImmutableSet.copyOf(indexColumns);
+ this.allFields = new ImmutableSet.Builder<LogicalExpression>()
+ .add(PluginConstants.DOCUMENT_SCHEMA_PATH)
+ .addAll(indexColumns)
+ .addAll(nonIndexColumns)
+ .build();
+ this.pluginCost = scan.getPluginCostModel();
+ }
+
+ public Object getOriginalDesc(){
+ return desc;
+ }
+
+ @Override
+ public boolean isCoveringIndex(List<LogicalExpression> expressions) {
+ List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions);
+ return columnsInIndexFields(decodedCols, allFields);
+ }
+
+ @Override
+ public boolean allColumnsIndexed(Collection<LogicalExpression> expressions) {
+ List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions);
+ return columnsInIndexFields(decodedCols, indexedFields);
+ }
+
+ @Override
+ public boolean someColumnsIndexed(Collection<LogicalExpression> columns) {
+ return columnsIndexed(columns, false);
+ }
+
+ private boolean columnsIndexed(Collection<LogicalExpression> expressions, boolean allColsIndexed) {
+ List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions);
+ if (allColsIndexed) {
+ return columnsInIndexFields(decodedCols, indexedFields);
+ } else {
+ return someColumnsInIndexFields(decodedCols, indexedFields);
+ }
+ }
+
+ public FunctionalIndexInfo getFunctionalInfo() {
+ if (this.functionalInfo == null) {
+ this.functionalInfo = new MapRDBFunctionalIndexInfo(this);
+ }
+ return this.functionalInfo;
+ }
+
+ /**
+ * Search through a LogicalExpression, finding all referenced schema paths
+ * and replace them with decoded paths.
+ * If one encoded path could be decoded to multiple paths, add these decoded paths to
+ * the end of returned list of expressions from parseExpressions.
+ */
+ private class DecodePathinExpr extends CloneVisitor {
+ Set<SchemaPath> schemaPathSet = Sets.newHashSet();
+
+ public List<LogicalExpression> parseExpressions(Collection<LogicalExpression> expressions) {
+ List<LogicalExpression> allCols = Lists.newArrayList();
+ Collection<SchemaPath> decoded;
+
+ for(LogicalExpression expr : expressions) {
+ LogicalExpression nonDecoded = expr.accept(this, null);
+ if(nonDecoded != null) {
+ allCols.add(nonDecoded);
+ }
+ }
+
+ if (schemaPathSet.size() > 0) {
+ decoded = EncodedSchemaPathSet.decode(schemaPathSet);
+ allCols.addAll(decoded);
+ }
+ return allCols;
+ }
+
+ @Override
+ public LogicalExpression visitSchemaPath(SchemaPath path, Void value) {
+ if (EncodedSchemaPathSet.isEncodedSchemaPath(path)) {
+ // if decoded size is not one, incoming path is encoded path thus there is no cast or other function applied on it,
+ // since users won't pass in encoded fields, so it is safe to return null,
+ schemaPathSet.add(path);
+ return null;
+ } else {
+ return path;
+ }
+ }
+
+ }
+
+ @Override
+ public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+ int numProjectedFields, GroupScan primaryTableGroupScan) {
+ Preconditions.checkArgument(primaryTableGroupScan instanceof DbGroupScan);
+ DbGroupScan dbGroupScan = (DbGroupScan) primaryTableGroupScan;
+ DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+ double totalRows = indexProps.getTotalRows();
+ double leadRowCount = indexProps.getLeadingSelectivity() * totalRows;
+ double avgRowSize = indexProps.getAvgRowSize();
+ if (indexProps.isCovering()) { // covering index
+ // int numIndexCols = allFields.size();
+ // for disk i/o, all index columns are going to be read into memory
+ double numBlocks = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan));
+ double diskCost = numBlocks * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan);
+ // cpu cost is cost of filter evaluation for the remainder condition
+ double cpuCost = 0.0;
+ if (indexProps.getTotalRemainderFilter() != null) {
+ cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST;
+ }
+ double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost
+ return costFactory.makeCost(leadRowCount, cpuCost, diskCost, networkCost);
+
+ } else { // non-covering index
+ // int numIndexCols = allFields.size();
+ double numBlocksIndex = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan));
+ double diskCostIndex = numBlocksIndex * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan);
+ // for the primary table join-back each row may belong to a different block, so in general num_blocks = num_rows;
+ // however, num_blocks cannot exceed the total number of blocks of the table
+ double totalBlocksPrimary = Math.ceil((dbGroupScan.getColumns().size() *
+ pluginCost.getAverageColumnSize(primaryTableGroupScan) * totalRows)/
+ pluginCost.getBlockSize(primaryTableGroupScan));
+ double diskBlocksPrimary = Math.min(totalBlocksPrimary, leadRowCount);
+ double diskCostPrimary = diskBlocksPrimary * pluginCost.getRandomBlockReadCost(primaryTableGroupScan);
+ double diskCostTotal = diskCostIndex + diskCostPrimary;
+
+ // cpu cost of remainder condition evaluation over the selected rows
+ double cpuCost = 0.0;
+ if (indexProps.getTotalRemainderFilter() != null) {
+ cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST;
+ }
+ double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost
+ return costFactory.makeCost(leadRowCount, cpuCost, diskCostTotal, networkCost);
+ }
+ }
+
+ // Future use once full table scan also includes network cost
+ private double getNetworkCost(double leadRowCount, int numProjectedFields, boolean isCovering,
+ GroupScan primaryTableGroupScan) {
+ if (isCovering) {
+ // db server will send only the projected columns to the db client for the selected
+ // number of rows, so network cost is based on the number of actual projected columns
+ double networkCost = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan);
+ return networkCost;
+ } else {
+ // only the rowkey column is projected from the index and sent over the network
+ double networkCostIndex = leadRowCount * 1 * pluginCost.getAverageColumnSize(primaryTableGroupScan);
+
+ // after join-back to primary table, all projected columns are sent over the network
+ double networkCostPrimary = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan);
+
+ return networkCostIndex + networkCostPrimary;
+ }
+
+ }
+
+ @Override
+ public PluginCost getPluginCostModel() {
+ return pluginCost;
+ }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
new file mode 100644
index 000000000..e1b8a61fc
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.index;
+
+import com.google.common.collect.Maps;
+import com.mapr.db.Admin;
+import com.mapr.db.MapRDB;
+import com.mapr.db.exceptions.DBException;
+import com.mapr.db.index.IndexDesc;
+import com.mapr.db.index.IndexDesc.MissingAndNullOrdering;
+import com.mapr.db.index.IndexFieldDesc;
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.parser.ExprLexer;
+import org.apache.drill.common.expression.parser.ExprParser;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatMatcher;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.FieldPathHelper;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.ojai.FieldPath;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDiscover {
+
+ static final String DEFAULT_STRING_CAST_LEN_STR = "256";
+
+ public MapRDBIndexDiscover(GroupScan inScan, DrillScanRelBase scanRel) {
+ super((AbstractDbGroupScan) inScan, scanRel);
+ }
+
+ public MapRDBIndexDiscover(GroupScan inScan, ScanPrel scanRel) {
+ super((AbstractDbGroupScan) inScan, scanRel);
+ }
+
+ @Override
+ public IndexCollection getTableIndex(String tableName) {
+ //return getTableIndexFromCommandLine(tableName);
+ return getTableIndexFromMFS(tableName);
+ }
+
+ /**
+ *
+ * @param tableName
+ * @return
+ */
+ private IndexCollection getTableIndexFromMFS(String tableName) {
+ try {
+ Set<DrillIndexDescriptor> idxSet = new HashSet<>();
+ Collection<IndexDesc> indexes = admin().getTableIndexes(new Path(tableName));
+ if (indexes.size() == 0 ) {
+ logger.error("No index returned from Admin.getTableIndexes for table {}", tableName);
+ return null;
+ }
+ for (IndexDesc idx : indexes) {
+ DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx);
+ if (hbaseIdx == null) {
+ //not able to build a valid index based on the index info from MFS
+ logger.error("Not able to build index for {}", idx.toString());
+ continue;
+ }
+ idxSet.add(hbaseIdx);
+ }
+ if (idxSet.size() == 0) {
+ logger.error("No index found for table {}.", tableName);
+ return null;
+ }
+ return new DrillIndexCollection(getOriginalScanRel(), idxSet);
+ } catch (DBException ex) {
+ logger.error("Could not get table index from File system.", ex);
+ }
+ catch(InvalidIndexDefinitionException ex) {
+ logger.error("Invalid index definition detected.", ex);
+ }
+ return null;
+ }
+
+ FileSelection deriveFSSelection(DrillFileSystem fs, IndexDescriptor idxDesc) throws IOException {
+ String tableName = idxDesc.getTableName();
+ String[] tablePath = tableName.split("/");
+ String tableParent = tableName.substring(0, tableName.lastIndexOf("/"));
+
+ return FileSelection.create(fs, tableParent, tablePath[tablePath.length - 1], false);
+ }
+
+ @Override
+ public DrillTable getNativeDrillTable(IndexDescriptor idxDescriptor) {
+
+ try {
+ final AbstractDbGroupScan origScan = getOriginalScan();
+ if (!(origScan instanceof MapRDBGroupScan)) {
+ return null;
+ }
+ MapRDBFormatPlugin maprFormatPlugin = ((MapRDBGroupScan) origScan).getFormatPlugin();
+ FileSystemPlugin fsPlugin = (FileSystemPlugin) (((MapRDBGroupScan) origScan).getStoragePlugin());
+
+ DrillFileSystem fs = ImpersonationUtil.createFileSystem(origScan.getUserName(), fsPlugin.getFsConf());
+ MapRDBFormatMatcher matcher = (MapRDBFormatMatcher) (maprFormatPlugin.getMatcher());
+ FileSelection fsSelection = deriveFSSelection(fs, idxDescriptor);
+ return matcher.isReadableIndex(fs, fsSelection, fsPlugin, fsPlugin.getName(),
+ origScan.getUserName(), idxDescriptor);
+
+ } catch (Exception e) {
+ logger.error("Failed to get native DrillTable.", e);
+ }
+ return null;
+ }
+
+ private SchemaPath fieldName2SchemaPath(String fieldName) {
+ if (fieldName.contains(":")) {
+ fieldName = fieldName.split(":")[1];
+ }
+ if (fieldName.contains(".")) {
+ return FieldPathHelper.fieldPath2SchemaPath(FieldPath.parseFrom(fieldName));
+ }
+ return SchemaPath.getSimplePath(fieldName);
+ }
+
+ String getDrillTypeStr(String maprdbTypeStr) {
+ String typeStr = maprdbTypeStr.toUpperCase();
+ String[] typeTokens = typeStr.split("[)(]");
+ String typeData = DEFAULT_STRING_CAST_LEN_STR;
+ if(typeTokens.length > 1) {
+ typeStr = typeTokens[0];
+ typeData = typeTokens[1];
+ }
+ switch(typeStr){
+ case "STRING":
+ // set default width since it is not specified
+ return "VARCHAR("+typeData+")";
+ case "LONG":
+ return "BIGINT";
+ case "INT":
+ case "INTEGER":
+ return "INT";
+ case "FLOAT":
+ return "FLOAT4";
+ case "DOUBLE":
+ return "FLOAT8";
+ case "INTERVAL_YEAR_MONTH":
+ return "INTERVALYEAR";
+ case "INTERVAL_DAY_TIME":
+ return "INTERVALDAY";
+ case "BOOLEAN":
+ return "BIT";
+ case "BINARY":
+ return "VARBINARY";
+ case "ANY":
+ case "DECIMAL":
+ return null;
+ default: return typeStr;
+ }
+
+ }
+
+ TypeProtos.MajorType getDrillType(String typeStr) {
+ switch(typeStr){
+ case "VARCHAR":
+ case "CHAR":
+ case "STRING":
+ // set default width since it is not specified
+ return
+ Types.required(TypeProtos.MinorType.VARCHAR).toBuilder().setWidth(
+ getOriginalScanRel().getCluster().getTypeFactory().createSqlType(SqlTypeName.VARCHAR).getPrecision()).build();
+ case "LONG":
+ case "BIGINT":
+ return Types.required(TypeProtos.MinorType.BIGINT);
+ case "INT":
+ case "INTEGER":
+ return Types.required(TypeProtos.MinorType.INT);
+ case "FLOAT":
+ return Types.required(TypeProtos.MinorType.FLOAT4);
+ case "DOUBLE":
+ return Types.required(TypeProtos.MinorType.FLOAT8);
+ case "INTERVAL_YEAR_MONTH":
+ return Types.required(TypeProtos.MinorType.INTERVALYEAR);
+ case "INTERVAL_DAY_TIME":
+ return Types.required(TypeProtos.MinorType.INTERVALDAY);
+ case "BOOLEAN":
+ return Types.required(TypeProtos.MinorType.BIT);
+ case "BINARY":
+ return Types.required(TypeProtos.MinorType.VARBINARY).toBuilder().build();
+ case "ANY":
+ case "DECIMAL":
+ return null;
+ default: return Types.required(TypeProtos.MinorType.valueOf(typeStr));
+ }
+ }
+
+ private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException {
+ //get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax
+ String castTypeStr = getDrillTypeStr(type);
+ if(castTypeStr == null) {//no cast
+ throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field);
+ }
+ try {
+ String castFunc = String.format("cast( %s as %s)", field, castTypeStr);
+ final ExprLexer lexer = new ExprLexer(new ANTLRStringStream(castFunc));
+ final CommonTokenStream tokens = new CommonTokenStream(lexer);
+ final ExprParser parser = new ExprParser(tokens);
+ final ExprParser.parse_return ret = parser.parse();
+ logger.trace("{}, {}", tokens, ret);
+ return ret.e;
+ }catch(Exception ex) {
+ logger.error("parse failed{}", ex);
+ }
+ return null;
+ }
+
+ private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException {
+ final String fieldName = desc.getFieldPath().asPathString();
+ final String functionDef = desc.getFunctionName();
+ if ((functionDef != null)) {//this is a function
+ String[] tokens = functionDef.split("\\s+");
+ if (tokens[0].equalsIgnoreCase("cast")) {
+ if (tokens.length != 3) {
+ throw new InvalidIndexDefinitionException("cast function definition not recognized: " + functionDef);
+ }
+ LogicalExpression idxExpr = castFunctionSQLSyntax(fieldName, tokens[2]);
+ if (idxExpr == null) {
+ throw new InvalidIndexDefinitionException("got null expression for function definition: " + functionDef);
+ }
+ return idxExpr;
+ } else {
+ throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef);
+ }
+ }
+ //else it is a schemaPath
+ return fieldName2SchemaPath(fieldName);
+ }
+
+ private List<LogicalExpression> field2SchemaPath(Collection<IndexFieldDesc> descCollection)
+ throws InvalidIndexDefinitionException {
+ List<LogicalExpression> listSchema = new ArrayList<>();
+ for (IndexFieldDesc field : descCollection) {
+ listSchema.add(getIndexExpression(field));
+ }
+ return listSchema;
+ }
+
+ private List<RelFieldCollation> getFieldCollations(IndexDesc desc, Collection<IndexFieldDesc> descCollection) {
+ List<RelFieldCollation> fieldCollations = new ArrayList<>();
+ int i=0;
+ for (IndexFieldDesc field : descCollection) {
+ RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ?
+ RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ?
+ RelFieldCollation.Direction.DESCENDING : null);
+ if (direction != null) {
+ // assume null direction of NULLS UNSPECIFIED for now until MapR-DB adds that to the APIs
+ RelFieldCollation.NullDirection nulldir =
+ desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullFirst ? NullDirection.FIRST :
+ (desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullLast ?
+ NullDirection.LAST : NullDirection.UNSPECIFIED);
+ RelFieldCollation c = new RelFieldCollation(i++, direction, nulldir);
+ fieldCollations.add(c);
+ } else {
+ // if the direction is not present for a field, no need to examine remaining fields
+ break;
+ }
+ }
+ return fieldCollations;
+ }
+
+ private CollationContext buildCollationContext(List<LogicalExpression> indexFields,
+ List<RelFieldCollation> indexFieldCollations) {
+ assert indexFieldCollations.size() <= indexFields.size();
+ Map<LogicalExpression, RelFieldCollation> collationMap = Maps.newHashMap();
+ for (int i = 0; i < indexFieldCollations.size(); i++) {
+ collationMap.put(indexFields.get(i), indexFieldCollations.get(i));
+ }
+ CollationContext collationContext = new CollationContext(collationMap, indexFieldCollations);
+ return collationContext;
+ }
+
+ private DrillIndexDescriptor buildIndexDescriptor(String tableName, IndexDesc desc)
+ throws InvalidIndexDefinitionException {
+ if (desc.isExternal()) {
+ //XX: not support external index
+ return null;
+ }
+
+ IndexDescriptor.IndexType idxType = IndexDescriptor.IndexType.NATIVE_SECONDARY_INDEX;
+ List<LogicalExpression> indexFields = field2SchemaPath(desc.getIndexedFields());
+ List<LogicalExpression> coveringFields = field2SchemaPath(desc.getIncludedFields());
+ coveringFields.add(SchemaPath.getSimplePath("_id"));
+ CollationContext collationContext = null;
+ if (!desc.isHashed()) { // hash index has no collation property
+ List<RelFieldCollation> indexFieldCollations = getFieldCollations(desc, desc.getIndexedFields());
+ collationContext = buildCollationContext(indexFields, indexFieldCollations);
+ }
+
+ DrillIndexDescriptor idx = new MapRDBIndexDescriptor (
+ indexFields,
+ collationContext,
+ coveringFields,
+ null,
+ desc.getIndexName(),
+ tableName,
+ idxType,
+ desc,
+ this.getOriginalScan(),
+ desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullFirst ? NullDirection.FIRST :
+ (desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullLast ?
+ NullDirection.LAST : NullDirection.UNSPECIFIED));
+
+ String storageName = this.getOriginalScan().getStoragePlugin().getName();
+ materializeIndex(storageName, idx);
+ return idx;
+ }
+
+ @SuppressWarnings("deprecation")
+ private Admin admin() {
+ assert getOriginalScan() instanceof MapRDBGroupScan;
+
+ final MapRDBGroupScan dbGroupScan = (MapRDBGroupScan) getOriginalScan();
+ final UserGroupInformation currentUser = ImpersonationUtil.createProxyUgi(dbGroupScan.getUserName());
+ final Configuration conf = dbGroupScan.getFormatPlugin().getFsConf();
+
+ final Admin admin;
+ try {
+ admin = currentUser.doAs(new PrivilegedExceptionAction<Admin>() {
+ public Admin run() throws Exception {
+ return MapRDB.getAdmin(conf);
+ }
+ });
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Failed to get Admin instance for user: " + currentUser.getUserName(), e);
+ }
+ return admin;
+ }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
index 3b8de349a..e129b968b 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
@@ -17,20 +17,49 @@
*/
package org.apache.drill.exec.planner.index;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.google.common.collect.Maps;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMdUtil;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.hbase.HBaseRegexParser;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.apache.hadoop.hbase.HConstants;
import org.ojai.store.QueryCondition;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
public class MapRDBStatistics implements Statistics {
@@ -260,16 +289,332 @@ public class MapRDBStatistics implements Statistics {
}
public boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context) {
- //XXX to implement for complete secondary index framework
+ GroupScan scan = IndexPlanUtils.getGroupScan(scanRel);
+
+ PlannerSettings settings = PrelUtil.getPlannerSettings(scanRel.getCluster().getPlanner());
+ rowKeyJoinBackIOFactor = settings.getIndexRowKeyJoinCostFactor();
+ if (scan instanceof DbGroupScan) {
+ String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+ if (statsCache.get(conditionAsStr) == null) {
+ IndexCollection indexes = ((DbGroupScan)scan).getSecondaryIndexCollection(scanRel);
+ populateStats(condition, indexes, scanRel, context);
+ logger.info("index_plan_info: initialize: scanRel #{} and groupScan {} got fulltable {}, statsCache: {}, fiStatsCache: {}",
+ scanRel.getId(), System.identityHashCode(scan), fullTableScanPayload, statsCache, fIStatsCache);
+ return true;
+ }
+ }
return false;
}
+ /**
+ * This function computes statistics when there is no query condition
+ * @param jTabGrpScan - The current group scan
+ * @param indexes - The collection of indexes to use for getting statistics
+ * @param scanRel - The current scanRel
+ * @param context - The index plan call context
+ */
+ private void populateStatsForNoFilter(JsonTableGroupScan jTabGrpScan, IndexCollection indexes, RelNode scanRel,
+ IndexCallContext context) {
+ // Get the stats payload for full table (has total rows in the table)
+ StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel);
+ addToCache(null, null, context, ftsPayload, jTabGrpScan, scanRel, scanRel.getRowType());
+ addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload);
+ // Get the stats for all indexes
+ for (IndexDescriptor idx: indexes) {
+ StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, idx, scanRel);
+ StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx);
+ RelDataType newRowType;
+ FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
+ if (functionInfo.hasFunctional()) {
+ newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
+ } else {
+ newRowType = scanRel.getRowType();
+ }
+ addToCache(null, idx, context, idxPayload, jTabGrpScan, scanRel, newRowType);
+ addToCache(idx, idxRowSizePayload, ftsPayload);
+ }
+ }
+
+ /**
+ * This is the core statistics function for populating the statistics. The statistics populated correspond to the query
+ * condition. Based on different types of plans, we would need statistics for different combinations of predicates. Currently,
+ * we do not have a tree-walker for {@link QueryCondition}. Hence, instead of using the individual predicates stats, to construct
+ * the stats for the overall predicates, we rely on using the final predicates. Hence, this has a limitation(susceptible) to
+ * predicate modification post stats generation. Statistics computed/stored are rowcounts, leading rowcounts, average rowsize.
+ * Rowcounts and leading rowcounts (i.e. corresponding to predicates on the leading index columns) are stored in the statsCache.
+ * Average rowsizes are stored in the fiStatsCache (FI stands for Filter Independent).
+ *
+ * @param condition - The condition for which to obtain statistics
+ * @param indexes - The collection of indexes to use for getting statistics
+ * @param scanRel - The current scanRel
+ * @param context - The index plan call context
+ */
+ private void populateStats(RexNode condition, IndexCollection indexes, DrillScanRelBase scanRel,
+ IndexCallContext context) {
+ JsonTableGroupScan jTabGrpScan;
+ Map<IndexDescriptor, IndexConditionInfo> firstKeyIdxConditionMap;
+ Map<IndexDescriptor, IndexConditionInfo> idxConditionMap;
+ /* Map containing the individual base conditions of an ANDed/ORed condition and their selectivities.
+ * This is used to compute the overall selectivity of a complex ANDed/ORed condition using its base
+ * conditions. Helps prevent over/under estimates and guessed selectivity for ORed predicates.
+ */
+ Map<String, Double> baseConditionMap;
+ GroupScan grpScan = IndexPlanUtils.getGroupScan(scanRel);
+
+ if ((scanRel instanceof DrillScanRel || scanRel instanceof ScanPrel) &&
+ grpScan instanceof JsonTableGroupScan) {
+ jTabGrpScan = (JsonTableGroupScan) grpScan;
+ } else {
+ logger.debug("Statistics: populateStats exit early - not an instance of JsonTableGroupScan!");
+ return;
+ }
+ if (condition == null) {
+ populateStatsForNoFilter(jTabGrpScan, indexes, scanRel, context);
+ statsAvailable = true;
+ return;
+ }
+
+ RexBuilder builder = scanRel.getCluster().getRexBuilder();
+ PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster());
+ // Get the stats payload for full table (has total rows in the table)
+ StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel);
+
+ // Get the average row size for table and all indexes
+ addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload);
+ if (ftsPayload == null || ftsPayload.getRowCount() == 0) {
+ return;
+ }
+ for (IndexDescriptor idx : indexes) {
+ StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx);
+ addToCache(idx, idxRowSizePayload, ftsPayload);
+ }
+
+ /* Only use indexes with distinct first key */
+ IndexCollection distFKeyIndexes = distinctFKeyIndexes(indexes, scanRel);
+ IndexConditionInfo.Builder infoBuilder = IndexConditionInfo.newBuilder(condition,
+ distFKeyIndexes, builder, scanRel);
+ idxConditionMap = infoBuilder.getIndexConditionMap();
+ firstKeyIdxConditionMap = infoBuilder.getFirstKeyIndexConditionMap();
+ baseConditionMap = new HashMap<>();
+ for (IndexDescriptor idx : firstKeyIdxConditionMap.keySet()) {
+ if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) {
+ continue;
+ }
+ RexNode idxCondition = firstKeyIdxConditionMap.get(idx).indexCondition;
+ /* Use the pre-processed condition only for getting actual statistic from MapR-DB APIs. Use the
+ * original condition everywhere else (cache store/lookups) since the RexNode condition and its
+ * corresponding QueryCondition will be used to get statistics. e.g. we convert LIKE into RANGE
+ * condition to get statistics. However, statistics are always asked for LIKE and NOT the RANGE
+ */
+ RexNode preProcIdxCondition = convertToStatsCondition(idxCondition, idx, context, scanRel,
+ Arrays.asList(SqlKind.CAST, SqlKind.LIKE));
+ RelDataType newRowType;
+ FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
+ if (functionInfo.hasFunctional()) {
+ newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
+ } else {
+ newRowType = scanRel.getRowType();
+ }
+
+ QueryCondition queryCondition = jTabGrpScan.convertToQueryCondition(
+ convertToLogicalExpression(preProcIdxCondition, newRowType, settings, builder));
+ // Cap rows/size at total rows in case of issues with DB APIs
+ StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(queryCondition, idx, scanRel);
+ double rowCount = Math.min(idxPayload.getRowCount(), ftsPayload.getRowCount());
+ double leadingRowCount = Math.min(idxPayload.getLeadingRowCount(), rowCount);
+ double avgRowSize = Math.min(idxPayload.getAvgRowSize(), ftsPayload.getAvgRowSize());
+ StatisticsPayload payload = new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize);
+ addToCache(idxCondition, idx, context, payload, jTabGrpScan, scanRel, newRowType);
+ addBaseConditions(idxCondition, payload, false, baseConditionMap, scanRel.getRowType());
+ }
+ /* Add the row count for index conditions on all indexes. Stats are only computed for leading
+ * keys but index conditions can be pushed and would be required for access path costing
+ */
+ for (IndexDescriptor idx : idxConditionMap.keySet()) {
+ if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) {
+ continue;
+ }
+ Map<LogicalExpression, RexNode> leadingPrefixMap = Maps.newHashMap();
+ double rowCount, leadingRowCount, avgRowSize;
+ RexNode idxCondition = idxConditionMap.get(idx).indexCondition;
+ // Ignore conditions which always evaluate to true
+ if (idxCondition.isAlwaysTrue()) {
+ continue;
+ }
+ RexNode idxIncColCondition = idxConditionMap.get(idx).remainderCondition;
+ RexNode idxRemColCondition = IndexPlanUtils.getLeadingPrefixMap(leadingPrefixMap, idx.getIndexColumns(), infoBuilder, idxCondition);
+ RexNode idxLeadColCondition = IndexPlanUtils.getLeadingColumnsFilter(
+ IndexPlanUtils.getLeadingFilters(leadingPrefixMap, idx.getIndexColumns()), builder);
+ RexNode idxTotRemColCondition = IndexPlanUtils.getTotalRemainderFilter(idxRemColCondition, idxIncColCondition, builder);
+ RexNode idxTotColCondition = IndexPlanUtils.getTotalFilter(idxLeadColCondition, idxTotRemColCondition, builder);
+ FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
+ RelDataType newRowType = scanRel.getRowType();
+ if (functionInfo.hasFunctional()) {
+ newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
+ }
+ /* For non-covering plans we would need the index leading condition */
+ rowCount = ftsPayload.getRowCount() * computeSelectivity(idxLeadColCondition, idx,
+ ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+ leadingRowCount = rowCount;
+ avgRowSize = fIStatsCache.get(buildUniqueIndexIdentifier(idx)).getAvgRowSize();
+ addToCache(idxLeadColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
+ jTabGrpScan, scanRel, newRowType);
+ /* For covering plans we would need the full condition */
+ rowCount = ftsPayload.getRowCount() * computeSelectivity(idxTotColCondition, idx,
+ ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+ addToCache(idxTotColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
+ jTabGrpScan, scanRel, newRowType);
+ /* For intersect plans we would need the index condition */
+ rowCount = ftsPayload.getRowCount() * computeSelectivity(idxCondition, idx,
+ ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+ addToCache(idxCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
+ jTabGrpScan, scanRel, newRowType);
+ /* Add the rowCount for condition on only included columns - no leading columns here! */
+ if (idxIncColCondition != null) {
+ rowCount = ftsPayload.getRowCount() * computeSelectivity(idxIncColCondition, null,
+ ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+ addToCache(idxIncColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, rowCount, avgRowSize),
+ jTabGrpScan, scanRel, newRowType);
+ }
+ }
+
+ // Add the rowCount for the complete condition - based on table
+ double rowCount = ftsPayload.getRowCount() * computeSelectivity(condition, null,
+ ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+ // Here, ftsLeadingKey rowcount is based on _id predicates
+ StatisticsPayload ftsLeadingKeyPayload = jTabGrpScan.getFirstKeyEstimatedStats(jTabGrpScan.convertToQueryCondition(
+ convertToLogicalExpression(condition, scanRel.getRowType(), settings, builder)), null, scanRel);
+ addToCache(condition, null, null, new MapRDBStatisticsPayload(rowCount, ftsLeadingKeyPayload.getRowCount(),
+ ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType());
+ // Add the full table rows while we are at it - represented by <NULL> RexNode, <NULL> QueryCondition.
+ // No ftsLeadingKey so leadingKeyRowcount = totalRowCount
+ addToCache(null, null, null, new MapRDBStatisticsPayload(ftsPayload.getRowCount(), ftsPayload.getRowCount(),
+ ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType());
+ // mark stats has been statsAvailable
+ statsAvailable = true;
+ }
+
+ private boolean addBaseConditions(RexNode condition, StatisticsPayload payload, boolean redundant,
+ Map<String, Double> baseConditionMap, RelDataType rowType) {
+ boolean res = redundant;
+ if (condition.getKind() == SqlKind.AND) {
+ for(RexNode pred : RelOptUtil.conjunctions(condition)) {
+ res = addBaseConditions(pred, payload, res, baseConditionMap, rowType);
+ }
+ } else if (condition.getKind() == SqlKind.OR) {
+ for(RexNode pred : RelOptUtil.disjunctions(condition)) {
+ res = addBaseConditions(pred, payload, res, baseConditionMap, rowType);
+ }
+ } else {
+ // base condition
+ String conditionAsStr = convertRexToString(condition, rowType);
+ if (!redundant) {
+ baseConditionMap.put(conditionAsStr, payload.getRowCount());
+ return true;
+ } else {
+ baseConditionMap.put(conditionAsStr, -1.0);
+ return false;
+ }
+ }
+ return res;
+ }
/*
- * Convert the given RexNode to a String representation while also replacing the RexInputRef references
- * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode
- * expressions may differ in the RexInputRef positions but otherwise the same.
- * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State)
- */
+ * Adds the statistic(row count) to the cache. Also adds the corresponding QueryCondition->RexNode
+ * condition mapping.
+ */
+ private void addToCache(RexNode condition, IndexDescriptor idx, IndexCallContext context,
+ StatisticsPayload payload, JsonTableGroupScan jTabGrpScan, RelNode scanRel, RelDataType rowType) {
+ if (condition != null
+ && !condition.isAlwaysTrue()) {
+ RexBuilder builder = scanRel.getCluster().getRexBuilder();
+ PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster());
+ String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+ if (statsCache.get(conditionAsStr) == null
+ && payload.getRowCount() != Statistics.ROWCOUNT_UNKNOWN) {
+ Map<String, StatisticsPayload> payloadMap = new HashMap<>();
+ payloadMap.put(buildUniqueIndexIdentifier(idx), payload);
+ statsCache.put(conditionAsStr, payloadMap);
+ logger.debug("Statistics: StatsCache:<{}, {}>",conditionAsStr, payload);
+ // Always pre-process CAST conditions - Otherwise queryCondition will not be generated correctly
+ RexNode preProcIdxCondition = convertToStatsCondition(condition, idx, context, scanRel,
+ Arrays.asList(SqlKind.CAST));
+ QueryCondition queryCondition =
+ jTabGrpScan.convertToQueryCondition(convertToLogicalExpression(preProcIdxCondition,
+ rowType, settings, builder));
+ if (queryCondition != null) {
+ String queryConditionAsStr = queryCondition.toString();
+ if (conditionRexNodeMap.get(queryConditionAsStr) == null) {
+ conditionRexNodeMap.put(queryConditionAsStr, conditionAsStr);
+ logger.debug("Statistics: QCRNCache:<{}, {}>",queryConditionAsStr, conditionAsStr);
+ }
+ } else {
+ logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr);
+ logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr);
+ }
+ } else {
+ Map<String, StatisticsPayload> payloadMap = statsCache.get(conditionAsStr);
+ if (payloadMap != null) {
+ if (payloadMap.get(buildUniqueIndexIdentifier(idx)) == null) {
+ payloadMap.put(buildUniqueIndexIdentifier(idx), payload);
+
+ // rowCount for the same condition should be the same on primary table or index,
+ // let us sync them to the smallest since currently both are over-estimated.
+ // DO NOT sync the leading rowCount since it is based on the leading condition and not the
+ // condition (key for this cache). Hence, for the same condition the leading condition and
+ // consequently the leading rowCount will vary with the index. Syncing them may lead to
+ // unintended side-effects e.g. given a covering index and full table scan and a condition
+ // on a non-id field which happens to be the leading key in the index, the leading rowcount
+ // for the full table scan should be the full table rowcount. Syncing them would incorrectly
+ // make the full table scan cheaper! If required, syncing should be only done based on
+ // leading condition and NOT the condition
+ double minimalRowCount = payload.getRowCount();
+ for (StatisticsPayload existing : payloadMap.values()) {
+ if (existing.getRowCount() < minimalRowCount) {
+ minimalRowCount = existing.getRowCount();
+ }
+ }
+ for (StatisticsPayload existing : payloadMap.values()) {
+ if (existing instanceof MapRDBStatisticsPayload) {
+ ((MapRDBStatisticsPayload)existing).rowCount = minimalRowCount;
+ }
+ }
+ } else {
+ logger.debug("Statistics: Filter row count already exists for filter: {}. Skip!", conditionAsStr);
+ }
+ } else {
+ logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr);
+ }
+ }
+ } else if (condition == null && idx == null) {
+ fullTableScanPayload = new MapRDBStatisticsPayload(payload.getRowCount(),
+ payload.getLeadingRowCount(), payload.getAvgRowSize());
+ logger.debug("Statistics: StatsCache:<{}, {}>","NULL", fullTableScanPayload);
+ }
+ }
+
+ private void addToCache(IndexDescriptor idx, StatisticsPayload payload, StatisticsPayload ftsPayload) {
+ String tabIdxIdentifier = buildUniqueIndexIdentifier(idx);
+ if (fIStatsCache.get(tabIdxIdentifier) == null) {
+ if (ftsPayload.getAvgRowSize() >= payload.getAvgRowSize()) {
+ fIStatsCache.put(tabIdxIdentifier, payload);
+ logger.debug("Statistics: fIStatsCache:<{}, {}>",tabIdxIdentifier, payload);
+ } else {
+ StatisticsPayload cappedPayload =
+ new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, ftsPayload.getAvgRowSize());
+ fIStatsCache.put(tabIdxIdentifier,cappedPayload);
+ logger.debug("Statistics: fIStatsCache:<{}, {}> (Capped)",tabIdxIdentifier, cappedPayload);
+ }
+ } else {
+ logger.debug("Statistics: Average row size already exists for :<{}, {}>. Skip!",tabIdxIdentifier, payload);
+ }
+ }
+
+ /*
+ * Convert the given RexNode to a String representation while also replacing the RexInputRef references
+ * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode
+ * expressions may differ in the RexInputRef positions but otherwise the same.
+ * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State)
+ */
private String convertRexToString(RexNode condition, RelDataType rowType) {
StringBuilder sb = new StringBuilder();
if (condition == null) {
@@ -320,11 +665,11 @@ public class MapRDBStatistics implements Statistics {
}
/*
- * Generate the input reference to column mapping for reference replacement. Please
- * look at the usage in convertRexToString() to understand why this mapping is required.
- */
+ * Generate the input reference to column mapping for reference replacement. Please
+ * look at the usage in convertRexToString() to understand why this mapping is required.
+ */
private void getInputRefMapping(RexNode condition, RelDataType rowType,
- HashMap<String, String> mapping) {
+ HashMap<String, String> mapping) {
if (condition instanceof RexCall) {
for (RexNode op : ((RexCall) condition).getOperands()) {
getInputRefMapping(op, rowType, mapping);
@@ -334,4 +679,328 @@ public class MapRDBStatistics implements Statistics {
rowType.getFieldNames().get(condition.hashCode()));
}
}
+
+ /*
+ * Additional pre-processing may be required for LIKE/CAST predicates in order to compute statistics.
+ * e.g. A LIKE predicate should be converted to a RANGE predicate for statistics computation. MapR-DB
+ * does not yet support computing statistics for LIKE predicates.
+ */
+ private RexNode convertToStatsCondition(RexNode condition, IndexDescriptor index,
+ IndexCallContext context, RelNode scanRel, List<SqlKind>typesToProcess) {
+ RexBuilder builder = scanRel.getCluster().getRexBuilder();
+ if (condition.getKind() == SqlKind.AND) {
+ final List<RexNode> conditions = Lists.newArrayList();
+ for(RexNode pred : RelOptUtil.conjunctions(condition)) {
+ conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess));
+ }
+ return RexUtil.composeConjunction(builder, conditions, false);
+ } else if (condition.getKind() == SqlKind.OR) {
+ final List<RexNode> conditions = Lists.newArrayList();
+ for(RexNode pred : RelOptUtil.disjunctions(condition)) {
+ conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess));
+ }
+ return RexUtil.composeDisjunction(builder, conditions, false);
+ } else if (condition instanceof RexCall) {
+ // LIKE operator - convert to a RANGE predicate, if possible
+ if (typesToProcess.contains(SqlKind.LIKE)
+ && ((RexCall) condition).getOperator().getKind() == SqlKind.LIKE) {
+ return convertLikeToRange((RexCall)condition, builder);
+ } else if (typesToProcess.contains(SqlKind.CAST)
+ && hasCastExpression(condition)) {
+ return convertCastForFIdx(((RexCall) condition), index, context, scanRel);
+ }
+ else {
+ return condition;
+ }
+ }
+ return condition;
+ }
+
+ /*
+ * Determines whether the given expression contains a CAST expression. Assumes that the
+ * given expression is a valid expression.
+ * Returns TRUE, if it finds at least one instance of CAST operator.
+ */
+ private boolean hasCastExpression(RexNode condition) {
+ if (condition instanceof RexCall) {
+ if (((RexCall) condition).getOperator().getKind() == SqlKind.CAST) {
+ return true;
+ }
+ for (RexNode op : ((RexCall) condition).getOperands()) {
+ if (hasCastExpression(op)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ /*
+ * CAST expressions are not understood by MAPR-DB as-is. Hence, we must convert them before passing them
+ * onto MAPR-DB for statistics. Given a functional index, the given expression is converted into an
+ * expression on the `expression` column of the functional index.
+ */
+ private RexNode convertCastForFIdx(RexCall condition, IndexDescriptor index,
+ IndexCallContext context, RelNode origScan) {
+ if (index == null) {
+ return condition;
+ }
+ FunctionalIndexInfo functionInfo = index.getFunctionalInfo();
+ if (!functionInfo.hasFunctional()) {
+ return condition;
+ }
+ // The functional index has a different row-type than the original scan. Use the index row-type when
+ // converting the condition
+ RelDataType newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(origScan, context, functionInfo);
+ RexBuilder builder = origScan.getCluster().getRexBuilder();
+ return FunctionalIndexHelper.convertConditionForIndexScan(condition,
+ origScan, newRowType, builder, functionInfo);
+ }
+
+ /*
+ * Helper function to perform additional pre-processing for LIKE predicates
+ */
+ private RexNode convertLikeToRange(RexCall condition, RexBuilder builder) {
+ Preconditions.checkArgument(condition.getOperator().getKind() == SqlKind.LIKE,
+ "Unable to convertLikeToRange: argument is not a LIKE condition!");
+ HBaseRegexParser parser = null;
+ RexNode arg = null;
+ RexLiteral pattern = null, escape = null;
+ String patternStr = null, escapeStr = null;
+ if (condition.getOperands().size() == 2) {
+ // No escape character specified
+ for (RexNode op : condition.getOperands()) {
+ if (op.getKind() == SqlKind.LITERAL) {
+ pattern = (RexLiteral) op;
+ } else {
+ arg = op;
+ }
+ }
+ // Get the PATTERN strings from the corresponding RexLiteral
+ if (pattern.getTypeName() == SqlTypeName.DECIMAL ||
+ pattern.getTypeName() == SqlTypeName.INTEGER) {
+ patternStr = pattern.getValue().toString();
+ } else if (pattern.getTypeName() == SqlTypeName.CHAR) {
+ patternStr = pattern.getValue2().toString();
+ }
+ if (patternStr != null) {
+ parser = new HBaseRegexParser(patternStr);
+ }
+ } else if (condition.getOperands().size() == 3) {
+ // Escape character specified
+ for (RexNode op : condition.getOperands()) {
+ if (op.getKind() == SqlKind.LITERAL) {
+ // Assume first literal specifies PATTERN and the second literal specifies the ESCAPE char
+ if (pattern == null) {
+ pattern = (RexLiteral) op;
+ } else {
+ escape = (RexLiteral) op;
+ }
+ } else {
+ arg = op;
+ }
+ }
+ // Get the PATTERN and ESCAPE strings from the corresponding RexLiteral
+ if (pattern.getTypeName() == SqlTypeName.DECIMAL ||
+ pattern.getTypeName() == SqlTypeName.INTEGER) {
+ patternStr = pattern.getValue().toString();
+ } else if (pattern.getTypeName() == SqlTypeName.CHAR) {
+ patternStr = pattern.getValue2().toString();
+ }
+ if (escape.getTypeName() == SqlTypeName.DECIMAL ||
+ escape.getTypeName() == SqlTypeName.INTEGER) {
+ escapeStr = escape.getValue().toString();
+ } else if (escape.getTypeName() == SqlTypeName.CHAR) {
+ escapeStr = escape.getValue2().toString();
+ }
+ if (patternStr != null && escapeStr != null) {
+ parser = new HBaseRegexParser(patternStr, escapeStr.toCharArray()[0]);
+ }
+ }
+ if (parser != null) {
+ parser.parse();
+ String prefix = parser.getPrefixString();
+ /*
+ * If there is a literal prefix, convert it into an EQUALITY or RANGE predicate
+ */
+ if (prefix != null) {
+ if (prefix.equals(parser.getLikeString())) {
+ // No WILDCARD present. This turns the LIKE predicate to EQUALITY predicate
+ if (arg != null) {
+ return builder.makeCall(SqlStdOperatorTable.EQUALS, arg, pattern);
+ }
+ } else {
+ // WILDCARD present. This turns the LIKE predicate to RANGE predicate
+ byte[] startKey = HConstants.EMPTY_START_ROW;
+ byte[] stopKey = HConstants.EMPTY_END_ROW;
+ startKey = prefix.getBytes(Charsets.UTF_8);
+ stopKey = startKey.clone();
+ boolean isMaxVal = true;
+ for (int i = stopKey.length - 1; i >= 0 ; --i) {
+ int nextByteValue = (0xff & stopKey[i]) + 1;
+ if (nextByteValue < 0xff) {
+ stopKey[i] = (byte) nextByteValue;
+ isMaxVal = false;
+ break;
+ } else {
+ stopKey[i] = 0;
+ }
+ }
+ if (isMaxVal) {
+ stopKey = HConstants.EMPTY_END_ROW;
+ }
+ try {
+ // TODO: This maybe a potential bug since we assume UTF-8 encoding. However, we follow the
+ // current DB implementation. See HBaseFilterBuilder.createHBaseScanSpec "like" CASE statement
+ RexLiteral startKeyLiteral = builder.makeLiteral(new String(startKey,
+ Charsets.UTF_8.toString()));
+ RexLiteral stopKeyLiteral = builder.makeLiteral(new String(stopKey,
+ Charsets.UTF_8.toString()));
+ if (arg != null) {
+ RexNode startPred = builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+ arg, startKeyLiteral);
+ RexNode stopPred = builder.makeCall(SqlStdOperatorTable.LESS_THAN, arg, stopKeyLiteral);
+ return builder.makeCall(SqlStdOperatorTable.AND, startPred, stopPred);
+ }
+ } catch (UnsupportedEncodingException ex) {
+ // Encoding not supported - Do nothing!
+ logger.debug("Statistics: convertLikeToRange: Unsupported Encoding Exception -> {}", ex.getMessage());
+ }
+ }
+ }
+ }
+ // Could not convert - return condition as-is.
+ return condition;
+ }
+
+ /*
+ * Compute the selectivity of the given rowCondition. Retrieve the selectivity
+ * for index conditions from the cache
+ */
+ private Pair<Double, Boolean> computeSelectivity(RexNode condition, IndexDescriptor idx, double totalRows,
+ RelNode scanRel, Map<String, Double> baseConditionMap) {
+ double selectivity;
+ boolean guess = false;
+ if (totalRows <= 0) {
+ return new Pair<>(1.0, true);
+ }
+ String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+ if (condition.getKind() == SqlKind.AND) {
+ selectivity = 1.0;
+ for (RexNode pred : RelOptUtil.conjunctions(condition)) {
+ Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap);
+ if (selPayload.left > 0) {
+ // At least one AND branch is a guess
+ if (selPayload.right == true) {
+ guess = true;
+ }
+ selectivity *= selPayload.left;
+ }
+ }
+ } else if (condition.getKind() == SqlKind.OR) {
+ selectivity = 0.0;
+ for (RexNode pred : RelOptUtil.disjunctions(condition)) {
+ Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap);
+ if (selPayload.left > 0.0) {
+ // At least one OR branch is a guess
+ if (selPayload.right == true) {
+ guess = true;
+ }
+ selectivity += selPayload.left;
+ }
+ }
+ // Cap selectivity of OR'ed predicates at 0.25 if at least one predicate is a guess (Calcite does the same)
+ if (guess && selectivity > 0.25) {
+ selectivity = 0.25;
+ }
+ } else {
+ guess = false;
+ if (baseConditionMap.get(conditionAsStr) != null) {
+ double rowCount = baseConditionMap.get(conditionAsStr);
+ if (rowCount != -1.0) {
+ selectivity = rowCount / totalRows;
+ } else {
+ // Ignore
+ selectivity = -1.0;
+ guess = true;
+ }
+ } else {
+ selectivity = RelMdUtil.guessSelectivity(condition);
+ guess = true;
+ }
+ return new Pair<>(selectivity, guess);
+ }
+ // Cap selectivity to be between 0.0 and 1.0
+ selectivity = Math.min(1.0, selectivity);
+ selectivity = Math.max(0.0, selectivity);
+ logger.debug("Statistics: computeSelectivity: Cache MISS: Computed {} -> {}", conditionAsStr, selectivity);
+ return new Pair<>(selectivity, guess);
+ }
+
+ /*
+ * Filters out indexes from the given collection based on the row key of indexes i.e. after filtering
+ * the given collection would contain only one index for each distinct row key in the collection
+ */
+ private IndexCollection distinctFKeyIndexes(IndexCollection indexes, RelNode scanRel) {
+ IndexCollection distinctIdxCollection = new DrillIndexCollection(scanRel, new HashSet<DrillIndexDescriptor>());
+ Iterator<IndexDescriptor> iterator = indexes.iterator();
+ Map<String, List<IndexDescriptor>> firstColIndexMap = new HashMap<>();
+ while (iterator.hasNext()) {
+ IndexDescriptor index = iterator.next();
+ // If index has columns - the first column is the leading column for the index
+ if (index.getIndexColumns() != null) {
+ List<IndexDescriptor> idxList;
+ String firstCol = convertLExToStr(index.getIndexColumns().get(0));
+ if (firstColIndexMap.get(firstCol) != null) {
+ idxList = firstColIndexMap.get(firstCol);
+ } else {
+ idxList = new ArrayList<>();
+ }
+ idxList.add(index);
+ firstColIndexMap.put(firstCol, idxList);
+ }
+ }
+ for (String firstCol : firstColIndexMap.keySet()) {
+ List<IndexDescriptor> indexesSameFirstCol = firstColIndexMap.get(firstCol);
+ double maxAvgRowSize = -1.0;
+ IndexDescriptor selectedIdx = null;
+ for (IndexDescriptor idx : indexesSameFirstCol) {
+ String tabIdxIdentifier = buildUniqueIndexIdentifier(idx);
+ double idxRowSize = fIStatsCache.get(tabIdxIdentifier).getAvgRowSize();
+ // Prefer index with largest average row-size, breaking ties lexicographically
+ if (idxRowSize > maxAvgRowSize
+ || (idxRowSize == maxAvgRowSize
+ && (selectedIdx == null || idx.getIndexName().compareTo(selectedIdx.getIndexName()) < 0))) {
+ maxAvgRowSize = idxRowSize;
+ selectedIdx = idx;
+ }
+ }
+ assert (selectedIdx != null);
+ distinctIdxCollection.addIndex(selectedIdx);
+ }
+ return distinctIdxCollection;
+ }
+
+ /*
+ * Returns the String representation for the given Logical Expression
+ */
+ private String convertLExToStr(LogicalExpression lex) {
+ StringBuilder sb = new StringBuilder();
+ ExpressionStringBuilder esb = new ExpressionStringBuilder();
+ lex.accept(esb, sb);
+ return sb.toString();
+ }
+
+ /*
+ * Converts the given RexNode condition into a Drill logical expression.
+ */
+ private LogicalExpression convertToLogicalExpression(RexNode condition,
+ RelDataType type, PlannerSettings settings, RexBuilder builder) {
+ LogicalExpression conditionExp;
+ try {
+ conditionExp = DrillOptiq.toDrill(new DrillParseContext(settings), type, builder, condition);
+ } catch (ClassCastException e) {
+ return null;
+ }
+ return conditionExp;
+ }
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
index ee35a68e4..f982278e7 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -20,7 +20,10 @@ package org.apache.drill.exec.store.mapr.db;
import java.io.IOException;
import com.mapr.fs.MapRFileStatus;
+import com.mapr.db.index.IndexDesc;
import com.mapr.fs.tables.TableProperties;
+import org.apache.drill.exec.planner.index.IndexDescriptor;
+import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.SchemaConfig;
@@ -33,6 +36,7 @@ import org.apache.drill.exec.store.mapr.TableFormatMatcher;
import org.apache.drill.exec.store.mapr.TableFormatPlugin;
import org.apache.drill.exec.store.mapr.db.binary.MapRDBBinaryTable;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
public class MapRDBFormatMatcher extends TableFormatMatcher {
@@ -49,6 +53,44 @@ public class MapRDBFormatMatcher extends TableFormatMatcher {
.getIsMarlinTable();
}
+
+ /**
+ * Get an instance of DrillTable for a particular native secondary index
+ * @param fs
+ * @param selection
+ * @param fsPlugin
+ * @param storageEngineName
+ * @param userName
+ * @param secondaryIndexDesc
+ * @return
+ * @throws IOException
+ */
+ public DrillTable isReadableIndex(DrillFileSystem fs,
+ FileSelection selection, FileSystemPlugin fsPlugin,
+ String storageEngineName, String userName,
+ IndexDescriptor secondaryIndexDesc) throws IOException {
+ FileStatus status = selection.getFirstPath(fs);
+
+ if (!isFileReadable(fs, status)) {
+ return null;
+ }
+
+ MapRDBFormatPlugin fp = (MapRDBFormatPlugin) getFormatPlugin();
+ DrillTable dt = new DynamicDrillTable(fsPlugin,
+ storageEngineName,
+ userName,
+ new FormatSelection(fp.getConfig(),
+ selection));
+
+ // TODO: Create groupScan using index descriptor
+ dt.setGroupScan(fp.getGroupScan(userName,
+ selection,
+ null /* columns */,
+ (IndexDesc) ((MapRDBIndexDescriptor) secondaryIndexDesc).getOriginalDesc()));
+
+ return dt;
+ }
+
@Override
public DrillTable isReadable(DrillFileSystem fs,
FileSelection selection, FileSystemPlugin fsPlugin,
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index da4829f03..0d1bf04c6 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -108,7 +108,7 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT,
MapRDBPushProjectIntoScan.PROJECT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_PROJECT,
- MapRDBPushLimitIntoScan.LIMIT_ON_SCAN);
+ MapRDBPushLimitIntoScan.LIMIT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_RKJOIN);
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
index 1e6bcec4a..422a269b0 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
@@ -39,6 +39,9 @@ import org.apache.calcite.rel.RelNode;
import org.apache.drill.exec.planner.index.IndexCollection;
import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.index.IndexDiscover;
+import org.apache.drill.exec.planner.index.IndexDiscoverFactory;
+import org.apache.drill.exec.planner.index.MapRDBIndexDiscover;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -307,8 +310,13 @@ public abstract class MapRDBGroupScan extends AbstractDbGroupScan {
@Override
public IndexCollection getSecondaryIndexCollection(RelNode scanRel) {
- //XXX to implement for complete secondary index framework
- return null;
+ IndexDiscover discover = IndexDiscoverFactory.getIndexDiscover(
+ getStorageConfig(), this, scanRel, MapRDBIndexDiscover.class);
+
+ if (discover == null) {
+ logger.error("Null IndexDiscover was found for {}!", scanRel);
+ }
+ return discover.getTableIndex(getTableName());
}
@JsonIgnore
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
index 1f4b8c9a0..a26bc808c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
@@ -26,11 +26,13 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.LimitPrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.RowKeyJoinPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hbase.HBaseScanSpec;
import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.RestrictedJsonTableGroupScan;
public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushLimitIntoScan.class);
@@ -59,8 +61,8 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule
if (scan.getGroupScan().supportsLimitPushdown()
&& !limit.isPushDown() && limit.getFetch() != null) {
if ((scan.getGroupScan() instanceof JsonTableGroupScan
- && ((JsonTableGroupScan) scan.getGroupScan()).isIndexScan()) ) {
- //|| (scan.getGroupScan() instanceof RestrictedJsonTableGroupScan)) {
+ && ((JsonTableGroupScan) scan.getGroupScan()).isIndexScan())
+ || (scan.getGroupScan() instanceof RestrictedJsonTableGroupScan)) {
return true;
}
}
@@ -111,6 +113,26 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule
}
};
+ public static final StoragePluginOptimizerRule LIMIT_ON_RKJOIN =
+ new MapRDBPushLimitIntoScan(RelOptHelper.some(LimitPrel.class, RelOptHelper.any(RowKeyJoinPrel.class)),
+ "MapRDBPushLimitIntoScan:Limit_On_RKJoin") {
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final RowKeyJoinPrel join = call.rel(1);
+ final LimitPrel limit = call.rel(0);
+ doPushLimitIntoRowKeyJoin(call, limit, null, join);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final LimitPrel limit = call.rel(0);
+ // We do not fire this rule if fetch() is null (indicating we have to fetch all the
+ // remaining rows starting from offset.
+ return !limit.isPushDown() && limit.getFetch() != null;
+ }
+ };
+
protected void doPushLimitIntoGroupScan(RelOptRuleCall call,
LimitPrel limit, final ProjectPrel project, ScanPrel scan, GroupScan groupScan) {
try {
@@ -153,5 +175,29 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule
}
return null;
}
+
+ protected void doPushLimitIntoRowKeyJoin(RelOptRuleCall call,
+ LimitPrel limit, final ProjectPrel project, RowKeyJoinPrel join) {
+ final RelNode newChild;
+ try {
+ RelNode left = join.getLeft();
+ RelNode right = join.getRight();
+ final RelNode limitOnLeft = new LimitPrel(left.getCluster(), left.getTraitSet(), left,
+ limit.getOffset(), limit.getFetch());
+ RowKeyJoinPrel newJoin = new RowKeyJoinPrel(join.getCluster(), join.getTraitSet(), limitOnLeft, right,
+ join.getCondition(), join.getJoinType());
+ if (project != null) {
+ final ProjectPrel newProject = new ProjectPrel(project.getCluster(), project.getTraitSet(), newJoin,
+ project.getProjects(), project.getRowType());
+ newChild = newProject;
+ } else {
+ newChild = newJoin;
+ }
+ call.transformTo(newChild);
+ logger.debug("pushLimitIntoRowKeyJoin: Pushed limit on left side of Join " + join.toString());
+ } catch (Exception e) {
+ logger.warn("pushLimitIntoRowKeyJoin: Exception while trying limit pushdown!", e);
+ }
+ }
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
index 2eb84e7ef..521586808 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
@@ -30,8 +30,9 @@ import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo;
import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index a269256cf..b54526232 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -44,10 +44,14 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.planner.index.IndexDescriptor;
+import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor;
+import org.apache.drill.exec.planner.index.MapRDBStatisticsPayload;
import org.apache.drill.exec.planner.index.Statistics;
import org.apache.drill.exec.planner.index.MapRDBStatistics;
import org.apache.drill.exec.planner.cost.PluginCost;
import org.apache.drill.exec.planner.physical.PartitionFunction;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
@@ -296,6 +300,9 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
@Override
public ScanStats getScanStats() {
+ if (isIndexScan()) {
+ return indexScanStats();
+ }
return fullTableScanStats();
}
@@ -359,6 +366,57 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
}
}
+ private ScanStats indexScanStats() {
+ if (!this.getIndexHint().equals("") &&
+ this.getIndexHint().equals(getIndexDesc().getIndexName())) {
+ logger.debug("JsonIndexGroupScan:{} forcing index {} by making tiny cost", this, this.getIndexHint());
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 1,1, 0);
+ }
+
+ int totalColNum = STAR_COLS;
+ PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
+ final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
+ boolean filterPushed = (scanSpec.getSerializedFilter() != null);
+ if(scanSpec != null && scanSpec.getIndexDesc() != null) {
+ totalColNum = scanSpec.getIndexDesc().getIncludedFields().size()
+ + scanSpec.getIndexDesc().getIndexedFields().size() + 1;
+ }
+ int numColumns = (columns == null || columns.isEmpty()) ? totalColNum: columns.size();
+ String idxIdentifier = stats.buildUniqueIndexIdentifier(scanSpec.getIndexDesc().getPrimaryTablePath(),
+ scanSpec.getIndexDesc().getIndexName());
+ double rowCount = stats.getRowCount(scanSpec.getCondition(), idxIdentifier);
+ // rowcount based on index leading columns predicate.
+ double leadingRowCount = stats.getLeadingRowCount(scanSpec.getCondition(), idxIdentifier);
+ double avgRowSize = stats.getAvgRowSize(idxIdentifier, false);
+ // If UNKNOWN, use defaults
+ if (rowCount == ROWCOUNT_UNKNOWN || rowCount == 0) {
+ rowCount = (filterPushed ? 0.0005f : 0.001f) * fullTableRowCount / scanSpec.getIndexDesc().getIndexedFields().size();
+ }
+ // If limit pushdown has occurred - factor it in the rowcount
+ if (this.maxRecordsToRead > 0) {
+ rowCount = Math.min(rowCount, this.maxRecordsToRead);
+ }
+ if (leadingRowCount == ROWCOUNT_UNKNOWN || leadingRowCount == 0) {
+ leadingRowCount = rowCount;
+ }
+ if (avgRowSize == AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) {
+ avgRowSize = avgColumnSize * numColumns;
+ }
+ double rowsFromDisk = leadingRowCount;
+ if (!filterPushed) {
+ // both start and stop rows are empty, indicating this is a full scan so
+ // use the total rows for calculating disk i/o
+ rowsFromDisk = fullTableRowCount;
+ }
+ double totalBlocks = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this));
+ double numBlocks = Math.ceil(((avgRowSize * rowsFromDisk)/pluginCostModel.getBlockSize(this)));
+ numBlocks = Math.min(totalBlocks, numBlocks);
+ double diskCost = numBlocks * pluginCostModel.getSequentialBlockReadCost(this);
+ logger.debug("index_plan_info: JsonIndexGroupScan:{} - indexName:{}: rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, rowsFromDisk {}, diskCost:{}",
+ System.identityHashCode(this), scanSpec.getIndexDesc().getIndexName(), rowCount, avgRowSize, numBlocks, totalBlocks, rowsFromDisk, diskCost);
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
+ }
+
@Override
@JsonIgnore
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
@@ -412,6 +470,142 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
return true;
}
+
+ @Override
+ public RestrictedJsonTableGroupScan getRestrictedScan(List<SchemaPath> columns) {
+ RestrictedJsonTableGroupScan newScan =
+ new RestrictedJsonTableGroupScan(this.getUserName(),
+ (FileSystemPlugin) this.getStoragePlugin(),
+ this.getFormatPlugin(),
+ this.getScanSpec(),
+ this.getColumns(),
+ this.getStatistics());
+ newScan.columns = columns;
+ return newScan;
+ }
+
+ /**
+ * Get the estimated average rowsize. DO NOT call this API directly.
+ * Call the stats API instead which modifies the counts based on preference options.
+ * @param index, to use for generating the estimate
+ * @return row count post filtering
+ */
+ public MapRDBStatisticsPayload getAverageRowSizeStats(IndexDescriptor index) {
+ IndexDesc indexDesc = null;
+ double avgRowSize = AVG_ROWSIZE_UNKNOWN;
+
+ if (index != null) {
+ indexDesc = (IndexDesc)((MapRDBIndexDescriptor)index).getOriginalDesc();
+ }
+ // If no index is specified, get it from the primary table
+ if(indexDesc == null && scanSpec.isSecondaryIndex()) {
+ throw new UnsupportedOperationException("getAverageRowSizeStats should be invoked on primary table");
+ }
+
+ // Get the index table or primary table and use the DB API to get the estimated number of rows. For size estimates,
+ // we assume that all the columns would be read from the disk.
+ final Table table = this.formatPlugin.getJsonTableCache().getTable(scanSpec.getTableName(), indexDesc, getUserName());
+
+ if (table != null) {
+ final MetaTable metaTable = table.getMetaTable();
+ if (metaTable != null) {
+ avgRowSize = metaTable.getAverageRowSize();
+ }
+ }
+ logger.debug("index_plan_info: getEstimatedRowCount obtained from DB Client for {}: indexName: {}, indexInfo: {}, " +
+ "avgRowSize: {}, estimatedSize {}", this, (indexDesc == null ? "null" : indexDesc.getIndexName()),
+ (indexDesc == null ? "null" : indexDesc.getIndexInfo()), avgRowSize);
+ return new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, avgRowSize);
+ }
+
+ /**
+ * Get the estimated statistics after applying the {@link RexNode} condition. DO NOT call this API directly.
+ * Call the stats API instead which modifies the counts based on preference options.
+ * @param condition, filter to apply
+ * @param index, to use for generating the estimate
+ * @return row count post filtering
+ */
+ public MapRDBStatisticsPayload getFirstKeyEstimatedStats(QueryCondition condition, IndexDescriptor index, RelNode scanRel) {
+ IndexDesc indexDesc = null;
+ if (index != null) {
+ indexDesc = (IndexDesc)((MapRDBIndexDescriptor)index).getOriginalDesc();
+ }
+ return getFirstKeyEstimatedStatsInternal(condition, indexDesc, scanRel);
+ }
+
+ /**
+ * Get the estimated statistics after applying the {@link QueryCondition} condition
+ * @param condition, filter to apply
+ * @param index, to use for generating the estimate
+ * @return {@link MapRDBStatisticsPayload} statistics
+ */
+ private MapRDBStatisticsPayload getFirstKeyEstimatedStatsInternal(QueryCondition condition, IndexDesc index, RelNode scanRel) {
+ // double totalRows = getRowCount(null, scanPrel);
+
+ // If no index is specified, get it from the primary table
+ if(index == null && scanSpec.isSecondaryIndex()) {
+ // If stats not cached get it from the table.
+ //table = MapRDB.getTable(scanSpec.getPrimaryTablePath());
+ throw new UnsupportedOperationException("getFirstKeyEstimatedStats should be invoked on primary table");
+ }
+
+ // Get the index table or primary table and use the DB API to get the estimated number of rows. For size estimates,
+ // we assume that all the columns would be read from the disk.
+ final Table table = this.formatPlugin.getJsonTableCache().getTable(scanSpec.getTableName(), index, getUserName());
+
+ if (table != null) {
+ // Factor reflecting confidence in the DB estimates. If a table has few tablets, the tablet-level stats
+ // might be off. The decay scalingFactor will reduce estimates when one tablet represents a significant percentage
+ // of the entire table.
+ double scalingFactor = 1.0;
+ boolean isFullScan = false;
+ final MetaTable metaTable = table.getMetaTable();
+ com.mapr.db.scan.ScanStats stats = (condition == null)
+ ? metaTable.getScanStats() : metaTable.getScanStats(condition);
+ if (index == null && condition != null) {
+ // Given table condition might not be on leading column. Check if the rowcount matches full table rows.
+ // In that case no leading key present or does not prune enough. Treat it like so.
+ com.mapr.db.scan.ScanStats noConditionPTabStats = metaTable.getScanStats();
+ if (stats.getEstimatedNumRows() == noConditionPTabStats.getEstimatedNumRows()) {
+ isFullScan = true;
+ }
+ }
+ // Use the scalingFactor only when a condition filters out rows from the table. If no condition is present, all rows
+ // should be selected. So the scalingFactor should not reduce the returned rows
+ if (condition != null && !isFullScan) {
+ double forcedScalingFactor = PrelUtil.getSettings(scanRel.getCluster()).getIndexStatsRowCountScalingFactor();
+ // Accuracy is defined as 1 - Error where Error = # Boundary Tablets (2) / # Total Matching Tablets.
+ // For 2 or less matching tablets, the error is assumed to be 50%. The Sqrt gives the decaying scalingFactor
+ if (stats.getTabletCount() > 2) {
+ double accuracy = 1.0 - (2.0/stats.getTabletCount());
+ scalingFactor = Math.min(1.0, 1.0 / Math.sqrt(1.0 / accuracy));
+ } else {
+ scalingFactor = 0.5;
+ }
+ if (forcedScalingFactor < 1.0
+ && metaTable.getScanStats().getTabletCount() < PluginConstants.JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT) {
+ // User forced confidence scalingFactor for small tables (assumed as less than 32 tablets (~512 MB))
+ scalingFactor = forcedScalingFactor;
+ }
+ }
+ logger.info("index_plan_info: getEstimatedRowCount obtained from DB Client for {}: indexName: {}, indexInfo: {}, " +
+ "condition: {} rowCount: {}, avgRowSize: {}, estimatedSize {}, tabletCount {}, totalTabletCount {}, " +
+ "scalingFactor {}",
+ this, (index == null ? "null" : index.getIndexName()), (index == null ? "null" : index.getIndexInfo()),
+ (condition == null ? "null" : condition.toString()), stats.getEstimatedNumRows(),
+ (stats.getEstimatedNumRows() == 0 ? 0 : stats.getEstimatedSize()/stats.getEstimatedNumRows()),
+ stats.getEstimatedSize(), stats.getTabletCount(), metaTable.getScanStats().getTabletCount(), scalingFactor);
+ return new MapRDBStatisticsPayload(scalingFactor * stats.getEstimatedNumRows(), scalingFactor * stats.getEstimatedNumRows(),
+ ((stats.getEstimatedNumRows() == 0 ? 0 : (double)stats.getEstimatedSize()/stats.getEstimatedNumRows())));
+ } else {
+ logger.info("index_plan_info: getEstimatedRowCount: {} indexName: {}, indexInfo: {}, " +
+ "condition: {} rowCount: UNKNOWN, avgRowSize: UNKNOWN", this, (index == null ? "null" : index.getIndexName()),
+ (index == null ? "null" : index.getIndexInfo()), (condition == null ? "null" : condition.toString()));
+ return new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, AVG_ROWSIZE_UNKNOWN);
+ }
+ }
+
+
/**
* Set the row count resulting from applying the {@link RexNode} condition. Forced row counts will take
* precedence over stats row counts
@@ -518,9 +712,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
@Override
@JsonIgnore
public PartitionFunction getRangePartitionFunction(List<FieldReference> refList) {
-
- return null;
- //new JsonTableRangePartitionFunction(refList, scanSpec.getTableName(), this.getUserName(), this.getFormatPlugin());
+ return new JsonTableRangePartitionFunction(refList, scanSpec.getTableName(), this.getUserName(), this.getFormatPlugin());
}
/**
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
new file mode 100644
index 000000000..acaa6cacb
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.planner.physical.AbstractRangePartitionFunction;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.vector.ValueVector;
+import org.ojai.store.QueryCondition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.mapr.db.Table;
+import com.mapr.db.impl.ConditionImpl;
+import com.mapr.db.impl.IdCodec;
+import com.mapr.db.impl.ConditionNode.RowkeyRange;
+import com.mapr.db.scan.ScanRange;
+import com.mapr.fs.jni.MapRConstants;
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+@JsonTypeName("jsontable-range-partition-function")
+public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunction {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableRangePartitionFunction.class);
+
+ @JsonProperty("refList")
+ protected List<FieldReference> refList;
+
+ @JsonProperty("tableName")
+ protected String tableName;
+
+ @JsonIgnore
+ protected String userName;
+
+ @JsonIgnore
+ protected ValueVector partitionKeyVector = null;
+
+ // List of start keys of the scan ranges for the table.
+ @JsonProperty
+ protected List<byte[]> startKeys = null;
+
+ // List of stop keys of the scan ranges for the table.
+ @JsonProperty
+ protected List<byte[]> stopKeys = null;
+
+ @JsonCreator
+ public JsonTableRangePartitionFunction(
+ @JsonProperty("refList") List<FieldReference> refList,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("startKeys") List<byte[]> startKeys,
+ @JsonProperty("stopKeys") List<byte[]> stopKeys) {
+ this.refList = refList;
+ this.tableName = tableName;
+ this.startKeys = startKeys;
+ this.stopKeys = stopKeys;
+ }
+
+ public JsonTableRangePartitionFunction(List<FieldReference> refList,
+ String tableName, String userName, MapRDBFormatPlugin formatPlugin) {
+ this.refList = refList;
+ this.tableName = tableName;
+ this.userName = userName;
+ initialize(formatPlugin);
+ }
+
+ @JsonProperty("refList")
+ @Override
+ public List<FieldReference> getPartitionRefList() {
+ return refList;
+ }
+
+ @Override
+ public void setup(List<VectorWrapper<?>> partitionKeys) {
+ if (partitionKeys.size() != 1) {
+ throw new UnsupportedOperationException(
+ "Range partitioning function supports exactly one partition column; encountered " + partitionKeys.size());
+ }
+
+ VectorWrapper<?> v = partitionKeys.get(0);
+
+ partitionKeyVector = v.getValueVector();
+
+ Preconditions.checkArgument(partitionKeyVector != null, "Found null partitionKeVector.") ;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof JsonTableRangePartitionFunction) {
+ JsonTableRangePartitionFunction rpf = (JsonTableRangePartitionFunction) obj;
+ List<FieldReference> thisPartRefList = this.getPartitionRefList();
+ List<FieldReference> otherPartRefList = rpf.getPartitionRefList();
+ if (thisPartRefList.size() != otherPartRefList.size()) {
+ return false;
+ }
+ for (int refIdx=0; refIdx<thisPartRefList.size(); refIdx++) {
+ if (!thisPartRefList.get(refIdx).equals(otherPartRefList.get(refIdx))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int eval(int index, int numPartitions) {
+
+ String key = partitionKeyVector.getAccessor().getObject(index).toString();
+ byte[] encodedKey = IdCodec.encodeAsBytes(key);
+
+ int tabletId = -1;
+
+ // Do a 'range' binary search through the list of start-stop keys to find nearest range. Assumption is
+ // that the list of start keys is ordered (this is ensured by MetaTable.getScanRanges())
+ // TODO: This search should ideally be delegated to MapR-DB once an appropriate API is available
+ // to optimize this
+ int low = 0, high = startKeys.size() - 1;
+ while (low <= high) {
+ int mid = low + (high-low)/2;
+
+ byte[] start = startKeys.get(mid);
+ byte[] stop = stopKeys.get(mid);
+
+ // Check if key is present in the mid interval of [start, stop].
+ // Account for empty byte array start/stop
+ if ( (Bytes.compareTo(encodedKey, start) >= 0 ||
+ Bytes.equals(start, MapRConstants.EMPTY_BYTE_ARRAY)
+ ) &&
+ (Bytes.compareTo(encodedKey, stop) < 0 ||
+ Bytes.equals(stop, MapRConstants.EMPTY_BYTE_ARRAY)
+ )
+ ) {
+ tabletId = mid;
+ break;
+ }
+
+ if (Bytes.compareTo(encodedKey, start) >= 0) {
+ // key is greater, ignore left side
+ low = mid + 1;
+ } else {
+ // key is smaller, ignore right side
+ high = mid - 1;
+ }
+ }
+
+ if (tabletId < 0) {
+ tabletId = 0;
+ logger.warn("Key {} was not found in any of the start-stop ranges. Using default tabletId {}", key, tabletId);
+ }
+
+ int partitionId = tabletId % numPartitions;
+
+ logger.trace("Key = {}, tablet id = {}, partition id = {}", key, tabletId, partitionId);
+
+ return partitionId;
+ }
+
+
+ public void initialize(MapRDBFormatPlugin plugin) {
+
+ // get the table handle from the table cache
+ Table table = plugin.getJsonTableCache().getTable(tableName, userName);
+
+ // Set the condition to null such that all scan ranges are retrieved for the primary table.
+ // The reason is the row keys could typically belong to any one of the tablets of the table, so
+ // there is no use trying to get only limited set of scan ranges.
+ // NOTE: here we use the restrictedScanRangeSizeMB because the range partitioning should be parallelized
+ // based on the number of scan ranges on the RestrictedJsonTableGroupScan.
+ List<ScanRange> ranges = table.getMetaTable().getScanRanges(plugin.getRestrictedScanRangeSizeMB());
+
+ this.startKeys = Lists.newArrayList();
+ this.stopKeys = Lists.newArrayList();
+
+ logger.debug("Num scan ranges for table {} = {}", table.getName(), ranges.size());
+
+ int count = 0;
+ for (ScanRange r : ranges) {
+ QueryCondition condition = r.getCondition();
+ List<RowkeyRange> rowkeyRanges = ((ConditionImpl)condition).getRowkeyRanges();
+ byte[] start = rowkeyRanges.get(0).getStartRow();
+ byte[] stop = rowkeyRanges.get(rowkeyRanges.size() - 1).getStopRow();
+
+ Preconditions.checkNotNull(start, String.format("Encountered a null start key at position %d for scan range condition %s.", count, condition.toString()));
+ Preconditions.checkNotNull(stop, String.format("Encountered a null stop key at position %d for scan range condition %s.", count, condition.toString()));
+
+ if (count > 0) {
+ // after the first start key, rest should be non-empty
+ Preconditions.checkState( !(Bytes.equals(start, MapRConstants.EMPTY_BYTE_ARRAY)), String.format("Encountered an empty start key at position %d", count));
+ }
+
+ if (count < ranges.size() - 1) {
+ // except for the last stop key, rest should be non-empty
+ Preconditions.checkState( !(Bytes.equals(stop, MapRConstants.EMPTY_BYTE_ARRAY)), String.format("Encountered an empty stop key at position %d", count));
+ }
+
+ startKeys.add(start);
+ stopKeys.add(stop);
+ count++;
+ }
+
+ // check validity; only need to check one of the lists since they are populated together
+ Preconditions.checkArgument(startKeys.size() > 0, "Found empty list of start/stopKeys.");
+
+ Preconditions.checkState(startKeys.size() == ranges.size(),
+ String.format("Mismatch between the lengths: num start keys = %d, num scan ranges = %d", startKeys.size(), ranges.size()));
+
+ Preconditions.checkState(stopKeys.size() == ranges.size(),
+ String.format("Mismatch between the lengths: num stop keys = %d, num scan ranges = %d", stopKeys.size(), ranges.size()));
+
+ }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
new file mode 100644
index 000000000..48ad96d81
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import java.util.List;
+import java.util.NavigableMap;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.index.MapRDBStatistics;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.index.Statistics;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScan;
+import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
+
+/**
+ * A RestrictedJsonTableGroupScan encapsulates (along with a subscan) the functionality
+ * for doing restricted (i.e skip) scan rather than sequential scan. The skipping is based
+ * on a supplied set of row keys (primary keys) from a join operator.
+ */
+@JsonTypeName("restricted-json-scan")
+public class RestrictedJsonTableGroupScan extends JsonTableGroupScan {
+
+ @JsonCreator
+ public RestrictedJsonTableGroupScan(@JsonProperty("userName") String userName,
+ @JsonProperty("storage") FileSystemPlugin storagePlugin,
+ @JsonProperty("format") MapRDBFormatPlugin formatPlugin,
+ @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("")MapRDBStatistics statistics) {
+ super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics);
+ }
+
+ // TODO: this method needs to be fully implemented
+ protected RestrictedMapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
+ JsonScanSpec spec = scanSpec;
+ RestrictedMapRDBSubScanSpec subScanSpec =
+ new RestrictedMapRDBSubScanSpec(
+ spec.getTableName(),
+ getRegionsToScan().get(tfi), spec.getSerializedFilter(), getUserName());
+ return subScanSpec;
+ }
+
+ protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
+ return getRegionsToScan(formatPlugin.getRestrictedScanRangeSizeMB());
+ }
+
+ @Override
+ public MapRDBSubScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+ "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+ minorFragmentId);
+ RestrictedMapRDBSubScan subscan =
+ new RestrictedMapRDBSubScan(getUserName(), formatPlugin,
+ getEndPointFragmentMapping(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON);
+
+ return subscan;
+ }
+
+ private List<RestrictedMapRDBSubScanSpec> getEndPointFragmentMapping(int minorFragmentId) {
+ List<RestrictedMapRDBSubScanSpec> restrictedSubScanSpecList = Lists.newArrayList();
+ List<MapRDBSubScanSpec> subScanSpecList = endpointFragmentMapping.get(minorFragmentId);
+ for(MapRDBSubScanSpec s : subScanSpecList) {
+ restrictedSubScanSpecList.add((RestrictedMapRDBSubScanSpec) s);
+ }
+ return restrictedSubScanSpecList;
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ * @param that The RestrictedJsonTableGroupScan to clone
+ */
+ private RestrictedJsonTableGroupScan(RestrictedJsonTableGroupScan that) {
+ super(that);
+ }
+
+ @Override
+ public GroupScan clone(JsonScanSpec scanSpec) {
+ RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
+ newScan.scanSpec = scanSpec;
+ newScan.resetRegionsToScan(); // resetting will force recalculation
+ return newScan;
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new RestrictedJsonTableGroupScan(this);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ //TODO: ideally here we should use the rowcount from index scan, and multiply a factor of restricted scan
+ double rowCount;
+ PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
+ final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
+ int numColumns = (columns == null || columns.isEmpty()) ? STAR_COLS: columns.size();
+ // Get the restricted group scan row count - same as the right side index rows
+ rowCount = computeRestrictedScanRowcount();
+ // Get the average row size of the primary table
+ double avgRowSize = stats.getAvgRowSize(null, true);
+ if (avgRowSize == Statistics.AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) {
+ avgRowSize = avgColumnSize * numColumns;
+ }
+ // restricted scan does random lookups and each row may belong to a different block, with the number
+ // of blocks upper bounded by the total num blocks in the primary table
+ double totalBlocksPrimary = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this));
+ double numBlocks = Math.min(totalBlocksPrimary, rowCount);
+ double diskCost = numBlocks * pluginCostModel.getRandomBlockReadCost(this);
+ // For non-covering plans, the dominating cost would be of the join back. Reduce it using the factor
+ // for biasing towards non-covering plans.
+ diskCost *= stats.getRowKeyJoinBackIOFactor();
+ logger.debug("RestrictedJsonGroupScan:{} rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, diskCost:{}",
+ System.identityHashCode(this), rowCount, avgRowSize, numBlocks, totalBlocksPrimary, diskCost);
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
+ }
+
+ private double computeRestrictedScanRowcount() {
+ double rowCount = Statistics.ROWCOUNT_UNKNOWN;
+ // The rowcount should be the same as the build side which was FORCED by putting it in forcedRowCountMap
+ if (forcedRowCountMap.get(null) != null) {
+ rowCount = forcedRowCountMap.get(null);
+ }
+ // If limit pushdown has occurred - factor it in the rowcount
+ if (rowCount == Statistics.ROWCOUNT_UNKNOWN || rowCount == 0) {
+ rowCount = (0.001f * fullTableRowCount);
+ }
+ if (this.maxRecordsToRead > 0) {
+ rowCount = Math.min(rowCount, this.maxRecordsToRead);
+ }
+ return rowCount;
+ }
+
+ @Override
+ public boolean isRestrictedScan() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "RestrictedJsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns
+ + ", rowcount=" + computeRestrictedScanRowcount()
+ + (maxRecordsToRead>0? ", limit=" + maxRecordsToRead : "")
+ + (getMaxParallelizationWidth()>0? ", maxwidth=" + getMaxParallelizationWidth() : "") + "]";
+ }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java
new file mode 100644
index 000000000..9ac27b47c
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java
@@ -0,0 +1,171 @@
+package com.mapr.drill.maprdb.tests.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.mapr.drill.maprdb.tests.json.BaseJsonTest;
+import com.mapr.tests.annotations.ClusterTest;
+import org.apache.drill.PlanTestBase;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category(ClusterTest.class)
+public class IndexHintPlanTest extends IndexPlanTest {
+
+ private static final String defaultHavingIndexPlan = "alter session reset `planner.enable_index_planning`";
+
+ @Test
+ // A simple testcase with index hint on a table which has only one index for a column t.id.ssn;
+ // This should pick i_ssn index for the query
+ public void testSimpleIndexHint() throws Exception {
+ String hintquery = "SELECT t.id.ssn as ssn FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_ssn')) as t " +
+ " where t.id.ssn = '100007423'";
+
+ String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t where t.id.ssn = '100007423'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(hintquery,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ //default plan picked by optimizer.
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{"RowKeyJoin"}
+ );
+ testBuilder()
+ .sqlQuery(hintquery)
+ .ordered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .go();
+
+ }
+
+
+ @Test
+ // A testcase where there are multiple index to pick from but only picks the index provided as hint.
+ // A valid index is provided as hint and it is useful during the index selection process, hence it will be selected.
+ public void testHintCaseWithMultipleIndexes_1() throws Exception {
+
+ String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_city')) as t " +
+ " where t.address.state = 'pc'";
+
+ String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(hintquery,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_city"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ //default plan picked by optimizer
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ return;
+ }
+
+ @Test
+ // A testcase where there are multiple index to pick from but only picks the index provided as hint.
+ // A valid index is provided as hint and it is useful during the index selection process, hence it will be selected.
+ // Difference between this testcase and the one before this is that index name is switched. This shows that index hint makes sure to select only one
+ // valid index specified as hint.
+ public void testHintCaseWithMultipleIndexes_2() throws Exception {
+
+ String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_age_phone')) as t " +
+ " where t.address.state = 'pc'";
+
+ String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(hintquery,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ //default plan picked by query optimizer.
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ return;
+ }
+
+ //Negative cases
+
+ @Test
+ // A testcase where there are multiple index to pick from but none of them equals to the index provided as hint (index hint is wrong).
+ //In this index is not at all present in the table hence it falls back to the case where the index itself is not given.
+ //Hence here one of the i_state_city or i_state_age_lic will be selected depending upon the cost.
+ public void testWithMultipleIndexesButNoIndexWithHint() throws Exception {
+
+ String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_and_city')) as t " +
+ " where t.address.state = 'pc'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(hintquery,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ return;
+ }
+
+ @Test
+ // A testcase where there are multiple index to pick from but none of them equals to the index provided as hint and the hint index is valid.
+ // Here the index name given is valid (i.e it is present in the table) but it is not useful.
+ // This case falls back to full table scan.
+ public void testWithMultipleIndexesButNoIndexWithValidHint() throws Exception {
+
+ String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_ssn')) as t " +
+ " where t.address.state = 'pc'";
+
+ String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(hintquery,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary"},
+ new String[]{"RowKeyJoin", "indexName="}
+ );
+
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ return;
+ }
+
+
+ @Test
+ //Covering index should be generated for a simple query instead of a RowKeyJoin.
+ public void testSimpleNoRowKeyJoin() throws Exception {
+ String query = "SELECT `reverseid` from table(hbase.`index_test_primary`(type => 'maprdb', index => 'hash_i_reverseid')) " +
+ "where `reverseid` = 1234";
+
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=hash_i_reverseid"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ return;
+ }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
new file mode 100644
index 000000000..c0ea2a006
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
@@ -0,0 +1,1715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import com.mapr.db.Admin;
+import com.mapr.drill.maprdb.tests.MaprDBTestsSuite;
+import com.mapr.drill.maprdb.tests.json.BaseJsonTest;
+import com.mapr.tests.annotations.ClusterTest;
+import org.apache.drill.PlanTestBase;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.apache.drill.common.config.DrillConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+import java.util.Properties;
+
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category(ClusterTest.class)
+public class IndexPlanTest extends BaseJsonTest {
+
+ final static String PRIMARY_TABLE_NAME = "/tmp/index_test_primary";
+
+ final static int PRIMARY_TABLE_SIZE = 10000;
+ private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
+ private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
+ private static final String noIndexPlan = "alter session set `planner.enable_index_planning` = false";
+ private static final String defaultHavingIndexPlan = "alter session reset `planner.enable_index_planning`";
+ private static final String disableHashAgg = "alter session set `planner.enable_hashagg` = false";
+ private static final String enableHashAgg = "alter session set `planner.enable_hashagg` = true";
+ private static final String defaultnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.025";
+ private static final String incrnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.25";
+ private static final String disableFTS = "alter session set `planner.disable_full_table_scan` = true";
+ private static final String enableFTS = "alter session reset `planner.disable_full_table_scan`";
+ private static final String preferIntersectPlans = "alter session set `planner.index.prefer_intersect_plans` = true";
+ private static final String defaultIntersectPlans = "alter session reset `planner.index.prefer_intersect_plans`";
+ private static final String lowRowKeyJoinBackIOFactor
+ = "alter session set `planner.index.rowkeyjoin_cost_factor` = 0.01";
+ private static final String defaultRowKeyJoinBackIOFactor
+ = "alter session reset `planner.index.rowkeyjoin_cost_factor`";
+
+ /**
+ * A sample row of this 10K table:
+ ------------------+-----------------------------+--------+
+ | 1012 | {"city":"pfrrs","state":"pc"} | {"email":"KfFzKUZwNk@gmail.com","phone":"6500005471"} |
+ {"ssn":"100007423"} | {"fname":"KfFzK","lname":"UZwNk"} | {"age":53.0,"income":45.0} | 1012 |
+ *
+ * This test suite generate random content to fill all the rows, since the random function always start from
+ * the same seed for different runs, when the row count is not changed, the data in table will always be the same,
+ * thus the query result could be predicted and verified.
+ */
+
+ @BeforeClass
+ public static void setupTableIndexes() throws Exception {
+
+ Properties overrideProps = new Properties();
+ overrideProps.setProperty("format-maprdb.json.useNumRegionsForDistribution", "true");
+ updateTestCluster(1, DrillConfig.create(overrideProps));
+
+ MaprDBTestsSuite.setupTests();
+ MaprDBTestsSuite.createPluginAndGetConf(getDrillbitContext());
+
+ test(incrnonCoveringSelectivityThreshold);
+
+ System.out.print("setupTableIndexes begins");
+ Admin admin = MaprDBTestsSuite.getAdmin();
+ if (admin != null) {
+ if (admin.tableExists(PRIMARY_TABLE_NAME)) {
+ admin.deleteTable(PRIMARY_TABLE_NAME);
+ }
+ }
+
+ LargeTableGen gen = new LargeTableGen(MaprDBTestsSuite.getAdmin());
+ /**
+ * indexDef is an array of string, LargeTableGen.generateTableWithIndex will take it as parameter to generate indexes
+ * for primary table.
+ * indexDef[3*i] defines i-th index's indexName, NOTE: IF the name begins with "hash", it is a hash index
+ * indexDef[3*i+1] indexed field,
+ * and indexDef[3*i+2] defines i-th index's non-indexed fields
+ */
+ final String[] indexDef = //null;
+ {"i_ssn", "id.ssn", "contact.phone",
+ "i_state_city", "address.state,address.city", "name.fname,name.lname",//mainly for composite key test
+ "i_age", "personal.age", "",
+ "i_income", "personal.income", "",
+ "i_lic", "driverlicense", "reverseid",
+ "i_state_city_dl", "address.state,address.city", "driverlicense",
+ "i_cast_int_ssn", "$CAST(id.ssn@INT)", "contact.phone",
+ "i_cast_vchar_lic", "$CAST(driverlicense@STRING)","contact.email",
+ "i_state_age_phone", "address.state,personal.age,contact.phone", "name.fname",
+ "i_cast_age_income_phone", "$CAST(personal.age@INT),$CAST(personal.income@INT),contact.phone", "name.fname",
+ "i_age_with_fname", "personal.age", "name.fname",
+ "hash_i_reverseid", "reverseid", "",
+ "hash_i_cast_timestamp_firstlogin", "$CAST(activity.irs.firstlogin@TIMESTAMP)", "id.ssn"
+ };
+ gen.generateTableWithIndex(PRIMARY_TABLE_NAME, PRIMARY_TABLE_SIZE, indexDef);
+ }
+
+ @AfterClass
+ public static void cleanupTableIndexes() throws Exception {
+ Admin admin = MaprDBTestsSuite.getAdmin();
+ if (admin != null) {
+ if (admin.tableExists(PRIMARY_TABLE_NAME)) {
+ // admin.deleteTable(PRIMARY_TABLE_NAME);
+ }
+ }
+ test(defaultnonCoveringSelectivityThreshold);
+ }
+
+ @Test
+ public void CTASTestTable() throws Exception {
+ String ctasQuery = "CREATE TABLE hbase.tmp.`backup_index_test_primary` " +
+ "AS SELECT * FROM hbase.`index_test_primary` as t ";
+ test(ctasQuery);
+ test("DROP TABLE IF EXISTS hbase.tmp.`backup_index_test_primary`");
+ }
+
+ @Test
+ public void CoveringPlanWithNonIndexedField() throws Exception {
+
+ String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn = '100007423'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ System.out.println("Covering Plan Verified!");
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("phone").baselineValues("6500005471")
+ .go();
+ return;
+
+ }
+
+ @Test
+ public void CoveringPlanWithOnlyIndexedField() throws Exception {
+ String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn = '100007423'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ System.out.println("Covering Plan Verified!");
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .go();
+
+ return;
+ }
+
+ @Test
+ public void NoIndexPlanForNonIndexField() throws Exception {
+
+ String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.contact.phone = '6500005471'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary"},
+ new String[]{"RowKeyJoin", "indexName="}
+ );
+
+ System.out.println("No Index Plan Verified!");
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100007632")
+ .go();
+
+ return;
+ }
+
+ @Test
+ public void NonCoveringPlan() throws Exception {
+
+ String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn = '100007423'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary,",
+ ".*JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_ssn"},
+ new String[]{}
+ );
+
+ System.out.println("Non-Covering Plan Verified!");
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("fname").baselineValues("KfFzK")
+ .go();
+
+ return;
+ }
+
+ @Test
+ public void RangeConditionIndexPlan() throws Exception {
+ String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age > 52 AND t.name.fname='KfFzK'";
+ try {
+ test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";");
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary,",
+ ".*JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=(i_age|i_age_with_fname)"},
+ new String[]{}
+ );
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForTestQuery(lowRowKeyJoinBackIOFactor)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .optionSettingQueriesForBaseline(sliceTargetDefault)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ test(defaultRowKeyJoinBackIOFactor);
+ }
+ }
+
+ @Test
+ public void CoveringWithSimpleFieldsOnly() throws Exception {
+
+ String query = "SELECT t._id AS `tid` FROM hbase.`index_test_primary` as t " +
+ " where t.driverlicense = 100007423";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_lic"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("tid").baselineValues("1012")
+ .go();
+
+ return;
+ }
+
+ @Test
+ public void NonCoveringWithSimpleFieldsOnly() throws Exception {
+
+ String query = "SELECT t.rowid AS `rowid` FROM hbase.`index_test_primary` as t " +
+ " where t.driverlicense = 100007423";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*" +
+ "RestrictedJsonTableGroupScan.*tableName=.*index_test_primary(.*[\n\r])+.*" +
+ "JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_lic"},
+ new String[]{}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("rowid").baselineValues("1012")
+ .go();
+
+ return;
+ }
+
+ @Test
+ public void NonCoveringWithExtraConditonOnPrimary() throws Exception {
+
+ String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age = 53 AND t.name.lname='UZwNk'";
+ try {
+ test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";");
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan",
+ ".*JsonTableGroupScan.*indexName=i_age",},
+ new String[]{}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("fname").baselineValues("KfFzK")
+ .go();
+ } finally {
+ test(defaultRowKeyJoinBackIOFactor);
+ }
+ return;
+ }
+
+ @Test
+ public void Intersect2indexesPlan() throws Exception {
+
+ String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age = 53 AND t.personal.income=45";
+ try {
+ test(defaultHavingIndexPlan);
+ test(preferIntersectPlans + ";" + disableFTS);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*HashJoin(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)"},
+ new String[]{}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("lname").baselineValues("UZwNk")
+ .baselineColumns("lname").baselineValues("foNwtze")
+ .baselineColumns("lname").baselineValues("qGZVfY")
+ .go();
+ testBuilder()
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .optionSettingQueriesForBaseline(sliceTargetDefault)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ test(defaultIntersectPlans + ";" + enableFTS);
+ }
+ return;
+ }
+
+ @Test
+ public void CompositeIndexNonCoveringPlan() throws Exception {
+
+ String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'pc' AND t.address.city='pfrrs'";
+ try {
+ test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";");
+
+ //either i_state_city or i_state_age_phone will be picked depends on cost model, both is fine for testing composite index nonCovering plan
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_"},
+ new String[]{}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
+ .go();
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .optionSettingQueriesForBaseline(sliceTargetDefault)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ test(defaultRowKeyJoinBackIOFactor);
+ }
+ return;
+ }
+
+ @Test//filter cover indexed, included and not in index at all filter
+ public void CompositeIndexNonCoveringFilterWithAllFieldsPlan() throws Exception {
+
+ String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'pc' AND t.address.city='pfrrs' AND t.driverlicense IN (100007423, 100007424)";
+ test(defaultHavingIndexPlan+";"+lowRowKeyJoinBackIOFactor+";");
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan.*condition=.*state.*city.*driverlicense.*or.*driverlicense.*(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+ new String[]{}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .go();
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .optionSettingQueriesForBaseline(sliceTargetDefault)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+
+ return;
+ }
+ @Test
+ public void CompositeIndexCoveringPlan() throws Exception {
+
+ String query = "SELECT t.`address`.`city` AS `city` FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'pc' AND t.address.city='pfrrs'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*indexName=i_state_city"},
+ new String[]{"RowKeyJoin", "Filter"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("city").baselineValues("pfrrs")
+ .baselineColumns("city").baselineValues("pfrrs")
+ .go();
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .optionSettingQueriesForBaseline(sliceTargetDefault)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ return;
+ }
+
+ @Test
+ public void TestNonCoveringRangePartition_1() throws Exception {
+
+ String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age = 53";
+ String[] expectedPlan = new String[] {"RowKeyJoin(.*[\n\r])+.*" +
+ "RestrictedJsonTableGroupScan.*tableName=.*index_test_primary(.*[\n\r])+.*" +
+ "RangePartitionExchange(.*[\n\r])+.*" +
+ "JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=(i_age|i_age_with_fname)"};
+ test(defaultHavingIndexPlan+";"+sliceTargetSmall+";");
+ PlanTestBase.testPlanMatchingPatterns(query,
+ expectedPlan, new String[]{});
+
+ try {
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ test(defaultHavingIndexPlan);
+ test(sliceTargetDefault);
+ }
+ return;
+ }
+
+ @Test
+ public void TestCastVarCharCoveringPlan() throws Exception {
+ //length 255 is to exact match the casted indexed field's length
+ String query = "SELECT t._id as tid, cast(t.driverlicense as varchar(255)) as driverlicense FROM hbase.`index_test_primary` as t " +
+ " where cast(t.driverlicense as varchar(255))='100007423'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_cast_vchar_lic"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ System.out.println("TestCastCoveringPlan Plan Verified!");
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("tid", "driverlicense").baselineValues("1012", "100007423")
+ .go();
+
+ return;
+ }
+
+ @Test
+ public void TestCastINTCoveringPlan() throws Exception {
+ String query = "SELECT t._id as tid, CAST(t.id.ssn as INT) as ssn, t.contact.phone AS `phone` FROM hbase.`index_test_primary` as t " +
+ " where CAST(t.id.ssn as INT) = 100007423";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_cast_int_ssn"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ System.out.println("TestCastCoveringPlan Plan Verified!");
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("tid", "ssn", "phone").baselineValues("1012", 100007423, "6500005471")
+ .go();
+
+ return;
+ }
+
+ @Test
+ public void TestCastNonCoveringPlan() throws Exception {
+ String query = "SELECT t.id.ssn AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where CAST(t.id.ssn as INT) = 100007423";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_cast_int_ssn"},
+ new String[]{}
+ );
+
+ System.out.println("TestCastNonCoveringPlan Plan Verified!");
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .go();
+ return;
+ }
+
+ @Test
+ public void TestCastVarchar_ConvertToRangePlan() throws Exception {
+ String query = "SELECT t.id.ssn AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where CAST(driverlicense as VARCHAR(10)) = '100007423'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*MATCHES \"\\^.*100007423.*E.*\\$\".*indexName=i_cast_vchar_lic"},
+ new String[]{}
+ );
+
+ System.out.println("TestCastVarchar_ConvertToRangePlan Verified!");
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .go();
+
+ return;
+ }
+
+ @Test // cast expression in filter is not indexed, but the same field casted to different type was indexed (CAST id.ssn as INT)
+ public void TestCastNoIndexPlan() throws Exception {
+ String query = "select t.id.ssn from hbase.`index_test_primary` t where cast(t.id.ssn as varchar(10)) = '100007423'";
+
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{},
+ new String[]{"indexName"}
+ );
+
+ }
+
+ @Test
+ public void TestLongerCastVarCharNoIndex() throws Exception {
+ //length 256 is to exact match the casted indexed field's length
+ String query = "SELECT t._id as tid, cast(t.driverlicense as varchar(500)) as driverlicense FROM hbase.`index_test_primary` as t " +
+ " where cast(t.driverlicense as varchar(500))='100007423'";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {},
+ new String[]{"RowKeyJoin", "indexName="}
+ );
+
+ System.out.println("TestLongerCastVarCharNoIndex Plan Verified!");
+
+ return;
+ }
+
+ @Test
+ public void TestCoveringPlanSortRemoved() throws Exception {
+ String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn <'100000003' order by t.id.ssn";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{"Sort"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("phone").baselineValues("6500008069")
+ .baselineColumns("phone").baselineValues("6500001411")
+ .baselineColumns("phone").baselineValues("6500001595")
+ .go();
+ }
+
+ @Test
+ public void TestCoveringPlanSortNotRemoved() throws Exception {
+ String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn <'100000003' order by t.contact.phone";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{"RowkeyJoin"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("phone").baselineValues("6500001411")
+ .baselineColumns("phone").baselineValues("6500001595")
+ .baselineColumns("phone").baselineValues("6500008069")
+ .go();
+ }
+
+ @Test
+ public void TestCoveringPlanSortRemovedWithSimpleFields() throws Exception {
+ String query = "SELECT t.driverlicense as l FROM hbase.`index_test_primary` as t " +
+ " where t.driverlicense < 100000003 order by t.driverlicense";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_lic"},
+ new String[]{"Sort"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("l").baselineValues(100000000l)
+ .baselineColumns("l").baselineValues(100000001l)
+ .baselineColumns("l").baselineValues(100000002l)
+ .go();
+ }
+
+ @Test
+ public void TestNonCoveringPlanSortRemoved() throws Exception {
+ String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ " where t.driverlicense < 100000003 order by t.driverlicense";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_lic"},
+ new String[]{"Sort"}
+ );
+ String query2 = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn < '100000003' order by t.id.ssn";
+ PlanTestBase.testPlanMatchingPatterns(query2,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+ new String[]{"Sort"}
+ );
+
+ //simple field, driverlicense
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("phone").baselineValues("6500008069")
+ .baselineColumns("phone").baselineValues("6500001411")
+ .baselineColumns("phone").baselineValues("6500001595")
+ .go();
+
+ //query on field of item expression(having capProject), non-simple field t.id.ssn
+ testBuilder()
+ .sqlQuery(query2)
+ .ordered()
+ .baselineColumns("fname").baselineValues("VcFahj")
+ .baselineColumns("fname").baselineValues("WbKVK")
+ .baselineColumns("fname").baselineValues("vSAEsyFN")
+ .go();
+
+ test(sliceTargetSmall);
+ try {
+ PlanTestBase.testPlanMatchingPatterns(query2,
+ new String[]{"SingleMergeExchange(.*[\n\r])+.*"
+ + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_ssn"},
+ new String[]{"Sort"}
+ );
+ } finally {
+ test(sliceTargetDefault);
+ }
+ }
+
+ //test cases are from TestNonCoveringPlanSortRemoved. Sort was removed when force_sort_noncovering was default(false)
+ @Test
+ public void TestNonCoveringPlanWithNoRemoveSortOption() throws Exception {
+ try {
+ test("alter session set `planner.index.force_sort_noncovering`=true");
+ test(defaultHavingIndexPlan);
+
+ String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ " where t.driverlicense < 100000003 order by t.driverlicense";
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"Sort", "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_lic"},
+ new String[]{}
+ );
+
+ String query2 = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn < '100000003' order by t.id.ssn";
+ PlanTestBase.testPlanMatchingPatterns(query2,
+ new String[]{"Sort", "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+ new String[]{}
+ );
+
+ //simple field, driverlicense
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("phone").baselineValues("6500008069")
+ .baselineColumns("phone").baselineValues("6500001411")
+ .baselineColumns("phone").baselineValues("6500001595")
+ .go();
+
+ //query on field of item expression(having capProject), non-simple field t.id.ssn
+ testBuilder()
+ .sqlQuery(query2)
+ .ordered()
+ .baselineColumns("fname").baselineValues("VcFahj")
+ .baselineColumns("fname").baselineValues("WbKVK")
+ .baselineColumns("fname").baselineValues("vSAEsyFN")
+ .go();
+
+ test(sliceTargetSmall);
+ try {
+ PlanTestBase.testPlanMatchingPatterns(query2,
+ new String[]{"Sort", "SingleMergeExchange(.*[\n\r])+.*"
+ + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_ssn"},
+ new String[]{}
+ );
+ } finally {
+ test(sliceTargetDefault);
+ }
+ }
+ finally {
+ test("alter session reset `planner.index.force_sort_noncovering`");
+ }
+ }
+
+ @Test // 2 table join, each table has local predicate on top-level column
+ public void TestCoveringPlanJoin_1() throws Exception {
+ String query = "SELECT count(*) as cnt FROM hbase.`index_test_primary` as t1 " +
+ " inner join hbase.`index_test_primary` as t2 on t1.driverlicense = t2.driverlicense " +
+ " where t1.driverlicense < 100000003 and t2.driverlicense < 100000003" ;
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=",
+ ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+ new String[]{}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("cnt").baselineValues(3L)
+ .go();
+ }
+
+ @Test // 2 table join, each table has local predicate on nested column
+ public void TestCoveringPlanJoin_2() throws Exception {
+ String query = "SELECT count(*) as cnt FROM hbase.`index_test_primary` as t1 " +
+ " inner join hbase.`index_test_primary` as t2 on t1.contact.phone = t2.contact.phone " +
+ " where t1.id.ssn < '100000003' and t2.id.ssn < '100000003' ";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=",
+ ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+ new String[]{}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("cnt").baselineValues(3L)
+ .go();
+ }
+
+ @Test // leading prefix of index has Equality conditions and ORDER BY last column; Sort SHOULD be dropped
+ public void TestCoveringPlanSortPrefix_1() throws Exception {
+ String query = "SELECT t.contact.phone FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'wo' and t.personal.age = 35 and t.contact.phone < '6500003000' order by t.contact.phone";
+ test(defaultHavingIndexPlan);
+
+ //we should glue to index i_state_age_phone to make sure we are testing the targeted prefix construction code path
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{"Sort"}
+ );
+
+ // compare the results of index plan with the no-index plan
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ }
+
+ @Test // leading prefix of index has Non-Equality conditions and ORDER BY last column; Sort SHOULD NOT be dropped
+ public void TestCoveringPlanSortPrefix_2() throws Exception {
+ String query = "SELECT t.contact.phone FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.contact.phone";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{}
+ );
+
+ // compare the results of index plan with the no-index plan
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ }
+
+ @Test //ORDER BY last two columns not in the indexed order; Sort SHOULD NOT be dropped
+ public void TestCoveringPlanSortPrefix_3() throws Exception {
+ String query = "SELECT CAST(t.personal.age as VARCHAR) as age, t.contact.phone FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.contact.phone, t.personal.age";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{}
+ );
+
+ // compare the results of index plan with the no-index plan
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ }
+
+ @Test // last two index fields in non-Equality conditions, ORDER BY last two fields; Sort SHOULD be dropped
+ public void TestCoveringPlanSortPrefix_4() throws Exception {
+ String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{"Sort"}
+ );
+
+ // compare the results of index plan with the no-index plan
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ }
+
+ @Test // index field in two or more equality conditions, it is not leading prefix, Sort SHOULD NOT be dropped
+ public void TestCoveringPlanSortPrefix_5() throws Exception {
+ String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'wo' and t.personal.age IN (31, 32, 33, 34) and t.contact.phone < '6500003000' order by t.contact.phone";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{}
+ );
+
+ // compare the results of index plan with the no-index plan
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ }
+
+ @Test // last two index fields in non-Equality conditions, ORDER BY last two fields NULLS FIRST; Sort SHOULD NOT be dropped
+ public void TestCoveringPlanSortPrefix_6() throws Exception {
+ String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone NULLS FIRST";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{}
+ );
+
+ // compare the results of index plan with the no-index plan
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ }
+
+ @Test // last two index fields in non-Equality conditions, ORDER BY last two fields NULLS LAST; Sort SHOULD be dropped
+ public void TestCoveringPlanSortPrefix_7() throws Exception {
+ String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone NULLS LAST";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+ new String[]{"Sort"}
+ );
+
+ // compare the results of index plan with the no-index plan
+ testBuilder()
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForBaseline(noIndexPlan)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void orderByCastCoveringPlan() throws Exception {
+ String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ " where CAST(t.id.ssn as INT) < 100000003 order by CAST(t.id.ssn as INT)";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+ new String[]{"Sort"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("phone").baselineValues("6500008069")
+ .baselineColumns("phone").baselineValues("6500001411")
+ .baselineColumns("phone").baselineValues("6500001595")
+ .go();
+ }
+
+ @Test // non-covering plan. sort by the only indexed field, sort SHOULD be removed
+ public void orderByNonCoveringPlan() throws Exception {
+ String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn < '100000003' order by t.id.ssn";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+ new String[]{"Sort"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("lname").baselineValues("iuMG")
+ .baselineColumns("lname").baselineValues("KpFq")
+ .baselineColumns("lname").baselineValues("bkkAvz")
+ .go();
+ }
+
+ @Test //non-covering plan. order by cast indexed field, sort SHOULD be removed
+ public void orderByCastNonCoveringPlan() throws Exception {
+ String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " +
+ " where CAST(t.id.ssn as INT) < 100000003 order by CAST(t.id.ssn as INT)";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+ new String[]{"Sort"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("lname").baselineValues("iuMG")
+ .baselineColumns("lname").baselineValues("KpFq")
+ .baselineColumns("lname").baselineValues("bkkAvz")
+ .go();
+ }
+
+
+ @Ignore //in statsCache, condition state+city has rowcount 1250, but state only has 1000. so it is picking i_state_age_phone
+ @Test //non-covering, order by non leading field, and leading fields are not in equality condition, Sort SHOULD NOT be removed
+ public void NonCoveringPlan_SortPrefix_1() throws Exception {
+
+ String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.address.state > 'pc' AND t.address.city>'pfrrr' AND t.address.city<'pfrrt' order by t.adddress.city";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort",
+ "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_city"},
+ new String[]{}
+ );
+ return;
+ }
+
+ @Test //non-covering, order by non leading field, and leading fields are in equality condition, Sort SHOULD be removed
+ public void NonCoveringPlan_SortPrefix_2() throws Exception {
+
+ String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.address.state = 'pc' AND t.address.city>'pfrrr' AND t.address.city<'pfrrt' order by t.address.city";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {
+ "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_city"},
+ new String[]{"Sort"}
+ );
+ return;
+ }
+
+ @Ignore ("Should be modified to get an index plan; not very useful since most covering plan filters get pushed")
+ @Test //Correct projection and results when filter on non-indexed column in covering plan.
+ public void nonIndexedColumnFilterCoveringPlan() throws Exception {
+ String query = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age > 68 and t.name.fname IN ('CnGobfR', 'THOHP')";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {".*Filter.*CnGobfR.*THOHP.*",
+ ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+ new String[] {".*Filter.*ITEM*CnGobfR.*THOHP.*"});
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("fname").baselineValues("CnGobfR")
+ .baselineColumns("fname").baselineValues("THOHP")
+ .baselineColumns("fname").baselineValues("CnGobfR")
+ .go();
+ }
+
+ @Test
+ @Ignore ("Fix after MEP 5.0")
+ public void orderByLimitNonCoveringPlan() throws Exception {
+ String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn < '100000003' order by t.id.ssn limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(sliceTargetSmall);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"Limit(.*[\n\r])+.*SingleMergeExchange(.*[\n\r])+.*Limit(.*[\n\r])+.*indexName="},
+ new String[]{"Sort"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("lname").baselineValues("iuMG")
+ .baselineColumns("lname").baselineValues("KpFq")
+ .go();
+ } finally {
+ test(sliceTargetDefault);
+ }
+ }
+
+ @Test
+ public void orderByLimitCoveringPlan() throws Exception {
+ String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn < '100000003' order by t.id.ssn limit 2";
+ test(defaultHavingIndexPlan);
+
+ //when index table has only one tablet, the SingleMergeExchange in the middle of two Limits will be removed.
+ //The lower limit gets pushed into the scan
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Limit(.*[\n\r])+.*indexName=.*limit=2"},
+ new String[]{"Sort"}
+ );
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("phone").baselineValues("6500008069")
+ .baselineColumns("phone").baselineValues("6500001411")
+ .go();
+ }
+
+ @Test
+ public void pickAnyIndexWithFTSDisabledPlan() throws Exception {
+ String lowCoveringSel = "alter session set `planner.index.covering_selectivity_threshold` = 0.025";
+ String defaultCoveringSel = "alter session reset `planner.index.covering_selectivity_threshold`";
+ String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn = '100007423'";
+ try {
+ test(defaultHavingIndexPlan + ";" + lowCoveringSel + ";");
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{".*JsonTableGroupScan.*tableName=.*index_test_primary"},
+ new String[]{".*indexName=i_ssn"}
+ );
+ // Must not throw CANNOTPLANEXCEPTION
+ test(defaultHavingIndexPlan + ";" + lowCoveringSel + ";" + disableFTS + ";");
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{"RowKeyJoin"}
+ );
+ } finally {
+ test(defaultCoveringSel+";"+enableFTS+";");
+ }
+ }
+
+ @Test
+ public void testCaseSensitive() throws Exception {
+ String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ " where t.id.SSN = '100000003' ";
+ test(defaultHavingIndexPlan);
+
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {""},
+ new String[]{"indexName"}
+ );
+
+ }
+
+ @Test
+ public void testCaseSensitiveIncludedField() throws Exception {
+
+ String query = "SELECT t.`CONTACT`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn = '100007423'";
+ test(defaultHavingIndexPlan);
+
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"RowKeyJoin",
+ ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+ new String[]{}
+ );
+ }
+
+
+ @Test
+ public void testHashIndexNoRemovingSort() throws Exception {
+ String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " +
+ " where t.reverseid <'10' order by t.reverseid";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort", "indexName=hash_i_reverseid", "RowKeyJoin"},
+ new String[]{}
+ );
+ }
+
+ @Test
+ public void testCastTimestampPlan() throws Exception {
+ String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t " +
+ " where cast(t.activity.irs.firstlogin as timestamp)=to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"indexName=hash_i_cast_timestamp_firstlogin"},
+ new String[]{"RowKeyJoin"}
+ );
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("ssn").baselineValues("100007423")
+ .go();
+
+ }
+
+ @Test
+ public void testNotConditionNoIndexPlan() throws Exception {
+ String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where NOT t.id.ssn = '100007423'";
+
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {},
+ new String[]{"indexName="}
+ );
+
+
+ String notInQuery = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn NOT IN ('100007423', '100007424')";
+ PlanTestBase.testPlanMatchingPatterns(notInQuery,
+ new String[] {},
+ new String[]{"indexName="}
+ );
+
+ String notLikeQuery = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+ " where t.id.ssn NOT LIKE '100007423'";
+ PlanTestBase.testPlanMatchingPatterns(notLikeQuery,
+ new String[] {},
+ new String[]{"indexName="}
+ );
+
+ }
+
+ @Test
+ public void testNoFilterOrderByCoveringPlan() throws Exception {
+ String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ "order by t.id.ssn limit 2";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"indexName=i_ssn"},
+ new String[]{"Sort", "TopN", "RowKeyJoin"}
+ );
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("ssn", "phone").baselineValues("100000000", "6500008069")
+ .baselineColumns("ssn", "phone").baselineValues("100000001", "6500001411")
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testNoFilterAndLimitOrderByCoveringPlan() throws Exception {
+ String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ "order by t.id.ssn";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"Sort"},
+ new String[]{"indexName=*", "RowKeyJoin", "TopN"}
+ );
+ }
+
+ @Test
+ public void testNoFilterOrderByCast() throws Exception {
+ String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ "order by CAST(t.id.ssn as INT) limit 2";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"indexName=i_cast_int_ssn"},
+ new String[]{"TopN", "Sort", "RowKeyJoin"}
+ );
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("ssn", "phone").baselineValues(100000000, "6500008069")
+ .baselineColumns("ssn", "phone").baselineValues(100000001, "6500001411")
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testNoFilterAndLimitOrderByCast() throws Exception {
+ String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ "order by CAST(t.id.ssn as INT)";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] { "Sort"},
+ new String[]{"indexName=*","TopN", "RowKeyJoin"}
+ );
+ }
+
+ @Test
+ public void testNoFilterOrderByHashIndex() throws Exception {
+ String query = "SELECT cast(t.activity.irs.firstlogin as timestamp) AS `firstlogin`, t.id.ssn as ssn FROM hbase.`index_test_primary` as t " +
+ "order by cast(t.activity.irs.firstlogin as timestamp), t.id.ssn limit 2";
+ test(defaultHavingIndexPlan);
+ //no collation for hash index so Sort or TopN must have been preserved
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"(Sort|TopN)"},
+ new String[]{"indexName="}
+ );
+ DateTime date = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+ .parseDateTime("2010-01-21 00:12:24");
+
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("firstlogin", "ssn").baselineValues(date, "100005592")
+ .baselineColumns("firstlogin", "ssn").baselineValues(date, "100005844")
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testNoFilterOrderBySimpleField() throws Exception {
+ String query = "SELECT t.reverseid as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+ "order by t.driverlicense limit 2";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"indexName=i_lic"},
+ new String[]{"Sort", "TopN"}
+ );
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("rid", "lic").baselineValues("4539", 100000000L)
+ .baselineColumns("rid", "lic").baselineValues("943", 100000001L)
+ .build()
+ .run();
+ }
+
+ @Test //negative case for no filter plan
+ public void testNoFilterOrderByNoIndexMatch() throws Exception {
+ String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+ "order by t.name.fname limit 2";
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"(Sort|TopN)"},
+ new String[]{"indexName="}
+ );
+ }
+
+// Enable this testcase once MD-2848 is fixed.
+// @Test
+// public void IntersectPlanWithOneSideNoRows() throws Exception {
+// try {
+// String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+// " where t.personal.age = 53 AND t.personal.income=111145";
+// test(defaultHavingIndexPlan);
+// test(preferIntersectPlans + ";" + disableFTS);
+// PlanTestBase.testPlanMatchingPatterns(query,
+// new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*HashJoin(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)"},
+// new String[]{}
+// );
+//
+// testNoResult(query);
+//
+// } finally {
+// test(defaultIntersectPlans + ";" + enableFTS);
+// }
+// }
+
+ //"i_cast_age_state_phone", "$CAST(personal.age@STRING),address.state,contact.phone", "name.fname",
+ @Test
+ public void testTrailingFieldIndexCovering() throws Exception {
+ String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+ " where cast(t.personal.age as INT)=53 AND t.contact.phone='6500005471' ";
+
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"indexName=i_cast_age_income_phone"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("fname").baselineValues("KfFzK")
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testIncludedFieldCovering() throws Exception {
+ String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+ " where cast(t.personal.age as INT)=53 AND t.name.fname='KfFzK' ";
+
+ test(defaultHavingIndexPlan);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"indexName=i_cast_age_income_phone"},
+ new String[]{"RowKeyJoin"}
+ );
+
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("phone").baselineValues("6500005471")
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testWithFilterGroupBy() throws Exception {
+ String query = " select t1.driverlicense from hbase.`index_test_primary` t1" +
+ " where t1.driverlicense > 100000001 group by t1.driverlicense limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ //no collation for hash index so Sort or TopN must have been preserved
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_lic", "StreamAgg"},
+ new String[]{"(Sort|TopN)"}
+ );
+
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery(disableHashAgg)
+ .baselineColumns("driverlicense").baselineValues(100000002L)
+ .baselineColumns("driverlicense").baselineValues(100000003L)
+ .build()
+ .run();
+ } finally {
+ test(enableHashAgg);
+ }
+ }
+
+ @Test
+ public void testNoFilterOrderByDesc() throws Exception {
+ String query = " select t1.driverlicense from hbase.`index_test_primary` t1" +
+ " order by t1.driverlicense desc limit 2";
+ test(defaultHavingIndexPlan);
+ //no collation for hash index so Sort or TopN must have been preserved
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[] {"(Sort|TopN)"},
+ new String[]{"indexName="}
+ );
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query)
+ .baselineColumns("driverlicense").baselineValues(100009999L)
+ .baselineColumns("driverlicense").baselineValues(100009998L)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testNoFilterGroupBy() throws Exception {
+ String query = " select t1.driverlicense from hbase.`index_test_primary` t1" +
+ " group by t1.driverlicense limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ //no collation for hash index so Sort or TopN must have been preserved
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_lic", "StreamAgg"},
+ new String[]{"(Sort|TopN)"}
+ );
+
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery(disableHashAgg)
+ .baselineColumns("driverlicense").baselineValues(100000000L)
+ .baselineColumns("driverlicense").baselineValues(100000001L)
+ .build()
+ .run();
+ } finally {
+ test(enableHashAgg);
+ }
+ }
+
+ @Test
+ public void testNoFilterGroupByCoveringPlan() throws Exception {
+ String query = "SELECT t.`id`.`ssn` AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " +
+ "group by t.id.ssn limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_ssn", "StreamAgg"},
+ new String[]{"Sort", "TopN", "RowKeyJoin"}
+ );
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery(disableHashAgg)
+ .baselineColumns("ssn", "phone").baselineValues("100000000", "6500008069")
+ .baselineColumns("ssn", "phone").baselineValues("100000001", "6500001411")
+ .build()
+ .run();
+ } finally {
+ test(enableHashAgg);
+ }
+ }
+
+ @Test
+ public void testNoFilterGroupByCast() throws Exception {
+ String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " +
+ "group by CAST(t.id.ssn as INT) limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_cast_int_ssn", "StreamAgg"},
+ new String[]{"TopN", "Sort", "RowKeyJoin"}
+ );
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery(disableHashAgg)
+ .baselineColumns("ssn", "phone").baselineValues(100000000, "6500008069")
+ .baselineColumns("ssn", "phone").baselineValues(100000001, "6500001411")
+ .build()
+ .run();
+ } finally {
+ test(enableHashAgg);
+ }
+ }
+
+ @Test
+ public void testNoFilterGroupByHashIndex() throws Exception {
+ String query = "SELECT cast(t.activity.irs.firstlogin as timestamp) AS `firstlogin`, max(t.id.ssn) as ssn FROM hbase.`index_test_primary` as t " +
+ "group by cast(t.activity.irs.firstlogin as timestamp) limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ //no collation for hash index so Sort or TopN must have been preserved
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"(Sort|TopN)", "StreamAgg"},
+ new String[]{"indexName="}
+ );
+ DateTime date1 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+ .parseDateTime("2010-01-21 00:12:24");
+
+ DateTime date2 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+ .parseDateTime("2010-01-21 00:24:48");
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery(disableHashAgg)
+ .baselineColumns("firstlogin", "ssn").baselineValues(date1, "100006852")
+ .baselineColumns("firstlogin", "ssn").baselineValues(date2, "100003660")
+ .build()
+ .run();
+ } finally {
+ test(enableHashAgg);
+ }
+ }
+
+ @Test
+ public void testNoFilterGroupBySimpleField() throws Exception {
+ String query = "SELECT max(t.reverseid) as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+ "group by t.driverlicense limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_lic", "StreamAgg"},
+ new String[]{"Sort", "TopN"}
+ );
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery(disableHashAgg)
+ .baselineColumns("rid", "lic").baselineValues("4539", 100000000L)
+ .baselineColumns("rid", "lic").baselineValues("943", 100000001L)
+ .build()
+ .run();
+ } finally {
+ test(enableHashAgg);
+ }
+ }
+
+ @Test //negative case for no filter plan
+ public void testNoFilterGroupByNoIndexMatch() throws Exception {
+ String query = "SELECT max(t.`id`.`ssn`) AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " +
+ "group by t.name.fname limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"(Sort|TopN)", "StreamAgg"},
+ new String[]{"indexName="}
+ );
+ } finally {
+ test(enableHashAgg);
+ }
+ }
+
+ @Test
+ public void testNoFilterGroupBySimpleFieldParallel() throws Exception {
+ String query = "SELECT max(t.reverseid) as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+ "group by t.driverlicense order by t.driverlicense limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(disableHashAgg);
+ test(sliceTargetSmall);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_lic", "StreamAgg", "HashToMergeExchange"},
+ new String[]{"Sort", "TopN"}
+ );
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+ .optionSettingQueriesForTestQuery(disableHashAgg)
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .baselineColumns("rid", "lic").baselineValues("4539", 100000000L)
+ .baselineColumns("rid", "lic").baselineValues("943", 100000001L)
+ .build()
+ .run();
+ } finally {
+ test(enableHashAgg);
+ test(sliceTargetDefault);
+ }
+ }
+
+ @Test
+ public void testLimitPushdownCoveringPlan() throws Exception {
+ String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age = 53 limit 3";
+ try {
+ test(defaultHavingIndexPlan + ";" + disableFTS + ";");
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+ new String[]{".*JsonTableGroupScan.*indexName=i_age_with_fname.*rowcount = 3.0"},
+ new String[]{}
+ );
+ } finally {
+ test(enableFTS);
+ }
+ }
+
+ @Test
+ public void testLimitPushdownOrderByCoveringPlan() throws Exception {
+ String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age = 53 order by t.personal.age limit 3";
+ try {
+ test(defaultHavingIndexPlan + ";" + disableFTS + ";");
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+ new String[]{".*JsonTableGroupScan.*indexName=i_age_with_fname.*rowcount = 3.0"},
+ new String[]{}
+ );
+ } finally {
+ test(enableFTS);
+ }
+ }
+
+ @Test
+ public void testLimitPushdownNonCoveringPlan() throws Exception {
+ String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age = 53 limit 7";
+ try {
+ test(defaultHavingIndexPlan+";"+disableFTS+";");
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+ new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary.*rowcount = 7.0"},
+ new String[]{}
+ );
+ } finally {
+ test(enableFTS);
+ }
+ }
+
+ @Test
+ public void testLimitPushdownOrderByNonCoveringPlan() throws Exception {
+ // Limit pushdown should NOT happen past rowkey join when ordering is required
+ String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+ " where t.personal.age = 53 order by t.personal.age limit 7";
+ try {
+ test(defaultHavingIndexPlan + ";" + disableFTS + ";" + sliceTargetSmall + ";");
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+ new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*"},
+ new String[]{".*tableName=.*index_test_primary.*rowcount = 7.*"}
+ );
+ } finally {
+ test(enableFTS);
+ }
+ }
+
+ @Test
+ public void testLimit0Pushdown() throws Exception {
+ // Limit pushdown should NOT happen past project with CONVERT_FROMJSON
+ String query = "select convert_from(convert_to(t.`name`.`lname`, 'JSON'), 'JSON') " +
+ "from hbase.`index_test_primary` as t limit 0";
+ try {
+ test(defaultHavingIndexPlan + ";");
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+ new String[]{"Limit(.*[\n\r])+.*Project.*CONVERT_FROMJSON(.*[\n\r])+.*Scan"},
+ new String[]{}
+ );
+ } finally {
+ }
+ }
+
+ @Test
+ public void testRemovalOfReduntantHashToMergeExchange() throws Exception {
+ String query = "SELECT t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+ "order by t.driverlicense limit 2";
+ try {
+ test(defaultHavingIndexPlan);
+ test(sliceTargetSmall);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_lic"},
+ new String[]{"HashToMergeExchange", "Sort", "TopN"});
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("lic").baselineValues(100000000L)
+ .baselineColumns("lic").baselineValues(100000001L)
+ .build()
+ .run();
+ } finally {
+ test(sliceTargetDefault);
+ }
+ }
+
+ @Test
+ public void testMultiPhaseAgg() throws Exception {
+ String query = "select count(t.reverseid) from hbase.`index_test_primary` as t " +
+ "group by t.driverlicense order by t.driverlicense";
+ try {
+ test(defaultHavingIndexPlan);
+ test(sliceTargetSmall);
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"indexName=i_lic", "HashToMergeExchange", "StreamAgg", "StreamAgg"},
+ new String[]{"Sort", "TopN"});
+ } finally {
+ test(sliceTargetDefault);
+ }
+ }
+
+ @Test
+ public void testHangForSimpleDistinct() throws Exception {
+ String query = "select distinct t.driverlicense from hbase.`index_test_primary` as t order by t.driverlicense limit 1";
+
+ try {
+ test(sliceTargetSmall);
+ testBuilder()
+ .ordered()
+ .sqlQuery(query)
+ .baselineColumns("driverlicense").baselineValues(100000000L)
+ .build()
+ .run();
+ } finally {
+ test(sliceTargetDefault);
+ }
+ }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java
new file mode 100644
index 000000000..bc857d158
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import static com.mapr.drill.maprdb.tests.MaprDBTestsSuite.INDEX_FLUSH_TIMEOUT;
+
+import java.io.InputStream;
+import java.io.StringBufferInputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.ojai.DocumentStream;
+import org.ojai.json.Json;
+
+import com.mapr.db.Admin;
+import com.mapr.db.Table;
+import com.mapr.db.TableDescriptor;
+import com.mapr.db.impl.MapRDBImpl;
+import com.mapr.db.impl.TableDescriptorImpl;
+import com.mapr.db.tests.utils.DBTests;
+import com.mapr.fs.utils.ssh.TestCluster;
+
+/**
+ * This class is to generate a MapR json table of this schema:
+ * {
+ * "address" : {
+ * "city":"wtj",
+ * "state":"ho"
+ * }
+ * "contact" : {
+ * "email":"VcFahjRfM@gmail.com",
+ * "phone":"6500005583"
+ * }
+ * "id" : {
+ * "ssn":"100005461"
+ * }
+ * "name" : {
+ * "fname":"VcFahj",
+ * "lname":"RfM"
+ * }
+ * }
+ *
+ */
+public class LargeTableGen extends LargeTableGenBase {
+
+ static final int SPLIT_SIZE = 5000;
+ private Admin admin;
+
+ public LargeTableGen(Admin dbadmin) {
+ admin = dbadmin;
+ }
+
+ Table createOrGetTable(String tableName, int recordNum) {
+ if (admin.tableExists(tableName)) {
+ return MapRDBImpl.getTable(tableName);
+ //admin.deleteTable(tableName);
+ }
+ else {
+ TableDescriptor desc = new TableDescriptorImpl(new Path(tableName));
+
+ int splits = (recordNum / SPLIT_SIZE) - (((recordNum % SPLIT_SIZE) > 1)? 0 : 1);
+
+ String[] splitsStr = new String[splits];
+ StringBuilder strBuilder = new StringBuilder("Splits:");
+ for(int i=0; i<splits; ++i) {
+ splitsStr[i] = String.format("%d", (i+1)*SPLIT_SIZE);
+ strBuilder.append(splitsStr[i] + ", ");
+ }
+ System.out.print(strBuilder.toString());
+
+ return admin.createTable(desc, splitsStr);
+ }
+ }
+
+ private void createIndex(Table table, String[] indexDef) throws Exception {
+ if(indexDef == null) {
+ //don't create index here. indexes may have been created
+ return;
+ }
+ for(int i=0; i<indexDef.length / 3; ++i) {
+ String indexCmd = String.format("maprcli table index add"
+ + " -path " + table.getPath()
+ + " -index %s"
+ + " -indexedfields '%s'"
+ + ((indexDef[3 * i + 2].length()==0)?"":" -includedfields '%s'")
+ + ((indexDef[3 * i].startsWith("hash"))? " -hashed true" : ""),
+ indexDefInCommand(indexDef[3 * i]), //index name
+ indexDefInCommand(indexDef[3 * i + 1]), //indexedfields
+ indexDefInCommand(indexDef[3 * i + 2])); //includedfields
+ System.out.println(indexCmd);
+
+ TestCluster.runCommand(indexCmd);
+ DBTests.admin().getTableIndexes(table.getPath(), true);
+ }
+ }
+
+ private String indexDefInCommand(String def) {
+ String[] splitted = def.split(",");
+ StringBuffer ret = new StringBuffer();
+ for(String field: splitted) {
+ if(ret.length() == 0) {
+ ret.append(field);
+ }
+ else {
+ ret.append(",").append(field);
+ }
+ }
+ return ret.toString();
+ }
+ public void generateTableWithIndex(String tablePath, int recordNumber, String[] indexDef) throws Exception {
+ // create index
+
+ initRandVector(recordNumber);
+ initDictionary();
+ DBTests.setTableStatsSendInterval(1);
+
+ if (admin.tableExists(tablePath)) {
+ //admin.deleteTable(tablePath);
+ }
+
+ //create Json String
+ int batch, i;
+ int BATCH_SIZE=2000;
+ try (Table table = createOrGetTable(tablePath, recordNumber)) {
+ //create index
+ createIndex(table, indexDef);
+ for (batch = 0; batch < recordNumber; batch += BATCH_SIZE) {
+ int batchStop = Math.min(recordNumber, batch + BATCH_SIZE);
+ StringBuffer strBuf = new StringBuffer();
+ for (i = batch; i < batchStop; ++i) {
+
+ strBuf.append(String.format("{\"rowid\": \"%d\", \"reverseid\": \"%d\", \"id\": {\"ssn\": \"%s\"}, \"contact\": {\"phone\": \"%s\", \"email\": \"%s\"}," +
+ "\"address\": {\"city\": \"%s\", \"state\": \"%s\"}, \"name\": { \"fname\": \"%s\", \"lname\": \"%s\" }," +
+ "\"personal\": {\"age\" : %s, \"income\": %s, \"birthdate\": {\"$dateDay\": \"%s\"} }," +
+ "\"activity\": {\"irs\" : { \"firstlogin\": \"%s\" } }," +
+ "\"driverlicense\":{\"$numberLong\": %s} } \n",
+ i + 1, recordNumber - i , getSSN(i), getPhone(i), getEmail(i),
+ getAddress(i)[2], getAddress(i)[1], getFirstName(i), getLastName(i),
+ getAge(i), getIncome(i), getBirthdate(i),
+ getFirstLogin(i),
+ getSSN(i)));
+ }
+ try (InputStream in = new StringBufferInputStream(strBuf.toString());
+ DocumentStream stream = Json.newDocumentStream(in)) {
+ //write by individual document
+ //for (Document document : stream) {
+ // table.insert(document, "rowid");
+ //}
+ try {
+ table.insert(stream, "rowid"); //insert a batch of document in stream
+ }catch(Exception e) {
+ System.out.println(stream.toString());
+ throw e;
+ }
+ }
+ }
+ table.flush();
+ DBTests.waitForIndexFlush(table.getPath(), INDEX_FLUSH_TIMEOUT);
+ Thread.sleep(200000);
+ }
+ }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java
new file mode 100644
index 000000000..917f42a8b
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+public class LargeTableGenBase {
+
+ private boolean dict_ready = false;
+
+ protected List<String> firstnames;
+ protected List<String> lastnames;
+ protected List<String[]> cities;
+ protected int[] randomized;
+
+ protected synchronized void initDictionary() {
+ initDictionaryWithRand();
+ }
+
+ protected void initDictionaryWithRand() {
+ {
+ firstnames = new ArrayList<>();
+ lastnames = new ArrayList<>();
+ cities = new ArrayList<>();
+ List<String> states = new ArrayList<>();
+
+ int fnNum = 2000; //2k
+ int lnNum = 200000;//200k
+ int cityNum = 10000;//10k
+ int stateNum = 50;
+ Random rand = new Random(2017);
+ int i;
+ try {
+ Set<String> strSet = new LinkedHashSet<>();
+ while(strSet.size() < stateNum) {
+ strSet.add(RandomStringUtils.random(2, 0, 0, true, false, null, rand));
+ }
+ states.addAll(strSet);
+
+ strSet = new LinkedHashSet<>();
+ while(strSet.size() < cityNum) {
+ int len = 3 + strSet.size() % 6;
+ strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand));
+ }
+
+ Iterator<String> it = strSet.iterator();
+ for(i=0; i<cityNum; ++i) {
+ cities.add(new String[]{"10000", states.get(i%stateNum), it.next()});
+ }
+
+ strSet = new LinkedHashSet<>();
+ while(strSet.size() < fnNum) {
+ int len = 3 + strSet.size() % 6;
+ strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand));
+ }
+ firstnames.addAll(strSet);
+
+ strSet = new LinkedHashSet<>();
+ while(strSet.size() < lnNum) {
+ int len = 3 + strSet.size() % 6;
+ strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand));
+ }
+ lastnames.addAll(strSet);
+ }
+ catch(Exception e) {
+ System.out.println("init data got exception");
+ e.printStackTrace();
+ }
+ dict_ready = true;
+ }
+ }
+
+ protected String getFirstName(int i) {
+ return firstnames.get((randomized[ i%randomized.length ] + i )% firstnames.size());
+ }
+
+ protected String getLastName(int i) {
+ return lastnames.get((randomized[ (2*i + randomized[i%randomized.length])% randomized.length]) % lastnames.size());
+ }
+
+ protected String[] getAddress(int i) {
+ return cities.get((randomized[(i+ randomized[i%randomized.length])%randomized.length]) % cities.size());
+ }
+
+ protected String getSSN(int i){
+ return String.format("%d", 1000*1000*100 + randomized[ i % randomized.length]);
+ }
+
+ protected String getPhone(int i) {
+ //80% phones are unique,
+ return String.format("%d", 6500*1000*1000L + randomized[ (randomized.length - i) %((int) (randomized.length * 0.8)) ]);
+ }
+
+ protected String getEmail(int i){
+ return getFirstName(i) + getLastName(i) + "@" + "gmail.com";
+ }
+
+ protected String getAge(int i) {
+ return String.format("%d",randomized[i%randomized.length] % 60 + 10);
+ }
+
+ protected String getIncome(int i) {//unit should be $10k
+ return String.format("%d",randomized[i%randomized.length] % 47 + 1);
+ }
+
+ //date yyyy-mm-dd
+ protected String getBirthdate(int i) {
+ int thisseed = randomized[i%randomized.length];
+ return String.format("%d-%02d-%02d",
+ 2016 - (thisseed % 60 + 10), thisseed % 12 + 1, (thisseed * 31) % 28 + 1 );
+ }
+
+ //timestamp, yyyy-mm-dd HH:mm:ss
+ protected String getFirstLogin(int i) {
+ int thisseed = randomized[i%randomized.length];
+ int nextseed = randomized[(i+1)%randomized.length];
+ return String.format("%d-%02d-%02d %02d:%02d:%02d.0",
+ 2016 - (thisseed % 7), (thisseed * 31) % 12 + 1, thisseed % 28 + 1, nextseed % 24, nextseed % 60, (nextseed * 47) % 60 );
+ }
+
+
+ protected String getField(String field, int i) {
+ if(field.equals("ssn")) {
+ return getSSN(i);
+ }
+ else if (field.equals("phone")) {
+ return getPhone(i);
+ }
+ else if(field.equals("email")) {
+ return getEmail(i);
+ }
+ else if(field.equals("city")) {
+ return getAddress(i)[1];
+ }
+ else if(field.equals("state")) {
+ return getAddress(i)[0];
+ }
+ else if(field.equals("fname")) {
+ return getFirstName(i);
+ }
+ else if(field.equals("lname")) {
+ return getLastName(i);
+ }
+ return "";
+ }
+
+
+ protected void initRandVector(int recordNumber) {
+ int i;
+ Random rand = new Random(2016);
+ randomized = new int[recordNumber];
+ for(i = 0; i<recordNumber; ++i) {
+ randomized[i] = i;
+ }
+ for (i=0; i<recordNumber; ++i) {
+ int idx1 = rand.nextInt(recordNumber);
+ int idx2 = rand.nextInt(recordNumber);
+ int temp = randomized[idx1];
+ randomized[idx1] = randomized[idx2];
+ randomized[idx2] = temp;
+ }
+ }
+
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java
new file mode 100644
index 000000000..36e25abaf
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import com.google.common.collect.Lists;
+import com.mapr.db.Admin;
+import com.mapr.drill.maprdb.tests.MaprDBTestsSuite;
+import com.mapr.drill.maprdb.tests.json.BaseJsonTest;
+import com.mapr.tests.annotations.ClusterTest;
+import org.apache.drill.PlanTestBase;
+import org.apache.hadoop.hbase.TableName;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+
+@Category(ClusterTest.class)
+public class StatisticsTest extends IndexPlanTest {
+ /**
+ * A sample row of this 10K table:
+ ------------------+-----------------------------+--------+
+ | 1012 | {"city":"pfrrs","state":"pc"} | {"email":"KfFzKUZwNk@gmail.com","phone":"6500005471"} |
+ {"ssn":"100007423"} | {"fname":"KfFzK","lname":"UZwNk"} | {"age":53.0,"income":45.0} | 1012 |
+ *
+ * This test suite generate random content to fill all the rows, since the random function always start from
+ * the same seed for different runs, when the row count is not changed, the data in table will always be the same,
+ * thus the query result could be predicted and verified.
+ */
+
+ @Test
+ @Ignore("Currently untested; re-enable after stats/costing integration complete")
+ public void testFilters() throws Exception {
+ String query;
+ String explain = "explain plan including all attributes for ";
+
+ // Top-level ANDs - Leading columns (personal.age), (address.state)
+ query = "select * from hbase.`index_test_primary` t "
+ + " where (t.personal.age < 30 or t.personal.age > 100)"
+ + " and (t.address.state = 'mo' or t.address.state = 'ca')";
+ PlanTestBase.testPlanMatchingPatterns(explain+query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+ new String[] {}
+ );
+
+ // Top-level ORs - Cannot split top-level ORs so use defaults
+ query = "select * from hbase.`index_test_primary` t "
+ + " where (t.personal.age > 30 and t.personal.age < 100)"
+ + " or (t.address.state = 'mo')";
+ PlanTestBase.testPlanMatchingPatterns(explain+query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+ new String[] {}
+ );
+
+ // ANDed condition - Leading index column(personal.age) and non-leading column(address.city)
+ query = "select * from hbase.`index_test_primary` t "
+ + " where (t.personal.age < 30 or t.personal.age > 100)"
+ + " and `address.city` = 'sf'";
+ PlanTestBase.testPlanMatchingPatterns(explain+query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+ new String[] {}
+ );
+
+ // ANDed condition - Leading index columns (address.state) and (address.city)
+ query = "select * from hbase.`index_test_primary` t "
+ + " where (`address.state` = 'mo' or `address.state` = 'ca') " // Leading index column
+ + " and `address.city` = 'sf'"; // Non leading index column
+ PlanTestBase.testPlanMatchingPatterns(explain+query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+ new String[] {}
+ );
+
+ // ANDed condition - Leading index columns (address.state) and non-index column (name.fname)
+ query = "select * from hbase.`index_test_primary` t "
+ + " where (`address.state` = 'mo' or `address.state` = 'ca') " // Leading index column
+ + " and `name.fname` = 'VcFahj'"; // Non index column
+ PlanTestBase.testPlanMatchingPatterns(explain+query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+ new String[] {}
+ );
+
+ // Simple condition - LIKE predicate
+ query = "select t._id as rowid from hbase.`index_test_primary` as t "
+ + "where t.driverlicense like '100007423%'";
+ PlanTestBase.testPlanMatchingPatterns(explain+query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+ new String[] {}
+ );
+
+ // Simple condition - LIKE predicate with ESCAPE clause
+ query = "select t._id as rowid from hbase.`index_test_primary` as t "
+ + "where t.driverlicense like '100007423%' ESCAPE '/'";
+ PlanTestBase.testPlanMatchingPatterns(explain+query,
+ new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+ new String[] {}
+ );
+ }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java
new file mode 100644
index 000000000..a501f8fa8
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+
+import com.mapr.db.Admin;
+import com.mapr.db.MapRDB;
+import org.apache.drill.exec.util.GuavaPatcher;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+* Copy classes to a MapR cluster node, then run a command like this:
+* java -classpath /tmp/drill-cmd-1.9.0-SNAPSHOT.jar:/opt/mapr/drill/drill-1.9.0/jars/*:/opt/mapr/drill/drill-1.9.0/jars/3rdparty/*:/opt/mapr/drill/drill-1.9.0/jars/ext/*
+* org.apache.drill.hbase.index.TableIndexGen -host 10.10.88.128 -port 5181 [-table pop3] [-size 1000000]
+*/
+
+class TestBigTable {
+
+ Admin admin;
+ boolean initialized = false;
+
+ LargeTableGen gen;
+
+ /*
+ "hbase.zookeeper.quorum": "10.10.88.128",
+ "hbase.zookeeper.property.clientPort": "5181"
+ */
+ void init(String host, String port) {
+ try {
+ admin = MapRDB.newAdmin();
+ initialized = true;
+ gen = new LargeTableGen(admin);
+ } catch (Exception e) {
+ System.out.println("Connection to HBase threw" + e.getMessage());
+ }
+ }
+}
+
+
+public class TableIndexCmd {
+
+ public static Map<String,String> parseParameter(String[] params) {
+ HashMap<String,String> retParams = new HashMap<String, String>();
+ for (int i=0; i<params.length; ++i) {
+ if (params[i].startsWith("-") && i<params.length - 1) {
+ String paramName = params[i].replaceFirst("-*", "");
+ retParams.put(paramName, params[i+1]);
+ ++i;
+ }
+ }
+ return retParams;
+ }
+
+ public static void pressEnterKeyToContinue()
+ {
+ System.out.println("Press any key to continue...");
+ try
+ {
+ System.in.read();
+ }
+ catch(Exception e)
+ {}
+ }
+
+
+ public static void main(String[] args) {
+ GuavaPatcher.patch();
+
+ String inHost = new String("localhost");
+ String inPort = new String("5181");
+ String inTable = new String("/tmp/population");
+ String dictPath = "hbase";
+ boolean waitKeyPress = true;
+ long inSize = 10000;
+ Map<String, String> params = parseParameter(args);
+ if(args.length >= 2) {
+ if(params.get("host") != null) {
+ inHost = params.get("host");
+ }
+ if(params.get("port") != null) {
+ inPort = params.get("port");
+ }
+ if(params.get("table") != null) {
+ inTable = params.get("table");
+ }
+ if(params.get("size") != null) {
+ inSize = Long.parseLong(params.get("size"));
+ }
+ if(params.get("dict") != null) {
+ dictPath = params.get("dict");
+ }
+ if(params.get("wait") != null) {
+ String answer = params.get("wait");
+ waitKeyPress = answer.startsWith("y") || answer.startsWith("t")? true : false;
+ }
+ }
+ if(waitKeyPress == true) {
+ pressEnterKeyToContinue();
+ }
+ try {
+ TestBigTable tbt = new TestBigTable();
+ tbt.init(inHost, inPort);
+ tbt.gen.generateTableWithIndex(inTable, (int)(inSize & 0xFFFFFFFFL), null);
+ }
+ catch(Exception e) {
+ System.out.println("generate big table got exception:" + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java
new file mode 100644
index 000000000..5f4da7439
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Class implementing OrderedPrel interface guarantees to provide ordered
+ * output on certain columns. TopNPrel and SortPrel base classes which implement
+ * this interface.
+ */
+public interface OrderedRel extends DrillRelNode {
+
+ /**
+ * A method to return ordering columns of the result.
+ * @return Collation order of the output.
+ */
+ RelCollation getCollation();
+
+ /**
+ * Offset value represented in RexNode.
+ * @return offset.
+ */
+ RexNode getOffset();
+
+ /**
+ * Fetch value represented in RexNode.
+ * @return fetch
+ */
+ RexNode getFetch();
+
+ /**
+ * A method to return if this relational node can be dropped during optimization process.
+ * @return true if this node can be dropped, false otherwise.
+ */
+ boolean canBeDropped();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
index 65788cb52..45251c641 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.index;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -28,6 +27,7 @@ import org.apache.drill.exec.physical.base.DbGroupScan;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.common.OrderedRel;
import java.util.List;
import java.util.Set;
@@ -66,7 +66,7 @@ public interface IndexCallContext {
RexNode getOrigCondition();
- Sort getSort();
+ OrderedRel getSort();
void createSortExprs();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java
index 27198bb02..3a6ea83d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.index;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -31,6 +30,7 @@ import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.DrillSortRel;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.calcite.rel.RelNode;
@@ -164,7 +164,7 @@ public class IndexLogicalPlanCallContext implements IndexCallContext {
return origPushedCondition;
}
- public Sort getSort() {
+ public OrderedRel getSort() {
return sort;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java
index 9c7b65167..91ff02c69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java
@@ -21,7 +21,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -29,10 +28,10 @@ import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
import org.apache.drill.exec.physical.base.DbGroupScan;
import org.apache.drill.exec.planner.common.DrillProjectRelBase;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.physical.SortPrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
@@ -42,8 +41,9 @@ import java.util.List;
import java.util.Set;
public class IndexPhysicalPlanCallContext implements IndexCallContext {
+
final public RelOptRuleCall call;
- final public SortPrel sort;
+ final public OrderedRel sort;
final public ProjectPrel upperProject;
final public FilterPrel filter;
final public ProjectPrel lowerProject;
@@ -67,7 +67,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext {
}
public IndexPhysicalPlanCallContext(RelOptRuleCall call,
- SortPrel sort,
+ OrderedRel sort,
ProjectPrel capProject,
FilterPrel filter,
ProjectPrel project,
@@ -83,7 +83,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext {
}
public IndexPhysicalPlanCallContext(RelOptRuleCall call,
- SortPrel sort,
+ OrderedRel sort,
ProjectPrel project,
ScanPrel scan, ExchangePrel exch) {
this.call = call;
@@ -171,7 +171,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext {
return origPushedCondition;
}
- public Sort getSort() {
+ public OrderedRel getSort() {
return sort;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java
index 666e2828a..cdad63ad0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.sql.SqlKind;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
@@ -56,6 +57,7 @@ import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexInputRef;
@@ -346,6 +348,21 @@ public class IndexPlanUtils {
return proj.getProjects();
}
+ public static boolean generateLimit(OrderedRel sort) {
+ RexNode fetchNode = sort.getFetch();
+ int fetchValue = (fetchNode == null) ? -1 : RexLiteral.intValue(fetchNode);
+ return fetchValue >=0;
+ }
+
+ public static RexNode getOffset(OrderedRel sort) {
+ return sort.getOffset();
+ }
+
+ public static RexNode getFetch(OrderedRel sort) {
+ return sort.getFetch();
+ }
+
+
/**
* generate logical expressions for sort rexNodes in SortRel, the result is store to IndexPlanCallContext
* @param indexContext
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java
index 36ff61f20..456542b31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java
@@ -30,7 +30,6 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -43,16 +42,19 @@ import org.apache.drill.exec.planner.logical.DrillFilterRel;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillSortRel;
import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.physical.SubsetTransformer;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.Prule;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.SortPrel;
import org.apache.drill.exec.planner.physical.HashToMergeExchangePrel;
import org.apache.drill.exec.planner.physical.SingleMergeExchangePrel;
import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.planner.physical.Prule;
-import org.apache.drill.exec.planner.physical.SubsetTransformer;
-import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
+import org.apache.drill.exec.planner.physical.TopNPrel;
+import org.apache.drill.exec.planner.physical.SortPrel;
+import org.apache.drill.exec.planner.physical.LimitPrel;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -67,7 +69,6 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
final protected DrillProjectRelBase origProject;
final protected DrillScanRelBase origScan;
final protected DrillProjectRelBase upperProject;
- final protected RelNode origSort;
final protected RexNode indexCondition;
final protected RexNode remainderCondition;
@@ -84,7 +85,6 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
this.origProject = indexContext.getLowerProject();
this.origScan = indexContext.getScan();
this.upperProject = indexContext.getUpperProject();
- this.origSort = indexContext.getSort();
this.indexCondition = indexCondition;
this.remainderCondition = remainderCondition;
this.indexContext = indexContext;
@@ -168,8 +168,8 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
return set;
}
- protected static boolean toRemoveSort(Sort sort, RelCollation inputCollation) {
- if ( (inputCollation != null) && inputCollation.satisfies(IndexPlanUtils.getCollation(sort))) {
+ protected static boolean toRemoveSort(RelCollation sortCollation, RelCollation inputCollation) {
+ if ( (inputCollation != null) && inputCollation.satisfies(sortCollation)) {
return true;
}
return false;
@@ -194,18 +194,34 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
}
}
+ private static RelNode getSortOrTopN(IndexCallContext indexContext,
+ RelNode sortNode, RelNode newRel, RelNode child) {
+ if (sortNode instanceof TopNPrel) {
+ return new TopNPrel(sortNode.getCluster(),
+ newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL).plus(indexContext.getCollation()),
+ child, ((TopNPrel)sortNode).getLimit(), indexContext.getCollation());
+ }
+ return new SortPrel(sortNode.getCluster(),
+ newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL).plus(indexContext.getCollation()),
+ child, indexContext.getCollation());
+ }
+
public static RelNode getSortNode(IndexCallContext indexContext, RelNode newRel, boolean donotGenerateSort,
boolean isSingleton, boolean isExchangeRequired) {
- Sort rel = indexContext.getSort();
+ OrderedRel rel = indexContext.getSort();
DrillDistributionTrait hashDistribution =
new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(indexContext.getDistributionFields()));
- if ( toRemoveSort(indexContext.getSort(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
+ if ( toRemoveSort(indexContext.getCollation(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
//we are going to remove sort
logger.debug("Not generating SortPrel since we have the required collation");
-
- RelTraitSet traits = newRel.getTraitSet().plus(IndexPlanUtils.getCollation(rel)).plus(Prel.DRILL_PHYSICAL);
+ if (IndexPlanUtils.generateLimit(rel)) {
+ newRel = new LimitPrel(newRel.getCluster(),
+ newRel.getTraitSet().plus(indexContext.getCollation()).plus(Prel.DRILL_PHYSICAL),
+ newRel, IndexPlanUtils.getOffset(rel), IndexPlanUtils.getFetch(rel));
+ }
+ RelTraitSet traits = newRel.getTraitSet().plus(indexContext.getCollation()).plus(Prel.DRILL_PHYSICAL);
newRel = Prule.convert(newRel, traits);
newRel = getExchange(newRel.getCluster(), isSingleton, isExchangeRequired,
traits, hashDistribution, indexContext, newRel);
@@ -215,10 +231,9 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
logger.debug("Not generating SortPrel and index plan, since just picking index for full index scan is not beneficial.");
return null;
}
- RelTraitSet traits = newRel.getTraitSet().plus(IndexPlanUtils.getCollation(rel)).plus(Prel.DRILL_PHYSICAL);
- newRel = new SortPrel(rel.getCluster(),
- newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL).plus(IndexPlanUtils.getCollation(rel)),
- Prule.convert(newRel, newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL)), IndexPlanUtils.getCollation(rel));
+ RelTraitSet traits = newRel.getTraitSet().plus(indexContext.getCollation()).plus(Prel.DRILL_PHYSICAL);
+ newRel = getSortOrTopN(indexContext, rel, newRel,
+ Prule.convert(newRel, newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL)));
newRel = getExchange(newRel.getCluster(), isSingleton, isExchangeRequired,
traits, hashDistribution, indexContext, newRel);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java
index 163aef986..e06ac8ff3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.planner.index.generators;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelCollation;
@@ -30,11 +32,11 @@ import org.apache.drill.exec.planner.index.FunctionalIndexInfo;
import org.apache.drill.exec.planner.index.IndexPlanUtils;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
-import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.Prule;
+import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.physical.base.DbGroupScan;
import java.util.List;
@@ -62,7 +64,7 @@ public class CoveringPlanNoFilterGenerator extends AbstractIndexPlanGenerator {
}
public RelNode convertChild() throws InvalidRelException {
-
+ Preconditions.checkNotNull(indexContext.getSort());
if (indexGroupScan == null) {
logger.error("Null indexgroupScan in CoveringIndexPlanGenerator.convertChild");
return null;
@@ -106,11 +108,9 @@ public class CoveringPlanNoFilterGenerator extends AbstractIndexPlanGenerator {
}
}
- if (indexContext.getSort() != null) {
- finalRel = getSortNode(indexContext, finalRel, true, isSingletonSortedStream, indexContext.getExchange() != null);
- if (finalRel == null) {
- return null;
- }
+ finalRel = getSortNode(indexContext, finalRel, true, isSingletonSortedStream, indexContext.getExchange() != null);
+ if (finalRel == null) {
+ return null;
}
finalRel = Prule.convert(finalRel, finalRel.getTraitSet().plus(Prel.DRILL_PHYSICAL));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
index db220fab4..e1337bc7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
@@ -319,7 +319,7 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
if (indexContext.getSort() != null) {
// When ordering is required, serialize the index scan side. With parallel index scans, the rowkey join may receive
// unsorted input because ordering is not guaranteed across different parallel inputs.
- if (toRemoveSort(indexContext.getSort(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
+ if (toRemoveSort(indexContext.getCollation(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
((IndexGroupScan)indexScanPrel.getGroupScan()).setParallelizationWidth(1);
}
newRel = getSortNode(indexContext, newRel, false,true, true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java
index db095047d..86ed4301c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.planner.index.rules;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
@@ -39,7 +41,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
-import org.apache.drill.exec.planner.physical.SortPrel;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
import org.apache.calcite.rel.RelNode;
@@ -52,21 +54,21 @@ public class DbScanSortRemovalRule extends Prule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DbScanSortRemovalRule.class);
public static final RelOptRule INDEX_SORT_EXCHANGE_SCAN =
- new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+ new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
RelOptHelper.some(HashToRandomExchangePrel.class,
RelOptHelper.any(ScanPrel.class))), "DbScanSortRemovalRule:sort_exchange_Scan", new MatchSES());
public static final RelOptRule INDEX_SORT_SCAN =
- new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+ new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
RelOptHelper.any(ScanPrel.class)), "DbScanSortRemovalRule:Sort_Scan", new MatchSS());
public static final RelOptRule INDEX_SORT_PROJ_SCAN =
- new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+ new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
RelOptHelper.some(ProjectPrel.class,
RelOptHelper.any(ScanPrel.class))), "DbScanSortRemovalRule:Sort_Proj_Scan", new MatchSPS());
public static final RelOptRule INDEX_SORT_EXCHANGE_PROJ_SCAN =
- new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+ new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
RelOptHelper.some(HashToRandomExchangePrel.class,
RelOptHelper.some(ProjectPrel.class,
RelOptHelper.any(ScanPrel.class)))), "DbScanSortRemovalRule:sort_exchange_proj_Scan", new MatchSEPS());
@@ -80,16 +82,21 @@ public class DbScanSortRemovalRule extends Prule {
this.match = match;
}
+ private static boolean isRemovableRel(OrderedRel node) {
+ return node.canBeDropped();
+ }
+
private static class MatchSES extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
public boolean match(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel)call.rel(2);
- return checkScan(scan.getGroupScan());
+ final OrderedRel sort = call.rel(0);
+ final ScanPrel scan = call.rel(2);
+ return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
}
public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(2);
- final SortPrel sort = call.rel(0);
+ final OrderedRel sort = call.rel(0);
final ExchangePrel exch = call.rel(1);
return new IndexPhysicalPlanCallContext(call, sort, null, scan, exch);
}
@@ -98,13 +105,14 @@ public class DbScanSortRemovalRule extends Prule {
private static class MatchSS extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
public boolean match(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel)call.rel(1);
- return checkScan(scan.getGroupScan());
+ final OrderedRel sort = call.rel(0);
+ final ScanPrel scan = call.rel(1);
+ return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
}
public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(1);
- final SortPrel sort = call.rel(0);
+ final OrderedRel sort = call.rel(0);
return new IndexPhysicalPlanCallContext(call, sort, null, scan, null);
}
}
@@ -112,14 +120,15 @@ public class DbScanSortRemovalRule extends Prule {
private static class MatchSPS extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
public boolean match(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel)call.rel(2);
- return checkScan(scan.getGroupScan());
+ final OrderedRel sort = call.rel(0);
+ final ScanPrel scan = call.rel(2);
+ return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
}
public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(2);
final ProjectPrel proj = call.rel(1);
- final SortPrel sort = call.rel(0);
+ final OrderedRel sort = call.rel(0);
return new IndexPhysicalPlanCallContext(call, sort, proj, scan, null);
}
}
@@ -127,13 +136,14 @@ public class DbScanSortRemovalRule extends Prule {
private static class MatchSEPS extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
public boolean match(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel)call.rel(3);
- return checkScan(scan.getGroupScan());
+ final OrderedRel sort = call.rel(0);
+ final ScanPrel scan = call.rel(3);
+ return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
}
public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(3);
- final SortPrel sort = call.rel(0);
+ final OrderedRel sort = call.rel(0);
final ProjectPrel proj = call.rel(2);
final ExchangePrel exch = call.rel(1);
return new IndexPhysicalPlanCallContext(call, sort, proj, scan, exch);
@@ -187,12 +197,15 @@ public class DbScanSortRemovalRule extends Prule {
false, settings);
if (planGen.convertChild() != null) {
indexContext.getCall().transformTo(planGen.convertChild());
+ } else {
+ logger.debug("Not able to generate index plan in ", this.getClass().toString());
}
} catch (Exception e) {
logger.warn("Exception while trying to generate indexscan to remove sort", e);
}
}
} else {
+ Preconditions.checkNotNull(indexContext.getSort());
//This case tries to use the already generated index to see if a sort can be removed.
if (indexContext.scan.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE).getFieldCollations().size() == 0) {
return;
@@ -204,12 +217,12 @@ public class DbScanSortRemovalRule extends Prule {
inputs.add(finalRel);
finalRel = indexContext.lowerProject.copy(indexContext.lowerProject.getTraitSet(), inputs);
}
- if (indexContext.getSort() != null) {
- finalRel = AbstractIndexPlanGenerator.getSortNode(indexContext, finalRel, true,false,
+
+ finalRel = AbstractIndexPlanGenerator.getSortNode(indexContext, finalRel, true,false,
indexContext.exch != null);
- }
if (finalRel == null) {
+ logger.debug("Not able to generate index plan in ", this.getClass().toString());
return;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
index cfa0e26b4..1e380cff7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
@@ -41,7 +42,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
/**
* Sort implemented in Drill.
*/
-public class DrillSortRel extends Sort implements DrillRel {
+public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
/** Creates a DrillSortRel. */
public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
@@ -98,4 +99,18 @@ public class DrillSortRel extends Sort implements DrillRel {
return new DrillSortRel(context.getCluster(), context.getLogicalTraits(), input, RelCollations.of(collations));
}
+ @Override
+ public RexNode getOffset() {
+ return offset;
+ }
+
+ @Override
+ public RexNode getFetch() {
+ return fetch;
+ }
+
+ @Override
+ public boolean canBeDropped() {
+ return true;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 77fb4c8bb..8064c4287 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelFieldCollation;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
@@ -40,16 +41,25 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
-public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel {
+public class SortPrel extends org.apache.calcite.rel.core.Sort implements OrderedRel,Prel {
+ private final boolean isRemovable;
/** Creates a DrillSortRel. */
public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
super(cluster, traits, input, collation);
+ isRemovable = true;
}
/** Creates a DrillSortRel with offset and fetch. */
public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
super(cluster, traits, input, collation, offset, fetch);
+ isRemovable = true;
+ }
+
+ /** Creates a DrillSortRel. */
+ public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, boolean isRemovable) {
+ super(cluster, traits, input, collation);
+ this.isRemovable = isRemovable;
}
@Override
@@ -141,4 +151,20 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel {
return this.copy(traits, children.get(0), collationTrait, this.offset, this.fetch);
}
+
+ @Override
+ public RexNode getOffset() {
+ return offset;
+ }
+
+ @Override
+ public RexNode getFetch() {
+ return fetch;
+ }
+
+ @Override
+ public boolean canBeDropped() {
+ return isRemovable;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java
index 3fc86b3d4..bec1b6a20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java
@@ -47,26 +47,25 @@ public class SortPrule extends Prule{
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillSortRel sort = (DrillSortRel) call.rel(0);
+ final DrillSortRel sort = call.rel(0);
final RelNode input = sort.getInput();
// Keep the collation in logical sort. Convert input into a RelNode with 1) this collation, 2) Physical, 3) hash distributed on
DrillDistributionTrait hashDistribution =
- new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(sort)));
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(sort)));
- final RelTraitSet traits = sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashDistribution);
-
- final RelNode convertedInput = convert(input, traits);
+ final RelTraitSet traits = RelTraitSet.createEmpty().plus(Prel.DRILL_PHYSICAL).plus(hashDistribution);
+ SortPrel child = new SortPrel(sort.getCluster(), traits.plus(sort.getCollation()),
+ convert(sort.getInput(), traits), sort.getCollation(), false);
if(isSingleMode(call)){
- call.transformTo(convertedInput);
+ call.transformTo(child);
}else{
- RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput, sort.getCollation());
+ RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), child, sort.getCollation());
call.transformTo(exch); // transform logical "sort" into "SingleMergeExchange".
}
-
}
private List<DistributionField> getDistributionField(DrillSortRel rel) {
@@ -76,7 +75,6 @@ public class SortPrule extends Prule{
DistributionField field = new DistributionField(relField.getFieldIndex());
distFields.add(field);
}
-
return distFields;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index e9414f174..f8f4b9d13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -26,8 +26,11 @@ import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.planner.common.OrderedRel;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -40,7 +43,7 @@ import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-public class TopNPrel extends SinglePrel {
+public class TopNPrel extends SinglePrel implements OrderedRel,Prel {
protected int limit;
protected final RelCollation collation;
@@ -66,6 +69,28 @@ public class TopNPrel extends SinglePrel {
return creator.addMetadata(this, topN);
}
+ @Override
+ public RelCollation getCollation() {
+ return collation;
+ }
+
+ @Override
+ public RexNode getOffset() {
+ return getCluster().getRexBuilder().makeExactLiteral(BigDecimal.ZERO,
+ getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ }
+
+ @Override
+ public RexNode getFetch() {
+ return getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit),
+ getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ }
+
+ @Override
+ public boolean canBeDropped() {
+ return true;
+ }
+
/**
* Cost of doing Top-N is proportional to M log N where M is the total number of
* input rows and N is the limit for Top-N. This makes Top-N preferable to Sort
@@ -93,6 +118,10 @@ public class TopNPrel extends SinglePrel {
.item("limit", limit);
}
+ public int getLimit() {
+ return limit;
+ }
+
@Override
public SelectionVectorMode[] getSupportedEncodings() {
return SelectionVectorMode.NONE_AND_TWO;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index f77a4378e..fa8e69d0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -25,6 +25,11 @@ import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.LimitPrel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.SingleMergeExchangePrel;
+import org.apache.drill.exec.planner.physical.HashToMergeExchangePrel;
import org.apache.calcite.rel.RelNode;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -48,7 +53,18 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
parent.add(prel);
MajorFragmentStat newFrag = new MajorFragmentStat();
newFrag.setRightSideOfLateral(parent.isRightSideOfLateral());
+
+ if (prel instanceof SingleMergeExchangePrel) {
+ newFrag.isSimpleRel = true;
+ }
+
Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
+
+ if (parent.isSimpleRel &&
+ prel instanceof HashToMergeExchangePrel) {
+ return newChild;
+ }
+
if (canRemoveExchange(parent, newFrag)) {
return newChild;
} else {
@@ -129,6 +145,8 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
s.add(p);
}
+ s.setHashDistribution(prel);
+
for(Prel p : prel) {
children.add(p.accept(this, s));
}
@@ -145,6 +163,9 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
private int maxWidth = Integer.MAX_VALUE;
private boolean isMultiSubScan = false;
private boolean rightSideOfLateral = false;
+ //This flag if true signifies that all the Rels thus far
+ //are simple rels with no distribution requirement.
+ private boolean isSimpleRel = false;
public void add(Prel prel) {
maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -162,6 +183,13 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
add(prel);
}
+ public void setHashDistribution(Prel prel) {
+ isSimpleRel = isSimpleRel &&
+ (prel instanceof LimitPrel ||
+ prel instanceof ProjectPrel ||
+ prel instanceof FilterPrel);
+ }
+
public boolean isSingular() {
// do not remove exchanges when a scan has more than one subscans (e.g. SystemTableScan)
if (isMultiSubScan) {