From 2a9e51f8a68872a77e38ee91be107868f60fd334 Mon Sep 17 00:00:00 2001 From: rebase Date: Fri, 16 Mar 2018 14:24:20 -0700 Subject: DRILL-6381: (Part 4) Enhance MapR-DB plugin to support querying secondary indexes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit   1. Implementation of the index descriptor for MapR-DB. 2. MapR-DB specific costing for covering and non-covering indexes. 3. Discovery componenent to discover the indexes available for a MapR-DB table including CAST functional indexes. 4. Utility functions to build a canonical index descriptor. 5. Statistics: fetch and initialize statistcs from MapR-DB for a query condition. Maintain a query-scoped cache for the statistics. Utility functions to compute selectivity. 6. Range Partitioning: partitioning function that takes into account the tablet map to find out where a particular rowkey belongs. 7. Restricted Scan: support doing restricted (i.e skip) scan through lookups on the rowkey. Added a group-scan and record reader for this. 8. MD-3726: Simple Order by queries (without limit) when an index is used are showing regression. 9. MD-3995: Do not pushdown limit 0 past project with CONVERT_FROMJSON 10. MD-4259 : Account for limit during hashcode computation Co-authored-by: Aman Sinha Co-authored-by: chunhui-shi Co-authored-by: Gautam Parai Co-authored-by: Padma Penumarthy Co-authored-by: Hanumath Rao Maduri Conflicts: contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java Fix additional compilation issues. --- .../exec/planner/index/MapRDBIndexDescriptor.java | 222 +++ .../exec/planner/index/MapRDBIndexDiscover.java | 374 +++++ .../drill/exec/planner/index/MapRDBStatistics.java | 689 +++++++- .../exec/store/mapr/db/MapRDBFormatMatcher.java | 42 + .../exec/store/mapr/db/MapRDBFormatPlugin.java | 2 +- .../drill/exec/store/mapr/db/MapRDBGroupScan.java | 12 +- .../store/mapr/db/MapRDBPushLimitIntoScan.java | 50 +- .../store/mapr/db/MapRDBPushProjectIntoScan.java | 3 +- .../store/mapr/db/json/JsonTableGroupScan.java | 198 ++- .../db/json/JsonTableRangePartitionFunction.java | 237 +++ .../mapr/db/json/RestrictedJsonTableGroupScan.java | 184 +++ .../maprdb/tests/index/IndexHintPlanTest.java | 171 ++ .../drill/maprdb/tests/index/IndexPlanTest.java | 1715 ++++++++++++++++++++ .../drill/maprdb/tests/index/LargeTableGen.java | 176 ++ .../maprdb/tests/index/LargeTableGenBase.java | 186 +++ .../drill/maprdb/tests/index/StatisticsTest.java | 115 ++ .../drill/maprdb/tests/index/TableIndexCmd.java | 127 ++ .../drill/exec/planner/common/OrderedRel.java | 53 + .../drill/exec/planner/index/IndexCallContext.java | 4 +- .../planner/index/IndexLogicalPlanCallContext.java | 4 +- .../index/IndexPhysicalPlanCallContext.java | 12 +- .../drill/exec/planner/index/IndexPlanUtils.java | 17 + .../generators/AbstractIndexPlanGenerator.java | 49 +- .../generators/CoveringPlanNoFilterGenerator.java | 16 +- .../generators/NonCoveringIndexPlanGenerator.java | 2 +- .../planner/index/rules/DbScanSortRemovalRule.java | 53 +- .../drill/exec/planner/logical/DrillSortRel.java | 17 +- .../drill/exec/planner/physical/SortPrel.java | 28 +- .../drill/exec/planner/physical/SortPrule.java | 16 +- .../drill/exec/planner/physical/TopNPrel.java | 31 +- .../visitor/ExcessiveExchangeIdentifier.java | 28 + 31 files changed, 4746 insertions(+), 87 deletions(-) create mode 100644 contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java create mode 100644 contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java create mode 100644 contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java create mode 100644 contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java create mode 100644 contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java create mode 100644 contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java create mode 100644 contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java create mode 100644 contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java create mode 100644 contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java create mode 100644 contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java new file mode 100644 index 000000000..a57f5b5ec --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.expr.CloneVisitor; +import org.apache.drill.exec.physical.base.DbGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.cost.DrillCostBase; +import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; +import org.apache.drill.exec.planner.cost.PluginCost; +import org.apache.drill.exec.planner.index.IndexProperties; +import org.apache.drill.exec.store.mapr.PluginConstants; +import org.apache.drill.exec.util.EncodedSchemaPathSet; +import org.apache.drill.common.expression.LogicalExpression; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +public class MapRDBIndexDescriptor extends DrillIndexDescriptor { + + protected final Object desc; + protected final Set allFields; + protected final Set indexedFields; + protected MapRDBFunctionalIndexInfo functionalInfo; + protected PluginCost pluginCost; + + public MapRDBIndexDescriptor(List indexCols, + CollationContext indexCollationContext, + List nonIndexCols, + List rowKeyColumns, + String indexName, + String tableName, + IndexType type, + Object desc, + DbGroupScan scan, + NullDirection nullsDirection) { + super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection); + this.desc = desc; + this.indexedFields = ImmutableSet.copyOf(indexColumns); + this.allFields = new ImmutableSet.Builder() + .add(PluginConstants.DOCUMENT_SCHEMA_PATH) + .addAll(indexColumns) + .addAll(nonIndexColumns) + .build(); + this.pluginCost = scan.getPluginCostModel(); + } + + public Object getOriginalDesc(){ + return desc; + } + + @Override + public boolean isCoveringIndex(List expressions) { + List decodedCols = new DecodePathinExpr().parseExpressions(expressions); + return columnsInIndexFields(decodedCols, allFields); + } + + @Override + public boolean allColumnsIndexed(Collection expressions) { + List decodedCols = new DecodePathinExpr().parseExpressions(expressions); + return columnsInIndexFields(decodedCols, indexedFields); + } + + @Override + public boolean someColumnsIndexed(Collection columns) { + return columnsIndexed(columns, false); + } + + private boolean columnsIndexed(Collection expressions, boolean allColsIndexed) { + List decodedCols = new DecodePathinExpr().parseExpressions(expressions); + if (allColsIndexed) { + return columnsInIndexFields(decodedCols, indexedFields); + } else { + return someColumnsInIndexFields(decodedCols, indexedFields); + } + } + + public FunctionalIndexInfo getFunctionalInfo() { + if (this.functionalInfo == null) { + this.functionalInfo = new MapRDBFunctionalIndexInfo(this); + } + return this.functionalInfo; + } + + /** + * Search through a LogicalExpression, finding all referenced schema paths + * and replace them with decoded paths. + * If one encoded path could be decoded to multiple paths, add these decoded paths to + * the end of returned list of expressions from parseExpressions. + */ + private class DecodePathinExpr extends CloneVisitor { + Set schemaPathSet = Sets.newHashSet(); + + public List parseExpressions(Collection expressions) { + List allCols = Lists.newArrayList(); + Collection decoded; + + for(LogicalExpression expr : expressions) { + LogicalExpression nonDecoded = expr.accept(this, null); + if(nonDecoded != null) { + allCols.add(nonDecoded); + } + } + + if (schemaPathSet.size() > 0) { + decoded = EncodedSchemaPathSet.decode(schemaPathSet); + allCols.addAll(decoded); + } + return allCols; + } + + @Override + public LogicalExpression visitSchemaPath(SchemaPath path, Void value) { + if (EncodedSchemaPathSet.isEncodedSchemaPath(path)) { + // if decoded size is not one, incoming path is encoded path thus there is no cast or other function applied on it, + // since users won't pass in encoded fields, so it is safe to return null, + schemaPathSet.add(path); + return null; + } else { + return path; + } + } + + } + + @Override + public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, + int numProjectedFields, GroupScan primaryTableGroupScan) { + Preconditions.checkArgument(primaryTableGroupScan instanceof DbGroupScan); + DbGroupScan dbGroupScan = (DbGroupScan) primaryTableGroupScan; + DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); + double totalRows = indexProps.getTotalRows(); + double leadRowCount = indexProps.getLeadingSelectivity() * totalRows; + double avgRowSize = indexProps.getAvgRowSize(); + if (indexProps.isCovering()) { // covering index + // int numIndexCols = allFields.size(); + // for disk i/o, all index columns are going to be read into memory + double numBlocks = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan)); + double diskCost = numBlocks * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan); + // cpu cost is cost of filter evaluation for the remainder condition + double cpuCost = 0.0; + if (indexProps.getTotalRemainderFilter() != null) { + cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST; + } + double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost + return costFactory.makeCost(leadRowCount, cpuCost, diskCost, networkCost); + + } else { // non-covering index + // int numIndexCols = allFields.size(); + double numBlocksIndex = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan)); + double diskCostIndex = numBlocksIndex * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan); + // for the primary table join-back each row may belong to a different block, so in general num_blocks = num_rows; + // however, num_blocks cannot exceed the total number of blocks of the table + double totalBlocksPrimary = Math.ceil((dbGroupScan.getColumns().size() * + pluginCost.getAverageColumnSize(primaryTableGroupScan) * totalRows)/ + pluginCost.getBlockSize(primaryTableGroupScan)); + double diskBlocksPrimary = Math.min(totalBlocksPrimary, leadRowCount); + double diskCostPrimary = diskBlocksPrimary * pluginCost.getRandomBlockReadCost(primaryTableGroupScan); + double diskCostTotal = diskCostIndex + diskCostPrimary; + + // cpu cost of remainder condition evaluation over the selected rows + double cpuCost = 0.0; + if (indexProps.getTotalRemainderFilter() != null) { + cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST; + } + double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost + return costFactory.makeCost(leadRowCount, cpuCost, diskCostTotal, networkCost); + } + } + + // Future use once full table scan also includes network cost + private double getNetworkCost(double leadRowCount, int numProjectedFields, boolean isCovering, + GroupScan primaryTableGroupScan) { + if (isCovering) { + // db server will send only the projected columns to the db client for the selected + // number of rows, so network cost is based on the number of actual projected columns + double networkCost = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan); + return networkCost; + } else { + // only the rowkey column is projected from the index and sent over the network + double networkCostIndex = leadRowCount * 1 * pluginCost.getAverageColumnSize(primaryTableGroupScan); + + // after join-back to primary table, all projected columns are sent over the network + double networkCostPrimary = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan); + + return networkCostIndex + networkCostPrimary; + } + + } + + @Override + public PluginCost getPluginCostModel() { + return pluginCost; + } +} diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java new file mode 100644 index 000000000..e1b8a61fc --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.index; + +import com.google.common.collect.Maps; +import com.mapr.db.Admin; +import com.mapr.db.MapRDB; +import com.mapr.db.exceptions.DBException; +import com.mapr.db.index.IndexDesc; +import com.mapr.db.index.IndexDesc.MissingAndNullOrdering; +import com.mapr.db.index.IndexFieldDesc; +import org.antlr.runtime.ANTLRStringStream; +import org.antlr.runtime.CommonTokenStream; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.parser.ExprLexer; +import org.apache.drill.common.expression.parser.ExprParser; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.base.AbstractDbGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatMatcher; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; +import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan; +import org.apache.drill.exec.store.mapr.db.json.FieldPathHelper; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.ojai.FieldPath; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDiscover { + + static final String DEFAULT_STRING_CAST_LEN_STR = "256"; + + public MapRDBIndexDiscover(GroupScan inScan, DrillScanRelBase scanRel) { + super((AbstractDbGroupScan) inScan, scanRel); + } + + public MapRDBIndexDiscover(GroupScan inScan, ScanPrel scanRel) { + super((AbstractDbGroupScan) inScan, scanRel); + } + + @Override + public IndexCollection getTableIndex(String tableName) { + //return getTableIndexFromCommandLine(tableName); + return getTableIndexFromMFS(tableName); + } + + /** + * + * @param tableName + * @return + */ + private IndexCollection getTableIndexFromMFS(String tableName) { + try { + Set idxSet = new HashSet<>(); + Collection indexes = admin().getTableIndexes(new Path(tableName)); + if (indexes.size() == 0 ) { + logger.error("No index returned from Admin.getTableIndexes for table {}", tableName); + return null; + } + for (IndexDesc idx : indexes) { + DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx); + if (hbaseIdx == null) { + //not able to build a valid index based on the index info from MFS + logger.error("Not able to build index for {}", idx.toString()); + continue; + } + idxSet.add(hbaseIdx); + } + if (idxSet.size() == 0) { + logger.error("No index found for table {}.", tableName); + return null; + } + return new DrillIndexCollection(getOriginalScanRel(), idxSet); + } catch (DBException ex) { + logger.error("Could not get table index from File system.", ex); + } + catch(InvalidIndexDefinitionException ex) { + logger.error("Invalid index definition detected.", ex); + } + return null; + } + + FileSelection deriveFSSelection(DrillFileSystem fs, IndexDescriptor idxDesc) throws IOException { + String tableName = idxDesc.getTableName(); + String[] tablePath = tableName.split("/"); + String tableParent = tableName.substring(0, tableName.lastIndexOf("/")); + + return FileSelection.create(fs, tableParent, tablePath[tablePath.length - 1], false); + } + + @Override + public DrillTable getNativeDrillTable(IndexDescriptor idxDescriptor) { + + try { + final AbstractDbGroupScan origScan = getOriginalScan(); + if (!(origScan instanceof MapRDBGroupScan)) { + return null; + } + MapRDBFormatPlugin maprFormatPlugin = ((MapRDBGroupScan) origScan).getFormatPlugin(); + FileSystemPlugin fsPlugin = (FileSystemPlugin) (((MapRDBGroupScan) origScan).getStoragePlugin()); + + DrillFileSystem fs = ImpersonationUtil.createFileSystem(origScan.getUserName(), fsPlugin.getFsConf()); + MapRDBFormatMatcher matcher = (MapRDBFormatMatcher) (maprFormatPlugin.getMatcher()); + FileSelection fsSelection = deriveFSSelection(fs, idxDescriptor); + return matcher.isReadableIndex(fs, fsSelection, fsPlugin, fsPlugin.getName(), + origScan.getUserName(), idxDescriptor); + + } catch (Exception e) { + logger.error("Failed to get native DrillTable.", e); + } + return null; + } + + private SchemaPath fieldName2SchemaPath(String fieldName) { + if (fieldName.contains(":")) { + fieldName = fieldName.split(":")[1]; + } + if (fieldName.contains(".")) { + return FieldPathHelper.fieldPath2SchemaPath(FieldPath.parseFrom(fieldName)); + } + return SchemaPath.getSimplePath(fieldName); + } + + String getDrillTypeStr(String maprdbTypeStr) { + String typeStr = maprdbTypeStr.toUpperCase(); + String[] typeTokens = typeStr.split("[)(]"); + String typeData = DEFAULT_STRING_CAST_LEN_STR; + if(typeTokens.length > 1) { + typeStr = typeTokens[0]; + typeData = typeTokens[1]; + } + switch(typeStr){ + case "STRING": + // set default width since it is not specified + return "VARCHAR("+typeData+")"; + case "LONG": + return "BIGINT"; + case "INT": + case "INTEGER": + return "INT"; + case "FLOAT": + return "FLOAT4"; + case "DOUBLE": + return "FLOAT8"; + case "INTERVAL_YEAR_MONTH": + return "INTERVALYEAR"; + case "INTERVAL_DAY_TIME": + return "INTERVALDAY"; + case "BOOLEAN": + return "BIT"; + case "BINARY": + return "VARBINARY"; + case "ANY": + case "DECIMAL": + return null; + default: return typeStr; + } + + } + + TypeProtos.MajorType getDrillType(String typeStr) { + switch(typeStr){ + case "VARCHAR": + case "CHAR": + case "STRING": + // set default width since it is not specified + return + Types.required(TypeProtos.MinorType.VARCHAR).toBuilder().setWidth( + getOriginalScanRel().getCluster().getTypeFactory().createSqlType(SqlTypeName.VARCHAR).getPrecision()).build(); + case "LONG": + case "BIGINT": + return Types.required(TypeProtos.MinorType.BIGINT); + case "INT": + case "INTEGER": + return Types.required(TypeProtos.MinorType.INT); + case "FLOAT": + return Types.required(TypeProtos.MinorType.FLOAT4); + case "DOUBLE": + return Types.required(TypeProtos.MinorType.FLOAT8); + case "INTERVAL_YEAR_MONTH": + return Types.required(TypeProtos.MinorType.INTERVALYEAR); + case "INTERVAL_DAY_TIME": + return Types.required(TypeProtos.MinorType.INTERVALDAY); + case "BOOLEAN": + return Types.required(TypeProtos.MinorType.BIT); + case "BINARY": + return Types.required(TypeProtos.MinorType.VARBINARY).toBuilder().build(); + case "ANY": + case "DECIMAL": + return null; + default: return Types.required(TypeProtos.MinorType.valueOf(typeStr)); + } + } + + private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException { + //get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax + String castTypeStr = getDrillTypeStr(type); + if(castTypeStr == null) {//no cast + throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field); + } + try { + String castFunc = String.format("cast( %s as %s)", field, castTypeStr); + final ExprLexer lexer = new ExprLexer(new ANTLRStringStream(castFunc)); + final CommonTokenStream tokens = new CommonTokenStream(lexer); + final ExprParser parser = new ExprParser(tokens); + final ExprParser.parse_return ret = parser.parse(); + logger.trace("{}, {}", tokens, ret); + return ret.e; + }catch(Exception ex) { + logger.error("parse failed{}", ex); + } + return null; + } + + private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException { + final String fieldName = desc.getFieldPath().asPathString(); + final String functionDef = desc.getFunctionName(); + if ((functionDef != null)) {//this is a function + String[] tokens = functionDef.split("\\s+"); + if (tokens[0].equalsIgnoreCase("cast")) { + if (tokens.length != 3) { + throw new InvalidIndexDefinitionException("cast function definition not recognized: " + functionDef); + } + LogicalExpression idxExpr = castFunctionSQLSyntax(fieldName, tokens[2]); + if (idxExpr == null) { + throw new InvalidIndexDefinitionException("got null expression for function definition: " + functionDef); + } + return idxExpr; + } else { + throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef); + } + } + //else it is a schemaPath + return fieldName2SchemaPath(fieldName); + } + + private List field2SchemaPath(Collection descCollection) + throws InvalidIndexDefinitionException { + List listSchema = new ArrayList<>(); + for (IndexFieldDesc field : descCollection) { + listSchema.add(getIndexExpression(field)); + } + return listSchema; + } + + private List getFieldCollations(IndexDesc desc, Collection descCollection) { + List fieldCollations = new ArrayList<>(); + int i=0; + for (IndexFieldDesc field : descCollection) { + RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ? + RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ? + RelFieldCollation.Direction.DESCENDING : null); + if (direction != null) { + // assume null direction of NULLS UNSPECIFIED for now until MapR-DB adds that to the APIs + RelFieldCollation.NullDirection nulldir = + desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullFirst ? NullDirection.FIRST : + (desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullLast ? + NullDirection.LAST : NullDirection.UNSPECIFIED); + RelFieldCollation c = new RelFieldCollation(i++, direction, nulldir); + fieldCollations.add(c); + } else { + // if the direction is not present for a field, no need to examine remaining fields + break; + } + } + return fieldCollations; + } + + private CollationContext buildCollationContext(List indexFields, + List indexFieldCollations) { + assert indexFieldCollations.size() <= indexFields.size(); + Map collationMap = Maps.newHashMap(); + for (int i = 0; i < indexFieldCollations.size(); i++) { + collationMap.put(indexFields.get(i), indexFieldCollations.get(i)); + } + CollationContext collationContext = new CollationContext(collationMap, indexFieldCollations); + return collationContext; + } + + private DrillIndexDescriptor buildIndexDescriptor(String tableName, IndexDesc desc) + throws InvalidIndexDefinitionException { + if (desc.isExternal()) { + //XX: not support external index + return null; + } + + IndexDescriptor.IndexType idxType = IndexDescriptor.IndexType.NATIVE_SECONDARY_INDEX; + List indexFields = field2SchemaPath(desc.getIndexedFields()); + List coveringFields = field2SchemaPath(desc.getIncludedFields()); + coveringFields.add(SchemaPath.getSimplePath("_id")); + CollationContext collationContext = null; + if (!desc.isHashed()) { // hash index has no collation property + List indexFieldCollations = getFieldCollations(desc, desc.getIndexedFields()); + collationContext = buildCollationContext(indexFields, indexFieldCollations); + } + + DrillIndexDescriptor idx = new MapRDBIndexDescriptor ( + indexFields, + collationContext, + coveringFields, + null, + desc.getIndexName(), + tableName, + idxType, + desc, + this.getOriginalScan(), + desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullFirst ? NullDirection.FIRST : + (desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullLast ? + NullDirection.LAST : NullDirection.UNSPECIFIED)); + + String storageName = this.getOriginalScan().getStoragePlugin().getName(); + materializeIndex(storageName, idx); + return idx; + } + + @SuppressWarnings("deprecation") + private Admin admin() { + assert getOriginalScan() instanceof MapRDBGroupScan; + + final MapRDBGroupScan dbGroupScan = (MapRDBGroupScan) getOriginalScan(); + final UserGroupInformation currentUser = ImpersonationUtil.createProxyUgi(dbGroupScan.getUserName()); + final Configuration conf = dbGroupScan.getFormatPlugin().getFsConf(); + + final Admin admin; + try { + admin = currentUser.doAs(new PrivilegedExceptionAction() { + public Admin run() throws Exception { + return MapRDB.getAdmin(conf); + } + }); + } catch (Exception e) { + throw new DrillRuntimeException("Failed to get Admin instance for user: " + currentUser.getUserName(), e); + } + return admin; + } +} diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java index 3b8de349a..e129b968b 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java @@ -17,20 +17,49 @@ */ package org.apache.drill.exec.planner.index; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.google.common.collect.Maps; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMdUtil; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Pair; +import org.apache.drill.common.expression.ExpressionStringBuilder; +import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.physical.base.DbGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.store.hbase.HBaseRegexParser; +import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; +import org.apache.hadoop.hbase.HConstants; import org.ojai.store.QueryCondition; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; public class MapRDBStatistics implements Statistics { @@ -260,16 +289,332 @@ public class MapRDBStatistics implements Statistics { } public boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context) { - //XXX to implement for complete secondary index framework + GroupScan scan = IndexPlanUtils.getGroupScan(scanRel); + + PlannerSettings settings = PrelUtil.getPlannerSettings(scanRel.getCluster().getPlanner()); + rowKeyJoinBackIOFactor = settings.getIndexRowKeyJoinCostFactor(); + if (scan instanceof DbGroupScan) { + String conditionAsStr = convertRexToString(condition, scanRel.getRowType()); + if (statsCache.get(conditionAsStr) == null) { + IndexCollection indexes = ((DbGroupScan)scan).getSecondaryIndexCollection(scanRel); + populateStats(condition, indexes, scanRel, context); + logger.info("index_plan_info: initialize: scanRel #{} and groupScan {} got fulltable {}, statsCache: {}, fiStatsCache: {}", + scanRel.getId(), System.identityHashCode(scan), fullTableScanPayload, statsCache, fIStatsCache); + return true; + } + } return false; } + /** + * This function computes statistics when there is no query condition + * @param jTabGrpScan - The current group scan + * @param indexes - The collection of indexes to use for getting statistics + * @param scanRel - The current scanRel + * @param context - The index plan call context + */ + private void populateStatsForNoFilter(JsonTableGroupScan jTabGrpScan, IndexCollection indexes, RelNode scanRel, + IndexCallContext context) { + // Get the stats payload for full table (has total rows in the table) + StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel); + addToCache(null, null, context, ftsPayload, jTabGrpScan, scanRel, scanRel.getRowType()); + addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload); + // Get the stats for all indexes + for (IndexDescriptor idx: indexes) { + StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, idx, scanRel); + StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx); + RelDataType newRowType; + FunctionalIndexInfo functionInfo = idx.getFunctionalInfo(); + if (functionInfo.hasFunctional()) { + newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo); + } else { + newRowType = scanRel.getRowType(); + } + addToCache(null, idx, context, idxPayload, jTabGrpScan, scanRel, newRowType); + addToCache(idx, idxRowSizePayload, ftsPayload); + } + } + + /** + * This is the core statistics function for populating the statistics. The statistics populated correspond to the query + * condition. Based on different types of plans, we would need statistics for different combinations of predicates. Currently, + * we do not have a tree-walker for {@link QueryCondition}. Hence, instead of using the individual predicates stats, to construct + * the stats for the overall predicates, we rely on using the final predicates. Hence, this has a limitation(susceptible) to + * predicate modification post stats generation. Statistics computed/stored are rowcounts, leading rowcounts, average rowsize. + * Rowcounts and leading rowcounts (i.e. corresponding to predicates on the leading index columns) are stored in the statsCache. + * Average rowsizes are stored in the fiStatsCache (FI stands for Filter Independent). + * + * @param condition - The condition for which to obtain statistics + * @param indexes - The collection of indexes to use for getting statistics + * @param scanRel - The current scanRel + * @param context - The index plan call context + */ + private void populateStats(RexNode condition, IndexCollection indexes, DrillScanRelBase scanRel, + IndexCallContext context) { + JsonTableGroupScan jTabGrpScan; + Map firstKeyIdxConditionMap; + Map idxConditionMap; + /* Map containing the individual base conditions of an ANDed/ORed condition and their selectivities. + * This is used to compute the overall selectivity of a complex ANDed/ORed condition using its base + * conditions. Helps prevent over/under estimates and guessed selectivity for ORed predicates. + */ + Map baseConditionMap; + GroupScan grpScan = IndexPlanUtils.getGroupScan(scanRel); + + if ((scanRel instanceof DrillScanRel || scanRel instanceof ScanPrel) && + grpScan instanceof JsonTableGroupScan) { + jTabGrpScan = (JsonTableGroupScan) grpScan; + } else { + logger.debug("Statistics: populateStats exit early - not an instance of JsonTableGroupScan!"); + return; + } + if (condition == null) { + populateStatsForNoFilter(jTabGrpScan, indexes, scanRel, context); + statsAvailable = true; + return; + } + + RexBuilder builder = scanRel.getCluster().getRexBuilder(); + PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster()); + // Get the stats payload for full table (has total rows in the table) + StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel); + + // Get the average row size for table and all indexes + addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload); + if (ftsPayload == null || ftsPayload.getRowCount() == 0) { + return; + } + for (IndexDescriptor idx : indexes) { + StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx); + addToCache(idx, idxRowSizePayload, ftsPayload); + } + + /* Only use indexes with distinct first key */ + IndexCollection distFKeyIndexes = distinctFKeyIndexes(indexes, scanRel); + IndexConditionInfo.Builder infoBuilder = IndexConditionInfo.newBuilder(condition, + distFKeyIndexes, builder, scanRel); + idxConditionMap = infoBuilder.getIndexConditionMap(); + firstKeyIdxConditionMap = infoBuilder.getFirstKeyIndexConditionMap(); + baseConditionMap = new HashMap<>(); + for (IndexDescriptor idx : firstKeyIdxConditionMap.keySet()) { + if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) { + continue; + } + RexNode idxCondition = firstKeyIdxConditionMap.get(idx).indexCondition; + /* Use the pre-processed condition only for getting actual statistic from MapR-DB APIs. Use the + * original condition everywhere else (cache store/lookups) since the RexNode condition and its + * corresponding QueryCondition will be used to get statistics. e.g. we convert LIKE into RANGE + * condition to get statistics. However, statistics are always asked for LIKE and NOT the RANGE + */ + RexNode preProcIdxCondition = convertToStatsCondition(idxCondition, idx, context, scanRel, + Arrays.asList(SqlKind.CAST, SqlKind.LIKE)); + RelDataType newRowType; + FunctionalIndexInfo functionInfo = idx.getFunctionalInfo(); + if (functionInfo.hasFunctional()) { + newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo); + } else { + newRowType = scanRel.getRowType(); + } + + QueryCondition queryCondition = jTabGrpScan.convertToQueryCondition( + convertToLogicalExpression(preProcIdxCondition, newRowType, settings, builder)); + // Cap rows/size at total rows in case of issues with DB APIs + StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(queryCondition, idx, scanRel); + double rowCount = Math.min(idxPayload.getRowCount(), ftsPayload.getRowCount()); + double leadingRowCount = Math.min(idxPayload.getLeadingRowCount(), rowCount); + double avgRowSize = Math.min(idxPayload.getAvgRowSize(), ftsPayload.getAvgRowSize()); + StatisticsPayload payload = new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize); + addToCache(idxCondition, idx, context, payload, jTabGrpScan, scanRel, newRowType); + addBaseConditions(idxCondition, payload, false, baseConditionMap, scanRel.getRowType()); + } + /* Add the row count for index conditions on all indexes. Stats are only computed for leading + * keys but index conditions can be pushed and would be required for access path costing + */ + for (IndexDescriptor idx : idxConditionMap.keySet()) { + if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) { + continue; + } + Map leadingPrefixMap = Maps.newHashMap(); + double rowCount, leadingRowCount, avgRowSize; + RexNode idxCondition = idxConditionMap.get(idx).indexCondition; + // Ignore conditions which always evaluate to true + if (idxCondition.isAlwaysTrue()) { + continue; + } + RexNode idxIncColCondition = idxConditionMap.get(idx).remainderCondition; + RexNode idxRemColCondition = IndexPlanUtils.getLeadingPrefixMap(leadingPrefixMap, idx.getIndexColumns(), infoBuilder, idxCondition); + RexNode idxLeadColCondition = IndexPlanUtils.getLeadingColumnsFilter( + IndexPlanUtils.getLeadingFilters(leadingPrefixMap, idx.getIndexColumns()), builder); + RexNode idxTotRemColCondition = IndexPlanUtils.getTotalRemainderFilter(idxRemColCondition, idxIncColCondition, builder); + RexNode idxTotColCondition = IndexPlanUtils.getTotalFilter(idxLeadColCondition, idxTotRemColCondition, builder); + FunctionalIndexInfo functionInfo = idx.getFunctionalInfo(); + RelDataType newRowType = scanRel.getRowType(); + if (functionInfo.hasFunctional()) { + newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo); + } + /* For non-covering plans we would need the index leading condition */ + rowCount = ftsPayload.getRowCount() * computeSelectivity(idxLeadColCondition, idx, + ftsPayload.getRowCount(), scanRel, baseConditionMap).left; + leadingRowCount = rowCount; + avgRowSize = fIStatsCache.get(buildUniqueIndexIdentifier(idx)).getAvgRowSize(); + addToCache(idxLeadColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize), + jTabGrpScan, scanRel, newRowType); + /* For covering plans we would need the full condition */ + rowCount = ftsPayload.getRowCount() * computeSelectivity(idxTotColCondition, idx, + ftsPayload.getRowCount(), scanRel, baseConditionMap).left; + addToCache(idxTotColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize), + jTabGrpScan, scanRel, newRowType); + /* For intersect plans we would need the index condition */ + rowCount = ftsPayload.getRowCount() * computeSelectivity(idxCondition, idx, + ftsPayload.getRowCount(), scanRel, baseConditionMap).left; + addToCache(idxCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize), + jTabGrpScan, scanRel, newRowType); + /* Add the rowCount for condition on only included columns - no leading columns here! */ + if (idxIncColCondition != null) { + rowCount = ftsPayload.getRowCount() * computeSelectivity(idxIncColCondition, null, + ftsPayload.getRowCount(), scanRel, baseConditionMap).left; + addToCache(idxIncColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, rowCount, avgRowSize), + jTabGrpScan, scanRel, newRowType); + } + } + + // Add the rowCount for the complete condition - based on table + double rowCount = ftsPayload.getRowCount() * computeSelectivity(condition, null, + ftsPayload.getRowCount(), scanRel, baseConditionMap).left; + // Here, ftsLeadingKey rowcount is based on _id predicates + StatisticsPayload ftsLeadingKeyPayload = jTabGrpScan.getFirstKeyEstimatedStats(jTabGrpScan.convertToQueryCondition( + convertToLogicalExpression(condition, scanRel.getRowType(), settings, builder)), null, scanRel); + addToCache(condition, null, null, new MapRDBStatisticsPayload(rowCount, ftsLeadingKeyPayload.getRowCount(), + ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType()); + // Add the full table rows while we are at it - represented by RexNode, QueryCondition. + // No ftsLeadingKey so leadingKeyRowcount = totalRowCount + addToCache(null, null, null, new MapRDBStatisticsPayload(ftsPayload.getRowCount(), ftsPayload.getRowCount(), + ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType()); + // mark stats has been statsAvailable + statsAvailable = true; + } + + private boolean addBaseConditions(RexNode condition, StatisticsPayload payload, boolean redundant, + Map baseConditionMap, RelDataType rowType) { + boolean res = redundant; + if (condition.getKind() == SqlKind.AND) { + for(RexNode pred : RelOptUtil.conjunctions(condition)) { + res = addBaseConditions(pred, payload, res, baseConditionMap, rowType); + } + } else if (condition.getKind() == SqlKind.OR) { + for(RexNode pred : RelOptUtil.disjunctions(condition)) { + res = addBaseConditions(pred, payload, res, baseConditionMap, rowType); + } + } else { + // base condition + String conditionAsStr = convertRexToString(condition, rowType); + if (!redundant) { + baseConditionMap.put(conditionAsStr, payload.getRowCount()); + return true; + } else { + baseConditionMap.put(conditionAsStr, -1.0); + return false; + } + } + return res; + } /* - * Convert the given RexNode to a String representation while also replacing the RexInputRef references - * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode - * expressions may differ in the RexInputRef positions but otherwise the same. - * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State) - */ + * Adds the statistic(row count) to the cache. Also adds the corresponding QueryCondition->RexNode + * condition mapping. + */ + private void addToCache(RexNode condition, IndexDescriptor idx, IndexCallContext context, + StatisticsPayload payload, JsonTableGroupScan jTabGrpScan, RelNode scanRel, RelDataType rowType) { + if (condition != null + && !condition.isAlwaysTrue()) { + RexBuilder builder = scanRel.getCluster().getRexBuilder(); + PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster()); + String conditionAsStr = convertRexToString(condition, scanRel.getRowType()); + if (statsCache.get(conditionAsStr) == null + && payload.getRowCount() != Statistics.ROWCOUNT_UNKNOWN) { + Map payloadMap = new HashMap<>(); + payloadMap.put(buildUniqueIndexIdentifier(idx), payload); + statsCache.put(conditionAsStr, payloadMap); + logger.debug("Statistics: StatsCache:<{}, {}>",conditionAsStr, payload); + // Always pre-process CAST conditions - Otherwise queryCondition will not be generated correctly + RexNode preProcIdxCondition = convertToStatsCondition(condition, idx, context, scanRel, + Arrays.asList(SqlKind.CAST)); + QueryCondition queryCondition = + jTabGrpScan.convertToQueryCondition(convertToLogicalExpression(preProcIdxCondition, + rowType, settings, builder)); + if (queryCondition != null) { + String queryConditionAsStr = queryCondition.toString(); + if (conditionRexNodeMap.get(queryConditionAsStr) == null) { + conditionRexNodeMap.put(queryConditionAsStr, conditionAsStr); + logger.debug("Statistics: QCRNCache:<{}, {}>",queryConditionAsStr, conditionAsStr); + } + } else { + logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr); + logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr); + } + } else { + Map payloadMap = statsCache.get(conditionAsStr); + if (payloadMap != null) { + if (payloadMap.get(buildUniqueIndexIdentifier(idx)) == null) { + payloadMap.put(buildUniqueIndexIdentifier(idx), payload); + + // rowCount for the same condition should be the same on primary table or index, + // let us sync them to the smallest since currently both are over-estimated. + // DO NOT sync the leading rowCount since it is based on the leading condition and not the + // condition (key for this cache). Hence, for the same condition the leading condition and + // consequently the leading rowCount will vary with the index. Syncing them may lead to + // unintended side-effects e.g. given a covering index and full table scan and a condition + // on a non-id field which happens to be the leading key in the index, the leading rowcount + // for the full table scan should be the full table rowcount. Syncing them would incorrectly + // make the full table scan cheaper! If required, syncing should be only done based on + // leading condition and NOT the condition + double minimalRowCount = payload.getRowCount(); + for (StatisticsPayload existing : payloadMap.values()) { + if (existing.getRowCount() < minimalRowCount) { + minimalRowCount = existing.getRowCount(); + } + } + for (StatisticsPayload existing : payloadMap.values()) { + if (existing instanceof MapRDBStatisticsPayload) { + ((MapRDBStatisticsPayload)existing).rowCount = minimalRowCount; + } + } + } else { + logger.debug("Statistics: Filter row count already exists for filter: {}. Skip!", conditionAsStr); + } + } else { + logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr); + } + } + } else if (condition == null && idx == null) { + fullTableScanPayload = new MapRDBStatisticsPayload(payload.getRowCount(), + payload.getLeadingRowCount(), payload.getAvgRowSize()); + logger.debug("Statistics: StatsCache:<{}, {}>","NULL", fullTableScanPayload); + } + } + + private void addToCache(IndexDescriptor idx, StatisticsPayload payload, StatisticsPayload ftsPayload) { + String tabIdxIdentifier = buildUniqueIndexIdentifier(idx); + if (fIStatsCache.get(tabIdxIdentifier) == null) { + if (ftsPayload.getAvgRowSize() >= payload.getAvgRowSize()) { + fIStatsCache.put(tabIdxIdentifier, payload); + logger.debug("Statistics: fIStatsCache:<{}, {}>",tabIdxIdentifier, payload); + } else { + StatisticsPayload cappedPayload = + new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, ftsPayload.getAvgRowSize()); + fIStatsCache.put(tabIdxIdentifier,cappedPayload); + logger.debug("Statistics: fIStatsCache:<{}, {}> (Capped)",tabIdxIdentifier, cappedPayload); + } + } else { + logger.debug("Statistics: Average row size already exists for :<{}, {}>. Skip!",tabIdxIdentifier, payload); + } + } + + /* + * Convert the given RexNode to a String representation while also replacing the RexInputRef references + * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode + * expressions may differ in the RexInputRef positions but otherwise the same. + * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State) + */ private String convertRexToString(RexNode condition, RelDataType rowType) { StringBuilder sb = new StringBuilder(); if (condition == null) { @@ -320,11 +665,11 @@ public class MapRDBStatistics implements Statistics { } /* - * Generate the input reference to column mapping for reference replacement. Please - * look at the usage in convertRexToString() to understand why this mapping is required. - */ + * Generate the input reference to column mapping for reference replacement. Please + * look at the usage in convertRexToString() to understand why this mapping is required. + */ private void getInputRefMapping(RexNode condition, RelDataType rowType, - HashMap mapping) { + HashMap mapping) { if (condition instanceof RexCall) { for (RexNode op : ((RexCall) condition).getOperands()) { getInputRefMapping(op, rowType, mapping); @@ -334,4 +679,328 @@ public class MapRDBStatistics implements Statistics { rowType.getFieldNames().get(condition.hashCode())); } } + + /* + * Additional pre-processing may be required for LIKE/CAST predicates in order to compute statistics. + * e.g. A LIKE predicate should be converted to a RANGE predicate for statistics computation. MapR-DB + * does not yet support computing statistics for LIKE predicates. + */ + private RexNode convertToStatsCondition(RexNode condition, IndexDescriptor index, + IndexCallContext context, RelNode scanRel, ListtypesToProcess) { + RexBuilder builder = scanRel.getCluster().getRexBuilder(); + if (condition.getKind() == SqlKind.AND) { + final List conditions = Lists.newArrayList(); + for(RexNode pred : RelOptUtil.conjunctions(condition)) { + conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess)); + } + return RexUtil.composeConjunction(builder, conditions, false); + } else if (condition.getKind() == SqlKind.OR) { + final List conditions = Lists.newArrayList(); + for(RexNode pred : RelOptUtil.disjunctions(condition)) { + conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess)); + } + return RexUtil.composeDisjunction(builder, conditions, false); + } else if (condition instanceof RexCall) { + // LIKE operator - convert to a RANGE predicate, if possible + if (typesToProcess.contains(SqlKind.LIKE) + && ((RexCall) condition).getOperator().getKind() == SqlKind.LIKE) { + return convertLikeToRange((RexCall)condition, builder); + } else if (typesToProcess.contains(SqlKind.CAST) + && hasCastExpression(condition)) { + return convertCastForFIdx(((RexCall) condition), index, context, scanRel); + } + else { + return condition; + } + } + return condition; + } + + /* + * Determines whether the given expression contains a CAST expression. Assumes that the + * given expression is a valid expression. + * Returns TRUE, if it finds at least one instance of CAST operator. + */ + private boolean hasCastExpression(RexNode condition) { + if (condition instanceof RexCall) { + if (((RexCall) condition).getOperator().getKind() == SqlKind.CAST) { + return true; + } + for (RexNode op : ((RexCall) condition).getOperands()) { + if (hasCastExpression(op)) { + return true; + } + } + } + return false; + } + /* + * CAST expressions are not understood by MAPR-DB as-is. Hence, we must convert them before passing them + * onto MAPR-DB for statistics. Given a functional index, the given expression is converted into an + * expression on the `expression` column of the functional index. + */ + private RexNode convertCastForFIdx(RexCall condition, IndexDescriptor index, + IndexCallContext context, RelNode origScan) { + if (index == null) { + return condition; + } + FunctionalIndexInfo functionInfo = index.getFunctionalInfo(); + if (!functionInfo.hasFunctional()) { + return condition; + } + // The functional index has a different row-type than the original scan. Use the index row-type when + // converting the condition + RelDataType newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(origScan, context, functionInfo); + RexBuilder builder = origScan.getCluster().getRexBuilder(); + return FunctionalIndexHelper.convertConditionForIndexScan(condition, + origScan, newRowType, builder, functionInfo); + } + + /* + * Helper function to perform additional pre-processing for LIKE predicates + */ + private RexNode convertLikeToRange(RexCall condition, RexBuilder builder) { + Preconditions.checkArgument(condition.getOperator().getKind() == SqlKind.LIKE, + "Unable to convertLikeToRange: argument is not a LIKE condition!"); + HBaseRegexParser parser = null; + RexNode arg = null; + RexLiteral pattern = null, escape = null; + String patternStr = null, escapeStr = null; + if (condition.getOperands().size() == 2) { + // No escape character specified + for (RexNode op : condition.getOperands()) { + if (op.getKind() == SqlKind.LITERAL) { + pattern = (RexLiteral) op; + } else { + arg = op; + } + } + // Get the PATTERN strings from the corresponding RexLiteral + if (pattern.getTypeName() == SqlTypeName.DECIMAL || + pattern.getTypeName() == SqlTypeName.INTEGER) { + patternStr = pattern.getValue().toString(); + } else if (pattern.getTypeName() == SqlTypeName.CHAR) { + patternStr = pattern.getValue2().toString(); + } + if (patternStr != null) { + parser = new HBaseRegexParser(patternStr); + } + } else if (condition.getOperands().size() == 3) { + // Escape character specified + for (RexNode op : condition.getOperands()) { + if (op.getKind() == SqlKind.LITERAL) { + // Assume first literal specifies PATTERN and the second literal specifies the ESCAPE char + if (pattern == null) { + pattern = (RexLiteral) op; + } else { + escape = (RexLiteral) op; + } + } else { + arg = op; + } + } + // Get the PATTERN and ESCAPE strings from the corresponding RexLiteral + if (pattern.getTypeName() == SqlTypeName.DECIMAL || + pattern.getTypeName() == SqlTypeName.INTEGER) { + patternStr = pattern.getValue().toString(); + } else if (pattern.getTypeName() == SqlTypeName.CHAR) { + patternStr = pattern.getValue2().toString(); + } + if (escape.getTypeName() == SqlTypeName.DECIMAL || + escape.getTypeName() == SqlTypeName.INTEGER) { + escapeStr = escape.getValue().toString(); + } else if (escape.getTypeName() == SqlTypeName.CHAR) { + escapeStr = escape.getValue2().toString(); + } + if (patternStr != null && escapeStr != null) { + parser = new HBaseRegexParser(patternStr, escapeStr.toCharArray()[0]); + } + } + if (parser != null) { + parser.parse(); + String prefix = parser.getPrefixString(); + /* + * If there is a literal prefix, convert it into an EQUALITY or RANGE predicate + */ + if (prefix != null) { + if (prefix.equals(parser.getLikeString())) { + // No WILDCARD present. This turns the LIKE predicate to EQUALITY predicate + if (arg != null) { + return builder.makeCall(SqlStdOperatorTable.EQUALS, arg, pattern); + } + } else { + // WILDCARD present. This turns the LIKE predicate to RANGE predicate + byte[] startKey = HConstants.EMPTY_START_ROW; + byte[] stopKey = HConstants.EMPTY_END_ROW; + startKey = prefix.getBytes(Charsets.UTF_8); + stopKey = startKey.clone(); + boolean isMaxVal = true; + for (int i = stopKey.length - 1; i >= 0 ; --i) { + int nextByteValue = (0xff & stopKey[i]) + 1; + if (nextByteValue < 0xff) { + stopKey[i] = (byte) nextByteValue; + isMaxVal = false; + break; + } else { + stopKey[i] = 0; + } + } + if (isMaxVal) { + stopKey = HConstants.EMPTY_END_ROW; + } + try { + // TODO: This maybe a potential bug since we assume UTF-8 encoding. However, we follow the + // current DB implementation. See HBaseFilterBuilder.createHBaseScanSpec "like" CASE statement + RexLiteral startKeyLiteral = builder.makeLiteral(new String(startKey, + Charsets.UTF_8.toString())); + RexLiteral stopKeyLiteral = builder.makeLiteral(new String(stopKey, + Charsets.UTF_8.toString())); + if (arg != null) { + RexNode startPred = builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, + arg, startKeyLiteral); + RexNode stopPred = builder.makeCall(SqlStdOperatorTable.LESS_THAN, arg, stopKeyLiteral); + return builder.makeCall(SqlStdOperatorTable.AND, startPred, stopPred); + } + } catch (UnsupportedEncodingException ex) { + // Encoding not supported - Do nothing! + logger.debug("Statistics: convertLikeToRange: Unsupported Encoding Exception -> {}", ex.getMessage()); + } + } + } + } + // Could not convert - return condition as-is. + return condition; + } + + /* + * Compute the selectivity of the given rowCondition. Retrieve the selectivity + * for index conditions from the cache + */ + private Pair computeSelectivity(RexNode condition, IndexDescriptor idx, double totalRows, + RelNode scanRel, Map baseConditionMap) { + double selectivity; + boolean guess = false; + if (totalRows <= 0) { + return new Pair<>(1.0, true); + } + String conditionAsStr = convertRexToString(condition, scanRel.getRowType()); + if (condition.getKind() == SqlKind.AND) { + selectivity = 1.0; + for (RexNode pred : RelOptUtil.conjunctions(condition)) { + Pair selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap); + if (selPayload.left > 0) { + // At least one AND branch is a guess + if (selPayload.right == true) { + guess = true; + } + selectivity *= selPayload.left; + } + } + } else if (condition.getKind() == SqlKind.OR) { + selectivity = 0.0; + for (RexNode pred : RelOptUtil.disjunctions(condition)) { + Pair selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap); + if (selPayload.left > 0.0) { + // At least one OR branch is a guess + if (selPayload.right == true) { + guess = true; + } + selectivity += selPayload.left; + } + } + // Cap selectivity of OR'ed predicates at 0.25 if at least one predicate is a guess (Calcite does the same) + if (guess && selectivity > 0.25) { + selectivity = 0.25; + } + } else { + guess = false; + if (baseConditionMap.get(conditionAsStr) != null) { + double rowCount = baseConditionMap.get(conditionAsStr); + if (rowCount != -1.0) { + selectivity = rowCount / totalRows; + } else { + // Ignore + selectivity = -1.0; + guess = true; + } + } else { + selectivity = RelMdUtil.guessSelectivity(condition); + guess = true; + } + return new Pair<>(selectivity, guess); + } + // Cap selectivity to be between 0.0 and 1.0 + selectivity = Math.min(1.0, selectivity); + selectivity = Math.max(0.0, selectivity); + logger.debug("Statistics: computeSelectivity: Cache MISS: Computed {} -> {}", conditionAsStr, selectivity); + return new Pair<>(selectivity, guess); + } + + /* + * Filters out indexes from the given collection based on the row key of indexes i.e. after filtering + * the given collection would contain only one index for each distinct row key in the collection + */ + private IndexCollection distinctFKeyIndexes(IndexCollection indexes, RelNode scanRel) { + IndexCollection distinctIdxCollection = new DrillIndexCollection(scanRel, new HashSet()); + Iterator iterator = indexes.iterator(); + Map> firstColIndexMap = new HashMap<>(); + while (iterator.hasNext()) { + IndexDescriptor index = iterator.next(); + // If index has columns - the first column is the leading column for the index + if (index.getIndexColumns() != null) { + List idxList; + String firstCol = convertLExToStr(index.getIndexColumns().get(0)); + if (firstColIndexMap.get(firstCol) != null) { + idxList = firstColIndexMap.get(firstCol); + } else { + idxList = new ArrayList<>(); + } + idxList.add(index); + firstColIndexMap.put(firstCol, idxList); + } + } + for (String firstCol : firstColIndexMap.keySet()) { + List indexesSameFirstCol = firstColIndexMap.get(firstCol); + double maxAvgRowSize = -1.0; + IndexDescriptor selectedIdx = null; + for (IndexDescriptor idx : indexesSameFirstCol) { + String tabIdxIdentifier = buildUniqueIndexIdentifier(idx); + double idxRowSize = fIStatsCache.get(tabIdxIdentifier).getAvgRowSize(); + // Prefer index with largest average row-size, breaking ties lexicographically + if (idxRowSize > maxAvgRowSize + || (idxRowSize == maxAvgRowSize + && (selectedIdx == null || idx.getIndexName().compareTo(selectedIdx.getIndexName()) < 0))) { + maxAvgRowSize = idxRowSize; + selectedIdx = idx; + } + } + assert (selectedIdx != null); + distinctIdxCollection.addIndex(selectedIdx); + } + return distinctIdxCollection; + } + + /* + * Returns the String representation for the given Logical Expression + */ + private String convertLExToStr(LogicalExpression lex) { + StringBuilder sb = new StringBuilder(); + ExpressionStringBuilder esb = new ExpressionStringBuilder(); + lex.accept(esb, sb); + return sb.toString(); + } + + /* + * Converts the given RexNode condition into a Drill logical expression. + */ + private LogicalExpression convertToLogicalExpression(RexNode condition, + RelDataType type, PlannerSettings settings, RexBuilder builder) { + LogicalExpression conditionExp; + try { + conditionExp = DrillOptiq.toDrill(new DrillParseContext(settings), type, builder, condition); + } catch (ClassCastException e) { + return null; + } + return conditionExp; + } } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java index ee35a68e4..f982278e7 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java @@ -20,7 +20,10 @@ package org.apache.drill.exec.store.mapr.db; import java.io.IOException; import com.mapr.fs.MapRFileStatus; +import com.mapr.db.index.IndexDesc; import com.mapr.fs.tables.TableProperties; +import org.apache.drill.exec.planner.index.IndexDescriptor; +import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.store.SchemaConfig; @@ -33,6 +36,7 @@ import org.apache.drill.exec.store.mapr.TableFormatMatcher; import org.apache.drill.exec.store.mapr.TableFormatPlugin; import org.apache.drill.exec.store.mapr.db.binary.MapRDBBinaryTable; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; public class MapRDBFormatMatcher extends TableFormatMatcher { @@ -49,6 +53,44 @@ public class MapRDBFormatMatcher extends TableFormatMatcher { .getIsMarlinTable(); } + + /** + * Get an instance of DrillTable for a particular native secondary index + * @param fs + * @param selection + * @param fsPlugin + * @param storageEngineName + * @param userName + * @param secondaryIndexDesc + * @return + * @throws IOException + */ + public DrillTable isReadableIndex(DrillFileSystem fs, + FileSelection selection, FileSystemPlugin fsPlugin, + String storageEngineName, String userName, + IndexDescriptor secondaryIndexDesc) throws IOException { + FileStatus status = selection.getFirstPath(fs); + + if (!isFileReadable(fs, status)) { + return null; + } + + MapRDBFormatPlugin fp = (MapRDBFormatPlugin) getFormatPlugin(); + DrillTable dt = new DynamicDrillTable(fsPlugin, + storageEngineName, + userName, + new FormatSelection(fp.getConfig(), + selection)); + + // TODO: Create groupScan using index descriptor + dt.setGroupScan(fp.getGroupScan(userName, + selection, + null /* columns */, + (IndexDesc) ((MapRDBIndexDescriptor) secondaryIndexDesc).getOriginalDesc())); + + return dt; + } + @Override public DrillTable isReadable(DrillFileSystem fs, FileSelection selection, FileSystemPlugin fsPlugin, diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java index da4829f03..0d1bf04c6 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java @@ -108,7 +108,7 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { public Set getOptimizerRules() { return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT, MapRDBPushProjectIntoScan.PROJECT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_PROJECT, - MapRDBPushLimitIntoScan.LIMIT_ON_SCAN); + MapRDBPushLimitIntoScan.LIMIT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_RKJOIN); } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java index 1e6bcec4a..422a269b0 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java @@ -39,6 +39,9 @@ import org.apache.calcite.rel.RelNode; import org.apache.drill.exec.planner.index.IndexCollection; import org.apache.drill.exec.planner.cost.PluginCost; +import org.apache.drill.exec.planner.index.IndexDiscover; +import org.apache.drill.exec.planner.index.IndexDiscoverFactory; +import org.apache.drill.exec.planner.index.MapRDBIndexDiscover; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.AbstractStoragePlugin; @@ -307,8 +310,13 @@ public abstract class MapRDBGroupScan extends AbstractDbGroupScan { @Override public IndexCollection getSecondaryIndexCollection(RelNode scanRel) { - //XXX to implement for complete secondary index framework - return null; + IndexDiscover discover = IndexDiscoverFactory.getIndexDiscover( + getStorageConfig(), this, scanRel, MapRDBIndexDiscover.class); + + if (discover == null) { + logger.error("Null IndexDiscover was found for {}!", scanRel); + } + return discover.getTableIndex(getTableName()); } @JsonIgnore diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java index 1f4b8c9a0..a26bc808c 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java @@ -26,11 +26,13 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.LimitPrel; import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.RowKeyJoinPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.hbase.HBaseScanSpec; import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; +import org.apache.drill.exec.store.mapr.db.json.RestrictedJsonTableGroupScan; public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushLimitIntoScan.class); @@ -59,8 +61,8 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule if (scan.getGroupScan().supportsLimitPushdown() && !limit.isPushDown() && limit.getFetch() != null) { if ((scan.getGroupScan() instanceof JsonTableGroupScan - && ((JsonTableGroupScan) scan.getGroupScan()).isIndexScan()) ) { - //|| (scan.getGroupScan() instanceof RestrictedJsonTableGroupScan)) { + && ((JsonTableGroupScan) scan.getGroupScan()).isIndexScan()) + || (scan.getGroupScan() instanceof RestrictedJsonTableGroupScan)) { return true; } } @@ -111,6 +113,26 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule } }; + public static final StoragePluginOptimizerRule LIMIT_ON_RKJOIN = + new MapRDBPushLimitIntoScan(RelOptHelper.some(LimitPrel.class, RelOptHelper.any(RowKeyJoinPrel.class)), + "MapRDBPushLimitIntoScan:Limit_On_RKJoin") { + + @Override + public void onMatch(RelOptRuleCall call) { + final RowKeyJoinPrel join = call.rel(1); + final LimitPrel limit = call.rel(0); + doPushLimitIntoRowKeyJoin(call, limit, null, join); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final LimitPrel limit = call.rel(0); + // We do not fire this rule if fetch() is null (indicating we have to fetch all the + // remaining rows starting from offset. + return !limit.isPushDown() && limit.getFetch() != null; + } + }; + protected void doPushLimitIntoGroupScan(RelOptRuleCall call, LimitPrel limit, final ProjectPrel project, ScanPrel scan, GroupScan groupScan) { try { @@ -153,5 +175,29 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule } return null; } + + protected void doPushLimitIntoRowKeyJoin(RelOptRuleCall call, + LimitPrel limit, final ProjectPrel project, RowKeyJoinPrel join) { + final RelNode newChild; + try { + RelNode left = join.getLeft(); + RelNode right = join.getRight(); + final RelNode limitOnLeft = new LimitPrel(left.getCluster(), left.getTraitSet(), left, + limit.getOffset(), limit.getFetch()); + RowKeyJoinPrel newJoin = new RowKeyJoinPrel(join.getCluster(), join.getTraitSet(), limitOnLeft, right, + join.getCondition(), join.getJoinType()); + if (project != null) { + final ProjectPrel newProject = new ProjectPrel(project.getCluster(), project.getTraitSet(), newJoin, + project.getProjects(), project.getRowType()); + newChild = newProject; + } else { + newChild = newJoin; + } + call.transformTo(newChild); + logger.debug("pushLimitIntoRowKeyJoin: Pushed limit on left side of Join " + join.toString()); + } catch (Exception e) { + logger.warn("pushLimitIntoRowKeyJoin: Exception while trying limit pushdown!", e); + } + } } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java index 2eb84e7ef..521586808 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java @@ -30,8 +30,9 @@ import org.apache.calcite.rex.RexNode; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; +import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo; import org.apache.drill.exec.planner.physical.Prel; -import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.store.StoragePluginOptimizerRule; diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java index a269256cf..b54526232 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java @@ -44,10 +44,14 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.planner.index.IndexDescriptor; +import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor; +import org.apache.drill.exec.planner.index.MapRDBStatisticsPayload; import org.apache.drill.exec.planner.index.Statistics; import org.apache.drill.exec.planner.index.MapRDBStatistics; import org.apache.drill.exec.planner.cost.PluginCost; import org.apache.drill.exec.planner.physical.PartitionFunction; +import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemPlugin; @@ -296,6 +300,9 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca @Override public ScanStats getScanStats() { + if (isIndexScan()) { + return indexScanStats(); + } return fullTableScanStats(); } @@ -359,6 +366,57 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca } } + private ScanStats indexScanStats() { + if (!this.getIndexHint().equals("") && + this.getIndexHint().equals(getIndexDesc().getIndexName())) { + logger.debug("JsonIndexGroupScan:{} forcing index {} by making tiny cost", this, this.getIndexHint()); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 1,1, 0); + } + + int totalColNum = STAR_COLS; + PluginCost pluginCostModel = formatPlugin.getPluginCostModel(); + final int avgColumnSize = pluginCostModel.getAverageColumnSize(this); + boolean filterPushed = (scanSpec.getSerializedFilter() != null); + if(scanSpec != null && scanSpec.getIndexDesc() != null) { + totalColNum = scanSpec.getIndexDesc().getIncludedFields().size() + + scanSpec.getIndexDesc().getIndexedFields().size() + 1; + } + int numColumns = (columns == null || columns.isEmpty()) ? totalColNum: columns.size(); + String idxIdentifier = stats.buildUniqueIndexIdentifier(scanSpec.getIndexDesc().getPrimaryTablePath(), + scanSpec.getIndexDesc().getIndexName()); + double rowCount = stats.getRowCount(scanSpec.getCondition(), idxIdentifier); + // rowcount based on index leading columns predicate. + double leadingRowCount = stats.getLeadingRowCount(scanSpec.getCondition(), idxIdentifier); + double avgRowSize = stats.getAvgRowSize(idxIdentifier, false); + // If UNKNOWN, use defaults + if (rowCount == ROWCOUNT_UNKNOWN || rowCount == 0) { + rowCount = (filterPushed ? 0.0005f : 0.001f) * fullTableRowCount / scanSpec.getIndexDesc().getIndexedFields().size(); + } + // If limit pushdown has occurred - factor it in the rowcount + if (this.maxRecordsToRead > 0) { + rowCount = Math.min(rowCount, this.maxRecordsToRead); + } + if (leadingRowCount == ROWCOUNT_UNKNOWN || leadingRowCount == 0) { + leadingRowCount = rowCount; + } + if (avgRowSize == AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) { + avgRowSize = avgColumnSize * numColumns; + } + double rowsFromDisk = leadingRowCount; + if (!filterPushed) { + // both start and stop rows are empty, indicating this is a full scan so + // use the total rows for calculating disk i/o + rowsFromDisk = fullTableRowCount; + } + double totalBlocks = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this)); + double numBlocks = Math.ceil(((avgRowSize * rowsFromDisk)/pluginCostModel.getBlockSize(this))); + numBlocks = Math.min(totalBlocks, numBlocks); + double diskCost = numBlocks * pluginCostModel.getSequentialBlockReadCost(this); + logger.debug("index_plan_info: JsonIndexGroupScan:{} - indexName:{}: rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, rowsFromDisk {}, diskCost:{}", + System.identityHashCode(this), scanSpec.getIndexDesc().getIndexName(), rowCount, avgRowSize, numBlocks, totalBlocks, rowsFromDisk, diskCost); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost); + } + @Override @JsonIgnore public PhysicalOperator getNewWithChildren(List children) { @@ -412,6 +470,142 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca return true; } + + @Override + public RestrictedJsonTableGroupScan getRestrictedScan(List columns) { + RestrictedJsonTableGroupScan newScan = + new RestrictedJsonTableGroupScan(this.getUserName(), + (FileSystemPlugin) this.getStoragePlugin(), + this.getFormatPlugin(), + this.getScanSpec(), + this.getColumns(), + this.getStatistics()); + newScan.columns = columns; + return newScan; + } + + /** + * Get the estimated average rowsize. DO NOT call this API directly. + * Call the stats API instead which modifies the counts based on preference options. + * @param index, to use for generating the estimate + * @return row count post filtering + */ + public MapRDBStatisticsPayload getAverageRowSizeStats(IndexDescriptor index) { + IndexDesc indexDesc = null; + double avgRowSize = AVG_ROWSIZE_UNKNOWN; + + if (index != null) { + indexDesc = (IndexDesc)((MapRDBIndexDescriptor)index).getOriginalDesc(); + } + // If no index is specified, get it from the primary table + if(indexDesc == null && scanSpec.isSecondaryIndex()) { + throw new UnsupportedOperationException("getAverageRowSizeStats should be invoked on primary table"); + } + + // Get the index table or primary table and use the DB API to get the estimated number of rows. For size estimates, + // we assume that all the columns would be read from the disk. + final Table table = this.formatPlugin.getJsonTableCache().getTable(scanSpec.getTableName(), indexDesc, getUserName()); + + if (table != null) { + final MetaTable metaTable = table.getMetaTable(); + if (metaTable != null) { + avgRowSize = metaTable.getAverageRowSize(); + } + } + logger.debug("index_plan_info: getEstimatedRowCount obtained from DB Client for {}: indexName: {}, indexInfo: {}, " + + "avgRowSize: {}, estimatedSize {}", this, (indexDesc == null ? "null" : indexDesc.getIndexName()), + (indexDesc == null ? "null" : indexDesc.getIndexInfo()), avgRowSize); + return new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, avgRowSize); + } + + /** + * Get the estimated statistics after applying the {@link RexNode} condition. DO NOT call this API directly. + * Call the stats API instead which modifies the counts based on preference options. + * @param condition, filter to apply + * @param index, to use for generating the estimate + * @return row count post filtering + */ + public MapRDBStatisticsPayload getFirstKeyEstimatedStats(QueryCondition condition, IndexDescriptor index, RelNode scanRel) { + IndexDesc indexDesc = null; + if (index != null) { + indexDesc = (IndexDesc)((MapRDBIndexDescriptor)index).getOriginalDesc(); + } + return getFirstKeyEstimatedStatsInternal(condition, indexDesc, scanRel); + } + + /** + * Get the estimated statistics after applying the {@link QueryCondition} condition + * @param condition, filter to apply + * @param index, to use for generating the estimate + * @return {@link MapRDBStatisticsPayload} statistics + */ + private MapRDBStatisticsPayload getFirstKeyEstimatedStatsInternal(QueryCondition condition, IndexDesc index, RelNode scanRel) { + // double totalRows = getRowCount(null, scanPrel); + + // If no index is specified, get it from the primary table + if(index == null && scanSpec.isSecondaryIndex()) { + // If stats not cached get it from the table. + //table = MapRDB.getTable(scanSpec.getPrimaryTablePath()); + throw new UnsupportedOperationException("getFirstKeyEstimatedStats should be invoked on primary table"); + } + + // Get the index table or primary table and use the DB API to get the estimated number of rows. For size estimates, + // we assume that all the columns would be read from the disk. + final Table table = this.formatPlugin.getJsonTableCache().getTable(scanSpec.getTableName(), index, getUserName()); + + if (table != null) { + // Factor reflecting confidence in the DB estimates. If a table has few tablets, the tablet-level stats + // might be off. The decay scalingFactor will reduce estimates when one tablet represents a significant percentage + // of the entire table. + double scalingFactor = 1.0; + boolean isFullScan = false; + final MetaTable metaTable = table.getMetaTable(); + com.mapr.db.scan.ScanStats stats = (condition == null) + ? metaTable.getScanStats() : metaTable.getScanStats(condition); + if (index == null && condition != null) { + // Given table condition might not be on leading column. Check if the rowcount matches full table rows. + // In that case no leading key present or does not prune enough. Treat it like so. + com.mapr.db.scan.ScanStats noConditionPTabStats = metaTable.getScanStats(); + if (stats.getEstimatedNumRows() == noConditionPTabStats.getEstimatedNumRows()) { + isFullScan = true; + } + } + // Use the scalingFactor only when a condition filters out rows from the table. If no condition is present, all rows + // should be selected. So the scalingFactor should not reduce the returned rows + if (condition != null && !isFullScan) { + double forcedScalingFactor = PrelUtil.getSettings(scanRel.getCluster()).getIndexStatsRowCountScalingFactor(); + // Accuracy is defined as 1 - Error where Error = # Boundary Tablets (2) / # Total Matching Tablets. + // For 2 or less matching tablets, the error is assumed to be 50%. The Sqrt gives the decaying scalingFactor + if (stats.getTabletCount() > 2) { + double accuracy = 1.0 - (2.0/stats.getTabletCount()); + scalingFactor = Math.min(1.0, 1.0 / Math.sqrt(1.0 / accuracy)); + } else { + scalingFactor = 0.5; + } + if (forcedScalingFactor < 1.0 + && metaTable.getScanStats().getTabletCount() < PluginConstants.JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT) { + // User forced confidence scalingFactor for small tables (assumed as less than 32 tablets (~512 MB)) + scalingFactor = forcedScalingFactor; + } + } + logger.info("index_plan_info: getEstimatedRowCount obtained from DB Client for {}: indexName: {}, indexInfo: {}, " + + "condition: {} rowCount: {}, avgRowSize: {}, estimatedSize {}, tabletCount {}, totalTabletCount {}, " + + "scalingFactor {}", + this, (index == null ? "null" : index.getIndexName()), (index == null ? "null" : index.getIndexInfo()), + (condition == null ? "null" : condition.toString()), stats.getEstimatedNumRows(), + (stats.getEstimatedNumRows() == 0 ? 0 : stats.getEstimatedSize()/stats.getEstimatedNumRows()), + stats.getEstimatedSize(), stats.getTabletCount(), metaTable.getScanStats().getTabletCount(), scalingFactor); + return new MapRDBStatisticsPayload(scalingFactor * stats.getEstimatedNumRows(), scalingFactor * stats.getEstimatedNumRows(), + ((stats.getEstimatedNumRows() == 0 ? 0 : (double)stats.getEstimatedSize()/stats.getEstimatedNumRows()))); + } else { + logger.info("index_plan_info: getEstimatedRowCount: {} indexName: {}, indexInfo: {}, " + + "condition: {} rowCount: UNKNOWN, avgRowSize: UNKNOWN", this, (index == null ? "null" : index.getIndexName()), + (index == null ? "null" : index.getIndexInfo()), (condition == null ? "null" : condition.toString())); + return new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, AVG_ROWSIZE_UNKNOWN); + } + } + + /** * Set the row count resulting from applying the {@link RexNode} condition. Forced row counts will take * precedence over stats row counts @@ -518,9 +712,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca @Override @JsonIgnore public PartitionFunction getRangePartitionFunction(List refList) { - - return null; - //new JsonTableRangePartitionFunction(refList, scanSpec.getTableName(), this.getUserName(), this.getFormatPlugin()); + return new JsonTableRangePartitionFunction(refList, scanSpec.getTableName(), this.getUserName(), this.getFormatPlugin()); } /** diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java new file mode 100644 index 000000000..acaa6cacb --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db.json; + +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.exec.planner.physical.AbstractRangePartitionFunction; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; +import org.apache.drill.exec.vector.ValueVector; +import org.ojai.store.QueryCondition; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.mapr.db.Table; +import com.mapr.db.impl.ConditionImpl; +import com.mapr.db.impl.IdCodec; +import com.mapr.db.impl.ConditionNode.RowkeyRange; +import com.mapr.db.scan.ScanRange; +import com.mapr.fs.jni.MapRConstants; +import com.mapr.org.apache.hadoop.hbase.util.Bytes; + +@JsonTypeName("jsontable-range-partition-function") +public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunction { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableRangePartitionFunction.class); + + @JsonProperty("refList") + protected List refList; + + @JsonProperty("tableName") + protected String tableName; + + @JsonIgnore + protected String userName; + + @JsonIgnore + protected ValueVector partitionKeyVector = null; + + // List of start keys of the scan ranges for the table. + @JsonProperty + protected List startKeys = null; + + // List of stop keys of the scan ranges for the table. + @JsonProperty + protected List stopKeys = null; + + @JsonCreator + public JsonTableRangePartitionFunction( + @JsonProperty("refList") List refList, + @JsonProperty("tableName") String tableName, + @JsonProperty("startKeys") List startKeys, + @JsonProperty("stopKeys") List stopKeys) { + this.refList = refList; + this.tableName = tableName; + this.startKeys = startKeys; + this.stopKeys = stopKeys; + } + + public JsonTableRangePartitionFunction(List refList, + String tableName, String userName, MapRDBFormatPlugin formatPlugin) { + this.refList = refList; + this.tableName = tableName; + this.userName = userName; + initialize(formatPlugin); + } + + @JsonProperty("refList") + @Override + public List getPartitionRefList() { + return refList; + } + + @Override + public void setup(List> partitionKeys) { + if (partitionKeys.size() != 1) { + throw new UnsupportedOperationException( + "Range partitioning function supports exactly one partition column; encountered " + partitionKeys.size()); + } + + VectorWrapper v = partitionKeys.get(0); + + partitionKeyVector = v.getValueVector(); + + Preconditions.checkArgument(partitionKeyVector != null, "Found null partitionKeVector.") ; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof JsonTableRangePartitionFunction) { + JsonTableRangePartitionFunction rpf = (JsonTableRangePartitionFunction) obj; + List thisPartRefList = this.getPartitionRefList(); + List otherPartRefList = rpf.getPartitionRefList(); + if (thisPartRefList.size() != otherPartRefList.size()) { + return false; + } + for (int refIdx=0; refIdx= 0 || + Bytes.equals(start, MapRConstants.EMPTY_BYTE_ARRAY) + ) && + (Bytes.compareTo(encodedKey, stop) < 0 || + Bytes.equals(stop, MapRConstants.EMPTY_BYTE_ARRAY) + ) + ) { + tabletId = mid; + break; + } + + if (Bytes.compareTo(encodedKey, start) >= 0) { + // key is greater, ignore left side + low = mid + 1; + } else { + // key is smaller, ignore right side + high = mid - 1; + } + } + + if (tabletId < 0) { + tabletId = 0; + logger.warn("Key {} was not found in any of the start-stop ranges. Using default tabletId {}", key, tabletId); + } + + int partitionId = tabletId % numPartitions; + + logger.trace("Key = {}, tablet id = {}, partition id = {}", key, tabletId, partitionId); + + return partitionId; + } + + + public void initialize(MapRDBFormatPlugin plugin) { + + // get the table handle from the table cache + Table table = plugin.getJsonTableCache().getTable(tableName, userName); + + // Set the condition to null such that all scan ranges are retrieved for the primary table. + // The reason is the row keys could typically belong to any one of the tablets of the table, so + // there is no use trying to get only limited set of scan ranges. + // NOTE: here we use the restrictedScanRangeSizeMB because the range partitioning should be parallelized + // based on the number of scan ranges on the RestrictedJsonTableGroupScan. + List ranges = table.getMetaTable().getScanRanges(plugin.getRestrictedScanRangeSizeMB()); + + this.startKeys = Lists.newArrayList(); + this.stopKeys = Lists.newArrayList(); + + logger.debug("Num scan ranges for table {} = {}", table.getName(), ranges.size()); + + int count = 0; + for (ScanRange r : ranges) { + QueryCondition condition = r.getCondition(); + List rowkeyRanges = ((ConditionImpl)condition).getRowkeyRanges(); + byte[] start = rowkeyRanges.get(0).getStartRow(); + byte[] stop = rowkeyRanges.get(rowkeyRanges.size() - 1).getStopRow(); + + Preconditions.checkNotNull(start, String.format("Encountered a null start key at position %d for scan range condition %s.", count, condition.toString())); + Preconditions.checkNotNull(stop, String.format("Encountered a null stop key at position %d for scan range condition %s.", count, condition.toString())); + + if (count > 0) { + // after the first start key, rest should be non-empty + Preconditions.checkState( !(Bytes.equals(start, MapRConstants.EMPTY_BYTE_ARRAY)), String.format("Encountered an empty start key at position %d", count)); + } + + if (count < ranges.size() - 1) { + // except for the last stop key, rest should be non-empty + Preconditions.checkState( !(Bytes.equals(stop, MapRConstants.EMPTY_BYTE_ARRAY)), String.format("Encountered an empty stop key at position %d", count)); + } + + startKeys.add(start); + stopKeys.add(stop); + count++; + } + + // check validity; only need to check one of the lists since they are populated together + Preconditions.checkArgument(startKeys.size() > 0, "Found empty list of start/stopKeys."); + + Preconditions.checkState(startKeys.size() == ranges.size(), + String.format("Mismatch between the lengths: num start keys = %d, num scan ranges = %d", startKeys.size(), ranges.size())); + + Preconditions.checkState(stopKeys.size() == ranges.size(), + String.format("Mismatch between the lengths: num stop keys = %d, num scan ranges = %d", stopKeys.size(), ranges.size())); + + } + +} diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java new file mode 100644 index 000000000..48ad96d81 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db.json; + +import java.util.List; +import java.util.NavigableMap; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.planner.index.MapRDBStatistics; +import org.apache.drill.exec.planner.cost.PluginCost; +import org.apache.drill.exec.planner.index.Statistics; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; +import org.apache.drill.exec.store.mapr.db.MapRDBSubScan; +import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; +import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScan; +import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec; +import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo; + +/** + * A RestrictedJsonTableGroupScan encapsulates (along with a subscan) the functionality + * for doing restricted (i.e skip) scan rather than sequential scan. The skipping is based + * on a supplied set of row keys (primary keys) from a join operator. + */ +@JsonTypeName("restricted-json-scan") +public class RestrictedJsonTableGroupScan extends JsonTableGroupScan { + + @JsonCreator + public RestrictedJsonTableGroupScan(@JsonProperty("userName") String userName, + @JsonProperty("storage") FileSystemPlugin storagePlugin, + @JsonProperty("format") MapRDBFormatPlugin formatPlugin, + @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */ + @JsonProperty("columns") List columns, + @JsonProperty("")MapRDBStatistics statistics) { + super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics); + } + + // TODO: this method needs to be fully implemented + protected RestrictedMapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) { + JsonScanSpec spec = scanSpec; + RestrictedMapRDBSubScanSpec subScanSpec = + new RestrictedMapRDBSubScanSpec( + spec.getTableName(), + getRegionsToScan().get(tfi), spec.getSerializedFilter(), getUserName()); + return subScanSpec; + } + + protected NavigableMap getRegionsToScan() { + return getRegionsToScan(formatPlugin.getRestrictedScanRangeSizeMB()); + } + + @Override + public MapRDBSubScan getSpecificScan(int minorFragmentId) { + assert minorFragmentId < endpointFragmentMapping.size() : String.format( + "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), + minorFragmentId); + RestrictedMapRDBSubScan subscan = + new RestrictedMapRDBSubScan(getUserName(), formatPlugin, + getEndPointFragmentMapping(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON); + + return subscan; + } + + private List getEndPointFragmentMapping(int minorFragmentId) { + List restrictedSubScanSpecList = Lists.newArrayList(); + List subScanSpecList = endpointFragmentMapping.get(minorFragmentId); + for(MapRDBSubScanSpec s : subScanSpecList) { + restrictedSubScanSpecList.add((RestrictedMapRDBSubScanSpec) s); + } + return restrictedSubScanSpecList; + } + + /** + * Private constructor, used for cloning. + * @param that The RestrictedJsonTableGroupScan to clone + */ + private RestrictedJsonTableGroupScan(RestrictedJsonTableGroupScan that) { + super(that); + } + + @Override + public GroupScan clone(JsonScanSpec scanSpec) { + RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this); + newScan.scanSpec = scanSpec; + newScan.resetRegionsToScan(); // resetting will force recalculation + return newScan; + } + + @Override + public GroupScan clone(List columns) { + RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this); + newScan.columns = columns; + return newScan; + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + return new RestrictedJsonTableGroupScan(this); + } + + @Override + public ScanStats getScanStats() { + //TODO: ideally here we should use the rowcount from index scan, and multiply a factor of restricted scan + double rowCount; + PluginCost pluginCostModel = formatPlugin.getPluginCostModel(); + final int avgColumnSize = pluginCostModel.getAverageColumnSize(this); + int numColumns = (columns == null || columns.isEmpty()) ? STAR_COLS: columns.size(); + // Get the restricted group scan row count - same as the right side index rows + rowCount = computeRestrictedScanRowcount(); + // Get the average row size of the primary table + double avgRowSize = stats.getAvgRowSize(null, true); + if (avgRowSize == Statistics.AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) { + avgRowSize = avgColumnSize * numColumns; + } + // restricted scan does random lookups and each row may belong to a different block, with the number + // of blocks upper bounded by the total num blocks in the primary table + double totalBlocksPrimary = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this)); + double numBlocks = Math.min(totalBlocksPrimary, rowCount); + double diskCost = numBlocks * pluginCostModel.getRandomBlockReadCost(this); + // For non-covering plans, the dominating cost would be of the join back. Reduce it using the factor + // for biasing towards non-covering plans. + diskCost *= stats.getRowKeyJoinBackIOFactor(); + logger.debug("RestrictedJsonGroupScan:{} rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, diskCost:{}", + System.identityHashCode(this), rowCount, avgRowSize, numBlocks, totalBlocksPrimary, diskCost); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost); + } + + private double computeRestrictedScanRowcount() { + double rowCount = Statistics.ROWCOUNT_UNKNOWN; + // The rowcount should be the same as the build side which was FORCED by putting it in forcedRowCountMap + if (forcedRowCountMap.get(null) != null) { + rowCount = forcedRowCountMap.get(null); + } + // If limit pushdown has occurred - factor it in the rowcount + if (rowCount == Statistics.ROWCOUNT_UNKNOWN || rowCount == 0) { + rowCount = (0.001f * fullTableRowCount); + } + if (this.maxRecordsToRead > 0) { + rowCount = Math.min(rowCount, this.maxRecordsToRead); + } + return rowCount; + } + + @Override + public boolean isRestrictedScan() { + return true; + } + + @Override + public String toString() { + return "RestrictedJsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns + + ", rowcount=" + computeRestrictedScanRowcount() + + (maxRecordsToRead>0? ", limit=" + maxRecordsToRead : "") + + (getMaxParallelizationWidth()>0? ", maxwidth=" + getMaxParallelizationWidth() : "") + "]"; + } +} diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java new file mode 100644 index 000000000..9ac27b47c --- /dev/null +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java @@ -0,0 +1,171 @@ +package com.mapr.drill.maprdb.tests.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.mapr.drill.maprdb.tests.json.BaseJsonTest; +import com.mapr.tests.annotations.ClusterTest; +import org.apache.drill.PlanTestBase; +import org.junit.experimental.categories.Category; +import org.junit.runners.MethodSorters; +import org.junit.FixMethodOrder; +import org.junit.Test; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@Category(ClusterTest.class) +public class IndexHintPlanTest extends IndexPlanTest { + + private static final String defaultHavingIndexPlan = "alter session reset `planner.enable_index_planning`"; + + @Test + // A simple testcase with index hint on a table which has only one index for a column t.id.ssn; + // This should pick i_ssn index for the query + public void testSimpleIndexHint() throws Exception { + String hintquery = "SELECT t.id.ssn as ssn FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_ssn')) as t " + + " where t.id.ssn = '100007423'"; + + String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t where t.id.ssn = '100007423'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(hintquery, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{"RowKeyJoin"} + ); + + //default plan picked by optimizer. + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{"RowKeyJoin"} + ); + testBuilder() + .sqlQuery(hintquery) + .ordered() + .baselineColumns("ssn").baselineValues("100007423") + .go(); + + } + + + @Test + // A testcase where there are multiple index to pick from but only picks the index provided as hint. + // A valid index is provided as hint and it is useful during the index selection process, hence it will be selected. + public void testHintCaseWithMultipleIndexes_1() throws Exception { + + String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_city')) as t " + + " where t.address.state = 'pc'"; + + String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(hintquery, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_city"}, + new String[]{"RowKeyJoin"} + ); + + //default plan picked by optimizer + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"}, + new String[]{"RowKeyJoin"} + ); + + return; + } + + @Test + // A testcase where there are multiple index to pick from but only picks the index provided as hint. + // A valid index is provided as hint and it is useful during the index selection process, hence it will be selected. + // Difference between this testcase and the one before this is that index name is switched. This shows that index hint makes sure to select only one + // valid index specified as hint. + public void testHintCaseWithMultipleIndexes_2() throws Exception { + + String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_age_phone')) as t " + + " where t.address.state = 'pc'"; + + String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(hintquery, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{"RowKeyJoin"} + ); + + //default plan picked by query optimizer. + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"}, + new String[]{"RowKeyJoin"} + ); + + return; + } + + //Negative cases + + @Test + // A testcase where there are multiple index to pick from but none of them equals to the index provided as hint (index hint is wrong). + //In this index is not at all present in the table hence it falls back to the case where the index itself is not given. + //Hence here one of the i_state_city or i_state_age_lic will be selected depending upon the cost. + public void testWithMultipleIndexesButNoIndexWithHint() throws Exception { + + String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_and_city')) as t " + + " where t.address.state = 'pc'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(hintquery, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"}, + new String[]{"RowKeyJoin"} + ); + + return; + } + + @Test + // A testcase where there are multiple index to pick from but none of them equals to the index provided as hint and the hint index is valid. + // Here the index name given is valid (i.e it is present in the table) but it is not useful. + // This case falls back to full table scan. + public void testWithMultipleIndexesButNoIndexWithValidHint() throws Exception { + + String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_ssn')) as t " + + " where t.address.state = 'pc'"; + + String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(hintquery, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary"}, + new String[]{"RowKeyJoin", "indexName="} + ); + + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"}, + new String[]{"RowKeyJoin"} + ); + + return; + } + + + @Test + //Covering index should be generated for a simple query instead of a RowKeyJoin. + public void testSimpleNoRowKeyJoin() throws Exception { + String query = "SELECT `reverseid` from table(hbase.`index_test_primary`(type => 'maprdb', index => 'hash_i_reverseid')) " + + "where `reverseid` = 1234"; + + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=hash_i_reverseid"}, + new String[]{"RowKeyJoin"} + ); + + return; + } +} diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java new file mode 100644 index 000000000..c0ea2a006 --- /dev/null +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java @@ -0,0 +1,1715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mapr.drill.maprdb.tests.index; + +import com.mapr.db.Admin; +import com.mapr.drill.maprdb.tests.MaprDBTestsSuite; +import com.mapr.drill.maprdb.tests.json.BaseJsonTest; +import com.mapr.tests.annotations.ClusterTest; +import org.apache.drill.PlanTestBase; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.apache.drill.common.config.DrillConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runners.MethodSorters; +import java.util.Properties; + + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@Category(ClusterTest.class) +public class IndexPlanTest extends BaseJsonTest { + + final static String PRIMARY_TABLE_NAME = "/tmp/index_test_primary"; + + final static int PRIMARY_TABLE_SIZE = 10000; + private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1"; + private static final String sliceTargetDefault = "alter session reset `planner.slice_target`"; + private static final String noIndexPlan = "alter session set `planner.enable_index_planning` = false"; + private static final String defaultHavingIndexPlan = "alter session reset `planner.enable_index_planning`"; + private static final String disableHashAgg = "alter session set `planner.enable_hashagg` = false"; + private static final String enableHashAgg = "alter session set `planner.enable_hashagg` = true"; + private static final String defaultnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.025"; + private static final String incrnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.25"; + private static final String disableFTS = "alter session set `planner.disable_full_table_scan` = true"; + private static final String enableFTS = "alter session reset `planner.disable_full_table_scan`"; + private static final String preferIntersectPlans = "alter session set `planner.index.prefer_intersect_plans` = true"; + private static final String defaultIntersectPlans = "alter session reset `planner.index.prefer_intersect_plans`"; + private static final String lowRowKeyJoinBackIOFactor + = "alter session set `planner.index.rowkeyjoin_cost_factor` = 0.01"; + private static final String defaultRowKeyJoinBackIOFactor + = "alter session reset `planner.index.rowkeyjoin_cost_factor`"; + + /** + * A sample row of this 10K table: + ------------------+-----------------------------+--------+ + | 1012 | {"city":"pfrrs","state":"pc"} | {"email":"KfFzKUZwNk@gmail.com","phone":"6500005471"} | + {"ssn":"100007423"} | {"fname":"KfFzK","lname":"UZwNk"} | {"age":53.0,"income":45.0} | 1012 | + * + * This test suite generate random content to fill all the rows, since the random function always start from + * the same seed for different runs, when the row count is not changed, the data in table will always be the same, + * thus the query result could be predicted and verified. + */ + + @BeforeClass + public static void setupTableIndexes() throws Exception { + + Properties overrideProps = new Properties(); + overrideProps.setProperty("format-maprdb.json.useNumRegionsForDistribution", "true"); + updateTestCluster(1, DrillConfig.create(overrideProps)); + + MaprDBTestsSuite.setupTests(); + MaprDBTestsSuite.createPluginAndGetConf(getDrillbitContext()); + + test(incrnonCoveringSelectivityThreshold); + + System.out.print("setupTableIndexes begins"); + Admin admin = MaprDBTestsSuite.getAdmin(); + if (admin != null) { + if (admin.tableExists(PRIMARY_TABLE_NAME)) { + admin.deleteTable(PRIMARY_TABLE_NAME); + } + } + + LargeTableGen gen = new LargeTableGen(MaprDBTestsSuite.getAdmin()); + /** + * indexDef is an array of string, LargeTableGen.generateTableWithIndex will take it as parameter to generate indexes + * for primary table. + * indexDef[3*i] defines i-th index's indexName, NOTE: IF the name begins with "hash", it is a hash index + * indexDef[3*i+1] indexed field, + * and indexDef[3*i+2] defines i-th index's non-indexed fields + */ + final String[] indexDef = //null; + {"i_ssn", "id.ssn", "contact.phone", + "i_state_city", "address.state,address.city", "name.fname,name.lname",//mainly for composite key test + "i_age", "personal.age", "", + "i_income", "personal.income", "", + "i_lic", "driverlicense", "reverseid", + "i_state_city_dl", "address.state,address.city", "driverlicense", + "i_cast_int_ssn", "$CAST(id.ssn@INT)", "contact.phone", + "i_cast_vchar_lic", "$CAST(driverlicense@STRING)","contact.email", + "i_state_age_phone", "address.state,personal.age,contact.phone", "name.fname", + "i_cast_age_income_phone", "$CAST(personal.age@INT),$CAST(personal.income@INT),contact.phone", "name.fname", + "i_age_with_fname", "personal.age", "name.fname", + "hash_i_reverseid", "reverseid", "", + "hash_i_cast_timestamp_firstlogin", "$CAST(activity.irs.firstlogin@TIMESTAMP)", "id.ssn" + }; + gen.generateTableWithIndex(PRIMARY_TABLE_NAME, PRIMARY_TABLE_SIZE, indexDef); + } + + @AfterClass + public static void cleanupTableIndexes() throws Exception { + Admin admin = MaprDBTestsSuite.getAdmin(); + if (admin != null) { + if (admin.tableExists(PRIMARY_TABLE_NAME)) { + // admin.deleteTable(PRIMARY_TABLE_NAME); + } + } + test(defaultnonCoveringSelectivityThreshold); + } + + @Test + public void CTASTestTable() throws Exception { + String ctasQuery = "CREATE TABLE hbase.tmp.`backup_index_test_primary` " + + "AS SELECT * FROM hbase.`index_test_primary` as t "; + test(ctasQuery); + test("DROP TABLE IF EXISTS hbase.tmp.`backup_index_test_primary`"); + } + + @Test + public void CoveringPlanWithNonIndexedField() throws Exception { + + String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " + + " where t.id.ssn = '100007423'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{"RowKeyJoin"} + ); + + System.out.println("Covering Plan Verified!"); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("phone").baselineValues("6500005471") + .go(); + return; + + } + + @Test + public void CoveringPlanWithOnlyIndexedField() throws Exception { + String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.id.ssn = '100007423'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{"RowKeyJoin"} + ); + + System.out.println("Covering Plan Verified!"); + + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .sqlQuery(query) + .ordered() + .baselineColumns("ssn").baselineValues("100007423") + .go(); + + return; + } + + @Test + public void NoIndexPlanForNonIndexField() throws Exception { + + String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.contact.phone = '6500005471'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary"}, + new String[]{"RowKeyJoin", "indexName="} + ); + + System.out.println("No Index Plan Verified!"); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100007632") + .go(); + + return; + } + + @Test + public void NonCoveringPlan() throws Exception { + + String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " + + " where t.id.ssn = '100007423'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary,", + ".*JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_ssn"}, + new String[]{} + ); + + System.out.println("Non-Covering Plan Verified!"); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("fname").baselineValues("KfFzK") + .go(); + + return; + } + + @Test + public void RangeConditionIndexPlan() throws Exception { + String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age > 52 AND t.name.fname='KfFzK'"; + try { + test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";"); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary,", + ".*JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=(i_age|i_age_with_fname)"}, + new String[]{} + ); + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForTestQuery(lowRowKeyJoinBackIOFactor) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + + testBuilder() + .optionSettingQueriesForTestQuery(sliceTargetSmall) + .optionSettingQueriesForBaseline(sliceTargetDefault) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } finally { + test(defaultRowKeyJoinBackIOFactor); + } + } + + @Test + public void CoveringWithSimpleFieldsOnly() throws Exception { + + String query = "SELECT t._id AS `tid` FROM hbase.`index_test_primary` as t " + + " where t.driverlicense = 100007423"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_lic"}, + new String[]{"RowKeyJoin"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("tid").baselineValues("1012") + .go(); + + return; + } + + @Test + public void NonCoveringWithSimpleFieldsOnly() throws Exception { + + String query = "SELECT t.rowid AS `rowid` FROM hbase.`index_test_primary` as t " + + " where t.driverlicense = 100007423"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin(.*[\n\r])+.*" + + "RestrictedJsonTableGroupScan.*tableName=.*index_test_primary(.*[\n\r])+.*" + + "JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_lic"}, + new String[]{} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("rowid").baselineValues("1012") + .go(); + + return; + } + + @Test + public void NonCoveringWithExtraConditonOnPrimary() throws Exception { + + String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age = 53 AND t.name.lname='UZwNk'"; + try { + test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";"); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan", + ".*JsonTableGroupScan.*indexName=i_age",}, + new String[]{} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("fname").baselineValues("KfFzK") + .go(); + } finally { + test(defaultRowKeyJoinBackIOFactor); + } + return; + } + + @Test + public void Intersect2indexesPlan() throws Exception { + + String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age = 53 AND t.personal.income=45"; + try { + test(defaultHavingIndexPlan); + test(preferIntersectPlans + ";" + disableFTS); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*HashJoin(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)"}, + new String[]{} + ); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("lname").baselineValues("UZwNk") + .baselineColumns("lname").baselineValues("foNwtze") + .baselineColumns("lname").baselineValues("qGZVfY") + .go(); + testBuilder() + .optionSettingQueriesForTestQuery(sliceTargetSmall) + .optionSettingQueriesForBaseline(sliceTargetDefault) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } finally { + test(defaultIntersectPlans + ";" + enableFTS); + } + return; + } + + @Test + public void CompositeIndexNonCoveringPlan() throws Exception { + + String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'pc' AND t.address.city='pfrrs'"; + try { + test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";"); + + //either i_state_city or i_state_age_phone will be picked depends on cost model, both is fine for testing composite index nonCovering plan + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_"}, + new String[]{} + ); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") + .go(); + + testBuilder() + .optionSettingQueriesForTestQuery(sliceTargetSmall) + .optionSettingQueriesForBaseline(sliceTargetDefault) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } finally { + test(defaultRowKeyJoinBackIOFactor); + } + return; + } + + @Test//filter cover indexed, included and not in index at all filter + public void CompositeIndexNonCoveringFilterWithAllFieldsPlan() throws Exception { + + String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'pc' AND t.address.city='pfrrs' AND t.driverlicense IN (100007423, 100007424)"; + test(defaultHavingIndexPlan+";"+lowRowKeyJoinBackIOFactor+";"); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan.*condition=.*state.*city.*driverlicense.*or.*driverlicense.*(.*[\n\r])+.*JsonTableGroupScan.*indexName="}, + new String[]{} + ); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("ssn").baselineValues("100007423") + .go(); + + testBuilder() + .optionSettingQueriesForTestQuery(sliceTargetSmall) + .optionSettingQueriesForBaseline(sliceTargetDefault) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + + return; + } + @Test + public void CompositeIndexCoveringPlan() throws Exception { + + String query = "SELECT t.`address`.`city` AS `city` FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'pc' AND t.address.city='pfrrs'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*indexName=i_state_city"}, + new String[]{"RowKeyJoin", "Filter"} + ); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("city").baselineValues("pfrrs") + .baselineColumns("city").baselineValues("pfrrs") + .go(); + + testBuilder() + .optionSettingQueriesForTestQuery(sliceTargetSmall) + .optionSettingQueriesForBaseline(sliceTargetDefault) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + return; + } + + @Test + public void TestNonCoveringRangePartition_1() throws Exception { + + String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age = 53"; + String[] expectedPlan = new String[] {"RowKeyJoin(.*[\n\r])+.*" + + "RestrictedJsonTableGroupScan.*tableName=.*index_test_primary(.*[\n\r])+.*" + + "RangePartitionExchange(.*[\n\r])+.*" + + "JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=(i_age|i_age_with_fname)"}; + test(defaultHavingIndexPlan+";"+sliceTargetSmall+";"); + PlanTestBase.testPlanMatchingPatterns(query, + expectedPlan, new String[]{}); + + try { + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } finally { + test(defaultHavingIndexPlan); + test(sliceTargetDefault); + } + return; + } + + @Test + public void TestCastVarCharCoveringPlan() throws Exception { + //length 255 is to exact match the casted indexed field's length + String query = "SELECT t._id as tid, cast(t.driverlicense as varchar(255)) as driverlicense FROM hbase.`index_test_primary` as t " + + " where cast(t.driverlicense as varchar(255))='100007423'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_cast_vchar_lic"}, + new String[]{"RowKeyJoin"} + ); + + System.out.println("TestCastCoveringPlan Plan Verified!"); + + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .sqlQuery(query) + .ordered() + .baselineColumns("tid", "driverlicense").baselineValues("1012", "100007423") + .go(); + + return; + } + + @Test + public void TestCastINTCoveringPlan() throws Exception { + String query = "SELECT t._id as tid, CAST(t.id.ssn as INT) as ssn, t.contact.phone AS `phone` FROM hbase.`index_test_primary` as t " + + " where CAST(t.id.ssn as INT) = 100007423"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_cast_int_ssn"}, + new String[]{"RowKeyJoin"} + ); + + System.out.println("TestCastCoveringPlan Plan Verified!"); + + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .sqlQuery(query) + .ordered() + .baselineColumns("tid", "ssn", "phone").baselineValues("1012", 100007423, "6500005471") + .go(); + + return; + } + + @Test + public void TestCastNonCoveringPlan() throws Exception { + String query = "SELECT t.id.ssn AS `ssn` FROM hbase.`index_test_primary` as t " + + " where CAST(t.id.ssn as INT) = 100007423"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_cast_int_ssn"}, + new String[]{} + ); + + System.out.println("TestCastNonCoveringPlan Plan Verified!"); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("ssn").baselineValues("100007423") + .go(); + return; + } + + @Test + public void TestCastVarchar_ConvertToRangePlan() throws Exception { + String query = "SELECT t.id.ssn AS `ssn` FROM hbase.`index_test_primary` as t " + + " where CAST(driverlicense as VARCHAR(10)) = '100007423'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*MATCHES \"\\^.*100007423.*E.*\\$\".*indexName=i_cast_vchar_lic"}, + new String[]{} + ); + + System.out.println("TestCastVarchar_ConvertToRangePlan Verified!"); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("ssn").baselineValues("100007423") + .go(); + + return; + } + + @Test // cast expression in filter is not indexed, but the same field casted to different type was indexed (CAST id.ssn as INT) + public void TestCastNoIndexPlan() throws Exception { + String query = "select t.id.ssn from hbase.`index_test_primary` t where cast(t.id.ssn as varchar(10)) = '100007423'"; + + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{}, + new String[]{"indexName"} + ); + + } + + @Test + public void TestLongerCastVarCharNoIndex() throws Exception { + //length 256 is to exact match the casted indexed field's length + String query = "SELECT t._id as tid, cast(t.driverlicense as varchar(500)) as driverlicense FROM hbase.`index_test_primary` as t " + + " where cast(t.driverlicense as varchar(500))='100007423'"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {}, + new String[]{"RowKeyJoin", "indexName="} + ); + + System.out.println("TestLongerCastVarCharNoIndex Plan Verified!"); + + return; + } + + @Test + public void TestCoveringPlanSortRemoved() throws Exception { + String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " + + " where t.id.ssn <'100000003' order by t.id.ssn"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{"Sort"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("phone").baselineValues("6500008069") + .baselineColumns("phone").baselineValues("6500001411") + .baselineColumns("phone").baselineValues("6500001595") + .go(); + } + + @Test + public void TestCoveringPlanSortNotRemoved() throws Exception { + String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " + + " where t.id.ssn <'100000003' order by t.contact.phone"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{"RowkeyJoin"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("phone").baselineValues("6500001411") + .baselineColumns("phone").baselineValues("6500001595") + .baselineColumns("phone").baselineValues("6500008069") + .go(); + } + + @Test + public void TestCoveringPlanSortRemovedWithSimpleFields() throws Exception { + String query = "SELECT t.driverlicense as l FROM hbase.`index_test_primary` as t " + + " where t.driverlicense < 100000003 order by t.driverlicense"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_lic"}, + new String[]{"Sort"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("l").baselineValues(100000000l) + .baselineColumns("l").baselineValues(100000001l) + .baselineColumns("l").baselineValues(100000002l) + .go(); + } + + @Test + public void TestNonCoveringPlanSortRemoved() throws Exception { + String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + " where t.driverlicense < 100000003 order by t.driverlicense"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_lic"}, + new String[]{"Sort"} + ); + String query2 = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " + + " where t.id.ssn < '100000003' order by t.id.ssn"; + PlanTestBase.testPlanMatchingPatterns(query2, + new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="}, + new String[]{"Sort"} + ); + + //simple field, driverlicense + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("phone").baselineValues("6500008069") + .baselineColumns("phone").baselineValues("6500001411") + .baselineColumns("phone").baselineValues("6500001595") + .go(); + + //query on field of item expression(having capProject), non-simple field t.id.ssn + testBuilder() + .sqlQuery(query2) + .ordered() + .baselineColumns("fname").baselineValues("VcFahj") + .baselineColumns("fname").baselineValues("WbKVK") + .baselineColumns("fname").baselineValues("vSAEsyFN") + .go(); + + test(sliceTargetSmall); + try { + PlanTestBase.testPlanMatchingPatterns(query2, + new String[]{"SingleMergeExchange(.*[\n\r])+.*" + + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_ssn"}, + new String[]{"Sort"} + ); + } finally { + test(sliceTargetDefault); + } + } + + //test cases are from TestNonCoveringPlanSortRemoved. Sort was removed when force_sort_noncovering was default(false) + @Test + public void TestNonCoveringPlanWithNoRemoveSortOption() throws Exception { + try { + test("alter session set `planner.index.force_sort_noncovering`=true"); + test(defaultHavingIndexPlan); + + String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + " where t.driverlicense < 100000003 order by t.driverlicense"; + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"Sort", "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_lic"}, + new String[]{} + ); + + String query2 = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " + + " where t.id.ssn < '100000003' order by t.id.ssn"; + PlanTestBase.testPlanMatchingPatterns(query2, + new String[]{"Sort", "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="}, + new String[]{} + ); + + //simple field, driverlicense + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("phone").baselineValues("6500008069") + .baselineColumns("phone").baselineValues("6500001411") + .baselineColumns("phone").baselineValues("6500001595") + .go(); + + //query on field of item expression(having capProject), non-simple field t.id.ssn + testBuilder() + .sqlQuery(query2) + .ordered() + .baselineColumns("fname").baselineValues("VcFahj") + .baselineColumns("fname").baselineValues("WbKVK") + .baselineColumns("fname").baselineValues("vSAEsyFN") + .go(); + + test(sliceTargetSmall); + try { + PlanTestBase.testPlanMatchingPatterns(query2, + new String[]{"Sort", "SingleMergeExchange(.*[\n\r])+.*" + + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_ssn"}, + new String[]{} + ); + } finally { + test(sliceTargetDefault); + } + } + finally { + test("alter session reset `planner.index.force_sort_noncovering`"); + } + } + + @Test // 2 table join, each table has local predicate on top-level column + public void TestCoveringPlanJoin_1() throws Exception { + String query = "SELECT count(*) as cnt FROM hbase.`index_test_primary` as t1 " + + " inner join hbase.`index_test_primary` as t2 on t1.driverlicense = t2.driverlicense " + + " where t1.driverlicense < 100000003 and t2.driverlicense < 100000003" ; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=", + ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="}, + new String[]{} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("cnt").baselineValues(3L) + .go(); + } + + @Test // 2 table join, each table has local predicate on nested column + public void TestCoveringPlanJoin_2() throws Exception { + String query = "SELECT count(*) as cnt FROM hbase.`index_test_primary` as t1 " + + " inner join hbase.`index_test_primary` as t2 on t1.contact.phone = t2.contact.phone " + + " where t1.id.ssn < '100000003' and t2.id.ssn < '100000003' "; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=", + ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="}, + new String[]{} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("cnt").baselineValues(3L) + .go(); + } + + @Test // leading prefix of index has Equality conditions and ORDER BY last column; Sort SHOULD be dropped + public void TestCoveringPlanSortPrefix_1() throws Exception { + String query = "SELECT t.contact.phone FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'wo' and t.personal.age = 35 and t.contact.phone < '6500003000' order by t.contact.phone"; + test(defaultHavingIndexPlan); + + //we should glue to index i_state_age_phone to make sure we are testing the targeted prefix construction code path + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{"Sort"} + ); + + // compare the results of index plan with the no-index plan + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } + + @Test // leading prefix of index has Non-Equality conditions and ORDER BY last column; Sort SHOULD NOT be dropped + public void TestCoveringPlanSortPrefix_2() throws Exception { + String query = "SELECT t.contact.phone FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.contact.phone"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{} + ); + + // compare the results of index plan with the no-index plan + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } + + @Test //ORDER BY last two columns not in the indexed order; Sort SHOULD NOT be dropped + public void TestCoveringPlanSortPrefix_3() throws Exception { + String query = "SELECT CAST(t.personal.age as VARCHAR) as age, t.contact.phone FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.contact.phone, t.personal.age"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{} + ); + + // compare the results of index plan with the no-index plan + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } + + @Test // last two index fields in non-Equality conditions, ORDER BY last two fields; Sort SHOULD be dropped + public void TestCoveringPlanSortPrefix_4() throws Exception { + String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{"Sort"} + ); + + // compare the results of index plan with the no-index plan + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } + + @Test // index field in two or more equality conditions, it is not leading prefix, Sort SHOULD NOT be dropped + public void TestCoveringPlanSortPrefix_5() throws Exception { + String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'wo' and t.personal.age IN (31, 32, 33, 34) and t.contact.phone < '6500003000' order by t.contact.phone"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{} + ); + + // compare the results of index plan with the no-index plan + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } + + @Test // last two index fields in non-Equality conditions, ORDER BY last two fields NULLS FIRST; Sort SHOULD NOT be dropped + public void TestCoveringPlanSortPrefix_6() throws Exception { + String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone NULLS FIRST"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{} + ); + + // compare the results of index plan with the no-index plan + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } + + @Test // last two index fields in non-Equality conditions, ORDER BY last two fields NULLS LAST; Sort SHOULD be dropped + public void TestCoveringPlanSortPrefix_7() throws Exception { + String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone NULLS LAST"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"}, + new String[]{"Sort"} + ); + + // compare the results of index plan with the no-index plan + testBuilder() + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForBaseline(noIndexPlan) + .unOrdered() + .sqlQuery(query) + .sqlBaselineQuery(query) + .build() + .run(); + } + + @Test + public void orderByCastCoveringPlan() throws Exception { + String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + " where CAST(t.id.ssn as INT) < 100000003 order by CAST(t.id.ssn as INT)"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="}, + new String[]{"Sort"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("phone").baselineValues("6500008069") + .baselineColumns("phone").baselineValues("6500001411") + .baselineColumns("phone").baselineValues("6500001595") + .go(); + } + + @Test // non-covering plan. sort by the only indexed field, sort SHOULD be removed + public void orderByNonCoveringPlan() throws Exception { + String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " + + " where t.id.ssn < '100000003' order by t.id.ssn"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="}, + new String[]{"Sort"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("lname").baselineValues("iuMG") + .baselineColumns("lname").baselineValues("KpFq") + .baselineColumns("lname").baselineValues("bkkAvz") + .go(); + } + + @Test //non-covering plan. order by cast indexed field, sort SHOULD be removed + public void orderByCastNonCoveringPlan() throws Exception { + String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " + + " where CAST(t.id.ssn as INT) < 100000003 order by CAST(t.id.ssn as INT)"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="}, + new String[]{"Sort"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("lname").baselineValues("iuMG") + .baselineColumns("lname").baselineValues("KpFq") + .baselineColumns("lname").baselineValues("bkkAvz") + .go(); + } + + + @Ignore //in statsCache, condition state+city has rowcount 1250, but state only has 1000. so it is picking i_state_age_phone + @Test //non-covering, order by non leading field, and leading fields are not in equality condition, Sort SHOULD NOT be removed + public void NonCoveringPlan_SortPrefix_1() throws Exception { + + String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.address.state > 'pc' AND t.address.city>'pfrrr' AND t.address.city<'pfrrt' order by t.adddress.city"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort", + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_city"}, + new String[]{} + ); + return; + } + + @Test //non-covering, order by non leading field, and leading fields are in equality condition, Sort SHOULD be removed + public void NonCoveringPlan_SortPrefix_2() throws Exception { + + String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.address.state = 'pc' AND t.address.city>'pfrrr' AND t.address.city<'pfrrt' order by t.address.city"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] { + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_city"}, + new String[]{"Sort"} + ); + return; + } + + @Ignore ("Should be modified to get an index plan; not very useful since most covering plan filters get pushed") + @Test //Correct projection and results when filter on non-indexed column in covering plan. + public void nonIndexedColumnFilterCoveringPlan() throws Exception { + String query = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " + + " where t.personal.age > 68 and t.name.fname IN ('CnGobfR', 'THOHP')"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {".*Filter.*CnGobfR.*THOHP.*", + ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="}, + new String[] {".*Filter.*ITEM*CnGobfR.*THOHP.*"}); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("fname").baselineValues("CnGobfR") + .baselineColumns("fname").baselineValues("THOHP") + .baselineColumns("fname").baselineValues("CnGobfR") + .go(); + } + + @Test + @Ignore ("Fix after MEP 5.0") + public void orderByLimitNonCoveringPlan() throws Exception { + String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " + + " where t.id.ssn < '100000003' order by t.id.ssn limit 2"; + try { + test(defaultHavingIndexPlan); + test(sliceTargetSmall); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"Limit(.*[\n\r])+.*SingleMergeExchange(.*[\n\r])+.*Limit(.*[\n\r])+.*indexName="}, + new String[]{"Sort"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("lname").baselineValues("iuMG") + .baselineColumns("lname").baselineValues("KpFq") + .go(); + } finally { + test(sliceTargetDefault); + } + } + + @Test + public void orderByLimitCoveringPlan() throws Exception { + String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + " where t.id.ssn < '100000003' order by t.id.ssn limit 2"; + test(defaultHavingIndexPlan); + + //when index table has only one tablet, the SingleMergeExchange in the middle of two Limits will be removed. + //The lower limit gets pushed into the scan + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Limit(.*[\n\r])+.*indexName=.*limit=2"}, + new String[]{"Sort"} + ); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("phone").baselineValues("6500008069") + .baselineColumns("phone").baselineValues("6500001411") + .go(); + } + + @Test + public void pickAnyIndexWithFTSDisabledPlan() throws Exception { + String lowCoveringSel = "alter session set `planner.index.covering_selectivity_threshold` = 0.025"; + String defaultCoveringSel = "alter session reset `planner.index.covering_selectivity_threshold`"; + String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " + + " where t.id.ssn = '100007423'"; + try { + test(defaultHavingIndexPlan + ";" + lowCoveringSel + ";"); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{".*JsonTableGroupScan.*tableName=.*index_test_primary"}, + new String[]{".*indexName=i_ssn"} + ); + // Must not throw CANNOTPLANEXCEPTION + test(defaultHavingIndexPlan + ";" + lowCoveringSel + ";" + disableFTS + ";"); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{"RowKeyJoin"} + ); + } finally { + test(defaultCoveringSel+";"+enableFTS+";"); + } + } + + @Test + public void testCaseSensitive() throws Exception { + String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + " where t.id.SSN = '100000003' "; + test(defaultHavingIndexPlan); + + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {""}, + new String[]{"indexName"} + ); + + } + + @Test + public void testCaseSensitiveIncludedField() throws Exception { + + String query = "SELECT t.`CONTACT`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " + + " where t.id.ssn = '100007423'"; + test(defaultHavingIndexPlan); + + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"RowKeyJoin", + ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"}, + new String[]{} + ); + } + + + @Test + public void testHashIndexNoRemovingSort() throws Exception { + String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " + + " where t.reverseid <'10' order by t.reverseid"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort", "indexName=hash_i_reverseid", "RowKeyJoin"}, + new String[]{} + ); + } + + @Test + public void testCastTimestampPlan() throws Exception { + String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t " + + " where cast(t.activity.irs.firstlogin as timestamp)=to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"indexName=hash_i_cast_timestamp_firstlogin"}, + new String[]{"RowKeyJoin"} + ); + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("ssn").baselineValues("100007423") + .go(); + + } + + @Test + public void testNotConditionNoIndexPlan() throws Exception { + String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where NOT t.id.ssn = '100007423'"; + + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {}, + new String[]{"indexName="} + ); + + + String notInQuery = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.id.ssn NOT IN ('100007423', '100007424')"; + PlanTestBase.testPlanMatchingPatterns(notInQuery, + new String[] {}, + new String[]{"indexName="} + ); + + String notLikeQuery = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " + + " where t.id.ssn NOT LIKE '100007423'"; + PlanTestBase.testPlanMatchingPatterns(notLikeQuery, + new String[] {}, + new String[]{"indexName="} + ); + + } + + @Test + public void testNoFilterOrderByCoveringPlan() throws Exception { + String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + "order by t.id.ssn limit 2"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"indexName=i_ssn"}, + new String[]{"Sort", "TopN", "RowKeyJoin"} + ); + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("ssn", "phone").baselineValues("100000000", "6500008069") + .baselineColumns("ssn", "phone").baselineValues("100000001", "6500001411") + .build() + .run(); + } + + @Test + public void testNoFilterAndLimitOrderByCoveringPlan() throws Exception { + String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + "order by t.id.ssn"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"Sort"}, + new String[]{"indexName=*", "RowKeyJoin", "TopN"} + ); + } + + @Test + public void testNoFilterOrderByCast() throws Exception { + String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + "order by CAST(t.id.ssn as INT) limit 2"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"indexName=i_cast_int_ssn"}, + new String[]{"TopN", "Sort", "RowKeyJoin"} + ); + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("ssn", "phone").baselineValues(100000000, "6500008069") + .baselineColumns("ssn", "phone").baselineValues(100000001, "6500001411") + .build() + .run(); + } + + @Test + public void testNoFilterAndLimitOrderByCast() throws Exception { + String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + "order by CAST(t.id.ssn as INT)"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] { "Sort"}, + new String[]{"indexName=*","TopN", "RowKeyJoin"} + ); + } + + @Test + public void testNoFilterOrderByHashIndex() throws Exception { + String query = "SELECT cast(t.activity.irs.firstlogin as timestamp) AS `firstlogin`, t.id.ssn as ssn FROM hbase.`index_test_primary` as t " + + "order by cast(t.activity.irs.firstlogin as timestamp), t.id.ssn limit 2"; + test(defaultHavingIndexPlan); + //no collation for hash index so Sort or TopN must have been preserved + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"(Sort|TopN)"}, + new String[]{"indexName="} + ); + DateTime date = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") + .parseDateTime("2010-01-21 00:12:24"); + + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("firstlogin", "ssn").baselineValues(date, "100005592") + .baselineColumns("firstlogin", "ssn").baselineValues(date, "100005844") + .build() + .run(); + } + + @Test + public void testNoFilterOrderBySimpleField() throws Exception { + String query = "SELECT t.reverseid as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " + + "order by t.driverlicense limit 2"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"indexName=i_lic"}, + new String[]{"Sort", "TopN"} + ); + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("rid", "lic").baselineValues("4539", 100000000L) + .baselineColumns("rid", "lic").baselineValues("943", 100000001L) + .build() + .run(); + } + + @Test //negative case for no filter plan + public void testNoFilterOrderByNoIndexMatch() throws Exception { + String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " + + "order by t.name.fname limit 2"; + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"(Sort|TopN)"}, + new String[]{"indexName="} + ); + } + +// Enable this testcase once MD-2848 is fixed. +// @Test +// public void IntersectPlanWithOneSideNoRows() throws Exception { +// try { +// String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " + +// " where t.personal.age = 53 AND t.personal.income=111145"; +// test(defaultHavingIndexPlan); +// test(preferIntersectPlans + ";" + disableFTS); +// PlanTestBase.testPlanMatchingPatterns(query, +// new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*HashJoin(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)"}, +// new String[]{} +// ); +// +// testNoResult(query); +// +// } finally { +// test(defaultIntersectPlans + ";" + enableFTS); +// } +// } + + //"i_cast_age_state_phone", "$CAST(personal.age@STRING),address.state,contact.phone", "name.fname", + @Test + public void testTrailingFieldIndexCovering() throws Exception { + String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " + + " where cast(t.personal.age as INT)=53 AND t.contact.phone='6500005471' "; + + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"indexName=i_cast_age_income_phone"}, + new String[]{"RowKeyJoin"} + ); + + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("fname").baselineValues("KfFzK") + .build() + .run(); + } + + @Test + public void testIncludedFieldCovering() throws Exception { + String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " + + " where cast(t.personal.age as INT)=53 AND t.name.fname='KfFzK' "; + + test(defaultHavingIndexPlan); + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"indexName=i_cast_age_income_phone"}, + new String[]{"RowKeyJoin"} + ); + + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("phone").baselineValues("6500005471") + .build() + .run(); + } + + @Test + public void testWithFilterGroupBy() throws Exception { + String query = " select t1.driverlicense from hbase.`index_test_primary` t1" + + " where t1.driverlicense > 100000001 group by t1.driverlicense limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + //no collation for hash index so Sort or TopN must have been preserved + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_lic", "StreamAgg"}, + new String[]{"(Sort|TopN)"} + ); + + testBuilder() + .ordered() + .sqlQuery(query) + .optionSettingQueriesForTestQuery(disableHashAgg) + .baselineColumns("driverlicense").baselineValues(100000002L) + .baselineColumns("driverlicense").baselineValues(100000003L) + .build() + .run(); + } finally { + test(enableHashAgg); + } + } + + @Test + public void testNoFilterOrderByDesc() throws Exception { + String query = " select t1.driverlicense from hbase.`index_test_primary` t1" + + " order by t1.driverlicense desc limit 2"; + test(defaultHavingIndexPlan); + //no collation for hash index so Sort or TopN must have been preserved + PlanTestBase.testPlanMatchingPatterns(query, + new String[] {"(Sort|TopN)"}, + new String[]{"indexName="} + ); + + testBuilder() + .unOrdered() + .sqlQuery(query) + .baselineColumns("driverlicense").baselineValues(100009999L) + .baselineColumns("driverlicense").baselineValues(100009998L) + .build() + .run(); + } + + @Test + public void testNoFilterGroupBy() throws Exception { + String query = " select t1.driverlicense from hbase.`index_test_primary` t1" + + " group by t1.driverlicense limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + //no collation for hash index so Sort or TopN must have been preserved + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_lic", "StreamAgg"}, + new String[]{"(Sort|TopN)"} + ); + + testBuilder() + .ordered() + .sqlQuery(query) + .optionSettingQueriesForTestQuery(disableHashAgg) + .baselineColumns("driverlicense").baselineValues(100000000L) + .baselineColumns("driverlicense").baselineValues(100000001L) + .build() + .run(); + } finally { + test(enableHashAgg); + } + } + + @Test + public void testNoFilterGroupByCoveringPlan() throws Exception { + String query = "SELECT t.`id`.`ssn` AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " + + "group by t.id.ssn limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_ssn", "StreamAgg"}, + new String[]{"Sort", "TopN", "RowKeyJoin"} + ); + testBuilder() + .ordered() + .sqlQuery(query) + .optionSettingQueriesForTestQuery(disableHashAgg) + .baselineColumns("ssn", "phone").baselineValues("100000000", "6500008069") + .baselineColumns("ssn", "phone").baselineValues("100000001", "6500001411") + .build() + .run(); + } finally { + test(enableHashAgg); + } + } + + @Test + public void testNoFilterGroupByCast() throws Exception { + String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " + + "group by CAST(t.id.ssn as INT) limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_cast_int_ssn", "StreamAgg"}, + new String[]{"TopN", "Sort", "RowKeyJoin"} + ); + testBuilder() + .ordered() + .sqlQuery(query) + .optionSettingQueriesForTestQuery(disableHashAgg) + .baselineColumns("ssn", "phone").baselineValues(100000000, "6500008069") + .baselineColumns("ssn", "phone").baselineValues(100000001, "6500001411") + .build() + .run(); + } finally { + test(enableHashAgg); + } + } + + @Test + public void testNoFilterGroupByHashIndex() throws Exception { + String query = "SELECT cast(t.activity.irs.firstlogin as timestamp) AS `firstlogin`, max(t.id.ssn) as ssn FROM hbase.`index_test_primary` as t " + + "group by cast(t.activity.irs.firstlogin as timestamp) limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + //no collation for hash index so Sort or TopN must have been preserved + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"(Sort|TopN)", "StreamAgg"}, + new String[]{"indexName="} + ); + DateTime date1 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") + .parseDateTime("2010-01-21 00:12:24"); + + DateTime date2 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") + .parseDateTime("2010-01-21 00:24:48"); + testBuilder() + .unOrdered() + .sqlQuery(query) + .optionSettingQueriesForTestQuery(disableHashAgg) + .baselineColumns("firstlogin", "ssn").baselineValues(date1, "100006852") + .baselineColumns("firstlogin", "ssn").baselineValues(date2, "100003660") + .build() + .run(); + } finally { + test(enableHashAgg); + } + } + + @Test + public void testNoFilterGroupBySimpleField() throws Exception { + String query = "SELECT max(t.reverseid) as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " + + "group by t.driverlicense limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_lic", "StreamAgg"}, + new String[]{"Sort", "TopN"} + ); + testBuilder() + .ordered() + .sqlQuery(query) + .optionSettingQueriesForTestQuery(disableHashAgg) + .baselineColumns("rid", "lic").baselineValues("4539", 100000000L) + .baselineColumns("rid", "lic").baselineValues("943", 100000001L) + .build() + .run(); + } finally { + test(enableHashAgg); + } + } + + @Test //negative case for no filter plan + public void testNoFilterGroupByNoIndexMatch() throws Exception { + String query = "SELECT max(t.`id`.`ssn`) AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " + + "group by t.name.fname limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"(Sort|TopN)", "StreamAgg"}, + new String[]{"indexName="} + ); + } finally { + test(enableHashAgg); + } + } + + @Test + public void testNoFilterGroupBySimpleFieldParallel() throws Exception { + String query = "SELECT max(t.reverseid) as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " + + "group by t.driverlicense order by t.driverlicense limit 2"; + try { + test(defaultHavingIndexPlan); + test(disableHashAgg); + test(sliceTargetSmall); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_lic", "StreamAgg", "HashToMergeExchange"}, + new String[]{"Sort", "TopN"} + ); + testBuilder() + .unOrdered() + .sqlQuery(query) + .optionSettingQueriesForTestQuery(defaultHavingIndexPlan) + .optionSettingQueriesForTestQuery(disableHashAgg) + .optionSettingQueriesForTestQuery(sliceTargetSmall) + .baselineColumns("rid", "lic").baselineValues("4539", 100000000L) + .baselineColumns("rid", "lic").baselineValues("943", 100000001L) + .build() + .run(); + } finally { + test(enableHashAgg); + test(sliceTargetDefault); + } + } + + @Test + public void testLimitPushdownCoveringPlan() throws Exception { + String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age = 53 limit 3"; + try { + test(defaultHavingIndexPlan + ";" + disableFTS + ";"); + PlanTestBase.testPlanWithAttributesMatchingPatterns(query, + new String[]{".*JsonTableGroupScan.*indexName=i_age_with_fname.*rowcount = 3.0"}, + new String[]{} + ); + } finally { + test(enableFTS); + } + } + + @Test + public void testLimitPushdownOrderByCoveringPlan() throws Exception { + String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age = 53 order by t.personal.age limit 3"; + try { + test(defaultHavingIndexPlan + ";" + disableFTS + ";"); + PlanTestBase.testPlanWithAttributesMatchingPatterns(query, + new String[]{".*JsonTableGroupScan.*indexName=i_age_with_fname.*rowcount = 3.0"}, + new String[]{} + ); + } finally { + test(enableFTS); + } + } + + @Test + public void testLimitPushdownNonCoveringPlan() throws Exception { + String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age = 53 limit 7"; + try { + test(defaultHavingIndexPlan+";"+disableFTS+";"); + PlanTestBase.testPlanWithAttributesMatchingPatterns(query, + new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary.*rowcount = 7.0"}, + new String[]{} + ); + } finally { + test(enableFTS); + } + } + + @Test + public void testLimitPushdownOrderByNonCoveringPlan() throws Exception { + // Limit pushdown should NOT happen past rowkey join when ordering is required + String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " + + " where t.personal.age = 53 order by t.personal.age limit 7"; + try { + test(defaultHavingIndexPlan + ";" + disableFTS + ";" + sliceTargetSmall + ";"); + PlanTestBase.testPlanWithAttributesMatchingPatterns(query, + new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*"}, + new String[]{".*tableName=.*index_test_primary.*rowcount = 7.*"} + ); + } finally { + test(enableFTS); + } + } + + @Test + public void testLimit0Pushdown() throws Exception { + // Limit pushdown should NOT happen past project with CONVERT_FROMJSON + String query = "select convert_from(convert_to(t.`name`.`lname`, 'JSON'), 'JSON') " + + "from hbase.`index_test_primary` as t limit 0"; + try { + test(defaultHavingIndexPlan + ";"); + PlanTestBase.testPlanWithAttributesMatchingPatterns(query, + new String[]{"Limit(.*[\n\r])+.*Project.*CONVERT_FROMJSON(.*[\n\r])+.*Scan"}, + new String[]{} + ); + } finally { + } + } + + @Test + public void testRemovalOfReduntantHashToMergeExchange() throws Exception { + String query = "SELECT t.driverlicense as lic FROM hbase.`index_test_primary` as t " + + "order by t.driverlicense limit 2"; + try { + test(defaultHavingIndexPlan); + test(sliceTargetSmall); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_lic"}, + new String[]{"HashToMergeExchange", "Sort", "TopN"}); + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("lic").baselineValues(100000000L) + .baselineColumns("lic").baselineValues(100000001L) + .build() + .run(); + } finally { + test(sliceTargetDefault); + } + } + + @Test + public void testMultiPhaseAgg() throws Exception { + String query = "select count(t.reverseid) from hbase.`index_test_primary` as t " + + "group by t.driverlicense order by t.driverlicense"; + try { + test(defaultHavingIndexPlan); + test(sliceTargetSmall); + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"indexName=i_lic", "HashToMergeExchange", "StreamAgg", "StreamAgg"}, + new String[]{"Sort", "TopN"}); + } finally { + test(sliceTargetDefault); + } + } + + @Test + public void testHangForSimpleDistinct() throws Exception { + String query = "select distinct t.driverlicense from hbase.`index_test_primary` as t order by t.driverlicense limit 1"; + + try { + test(sliceTargetSmall); + testBuilder() + .ordered() + .sqlQuery(query) + .baselineColumns("driverlicense").baselineValues(100000000L) + .build() + .run(); + } finally { + test(sliceTargetDefault); + } + } +} diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java new file mode 100644 index 000000000..bc857d158 --- /dev/null +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mapr.drill.maprdb.tests.index; + +import static com.mapr.drill.maprdb.tests.MaprDBTestsSuite.INDEX_FLUSH_TIMEOUT; + +import java.io.InputStream; +import java.io.StringBufferInputStream; + +import org.apache.hadoop.fs.Path; +import org.ojai.DocumentStream; +import org.ojai.json.Json; + +import com.mapr.db.Admin; +import com.mapr.db.Table; +import com.mapr.db.TableDescriptor; +import com.mapr.db.impl.MapRDBImpl; +import com.mapr.db.impl.TableDescriptorImpl; +import com.mapr.db.tests.utils.DBTests; +import com.mapr.fs.utils.ssh.TestCluster; + +/** + * This class is to generate a MapR json table of this schema: + * { + * "address" : { + * "city":"wtj", + * "state":"ho" + * } + * "contact" : { + * "email":"VcFahjRfM@gmail.com", + * "phone":"6500005583" + * } + * "id" : { + * "ssn":"100005461" + * } + * "name" : { + * "fname":"VcFahj", + * "lname":"RfM" + * } + * } + * + */ +public class LargeTableGen extends LargeTableGenBase { + + static final int SPLIT_SIZE = 5000; + private Admin admin; + + public LargeTableGen(Admin dbadmin) { + admin = dbadmin; + } + + Table createOrGetTable(String tableName, int recordNum) { + if (admin.tableExists(tableName)) { + return MapRDBImpl.getTable(tableName); + //admin.deleteTable(tableName); + } + else { + TableDescriptor desc = new TableDescriptorImpl(new Path(tableName)); + + int splits = (recordNum / SPLIT_SIZE) - (((recordNum % SPLIT_SIZE) > 1)? 0 : 1); + + String[] splitsStr = new String[splits]; + StringBuilder strBuilder = new StringBuilder("Splits:"); + for(int i=0; i firstnames; + protected List lastnames; + protected List cities; + protected int[] randomized; + + protected synchronized void initDictionary() { + initDictionaryWithRand(); + } + + protected void initDictionaryWithRand() { + { + firstnames = new ArrayList<>(); + lastnames = new ArrayList<>(); + cities = new ArrayList<>(); + List states = new ArrayList<>(); + + int fnNum = 2000; //2k + int lnNum = 200000;//200k + int cityNum = 10000;//10k + int stateNum = 50; + Random rand = new Random(2017); + int i; + try { + Set strSet = new LinkedHashSet<>(); + while(strSet.size() < stateNum) { + strSet.add(RandomStringUtils.random(2, 0, 0, true, false, null, rand)); + } + states.addAll(strSet); + + strSet = new LinkedHashSet<>(); + while(strSet.size() < cityNum) { + int len = 3 + strSet.size() % 6; + strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand)); + } + + Iterator it = strSet.iterator(); + for(i=0; i(); + while(strSet.size() < fnNum) { + int len = 3 + strSet.size() % 6; + strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand)); + } + firstnames.addAll(strSet); + + strSet = new LinkedHashSet<>(); + while(strSet.size() < lnNum) { + int len = 3 + strSet.size() % 6; + strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand)); + } + lastnames.addAll(strSet); + } + catch(Exception e) { + System.out.println("init data got exception"); + e.printStackTrace(); + } + dict_ready = true; + } + } + + protected String getFirstName(int i) { + return firstnames.get((randomized[ i%randomized.length ] + i )% firstnames.size()); + } + + protected String getLastName(int i) { + return lastnames.get((randomized[ (2*i + randomized[i%randomized.length])% randomized.length]) % lastnames.size()); + } + + protected String[] getAddress(int i) { + return cities.get((randomized[(i+ randomized[i%randomized.length])%randomized.length]) % cities.size()); + } + + protected String getSSN(int i){ + return String.format("%d", 1000*1000*100 + randomized[ i % randomized.length]); + } + + protected String getPhone(int i) { + //80% phones are unique, + return String.format("%d", 6500*1000*1000L + randomized[ (randomized.length - i) %((int) (randomized.length * 0.8)) ]); + } + + protected String getEmail(int i){ + return getFirstName(i) + getLastName(i) + "@" + "gmail.com"; + } + + protected String getAge(int i) { + return String.format("%d",randomized[i%randomized.length] % 60 + 10); + } + + protected String getIncome(int i) {//unit should be $10k + return String.format("%d",randomized[i%randomized.length] % 47 + 1); + } + + //date yyyy-mm-dd + protected String getBirthdate(int i) { + int thisseed = randomized[i%randomized.length]; + return String.format("%d-%02d-%02d", + 2016 - (thisseed % 60 + 10), thisseed % 12 + 1, (thisseed * 31) % 28 + 1 ); + } + + //timestamp, yyyy-mm-dd HH:mm:ss + protected String getFirstLogin(int i) { + int thisseed = randomized[i%randomized.length]; + int nextseed = randomized[(i+1)%randomized.length]; + return String.format("%d-%02d-%02d %02d:%02d:%02d.0", + 2016 - (thisseed % 7), (thisseed * 31) % 12 + 1, thisseed % 28 + 1, nextseed % 24, nextseed % 60, (nextseed * 47) % 60 ); + } + + + protected String getField(String field, int i) { + if(field.equals("ssn")) { + return getSSN(i); + } + else if (field.equals("phone")) { + return getPhone(i); + } + else if(field.equals("email")) { + return getEmail(i); + } + else if(field.equals("city")) { + return getAddress(i)[1]; + } + else if(field.equals("state")) { + return getAddress(i)[0]; + } + else if(field.equals("fname")) { + return getFirstName(i); + } + else if(field.equals("lname")) { + return getLastName(i); + } + return ""; + } + + + protected void initRandVector(int recordNumber) { + int i; + Random rand = new Random(2016); + randomized = new int[recordNumber]; + for(i = 0; i 100)" + + " and (t.address.state = 'mo' or t.address.state = 'ca')"; + PlanTestBase.testPlanMatchingPatterns(explain+query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"}, + new String[] {} + ); + + // Top-level ORs - Cannot split top-level ORs so use defaults + query = "select * from hbase.`index_test_primary` t " + + " where (t.personal.age > 30 and t.personal.age < 100)" + + " or (t.address.state = 'mo')"; + PlanTestBase.testPlanMatchingPatterns(explain+query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"}, + new String[] {} + ); + + // ANDed condition - Leading index column(personal.age) and non-leading column(address.city) + query = "select * from hbase.`index_test_primary` t " + + " where (t.personal.age < 30 or t.personal.age > 100)" + + " and `address.city` = 'sf'"; + PlanTestBase.testPlanMatchingPatterns(explain+query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"}, + new String[] {} + ); + + // ANDed condition - Leading index columns (address.state) and (address.city) + query = "select * from hbase.`index_test_primary` t " + + " where (`address.state` = 'mo' or `address.state` = 'ca') " // Leading index column + + " and `address.city` = 'sf'"; // Non leading index column + PlanTestBase.testPlanMatchingPatterns(explain+query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"}, + new String[] {} + ); + + // ANDed condition - Leading index columns (address.state) and non-index column (name.fname) + query = "select * from hbase.`index_test_primary` t " + + " where (`address.state` = 'mo' or `address.state` = 'ca') " // Leading index column + + " and `name.fname` = 'VcFahj'"; // Non index column + PlanTestBase.testPlanMatchingPatterns(explain+query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"}, + new String[] {} + ); + + // Simple condition - LIKE predicate + query = "select t._id as rowid from hbase.`index_test_primary` as t " + + "where t.driverlicense like '100007423%'"; + PlanTestBase.testPlanMatchingPatterns(explain+query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"}, + new String[] {} + ); + + // Simple condition - LIKE predicate with ESCAPE clause + query = "select t._id as rowid from hbase.`index_test_primary` as t " + + "where t.driverlicense like '100007423%' ESCAPE '/'"; + PlanTestBase.testPlanMatchingPatterns(explain+query, + new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"}, + new String[] {} + ); + } +} diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java new file mode 100644 index 000000000..a501f8fa8 --- /dev/null +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mapr.drill.maprdb.tests.index; + + +import com.mapr.db.Admin; +import com.mapr.db.MapRDB; +import org.apache.drill.exec.util.GuavaPatcher; + +import java.util.HashMap; +import java.util.Map; + +/** +* Copy classes to a MapR cluster node, then run a command like this: +* java -classpath /tmp/drill-cmd-1.9.0-SNAPSHOT.jar:/opt/mapr/drill/drill-1.9.0/jars/*:/opt/mapr/drill/drill-1.9.0/jars/3rdparty/*:/opt/mapr/drill/drill-1.9.0/jars/ext/* +* org.apache.drill.hbase.index.TableIndexGen -host 10.10.88.128 -port 5181 [-table pop3] [-size 1000000] +*/ + +class TestBigTable { + + Admin admin; + boolean initialized = false; + + LargeTableGen gen; + + /* + "hbase.zookeeper.quorum": "10.10.88.128", + "hbase.zookeeper.property.clientPort": "5181" + */ + void init(String host, String port) { + try { + admin = MapRDB.newAdmin(); + initialized = true; + gen = new LargeTableGen(admin); + } catch (Exception e) { + System.out.println("Connection to HBase threw" + e.getMessage()); + } + } +} + + +public class TableIndexCmd { + + public static Map parseParameter(String[] params) { + HashMap retParams = new HashMap(); + for (int i=0; i params = parseParameter(args); + if(args.length >= 2) { + if(params.get("host") != null) { + inHost = params.get("host"); + } + if(params.get("port") != null) { + inPort = params.get("port"); + } + if(params.get("table") != null) { + inTable = params.get("table"); + } + if(params.get("size") != null) { + inSize = Long.parseLong(params.get("size")); + } + if(params.get("dict") != null) { + dictPath = params.get("dict"); + } + if(params.get("wait") != null) { + String answer = params.get("wait"); + waitKeyPress = answer.startsWith("y") || answer.startsWith("t")? true : false; + } + } + if(waitKeyPress == true) { + pressEnterKeyToContinue(); + } + try { + TestBigTable tbt = new TestBigTable(); + tbt.init(inHost, inPort); + tbt.gen.generateTableWithIndex(inTable, (int)(inSize & 0xFFFFFFFFL), null); + } + catch(Exception e) { + System.out.println("generate big table got exception:" + e.getMessage()); + e.printStackTrace(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java new file mode 100644 index 000000000..5f4da7439 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.common; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rex.RexNode; + +/** + * Class implementing OrderedPrel interface guarantees to provide ordered + * output on certain columns. TopNPrel and SortPrel base classes which implement + * this interface. + */ +public interface OrderedRel extends DrillRelNode { + + /** + * A method to return ordering columns of the result. + * @return Collation order of the output. + */ + RelCollation getCollation(); + + /** + * Offset value represented in RexNode. + * @return offset. + */ + RexNode getOffset(); + + /** + * Fetch value represented in RexNode. + * @return fetch + */ + RexNode getFetch(); + + /** + * A method to return if this relational node can be dropped during optimization process. + * @return true if this node can be dropped, false otherwise. + */ + boolean canBeDropped(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java index 65788cb52..45251c641 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.index; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rex.RexNode; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; @@ -28,6 +27,7 @@ import org.apache.drill.exec.physical.base.DbGroupScan; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; import org.apache.drill.exec.planner.common.DrillScanRelBase; import org.apache.drill.exec.planner.common.DrillProjectRelBase; +import org.apache.drill.exec.planner.common.OrderedRel; import java.util.List; import java.util.Set; @@ -66,7 +66,7 @@ public interface IndexCallContext { RexNode getOrigCondition(); - Sort getSort(); + OrderedRel getSort(); void createSortExprs(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java index 27198bb02..3a6ea83d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.index; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelCollation; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rex.RexNode; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; @@ -31,6 +30,7 @@ import org.apache.drill.exec.planner.logical.DrillProjectRel; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.DrillSortRel; import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.apache.drill.exec.planner.common.OrderedRel; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; import org.apache.calcite.rel.RelNode; @@ -164,7 +164,7 @@ public class IndexLogicalPlanCallContext implements IndexCallContext { return origPushedCondition; } - public Sort getSort() { + public OrderedRel getSort() { return sort; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java index 9c7b65167..91ff02c69 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java @@ -21,7 +21,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rex.RexNode; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; @@ -29,10 +28,10 @@ import org.apache.drill.exec.physical.base.AbstractDbGroupScan; import org.apache.drill.exec.physical.base.DbGroupScan; import org.apache.drill.exec.planner.common.DrillProjectRelBase; import org.apache.drill.exec.planner.common.DrillScanRelBase; -import org.apache.drill.exec.planner.physical.SortPrel; import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.FilterPrel; import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.planner.common.OrderedRel; import org.apache.drill.exec.planner.physical.ExchangePrel; import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; @@ -42,8 +41,9 @@ import java.util.List; import java.util.Set; public class IndexPhysicalPlanCallContext implements IndexCallContext { + final public RelOptRuleCall call; - final public SortPrel sort; + final public OrderedRel sort; final public ProjectPrel upperProject; final public FilterPrel filter; final public ProjectPrel lowerProject; @@ -67,7 +67,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext { } public IndexPhysicalPlanCallContext(RelOptRuleCall call, - SortPrel sort, + OrderedRel sort, ProjectPrel capProject, FilterPrel filter, ProjectPrel project, @@ -83,7 +83,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext { } public IndexPhysicalPlanCallContext(RelOptRuleCall call, - SortPrel sort, + OrderedRel sort, ProjectPrel project, ScanPrel scan, ExchangePrel exch) { this.call = call; @@ -171,7 +171,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext { return origPushedCondition; } - public Sort getSort() { + public OrderedRel getSort() { return sort; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java index 666e2828a..cdad63ad0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java @@ -38,6 +38,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.sql.SqlKind; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; @@ -56,6 +57,7 @@ import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.common.OrderedRel; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexInputRef; @@ -346,6 +348,21 @@ public class IndexPlanUtils { return proj.getProjects(); } + public static boolean generateLimit(OrderedRel sort) { + RexNode fetchNode = sort.getFetch(); + int fetchValue = (fetchNode == null) ? -1 : RexLiteral.intValue(fetchNode); + return fetchValue >=0; + } + + public static RexNode getOffset(OrderedRel sort) { + return sort.getOffset(); + } + + public static RexNode getFetch(OrderedRel sort) { + return sort.getFetch(); + } + + /** * generate logical expressions for sort rexNodes in SortRel, the result is store to IndexPlanCallContext * @param indexContext diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java index 36ff61f20..456542b31 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java @@ -30,7 +30,6 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollationTraitDef; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataTypeFieldImpl; import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.sql.type.SqlTypeName; @@ -43,16 +42,19 @@ import org.apache.drill.exec.planner.logical.DrillFilterRel; import org.apache.drill.exec.planner.logical.DrillProjectRel; import org.apache.drill.exec.planner.logical.DrillSortRel; import org.apache.drill.exec.planner.common.DrillProjectRelBase; +import org.apache.drill.exec.planner.common.OrderedRel; import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.apache.drill.exec.planner.physical.SubsetTransformer; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.Prule; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.Prel; -import org.apache.drill.exec.planner.physical.SortPrel; import org.apache.drill.exec.planner.physical.HashToMergeExchangePrel; import org.apache.drill.exec.planner.physical.SingleMergeExchangePrel; import org.apache.drill.exec.planner.physical.PrelUtil; -import org.apache.drill.exec.planner.physical.Prule; -import org.apache.drill.exec.planner.physical.SubsetTransformer; -import org.apache.drill.exec.planner.physical.DrillDistributionTrait; +import org.apache.drill.exec.planner.physical.TopNPrel; +import org.apache.drill.exec.planner.physical.SortPrel; +import org.apache.drill.exec.planner.physical.LimitPrel; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -67,7 +69,6 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer { public boolean match(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel)call.rel(2); - return checkScan(scan.getGroupScan()); + final OrderedRel sort = call.rel(0); + final ScanPrel scan = call.rel(2); + return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort); } public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) { final ScanPrel scan = call.rel(2); - final SortPrel sort = call.rel(0); + final OrderedRel sort = call.rel(0); final ExchangePrel exch = call.rel(1); return new IndexPhysicalPlanCallContext(call, sort, null, scan, exch); } @@ -98,13 +105,14 @@ public class DbScanSortRemovalRule extends Prule { private static class MatchSS extends AbstractMatchFunction { public boolean match(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel)call.rel(1); - return checkScan(scan.getGroupScan()); + final OrderedRel sort = call.rel(0); + final ScanPrel scan = call.rel(1); + return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort); } public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) { final ScanPrel scan = call.rel(1); - final SortPrel sort = call.rel(0); + final OrderedRel sort = call.rel(0); return new IndexPhysicalPlanCallContext(call, sort, null, scan, null); } } @@ -112,14 +120,15 @@ public class DbScanSortRemovalRule extends Prule { private static class MatchSPS extends AbstractMatchFunction { public boolean match(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel)call.rel(2); - return checkScan(scan.getGroupScan()); + final OrderedRel sort = call.rel(0); + final ScanPrel scan = call.rel(2); + return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort); } public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) { final ScanPrel scan = call.rel(2); final ProjectPrel proj = call.rel(1); - final SortPrel sort = call.rel(0); + final OrderedRel sort = call.rel(0); return new IndexPhysicalPlanCallContext(call, sort, proj, scan, null); } } @@ -127,13 +136,14 @@ public class DbScanSortRemovalRule extends Prule { private static class MatchSEPS extends AbstractMatchFunction { public boolean match(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel)call.rel(3); - return checkScan(scan.getGroupScan()); + final OrderedRel sort = call.rel(0); + final ScanPrel scan = call.rel(3); + return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort); } public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) { final ScanPrel scan = call.rel(3); - final SortPrel sort = call.rel(0); + final OrderedRel sort = call.rel(0); final ProjectPrel proj = call.rel(2); final ExchangePrel exch = call.rel(1); return new IndexPhysicalPlanCallContext(call, sort, proj, scan, exch); @@ -187,12 +197,15 @@ public class DbScanSortRemovalRule extends Prule { false, settings); if (planGen.convertChild() != null) { indexContext.getCall().transformTo(planGen.convertChild()); + } else { + logger.debug("Not able to generate index plan in ", this.getClass().toString()); } } catch (Exception e) { logger.warn("Exception while trying to generate indexscan to remove sort", e); } } } else { + Preconditions.checkNotNull(indexContext.getSort()); //This case tries to use the already generated index to see if a sort can be removed. if (indexContext.scan.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE).getFieldCollations().size() == 0) { return; @@ -204,12 +217,12 @@ public class DbScanSortRemovalRule extends Prule { inputs.add(finalRel); finalRel = indexContext.lowerProject.copy(indexContext.lowerProject.getTraitSet(), inputs); } - if (indexContext.getSort() != null) { - finalRel = AbstractIndexPlanGenerator.getSortNode(indexContext, finalRel, true,false, + + finalRel = AbstractIndexPlanGenerator.getSortNode(indexContext, finalRel, true,false, indexContext.exch != null); - } if (finalRel == null) { + logger.debug("Not able to generate index plan in ", this.getClass().toString()); return; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java index cfa0e26b4..1e380cff7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java @@ -26,6 +26,7 @@ import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.Order; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.planner.torel.ConversionContext; +import org.apache.drill.exec.planner.common.OrderedRel; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelFieldCollation; @@ -41,7 +42,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps; /** * Sort implemented in Drill. */ -public class DrillSortRel extends Sort implements DrillRel { +public class DrillSortRel extends Sort implements DrillRel,OrderedRel { /** Creates a DrillSortRel. */ public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) { @@ -98,4 +99,18 @@ public class DrillSortRel extends Sort implements DrillRel { return new DrillSortRel(context.getCluster(), context.getLogicalTraits(), input, RelCollations.of(collations)); } + @Override + public RexNode getOffset() { + return offset; + } + + @Override + public RexNode getFetch() { + return fetch; + } + + @Override + public boolean canBeDropped() { + return true; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index 77fb4c8bb..8064c4287 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelFieldCollation; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.ExternalSort; +import org.apache.drill.exec.planner.common.OrderedRel; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; @@ -40,16 +41,25 @@ import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rex.RexNode; -public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel { +public class SortPrel extends org.apache.calcite.rel.core.Sort implements OrderedRel,Prel { + private final boolean isRemovable; /** Creates a DrillSortRel. */ public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) { super(cluster, traits, input, collation); + isRemovable = true; } /** Creates a DrillSortRel with offset and fetch. */ public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) { super(cluster, traits, input, collation, offset, fetch); + isRemovable = true; + } + + /** Creates a DrillSortRel. */ + public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, boolean isRemovable) { + super(cluster, traits, input, collation); + this.isRemovable = isRemovable; } @Override @@ -141,4 +151,20 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel { return this.copy(traits, children.get(0), collationTrait, this.offset, this.fetch); } + + @Override + public RexNode getOffset() { + return offset; + } + + @Override + public RexNode getFetch() { + return fetch; + } + + @Override + public boolean canBeDropped() { + return isRemovable; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java index 3fc86b3d4..bec1b6a20 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java @@ -47,26 +47,25 @@ public class SortPrule extends Prule{ @Override public void onMatch(RelOptRuleCall call) { - final DrillSortRel sort = (DrillSortRel) call.rel(0); + final DrillSortRel sort = call.rel(0); final RelNode input = sort.getInput(); // Keep the collation in logical sort. Convert input into a RelNode with 1) this collation, 2) Physical, 3) hash distributed on DrillDistributionTrait hashDistribution = - new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(sort))); + new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(sort))); - final RelTraitSet traits = sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashDistribution); - - final RelNode convertedInput = convert(input, traits); + final RelTraitSet traits = RelTraitSet.createEmpty().plus(Prel.DRILL_PHYSICAL).plus(hashDistribution); + SortPrel child = new SortPrel(sort.getCluster(), traits.plus(sort.getCollation()), + convert(sort.getInput(), traits), sort.getCollation(), false); if(isSingleMode(call)){ - call.transformTo(convertedInput); + call.transformTo(child); }else{ - RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput, sort.getCollation()); + RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), child, sort.getCollation()); call.transformTo(exch); // transform logical "sort" into "SingleMergeExchange". } - } private List getDistributionField(DrillSortRel rel) { @@ -76,7 +75,6 @@ public class SortPrule extends Prule{ DistributionField field = new DistributionField(relField.getFieldIndex()); distFields.add(field); } - return distFields; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java index e9414f174..f8f4b9d13 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java @@ -26,8 +26,11 @@ import org.apache.calcite.rel.RelCollationImpl; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.TopN; +import org.apache.drill.exec.planner.common.OrderedRel; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -40,7 +43,7 @@ import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; -public class TopNPrel extends SinglePrel { +public class TopNPrel extends SinglePrel implements OrderedRel,Prel { protected int limit; protected final RelCollation collation; @@ -66,6 +69,28 @@ public class TopNPrel extends SinglePrel { return creator.addMetadata(this, topN); } + @Override + public RelCollation getCollation() { + return collation; + } + + @Override + public RexNode getOffset() { + return getCluster().getRexBuilder().makeExactLiteral(BigDecimal.ZERO, + getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + } + + @Override + public RexNode getFetch() { + return getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit), + getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + } + + @Override + public boolean canBeDropped() { + return true; + } + /** * Cost of doing Top-N is proportional to M log N where M is the total number of * input rows and N is the limit for Top-N. This makes Top-N preferable to Sort @@ -93,6 +118,10 @@ public class TopNPrel extends SinglePrel { .item("limit", limit); } + public int getLimit() { + return limit; + } + @Override public SelectionVectorMode[] getSupportedEncodings() { return SelectionVectorMode.NONE_AND_TWO; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java index f77a4378e..fa8e69d0b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java @@ -25,6 +25,11 @@ import org.apache.drill.exec.planner.physical.ExchangePrel; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScreenPrel; +import org.apache.drill.exec.planner.physical.LimitPrel; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.FilterPrel; +import org.apache.drill.exec.planner.physical.SingleMergeExchangePrel; +import org.apache.drill.exec.planner.physical.HashToMergeExchangePrel; import org.apache.calcite.rel.RelNode; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -48,7 +53,18 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor