diff options
author | Aman Sinha <asinha@maprtech.com> | 2018-10-01 12:06:39 -0700 |
---|---|---|
committer | Aman Sinha <asinha@maprtech.com> | 2018-10-25 16:08:51 -0700 |
commit | 5fa9c808daef8ea70a39ac2248daa99055955b72 (patch) | |
tree | 583df7c47d55bc9ecc30d1351f0aa8ccccf162ee /exec | |
parent | 3c2f9ab6c0ed290bed3bc440d72a670bbe512b5a (diff) |
DRILL-6381: Address code review comments.
Diffstat (limited to 'exec')
8 files changed, 56 insertions, 37 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java index 21d60923f..4df9c38c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java @@ -50,16 +50,6 @@ public class HashJoinPOP extends AbstractJoinPop { @JsonProperty("subScanForRowKeyJoin") private SubScan subScanForRowKeyJoin; - /* - public HashJoinPOP( - @JsonProperty("left") PhysicalOperator left, - @JsonProperty("right") PhysicalOperator right, - @JsonProperty("conditions") List<JoinCondition> conditions, - @JsonProperty("joinType") JoinRelType joinType - ) { - this(left, right, conditions, joinType, false, JoinControl.DEFAULT); - } -*/ @JsonCreator public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, @JsonProperty("conditions") List<JoinCondition> conditions, @@ -80,8 +70,6 @@ public class HashJoinPOP extends AbstractJoinPop { List<JoinCondition> conditions, JoinRelType joinType) { this(left, right, conditions, joinType, null, false, JoinControl.DEFAULT); - // super(left, right, joinType, null, conditions); - // Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop"); } @VisibleForTesting diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java index f908ead4c..dd042da34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java @@ -71,4 +71,10 @@ public abstract class AbstractIndexDescriptor extends DrillIndexDefinition imple int numProjectedFields, GroupScan primaryGroupScan) { throw new UnsupportedOperationException("getCost() not supported for this index."); } + + @Override + public boolean isAsyncIndex() { + return true; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java index d43ba8121..3b63230f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java @@ -72,12 +72,19 @@ public interface IndexDescriptor extends IndexDefinition { * @param primaryGroupScan Primary table's GroupScan instance * @return a RelOptCost instance representing the total cost */ - public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, + RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, int numProjectedFields, GroupScan primaryGroupScan); /** * Get the costing factors associated with the storage/format plugin */ - public PluginCost getPluginCostModel(); + PluginCost getPluginCostModel(); + + /** + * Whether this index is maintained synchronously (i.e primary table updates are propagated to the index + * synchronously) or asynchronously with some delay. The latter is more common for distributed NoSQL databases. + * @return True if the index is maintained asynchronously, False otherwise + */ + boolean isAsyncIndex(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java index ea34ea585..3d3aeeb17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java @@ -33,11 +33,7 @@ public class IndexGroup { } public boolean isIntersectIndex() { - if (indexProps.size() > 1) { - return true; - } else { - return false; - } + return indexProps.size() > 1; } public int numIndexes() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java index f2091f6a0..b380c2897 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java @@ -237,7 +237,8 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { return finalRel; } - private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant) { + private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant, + boolean isAnyIndexAsync) { DbGroupScan origDbGroupScan = (DbGroupScan)IndexPlanUtils.getGroupScan(origScan); List<SchemaPath> cols = new ArrayList<SchemaPath>(origDbGroupScan.getColumns()); @@ -266,9 +267,16 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder = dbScan.getCluster().getTypeFactory().builder(); - FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), - dbScan, indexContext.getOrigCondition()); - lastRelNode = leftIndexFilterPrel; + FilterPrel leftIndexFilterPrel = null; + + // See NonCoveringIndexPlanGenerator for why we are re-applying index filter condition in case of async indexes. + // For intersect planning, any one of the intersected indexes may be async but to keep it simple we re-apply the + // full original condition. + if (isAnyIndexAsync) { + new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), + dbScan, indexContext.getOrigCondition()); + lastRelNode = leftIndexFilterPrel; + } // new Project's rowtype is original Project's rowtype [plus rowkey if rowkey is not in original rowtype] ProjectPrel leftIndexProjectPrel = null; @@ -301,8 +309,12 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { @Override public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException { Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap(); + boolean isAnyIndexAsync = false; for(IndexDescriptor idx : indexInfoMap.keySet()) { idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition); + if (!isAnyIndexAsync && idx.isAsyncIndex()) { + isAnyIndexAsync = true; + } } RelNode indexPlan = null; @@ -323,7 +335,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { //now with index plan constructed, build plan of left(probe) side to use restricted db scan - Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant); + Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync); RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT); if ( upperProject != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java index e1337bc7e..c1bcf6861 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java @@ -205,16 +205,17 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator { final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder = dbScan.getCluster().getTypeFactory().builder(); - //we are applying the same index condition to primary table's restricted scan, the reason - // for this is, the scans on index table and primary table are not a transaction, meaning that _after_ index scan, - // primary table might already have data get updated, thus some rows picked by index were modified and no more satisfy the - // index condition. By applying the same index condition again here, we will avoid the possibility to have some - //not-wanted records get into downstream operators in such scenarios. - //the remainder condition will be applied on top of RowKeyJoin. - FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), - dbScan, indexContext.getOrigCondition()); - - lastLeft = leftIndexFilterPrel; + // We are applying the same index condition to primary table's restricted scan. The reason is, the index may be an async + // index .. i.e it is not synchronously updated along with the primary table update as part of a single transaction, so it + // is possible that after or during index scan, the primary table rows may have been updated and no longer satisfy the index + // condition. By re-applying the index condition here, we will ensure non-qualifying records are filtered out. + // The remainder condition will be applied on top of RowKeyJoin. + FilterPrel leftIndexFilterPrel = null; + if (indexDesc.isAsyncIndex()) { + leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(), + dbScan, indexContext.getOrigCondition()); + lastLeft = leftIndexFilterPrel; + } RelDataType origRowType = origProject == null ? origScan.getRowType() : origProject.getRowType(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java index 0626483fe..6d52184dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java @@ -86,7 +86,13 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel, toDist.getFields()); case RANGE_DISTRIBUTED: - // return new OrderedPartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel); + // NOTE: earlier, for Range Distribution we were creating an OrderedPartitionExchange; however that operator is not actually + // used in any of the query plans because Drill's Sort does not do range based sorting (it does a HashToRandomExchange followed + // by a Sort). Here, we are generating a RangePartitionExchange instead of OrderedPartitionExchange. The run-time implementation + // of RPE is a much simpler operator..it just does 'bucketing' based on ranges. Also, it allows a parameter to specify the + // partitioning function whereas the OPE does a much more complex inferencing to determine which partition goes where. In future, + // if we do want to leverage OPE then we could create a new type of distribution trait or make the DistributionType a + // class instead of a simple enum and then we can distinguish whether an OPE or RPE is needed. return new RangePartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel, toDist.getFields(), toDist.getPartitionFunction()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java index 616eb5642..7e8f77e57 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java @@ -82,8 +82,11 @@ public class RowKeyJoinPrel extends JoinPrel implements Prel { } double rowCount = mq.getRowCount(this.getRight()); DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory(); - return costFactory.makeCost(rowCount, 0, 0, 0, - 0 /* mem cost is 0 because this operator does not make any extra copy of either the left or right batches */); + + // RowKeyJoin operator by itself incurs negligible CPU and I/O cost since it is not doing a real join. + // The actual cost is attributed to the skip-scan (random I/O). The RK join will hold 1 batch in memory but + // it is not making any extra copy of either the left or right batches, so the memory cost is 0 + return costFactory.makeCost(rowCount, 0, 0, 0, 0); } @Override |