aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java5
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java14
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java28
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java7
12 files changed, 70 insertions, 72 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
index 01561a3ab..ec38636c5 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
@@ -138,6 +138,11 @@ public class MapRDBFunctionalIndexInfo implements FunctionalIndexInfo {
}
/**
+ * Suppose the index key has functions (rather than plain columns): CAST(a as int), CAST(b as varchar(10)),
+ * then we want to maintain a mapping of the logical expression of that function to the schema path of the
+ * base column involved in the function. In this example map has 2 entries:
+ * CAST(a as int) --> 'a'
+ * CAST(b as varchar(10)) --> 'b'
* @return the map of indexed expression --> the involved schema paths in a indexed expression
*/
public Map<LogicalExpression, Set<SchemaPath>> getPathsInFunctionExpr() {
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
index e1b8a61fc..c231e1181 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.expression.parser.ExprLexer;
import org.apache.drill.common.expression.parser.ExprParser;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
+import org.apache.drill.common.util.DrillFileUtils;
import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
@@ -67,6 +68,7 @@ import java.util.Set;
public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDiscover {
static final String DEFAULT_STRING_CAST_LEN_STR = "256";
+ static final String FIELD_DELIMITER = ":";
public MapRDBIndexDiscover(GroupScan inScan, DrillScanRelBase scanRel) {
super((AbstractDbGroupScan) inScan, scanRel);
@@ -78,14 +80,14 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
@Override
public IndexCollection getTableIndex(String tableName) {
- //return getTableIndexFromCommandLine(tableName);
return getTableIndexFromMFS(tableName);
}
/**
- *
+ * For a given table name get the list of indexes defined on the table according to the visibility of
+ * the indexes based on permissions.
* @param tableName
- * @return
+ * @return an IndexCollection representing the list of indexes for that table
*/
private IndexCollection getTableIndexFromMFS(String tableName) {
try {
@@ -120,8 +122,8 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
FileSelection deriveFSSelection(DrillFileSystem fs, IndexDescriptor idxDesc) throws IOException {
String tableName = idxDesc.getTableName();
- String[] tablePath = tableName.split("/");
- String tableParent = tableName.substring(0, tableName.lastIndexOf("/"));
+ String[] tablePath = tableName.split(DrillFileUtils.SEPARATOR);
+ String tableParent = tableName.substring(0, tableName.lastIndexOf(DrillFileUtils.SEPARATOR));
return FileSelection.create(fs, tableParent, tablePath[tablePath.length - 1], false);
}
@@ -318,7 +320,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
private DrillIndexDescriptor buildIndexDescriptor(String tableName, IndexDesc desc)
throws InvalidIndexDefinitionException {
if (desc.isExternal()) {
- //XX: not support external index
+ // External index is not currently supported
return null;
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
index bd8a32aef..596699f42 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
@@ -188,32 +188,4 @@ public class RestrictedMapRDBSubScanSpec extends MapRDBSubScanSpec {
currentIndex += numKeys;
}
- /**
- * Returns the next row key in the iteration.
- * @return the next row key in the iteration or null if no more row keys
- */
- @JsonIgnore
- public String nextRowKey() {
- if (hasRowKey()) {
- // get the entry at the current index within this batch
- Object o = rowKeyVector.getAccessor().getObject(currentIndex++);
- if (o == null) {
- throw new DrillRuntimeException("Encountered a null row key during restricted subscan !");
- }
-
- // this is specific to the way the hash join maintains its entries. once we have reached the max
- // occupied index within a batch, move to the next one and reset the current index to 0
- // TODO: we should try to abstract this out
- if (currentIndex > maxOccupiedIndex) {
- Pair<ValueVector, Integer> currentBatch = rjbatch.nextRowKeyBatch();
- if (currentBatch != null) {
- init(currentBatch);
- }
- }
-
- return o.toString();
- }
- return null;
- }
-
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
index acaa6cacb..436347fcc 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
@@ -186,7 +186,7 @@ public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunct
// get the table handle from the table cache
Table table = plugin.getJsonTableCache().getTable(tableName, userName);
- // Set the condition to null such that all scan ranges are retrieved for the primary table.
+ // Get all scan ranges for the primary table.
// The reason is the row keys could typically belong to any one of the tablets of the table, so
// there is no use trying to get only limited set of scan ranges.
// NOTE: here we use the restrictedScanRangeSizeMB because the range partitioning should be parallelized
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 21d60923f..4df9c38c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -50,16 +50,6 @@ public class HashJoinPOP extends AbstractJoinPop {
@JsonProperty("subScanForRowKeyJoin")
private SubScan subScanForRowKeyJoin;
- /*
- public HashJoinPOP(
- @JsonProperty("left") PhysicalOperator left,
- @JsonProperty("right") PhysicalOperator right,
- @JsonProperty("conditions") List<JoinCondition> conditions,
- @JsonProperty("joinType") JoinRelType joinType
- ) {
- this(left, right, conditions, joinType, false, JoinControl.DEFAULT);
- }
-*/
@JsonCreator
public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right,
@JsonProperty("conditions") List<JoinCondition> conditions,
@@ -80,8 +70,6 @@ public class HashJoinPOP extends AbstractJoinPop {
List<JoinCondition> conditions,
JoinRelType joinType) {
this(left, right, conditions, joinType, null, false, JoinControl.DEFAULT);
- // super(left, right, joinType, null, conditions);
- // Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop");
}
@VisibleForTesting
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
index f908ead4c..dd042da34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
@@ -71,4 +71,10 @@ public abstract class AbstractIndexDescriptor extends DrillIndexDefinition imple
int numProjectedFields, GroupScan primaryGroupScan) {
throw new UnsupportedOperationException("getCost() not supported for this index.");
}
+
+ @Override
+ public boolean isAsyncIndex() {
+ return true;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
index d43ba8121..3b63230f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
@@ -72,12 +72,19 @@ public interface IndexDescriptor extends IndexDefinition {
* @param primaryGroupScan Primary table's GroupScan instance
* @return a RelOptCost instance representing the total cost
*/
- public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+ RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
int numProjectedFields, GroupScan primaryGroupScan);
/**
* Get the costing factors associated with the storage/format plugin
*/
- public PluginCost getPluginCostModel();
+ PluginCost getPluginCostModel();
+
+ /**
+ * Whether this index is maintained synchronously (i.e primary table updates are propagated to the index
+ * synchronously) or asynchronously with some delay. The latter is more common for distributed NoSQL databases.
+ * @return True if the index is maintained asynchronously, False otherwise
+ */
+ boolean isAsyncIndex();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
index ea34ea585..3d3aeeb17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
@@ -33,11 +33,7 @@ public class IndexGroup {
}
public boolean isIntersectIndex() {
- if (indexProps.size() > 1) {
- return true;
- } else {
- return false;
- }
+ return indexProps.size() > 1;
}
public int numIndexes() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
index f2091f6a0..b380c2897 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
@@ -237,7 +237,8 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
return finalRel;
}
- private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant) {
+ private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant,
+ boolean isAnyIndexAsync) {
DbGroupScan origDbGroupScan = (DbGroupScan)IndexPlanUtils.getGroupScan(origScan);
List<SchemaPath> cols = new ArrayList<SchemaPath>(origDbGroupScan.getColumns());
@@ -266,9 +267,16 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
dbScan.getCluster().getTypeFactory().builder();
- FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
- dbScan, indexContext.getOrigCondition());
- lastRelNode = leftIndexFilterPrel;
+ FilterPrel leftIndexFilterPrel = null;
+
+ // See NonCoveringIndexPlanGenerator for why we are re-applying index filter condition in case of async indexes.
+ // For intersect planning, any one of the intersected indexes may be async but to keep it simple we re-apply the
+ // full original condition.
+ if (isAnyIndexAsync) {
+ new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
+ dbScan, indexContext.getOrigCondition());
+ lastRelNode = leftIndexFilterPrel;
+ }
// new Project's rowtype is original Project's rowtype [plus rowkey if rowkey is not in original rowtype]
ProjectPrel leftIndexProjectPrel = null;
@@ -301,8 +309,12 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
@Override
public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException {
Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap();
+ boolean isAnyIndexAsync = false;
for(IndexDescriptor idx : indexInfoMap.keySet()) {
idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition);
+ if (!isAnyIndexAsync && idx.isAsyncIndex()) {
+ isAnyIndexAsync = true;
+ }
}
RelNode indexPlan = null;
@@ -323,7 +335,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
//now with index plan constructed, build plan of left(probe) side to use restricted db scan
- Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant);
+ Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync);
RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT);
if ( upperProject != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
index e1337bc7e..c1bcf6861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
@@ -205,16 +205,17 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
dbScan.getCluster().getTypeFactory().builder();
- //we are applying the same index condition to primary table's restricted scan, the reason
- // for this is, the scans on index table and primary table are not a transaction, meaning that _after_ index scan,
- // primary table might already have data get updated, thus some rows picked by index were modified and no more satisfy the
- // index condition. By applying the same index condition again here, we will avoid the possibility to have some
- //not-wanted records get into downstream operators in such scenarios.
- //the remainder condition will be applied on top of RowKeyJoin.
- FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
- dbScan, indexContext.getOrigCondition());
-
- lastLeft = leftIndexFilterPrel;
+ // We are applying the same index condition to primary table's restricted scan. The reason is, the index may be an async
+ // index .. i.e it is not synchronously updated along with the primary table update as part of a single transaction, so it
+ // is possible that after or during index scan, the primary table rows may have been updated and no longer satisfy the index
+ // condition. By re-applying the index condition here, we will ensure non-qualifying records are filtered out.
+ // The remainder condition will be applied on top of RowKeyJoin.
+ FilterPrel leftIndexFilterPrel = null;
+ if (indexDesc.isAsyncIndex()) {
+ leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
+ dbScan, indexContext.getOrigCondition());
+ lastLeft = leftIndexFilterPrel;
+ }
RelDataType origRowType = origProject == null ? origScan.getRowType() : origProject.getRowType();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
index 0626483fe..6d52184dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
@@ -86,7 +86,13 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel,
toDist.getFields());
case RANGE_DISTRIBUTED:
- // return new OrderedPartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
+ // NOTE: earlier, for Range Distribution we were creating an OrderedPartitionExchange; however that operator is not actually
+ // used in any of the query plans because Drill's Sort does not do range based sorting (it does a HashToRandomExchange followed
+ // by a Sort). Here, we are generating a RangePartitionExchange instead of OrderedPartitionExchange. The run-time implementation
+ // of RPE is a much simpler operator..it just does 'bucketing' based on ranges. Also, it allows a parameter to specify the
+ // partitioning function whereas the OPE does a much more complex inferencing to determine which partition goes where. In future,
+ // if we do want to leverage OPE then we could create a new type of distribution trait or make the DistributionType a
+ // class instead of a simple enum and then we can distinguish whether an OPE or RPE is needed.
return new RangePartitionExchangePrel(rel.getCluster(),
planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel,
toDist.getFields(), toDist.getPartitionFunction());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
index 616eb5642..7e8f77e57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
@@ -82,8 +82,11 @@ public class RowKeyJoinPrel extends JoinPrel implements Prel {
}
double rowCount = mq.getRowCount(this.getRight());
DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
- return costFactory.makeCost(rowCount, 0, 0, 0,
- 0 /* mem cost is 0 because this operator does not make any extra copy of either the left or right batches */);
+
+ // RowKeyJoin operator by itself incurs negligible CPU and I/O cost since it is not doing a real join.
+ // The actual cost is attributed to the skip-scan (random I/O). The RK join will hold 1 batch in memory but
+ // it is not making any extra copy of either the left or right batches, so the memory cost is 0
+ return costFactory.makeCost(rowCount, 0, 0, 0, 0);
}
@Override