diff options
12 files changed, 70 insertions, 72 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java index 01561a3ab..ec38636c5 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java @@ -138,6 +138,11 @@ public class MapRDBFunctionalIndexInfo implements FunctionalIndexInfo { } /** + * Suppose the index key has functions (rather than plain columns): CAST(a as int), CAST(b as varchar(10)), + * then we want to maintain a mapping of the logical expression of that function to the schema path of the + * base column involved in the function. In this example map has 2 entries: + * CAST(a as int) --> 'a' + * CAST(b as varchar(10)) --> 'b' * @return the map of indexed expression --> the involved schema paths in a indexed expression */ public Map<LogicalExpression, Set<SchemaPath>> getPathsInFunctionExpr() { diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java index e1b8a61fc..c231e1181 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java @@ -37,6 +37,7 @@ import org.apache.drill.common.expression.parser.ExprLexer; import org.apache.drill.common.expression.parser.ExprParser; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; +import org.apache.drill.common.util.DrillFileUtils; import org.apache.drill.exec.physical.base.AbstractDbGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.common.DrillScanRelBase; @@ -67,6 +68,7 @@ import java.util.Set; public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDiscover { static final String DEFAULT_STRING_CAST_LEN_STR = "256"; + static final String FIELD_DELIMITER = ":"; public MapRDBIndexDiscover(GroupScan inScan, DrillScanRelBase scanRel) { super((AbstractDbGroupScan) inScan, scanRel); @@ -78,14 +80,14 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco @Override public IndexCollection getTableIndex(String tableName) { - //return getTableIndexFromCommandLine(tableName); return getTableIndexFromMFS(tableName); } /** - * + * For a given table name get the list of indexes defined on the table according to the visibility of + * the indexes based on permissions. * @param tableName - * @return + * @return an IndexCollection representing the list of indexes for that table */ private IndexCollection getTableIndexFromMFS(String tableName) { try { @@ -120,8 +122,8 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco FileSelection deriveFSSelection(DrillFileSystem fs, IndexDescriptor idxDesc) throws IOException { String tableName = idxDesc.getTableName(); - String[] tablePath = tableName.split("/"); - String tableParent = tableName.substring(0, tableName.lastIndexOf("/")); + String[] tablePath = tableName.split(DrillFileUtils.SEPARATOR); + String tableParent = tableName.substring(0, tableName.lastIndexOf(DrillFileUtils.SEPARATOR)); return FileSelection.create(fs, tableParent, tablePath[tablePath.length - 1], false); } @@ -318,7 +320,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco private DrillIndexDescriptor buildIndexDescriptor(String tableName, IndexDesc desc) throws InvalidIndexDefinitionException { if (desc.isExternal()) { - //XX: not support external index + // External index is not currently supported return null; } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java index bd8a32aef..596699f42 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java @@ -188,32 +188,4 @@ public class RestrictedMapRDBSubScanSpec extends MapRDBSubScanSpec { currentIndex += numKeys; } - /** - * Returns the next row key in the iteration. - * @return the next row key in the iteration or null if no more row keys - */ - @JsonIgnore - public String nextRowKey() { - if (hasRowKey()) { - // get the entry at the current index within this batch - Object o = rowKeyVector.getAccessor().getObject(currentIndex++); - if (o == null) { - throw new DrillRuntimeException("Encountered a null row key during restricted subscan !"); - } - - // this is specific to the way the hash join maintains its entries. once we have reached the max - // occupied index within a batch, move to the next one and reset the current index to 0 - // TODO: we should try to abstract this out - if (currentIndex > maxOccupiedIndex) { - Pair<ValueVector, Integer> currentBatch = rjbatch.nextRowKeyBatch(); - if (currentBatch != null) { - init(currentBatch); - } - } - - return o.toString(); - } - return null; - } - } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java index acaa6cacb..436347fcc 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java @@ -186,7 +186,7 @@ public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunct // get the table handle from the table cache Table table = plugin.getJsonTableCache().getTable(tableName, userName); - // Set the condition to null such that all scan ranges are retrieved for the primary table. + // Get all scan ranges for the primary table. // The reason is the row keys could typically belong to any one of the tablets of the table, so // there is no use trying to get only limited set of scan ranges. // NOTE: here we use the restrictedScanRangeSizeMB because the range partitioning should be parallelized diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java index 21d60923f..4df9c38c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java @@ -50,16 +50,6 @@ public class HashJoinPOP extends AbstractJoinPop { @JsonProperty("subScanForRowKeyJoin") private SubScan subScanForRowKeyJoin; - /* - public HashJoinPOP( - @JsonProperty("left") PhysicalOperator left, - @JsonProperty("right") PhysicalOperator right, - @JsonProperty("conditions") List<JoinCondition> conditions, - @JsonProperty("joinType") JoinRelType joinType - ) { - this(left, right, conditions, joinType, false, JoinControl.DEFAULT); - } -*/ @JsonCreator public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, @JsonProperty("conditions") List<JoinCondition> conditions, @@ -80,8 +70,6 @@ public class HashJoinPOP extends AbstractJoinPop { List<JoinCondition> conditions, JoinRelType joinType) { this(left, right, conditions, joinType, null, false, JoinControl.DEFAULT); - // super(left, right, joinType, null, conditions); - // Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop"); } @VisibleForTesting diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java index f908ead4c..dd042da34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java @@ -71,4 +71,10 @@ public abstract class AbstractIndexDescriptor extends DrillIndexDefinition imple int numProjectedFields, GroupScan primaryGroupScan) { throw new UnsupportedOperationException("getCost() not supported for this index."); } + + @Override + public boolean isAsyncIndex() { + return true; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java index d43ba8121..3b63230f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java @@ -72,12 +72,19 @@ public interface IndexDescriptor extends IndexDefinition { * @param primaryGroupScan Primary table's GroupScan instance * @return a RelOptCost instance representing the total cost */ - public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, + RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, int numProjectedFields, GroupScan primaryGroupScan); /** * Get the costing factors associated with the storage/format plugin */ - public PluginCost getPluginCostModel(); + PluginCost getPluginCostModel(); + + /** + * Whether this index is maintained synchronously (i.e primary table updates are propagated to the index + * synchronously) or asynchronously with some delay. The latter is more common for distributed NoSQL databases. + * @return True if the index is maintained asynchronously, False otherwise + */ + boolean isAsyncIndex(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java index ea34ea585..3d3aeeb17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java @@ -33,11 +33,7 @@ public class IndexGroup { } public boolean isIntersectIndex() { - if (indexProps.size() > 1) { - return true; - } else { - return false; - } + return indexProps.size() > 1; } public int numIndexes() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java index f2091f6a0..b380c2897 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java @@ -237,7 +237,8 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { return finalRel; } - private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant) { + private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant, + boolean isAnyIndexAsync) { DbGroupScan origDbGroupScan = (DbGroupScan)IndexPlanUtils.getGroupScan(origScan); List<SchemaPath> cols = new ArrayList<SchemaPath>(origDbGroupScan.getColumns()); @@ -266,9 +267,16 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder = dbScan.getCluster().getTypeFactory().builder(); - FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), - dbScan, indexContext.getOrigCondition()); - lastRelNode = leftIndexFilterPrel; + FilterPrel leftIndexFilterPrel = null; + + // See NonCoveringIndexPlanGenerator for why we are re-applying index filter condition in case of async indexes. + // For intersect planning, any one of the intersected indexes may be async but to keep it simple we re-apply the + // full original condition. + if (isAnyIndexAsync) { + new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), + dbScan, indexContext.getOrigCondition()); + lastRelNode = leftIndexFilterPrel; + } // new Project's rowtype is original Project's rowtype [plus rowkey if rowkey is not in original rowtype] ProjectPrel leftIndexProjectPrel = null; @@ -301,8 +309,12 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { @Override public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException { Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap(); + boolean isAnyIndexAsync = false; for(IndexDescriptor idx : indexInfoMap.keySet()) { idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition); + if (!isAnyIndexAsync && idx.isAsyncIndex()) { + isAnyIndexAsync = true; + } } RelNode indexPlan = null; @@ -323,7 +335,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { //now with index plan constructed, build plan of left(probe) side to use restricted db scan - Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant); + Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync); RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT); if ( upperProject != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java index e1337bc7e..c1bcf6861 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java @@ -205,16 +205,17 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator { final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder = dbScan.getCluster().getTypeFactory().builder(); - //we are applying the same index condition to primary table's restricted scan, the reason - // for this is, the scans on index table and primary table are not a transaction, meaning that _after_ index scan, - // primary table might already have data get updated, thus some rows picked by index were modified and no more satisfy the - // index condition. By applying the same index condition again here, we will avoid the possibility to have some - //not-wanted records get into downstream operators in such scenarios. - //the remainder condition will be applied on top of RowKeyJoin. - FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), - dbScan, indexContext.getOrigCondition()); - - lastLeft = leftIndexFilterPrel; + // We are applying the same index condition to primary table's restricted scan. The reason is, the index may be an async + // index .. i.e it is not synchronously updated along with the primary table update as part of a single transaction, so it + // is possible that after or during index scan, the primary table rows may have been updated and no longer satisfy the index + // condition. By re-applying the index condition here, we will ensure non-qualifying records are filtered out. + // The remainder condition will be applied on top of RowKeyJoin. + FilterPrel leftIndexFilterPrel = null; + if (indexDesc.isAsyncIndex()) { + leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), + dbScan, indexContext.getOrigCondition()); + lastLeft = leftIndexFilterPrel; + } RelDataType origRowType = origProject == null ? origScan.getRowType() : origProject.getRowType(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java index 0626483fe..6d52184dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java @@ -86,7 +86,13 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel, toDist.getFields()); case RANGE_DISTRIBUTED: - // return new OrderedPartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel); + // NOTE: earlier, for Range Distribution we were creating an OrderedPartitionExchange; however that operator is not actually + // used in any of the query plans because Drill's Sort does not do range based sorting (it does a HashToRandomExchange followed + // by a Sort). Here, we are generating a RangePartitionExchange instead of OrderedPartitionExchange. The run-time implementation + // of RPE is a much simpler operator..it just does 'bucketing' based on ranges. Also, it allows a parameter to specify the + // partitioning function whereas the OPE does a much more complex inferencing to determine which partition goes where. In future, + // if we do want to leverage OPE then we could create a new type of distribution trait or make the DistributionType a + // class instead of a simple enum and then we can distinguish whether an OPE or RPE is needed. return new RangePartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel, toDist.getFields(), toDist.getPartitionFunction()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java index 616eb5642..7e8f77e57 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java @@ -82,8 +82,11 @@ public class RowKeyJoinPrel extends JoinPrel implements Prel { } double rowCount = mq.getRowCount(this.getRight()); DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory(); - return costFactory.makeCost(rowCount, 0, 0, 0, - 0 /* mem cost is 0 because this operator does not make any extra copy of either the left or right batches */); + + // RowKeyJoin operator by itself incurs negligible CPU and I/O cost since it is not doing a real join. + // The actual cost is attributed to the skip-scan (random I/O). The RK join will hold 1 batch in memory but + // it is not making any extra copy of either the left or right batches, so the memory cost is 0 + return costFactory.makeCost(rowCount, 0, 0, 0, 0); } @Override |