aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorAman Sinha <asinha@maprtech.com>2018-10-01 12:06:39 -0700
committerAman Sinha <asinha@maprtech.com>2018-10-25 16:08:51 -0700
commit5fa9c808daef8ea70a39ac2248daa99055955b72 (patch)
tree583df7c47d55bc9ecc30d1351f0aa8ccccf162ee /exec
parent3c2f9ab6c0ed290bed3bc440d72a670bbe512b5a (diff)
DRILL-6381: Address code review comments.
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java7
8 files changed, 56 insertions, 37 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 21d60923f..4df9c38c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -50,16 +50,6 @@ public class HashJoinPOP extends AbstractJoinPop {
@JsonProperty("subScanForRowKeyJoin")
private SubScan subScanForRowKeyJoin;
- /*
- public HashJoinPOP(
- @JsonProperty("left") PhysicalOperator left,
- @JsonProperty("right") PhysicalOperator right,
- @JsonProperty("conditions") List<JoinCondition> conditions,
- @JsonProperty("joinType") JoinRelType joinType
- ) {
- this(left, right, conditions, joinType, false, JoinControl.DEFAULT);
- }
-*/
@JsonCreator
public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right,
@JsonProperty("conditions") List<JoinCondition> conditions,
@@ -80,8 +70,6 @@ public class HashJoinPOP extends AbstractJoinPop {
List<JoinCondition> conditions,
JoinRelType joinType) {
this(left, right, conditions, joinType, null, false, JoinControl.DEFAULT);
- // super(left, right, joinType, null, conditions);
- // Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop");
}
@VisibleForTesting
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
index f908ead4c..dd042da34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
@@ -71,4 +71,10 @@ public abstract class AbstractIndexDescriptor extends DrillIndexDefinition imple
int numProjectedFields, GroupScan primaryGroupScan) {
throw new UnsupportedOperationException("getCost() not supported for this index.");
}
+
+ @Override
+ public boolean isAsyncIndex() {
+ return true;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
index d43ba8121..3b63230f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
@@ -72,12 +72,19 @@ public interface IndexDescriptor extends IndexDefinition {
* @param primaryGroupScan Primary table's GroupScan instance
* @return a RelOptCost instance representing the total cost
*/
- public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+ RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
int numProjectedFields, GroupScan primaryGroupScan);
/**
* Get the costing factors associated with the storage/format plugin
*/
- public PluginCost getPluginCostModel();
+ PluginCost getPluginCostModel();
+
+ /**
+ * Whether this index is maintained synchronously (i.e primary table updates are propagated to the index
+ * synchronously) or asynchronously with some delay. The latter is more common for distributed NoSQL databases.
+ * @return True if the index is maintained asynchronously, False otherwise
+ */
+ boolean isAsyncIndex();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
index ea34ea585..3d3aeeb17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
@@ -33,11 +33,7 @@ public class IndexGroup {
}
public boolean isIntersectIndex() {
- if (indexProps.size() > 1) {
- return true;
- } else {
- return false;
- }
+ return indexProps.size() > 1;
}
public int numIndexes() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
index f2091f6a0..b380c2897 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
@@ -237,7 +237,8 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
return finalRel;
}
- private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant) {
+ private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant,
+ boolean isAnyIndexAsync) {
DbGroupScan origDbGroupScan = (DbGroupScan)IndexPlanUtils.getGroupScan(origScan);
List<SchemaPath> cols = new ArrayList<SchemaPath>(origDbGroupScan.getColumns());
@@ -266,9 +267,16 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
dbScan.getCluster().getTypeFactory().builder();
- FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
- dbScan, indexContext.getOrigCondition());
- lastRelNode = leftIndexFilterPrel;
+ FilterPrel leftIndexFilterPrel = null;
+
+ // See NonCoveringIndexPlanGenerator for why we are re-applying index filter condition in case of async indexes.
+ // For intersect planning, any one of the intersected indexes may be async but to keep it simple we re-apply the
+ // full original condition.
+ if (isAnyIndexAsync) {
+ new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
+ dbScan, indexContext.getOrigCondition());
+ lastRelNode = leftIndexFilterPrel;
+ }
// new Project's rowtype is original Project's rowtype [plus rowkey if rowkey is not in original rowtype]
ProjectPrel leftIndexProjectPrel = null;
@@ -301,8 +309,12 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
@Override
public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException {
Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap();
+ boolean isAnyIndexAsync = false;
for(IndexDescriptor idx : indexInfoMap.keySet()) {
idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition);
+ if (!isAnyIndexAsync && idx.isAsyncIndex()) {
+ isAnyIndexAsync = true;
+ }
}
RelNode indexPlan = null;
@@ -323,7 +335,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
//now with index plan constructed, build plan of left(probe) side to use restricted db scan
- Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant);
+ Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync);
RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT);
if ( upperProject != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
index e1337bc7e..c1bcf6861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
@@ -205,16 +205,17 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
dbScan.getCluster().getTypeFactory().builder();
- //we are applying the same index condition to primary table's restricted scan, the reason
- // for this is, the scans on index table and primary table are not a transaction, meaning that _after_ index scan,
- // primary table might already have data get updated, thus some rows picked by index were modified and no more satisfy the
- // index condition. By applying the same index condition again here, we will avoid the possibility to have some
- //not-wanted records get into downstream operators in such scenarios.
- //the remainder condition will be applied on top of RowKeyJoin.
- FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
- dbScan, indexContext.getOrigCondition());
-
- lastLeft = leftIndexFilterPrel;
+ // We are applying the same index condition to primary table's restricted scan. The reason is, the index may be an async
+ // index .. i.e it is not synchronously updated along with the primary table update as part of a single transaction, so it
+ // is possible that after or during index scan, the primary table rows may have been updated and no longer satisfy the index
+ // condition. By re-applying the index condition here, we will ensure non-qualifying records are filtered out.
+ // The remainder condition will be applied on top of RowKeyJoin.
+ FilterPrel leftIndexFilterPrel = null;
+ if (indexDesc.isAsyncIndex()) {
+ leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
+ dbScan, indexContext.getOrigCondition());
+ lastLeft = leftIndexFilterPrel;
+ }
RelDataType origRowType = origProject == null ? origScan.getRowType() : origProject.getRowType();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
index 0626483fe..6d52184dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
@@ -86,7 +86,13 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel,
toDist.getFields());
case RANGE_DISTRIBUTED:
- // return new OrderedPartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
+ // NOTE: earlier, for Range Distribution we were creating an OrderedPartitionExchange; however that operator is not actually
+ // used in any of the query plans because Drill's Sort does not do range based sorting (it does a HashToRandomExchange followed
+ // by a Sort). Here, we are generating a RangePartitionExchange instead of OrderedPartitionExchange. The run-time implementation
+ // of RPE is a much simpler operator..it just does 'bucketing' based on ranges. Also, it allows a parameter to specify the
+ // partitioning function whereas the OPE does a much more complex inferencing to determine which partition goes where. In future,
+ // if we do want to leverage OPE then we could create a new type of distribution trait or make the DistributionType a
+ // class instead of a simple enum and then we can distinguish whether an OPE or RPE is needed.
return new RangePartitionExchangePrel(rel.getCluster(),
planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel,
toDist.getFields(), toDist.getPartitionFunction());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
index 616eb5642..7e8f77e57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
@@ -82,8 +82,11 @@ public class RowKeyJoinPrel extends JoinPrel implements Prel {
}
double rowCount = mq.getRowCount(this.getRight());
DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
- return costFactory.makeCost(rowCount, 0, 0, 0,
- 0 /* mem cost is 0 because this operator does not make any extra copy of either the left or right batches */);
+
+ // RowKeyJoin operator by itself incurs negligible CPU and I/O cost since it is not doing a real join.
+ // The actual cost is attributed to the skip-scan (random I/O). The RK join will hold 1 batch in memory but
+ // it is not making any extra copy of either the left or right batches, so the memory cost is 0
+ return costFactory.makeCost(rowCount, 0, 0, 0, 0);
}
@Override