diff options
author | rebase <builds@mapr.com> | 2018-02-12 14:10:56 -0800 |
---|---|---|
committer | Aman Sinha <asinha@maprtech.com> | 2018-10-25 16:08:51 -0700 |
commit | 0abcbe3f36bf6c0a2b5fe07a778d201ead8dd2ce (patch) | |
tree | 45fe91b40a729ed49de8cdc53dc932a50d4633b0 /exec | |
parent | 61e8b464063299dc1f67445157a46c4939b0cace (diff) |
DRILL-6381: (Part 1) Secondary Index framework
1. Secondary Index planning interfaces and abstract classes like DBGroupScan, DbSubScan, IndexDecriptor etc.
2. Statistics and Cost model interfaces/classes: PluginCost, Statistics, StatisticsPayload, AbstractIndexStatistics
3. ScanBatch and RecordReader to support repeatable scan
4. Secondary Index execution related interfaces: RangePartitionSender, RowKeyJoin, PartitionFunction
5. MD-3979: Query using cast index plan fails with NPE
Co-authored-by: Aman Sinha <asinha@maprtech.com>
Co-authored-by: chunhui-shi <cshi@maprtech.com>
Co-authored-by: Gautam Parai <gparai@maprtech.com>
Co-authored-by: Padma Penumarthy <ppenumar97@yahoo.com>
Co-authored-by: Hanumath Rao Maduri <hmaduri@maprtech.com>
Conflicts:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
protocol/src/main/protobuf/UserBitShared.proto
Diffstat (limited to 'exec')
41 files changed, 2844 insertions, 28 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 21e16eb5b..cb0fc5cf2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -807,6 +807,8 @@ public final class ExecConstants { */ public static final String ENABLE_ITERATOR_VALIDATION = "drill.exec.debug.validate_iterators"; + public static final String QUERY_ROWKEYJOIN_BATCHSIZE_KEY = "exec.query.rowkeyjoin_batchsize"; + public static final PositiveLongValidator QUERY_ROWKEYJOIN_BATCHSIZE = new PositiveLongValidator(QUERY_ROWKEYJOIN_BATCHSIZE_KEY, Long.MAX_VALUE, null); /** * When iterator validation is enabled, additionally validates the vectors in * each batch passed to each iterator. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java new file mode 100644 index 000000000..42e4bb9ff --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.base; + +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.calcite.rel.RelNode; +import org.apache.drill.exec.planner.index.IndexCollection; +import org.apache.drill.exec.planner.cost.PluginCost; +import org.apache.drill.exec.planner.physical.PartitionFunction; +import org.apache.drill.exec.store.AbstractStoragePlugin; + +public abstract class AbstractDbGroupScan extends AbstractGroupScan implements DbGroupScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractDbGroupScan.class); + + private static final String ROW_KEY = "_id"; + private static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY); + + public AbstractDbGroupScan(String userName) { + super(userName); + } + + public AbstractDbGroupScan(AbstractDbGroupScan that) { + super(that); + } + + public abstract AbstractStoragePlugin getStoragePlugin(); + + public abstract StoragePluginConfig getStorageConfig(); + + public abstract List<SchemaPath> getColumns(); + + @Override + public boolean supportsSecondaryIndex() { + return false; + } + + @Override + public IndexCollection getSecondaryIndexCollection(RelNode scanrel) { + return null; + } + + @Override + public boolean supportsRestrictedScan() { + return false; + } + + @Override + public boolean isRestrictedScan() { + return false; + } + + @Override + public DbGroupScan getRestrictedScan(List<SchemaPath> columns) { + return null; + } + + @Override + public String getRowKeyName() { + return ROW_KEY; + } + + @Override + public SchemaPath getRowKeyPath() { + return ROW_KEY_PATH; + } + + @Override + public PartitionFunction getRangePartitionFunction(List<FieldReference> refList) { + throw new UnsupportedOperationException(); + } + + @Override + public PluginCost getPluginCostModel() { + return null; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java new file mode 100644 index 000000000..caa583162 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.base; + +import org.apache.drill.exec.physical.impl.join.RowKeyJoin; + +public abstract class AbstractDbSubScan extends AbstractSubScan implements DbSubScan { + + public AbstractDbSubScan(String userName) { + super(userName); + } + + public boolean isRestrictedSubScan() { + return false; + } + + @Override + public void addJoinForRestrictedSubScan(RowKeyJoin batch) { + throw new UnsupportedOperationException(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 340c303f7..ca82ca621 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.config.HashToRandomExchange; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.physical.config.Limit; @@ -29,6 +30,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.config.RangePartitionSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.physical.config.Sort; @@ -157,6 +159,16 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitHashPartitionSender(HashToRandomExchange op, X value) throws E { + return visitExchange(op, value); + } + + @Override + public T visitRangePartitionSender(RangePartitionSender op, X value) throws E { + return visitSender(op, value); + } + + @Override public T visitBroadcastSender(BroadcastSender op, X value) throws E { return visitSender(op, value); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java new file mode 100644 index 000000000..e16fba1ff --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.base; + +import org.apache.calcite.rel.RelNode; +import org.apache.drill.common.expression.FieldReference; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.planner.index.IndexCollection; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.drill.exec.planner.cost.PluginCost; +import org.apache.drill.exec.planner.physical.PartitionFunction; +import org.apache.drill.exec.planner.index.Statistics; + +import java.util.List; + +/** + * A DbGroupScan operator represents the scan associated with a database. The underlying + * database may support secondary indexes, so there are interface methods for indexes. + */ +public interface DbGroupScan extends GroupScan { + + + @JsonIgnore + public boolean supportsSecondaryIndex(); + + /** + * Get the index collection associated with this table if any + */ + @JsonIgnore + public IndexCollection getSecondaryIndexCollection(RelNode scan); + + /** + * Set the artificial row count after applying the {@link RexNode} condition + * @param condition + * @param count + * @param capRowCount + */ + @JsonIgnore + public void setRowCount(RexNode condition, double count, double capRowCount); + + /** + * Get the row count after applying the {@link RexNode} condition + * @param condition, filter to apply + * @param scanRel, the current scan rel + * @return row count post filtering + */ + @JsonIgnore + public double getRowCount(RexNode condition, RelNode scanRel); + + /** + * Get the statistics for this {@link DbGroupScan} + * @return the {@link Statistics} for this Scan + */ + @JsonIgnore + public Statistics getStatistics(); + + public List<SchemaPath> getColumns(); + + public void setCostFactor(double sel); + + @JsonIgnore + boolean isIndexScan(); + + /** + * Whether this DbGroupScan supports creating a restricted (skip) scan + * @return true if restricted scan is supported, false otherwise + */ + @JsonIgnore + boolean supportsRestrictedScan(); + + /** + * Whether this DbGroupScan is itself a restricted scan + * @return true if this DbGroupScan is itself a restricted scan, false otherwise + */ + @JsonIgnore + boolean isRestrictedScan(); + + /** + * If this DbGroupScan supports restricted scan, create a restricted scan from this DbGroupScan. + * @param columns + * @return a non-null DbGroupScan if restricted scan is supported, null otherwise + */ + @JsonIgnore + DbGroupScan getRestrictedScan(List<SchemaPath> columns); + + @JsonIgnore + String getRowKeyName(); + + @JsonIgnore + String getIndexHint(); + + @JsonIgnore + SchemaPath getRowKeyPath(); + + /** + * Get a partition function instance for range based partitioning + * @param refList a list of FieldReference exprs that are participating in the range partitioning + * @return instance of a partitioning function + */ + @JsonIgnore + PartitionFunction getRangePartitionFunction(List<FieldReference> refList); + + /** + * Get the format plugin cost model. The cost model will provide cost factors such as seq. scan cost, + * random scan cost, block size. + * @return a PluginCost cost model + */ + @JsonIgnore + PluginCost getPluginCostModel(); + + @JsonIgnore + boolean isFilterPushedDown(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java new file mode 100644 index 000000000..874468d4e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.base; + +import org.apache.drill.exec.physical.impl.join.RowKeyJoin; + +import com.fasterxml.jackson.annotation.JsonIgnore; + + +public interface DbSubScan extends SubScan { + + /** + * Whether this subscan is a restricted (skip) subscan + * @return true if this subscan is a restricted subscan, false otherwise + */ + @JsonIgnore + boolean isRestrictedSubScan(); + + /** + * For a restricted sub-scan, this method allows associating a (hash)join instance. A subscan within a minor + * fragment must have a corresponding (hash)join batch instance from which it will retrieve its set of + * rowkeys to perform the restricted scan. + * @param batch + */ + @JsonIgnore + void addJoinForRestrictedSubScan(RowKeyJoin batch); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java new file mode 100644 index 000000000..1047e829d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.base; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.planner.index.Statistics; + + +import java.util.List; + +/** + * An IndexGroupScan operator represents the scan associated with an Index. + */ +public interface IndexGroupScan extends GroupScan { + + /** + * Get the column ordinal of the rowkey column from the output schema of the IndexGroupScan + * @return + */ + @JsonIgnore + public int getRowKeyOrdinal(); + + /** + * Set the artificial row count after applying the {@link RexNode} condition + * Mainly used for debugging + * @param condition + * @param count + * @param capRowCount + */ + @JsonIgnore + public void setRowCount(RexNode condition, double count, double capRowCount); + + /** + * Get the row count after applying the {@link RexNode} condition + * @param condition, filter to apply + * @return row count post filtering + */ + @JsonIgnore + public double getRowCount(RexNode condition, RelNode scanRel); + + /** + * Set the statistics for {@link IndexGroupScan} + * @param statistics + */ + @JsonIgnore + public void setStatistics(Statistics statistics); + + @JsonIgnore + public void setColumns(List<SchemaPath> columns); + + @JsonIgnore + public List<SchemaPath> getColumns(); + + @JsonIgnore + public void setParallelizationWidth(int width); + +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index f2e53eb03..1bb1545c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.config.HashToRandomExchange; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.physical.config.Limit; @@ -29,6 +30,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.config.RangePartitionSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.physical.config.Sort; @@ -73,6 +75,8 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA value) throws EXCEP; public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) throws EXCEP; public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) throws EXCEP; + public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) throws EXCEP; + public RETURN visitRangePartitionSender(RangePartitionSender op, EXTRA value) throws EXCEP; public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws EXCEP; public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java index 88c3be0e3..0c0852a07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java @@ -17,57 +17,57 @@ */ package org.apache.drill.exec.physical.config; -import java.util.Collections; import java.util.List; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractSender; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.planner.physical.PartitionFunction; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -@JsonTypeName("range-sender") -public class RangeSender extends AbstractSender{ +@JsonTypeName("range-partition-sender") +public class RangePartitionSender extends AbstractSender{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangeSender.class); + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangePartitionSender.class); - List<EndpointPartition> partitions; + // The number of records in the outgoing batch. This is overriding the default value in Partitioner + public static final int RANGE_PARTITION_OUTGOING_BATCH_SIZE = (1 << 12) - 1; + + @JsonProperty("partitionFunction") + private PartitionFunction partitionFunction; @JsonCreator - public RangeSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("partitions") List<EndpointPartition> partitions) { - super(oppositeMajorFragmentId, child, Collections.<MinorFragmentEndpoint>emptyList()); - this.partitions = partitions; + public RangePartitionSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("destinations") List<MinorFragmentEndpoint> endpoints, + @JsonProperty("partitionFunction") PartitionFunction partitionFunction) { + super(oppositeMajorFragmentId, child, endpoints); + this.partitionFunction = partitionFunction; } @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new RangeSender(oppositeMajorFragmentId, child, partitions); + return new RangePartitionSender(oppositeMajorFragmentId, child, destinations, partitionFunction); } - public static class EndpointPartition{ - private final PartitionRange range; - private final DrillbitEndpoint endpoint; + @JsonProperty("partitionFunction") + public PartitionFunction getPartitionFunction() { + return partitionFunction; + } - @JsonCreator - public EndpointPartition(@JsonProperty("range") PartitionRange range, @JsonProperty("endpoint") DrillbitEndpoint endpoint) { - super(); - this.range = range; - this.endpoint = endpoint; - } - public PartitionRange getRange() { - return range; - } - public DrillbitEndpoint getEndpoint() { - return endpoint; - } + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitRangePartitionSender(this, value); } @Override public int getOperatorType() { - return CoreOperatorType.RANGE_SENDER_VALUE; + return CoreOperatorType.RANGE_PARTITION_SENDER_VALUE; } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index dc8dd0fd8..5ccf1c093 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -83,10 +83,14 @@ public class ScanBatch implements CloseableRecordBatch { private final List<Map<String, String>> implicitColumnList; private String currentReaderClassName; private final RecordBatchStatsContext batchStatsContext; + // Represents last outcome of next(). If an Exception is thrown // during the method's execution a value IterOutcome.STOP will be assigned. private IterOutcome lastOutcome; + private List<RecordReader> readerList = null; // needed for repeatable scanners + private boolean isRepeatableScan = false; // needed for repeatable scanners + /** * * @param context @@ -137,6 +141,15 @@ public class ScanBatch implements CloseableRecordBatch { readers, Collections.<Map<String, String>> emptyList()); } + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, + List<RecordReader> readerList, boolean isRepeatableScan) + throws ExecutionSetupException { + this(context, context.newOperatorContext(subScanConfig), + readerList, Collections.<Map<String, String>> emptyList()); + this.readerList = readerList; + this.isRepeatableScan = isRepeatableScan; + } + @Override public FragmentContext getContext() { return context; @@ -255,7 +268,7 @@ public class ScanBatch implements CloseableRecordBatch { return false; } currentReader = readers.next(); - if (readers.hasNext()) { + if (!isRepeatableScan && readers.hasNext()) { readers.remove(); } implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java new file mode 100644 index 000000000..7b4dfcaaa --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.exec.record.AbstractRecordBatch.BatchState; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Interface for a row key join + */ +public interface RowKeyJoin { + + /** + * Enum for RowKeyJoin internal state. + * Possible states are {INITIAL, PROCESSING, DONE} + * + * Initially RowKeyJoin will be at INITIAL state. Then the state will be transitioned + * by the RestrictedJsonRecordReader to PROCESSING as soon as it processes the rows + * related to RowKeys. Then RowKeyJoin algorithm sets to INITIAL state when leftStream has no data. + * Basically RowKeyJoin calls leftStream multiple times depending upon the rightStream, hence + * this transition from PROCESSING to INITIAL. If there is no data from rightStream or OutOfMemory + * condition then the state is transitioned to DONE. + */ + public enum RowKeyJoinState { + INITIAL, PROCESSING, DONE; + } + + /** + * Is the next batch of row keys ready to be returned + * @return True if ready, false if not + */ + public boolean hasRowKeyBatch(); + + /** + * Get the next batch of row keys + * @return a Pair whose left element is the ValueVector containing the row keys, right + * element is the number of row keys in this batch + */ + public Pair<ValueVector, Integer> nextRowKeyBatch(); + + + /** + * Get the current BatchState (this is useful when performing row key join) + */ + public BatchState getBatchState(); + + /** + * Set the BatchState (this is useful when performing row key join) + * @param newState + */ + public void setBatchState(BatchState newState); + + /** + * Set the RowKeyJoinState (this is useful for maintaining state for row key join algorithm) + * @param newState + */ + public void setRowKeyJoinState(RowKeyJoinState newState); + + /** + * Get the current RowKeyJoinState. + */ + public RowKeyJoinState getRowKeyJoinState(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java index 96c311216..b39328ea0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java @@ -396,4 +396,21 @@ public abstract class DrillRelOptUtil { } } } + + public static boolean isProjectFlatten(RelNode project) { + + assert project instanceof Project : "Rel is NOT an instance of project!"; + + for (RexNode rex : project.getChildExps()) { + RexNode newExpr = rex; + if (rex instanceof RexCall) { + RexCall function = (RexCall) rex; + String functionName = function.getOperator().getName(); + if (functionName.equalsIgnoreCase("flatten") ) { + return true; + } + } + } + return false; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java new file mode 100644 index 000000000..d765162fb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java @@ -0,0 +1,79 @@ +package org.apache.drill.exec.planner.cost; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.drill.exec.physical.base.GroupScan; + +/** + * PluginCost describes the cost factors to be used when costing for the specific storage/format plugin + */ +public interface PluginCost { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PluginCost.class); + + /** + * An interface to check if a parameter provided by user is valid or not. + * @param <T> Type of the parameter. + */ + interface CheckValid<T> { + boolean isValid(T paramValue); + } + + /** + * Class which checks whether the provided parameter value is greater than + * or equals to a minimum limit. + */ + class greaterThanEquals implements CheckValid<Integer> { + private final Integer atleastEqualsTo; + public greaterThanEquals(Integer atleast) { + atleastEqualsTo = atleast; + } + + @Override + public boolean isValid(Integer paramValue) { + if (paramValue >= atleastEqualsTo && + paramValue <= Integer.MAX_VALUE) { + return true; + } else { + logger.warn("Setting default value as the supplied parameter value is less than {}", paramValue); + return false; + } + } + } + + /** + * @return the average column size in bytes + */ + int getAverageColumnSize(GroupScan scan); + + /** + * @return the block size in bytes + */ + int getBlockSize(GroupScan scan); + + /** + * @return the sequential block read cost + */ + int getSequentialBlockReadCost(GroupScan scan); + + /** + * @return the random block read cost + */ + int getRandomBlockReadCost(GroupScan scan); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java new file mode 100644 index 000000000..9894b3263 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import java.util.Iterator; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; + +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.SchemaPath; + +/** + * Abstract base class for Index collection (collection of Index descriptors) + * + */ +public abstract class AbstractIndexCollection implements IndexCollection, Iterable<IndexDescriptor> { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractIndexCollection.class); + /** + * A set of indexes for a particular table + */ + @JsonProperty + protected List<IndexDescriptor> indexes; + + public AbstractIndexCollection() { + indexes = Lists.newArrayList(); + } + + @Override + public boolean addIndex(IndexDescriptor index) { + return indexes.add(index); + } + + @Override + public boolean removeIndex(IndexDescriptor index) { + return indexes.remove(index); + } + + @Override + public void clearAll() { + indexes.clear(); + } + + @Override + public boolean supportsIndexSelection() { + return false; + } + + @Override + public double getRows(RexNode indexCondition) { + throw new UnsupportedOperationException("getRows() not supported for this index collection."); + } + + @Override + public boolean supportsRowCountStats() { + return false; + } + + @Override + public boolean supportsFullTextSearch() { + return false; + } + + @Override + public boolean isColumnIndexed(SchemaPath path) { + for (IndexDescriptor index : indexes) { + if (index.getIndexColumnOrdinal(path) >= 0) { + return true; + } + } + return false; + } + + @Override + public Iterator<IndexDescriptor> iterator() { + return indexes.iterator(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java new file mode 100644 index 000000000..f908ead4c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.IndexGroupScan; + +/** + * Abstract base class for an Index descriptor + * + */ +public abstract class AbstractIndexDescriptor extends DrillIndexDefinition implements IndexDescriptor { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractIndexDescriptor .class); + + public AbstractIndexDescriptor(List<LogicalExpression> indexCols, + CollationContext indexCollationContext, + List<LogicalExpression> nonIndexCols, + List<LogicalExpression> rowKeyColumns, + String indexName, + String tableName, + IndexType type, + NullDirection nullsDirection) { + super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection); + } + + @Override + public double getRows(RelNode scan, RexNode indexCondition) { + throw new UnsupportedOperationException("getRows() not supported for this index."); + } + + @Override + public boolean supportsRowCountStats() { + return false; + } + + @Override + public IndexGroupScan getIndexGroupScan() { + throw new UnsupportedOperationException("Group scan not supported for this index."); + } + + @Override + public boolean supportsFullTextSearch() { + return false; + } + + @Override + public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, + int numProjectedFields, GroupScan primaryGroupScan) { + throw new UnsupportedOperationException("getCost() not supported for this index."); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java new file mode 100644 index 000000000..dfc0897ee --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public abstract class AbstractIndexStatistics implements IndexStatistics { + + protected static final Logger logger = LoggerFactory.getLogger(AbstractIndexStatistics.class); + protected final RelNode input; + protected final RexNode condition; + protected final DrillTable table; + + public AbstractIndexStatistics(RelNode input, RexNode condition, DrillTable table) { + this.input = input; + this.condition = condition; + this.table = table; + } + public abstract double getRowCount(); + + public List<RelCollation> getCollations() { + throw new UnsupportedOperationException(); + } + + public RelDistribution getDistribution() { + throw new UnsupportedOperationException(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java new file mode 100644 index 000000000..8260beea4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.drill.common.expression.LogicalExpression; + +import java.util.List; +import java.util.Map; + +public class CollationContext { + + public final Map<LogicalExpression, RelFieldCollation> collationMap; + public final List<RelFieldCollation> relFieldCollations; + + public CollationContext(Map<LogicalExpression, RelFieldCollation> collationMap, + List<RelFieldCollation> relFieldCollations) { + this.collationMap = collationMap; + this.relFieldCollations = relFieldCollations; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java new file mode 100644 index 000000000..0ea3d83e7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + + +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.physical.base.IndexGroupScan; +import org.apache.calcite.rel.RelNode; + +import java.util.Set; + +public class DrillIndexCollection extends AbstractIndexCollection { + private final RelNode scan; // physical scan rel corresponding to the primary table + + public DrillIndexCollection(RelNode scanRel, + Set<DrillIndexDescriptor> indexes) { + this.scan = scanRel; + for (IndexDescriptor index : indexes) { + super.addIndex(index); + } + } + + private IndexDescriptor getIndexDescriptor() { + + //XXX need a policy to pick the indexDesc to use instead of picking the first one. + return this.indexes.iterator().next(); + } + + @Override + public boolean supportsIndexSelection() { + return true; + } + + @Override + public boolean supportsRowCountStats() { + return true; + } + + @Override + public boolean supportsFullTextSearch() { + return true; + } + + @Override + public double getRows(RexNode indexCondition) { + + return getIndexDescriptor().getRows(scan, indexCondition); + } + + @Override + public IndexGroupScan getGroupScan() { + return getIndexDescriptor().getIndexGroupScan(); + } + + @Override + public IndexCollectionType getIndexCollectionType() { + return IndexCollection.IndexCollectionType.EXTERNAL_SECONDARY_INDEX_COLLECTION; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java new file mode 100644 index 000000000..03c2a44c6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.drill.shaded.guava.com.google.common.collect.Sets; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.drill.common.expression.CastExpression; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DrillIndexDefinition implements IndexDefinition { + /** + * The indexColumns is the list of column(s) on which this index is created. If there is more than 1 column, + * the order of the columns is important: index on {a, b} is not the same as index on {b, a} + * NOTE: the indexed column could be of type columnfamily.column + */ + @JsonProperty + protected final List<LogicalExpression> indexColumns; + + /** + * nonIndexColumns: the list of columns that are included in the index as 'covering' + * columns but are not themselves indexed. These are useful for covering indexes where the + * query request can be satisfied directly by the index and avoid accessing the table altogether. + */ + @JsonProperty + protected final List<LogicalExpression> nonIndexColumns; + + @JsonIgnore + protected final Set<LogicalExpression> allIndexColumns; + + @JsonProperty + protected final List<LogicalExpression> rowKeyColumns; + + @JsonProperty + protected final CollationContext indexCollationContext; + + /** + * indexName: name of the index that should be unique within the scope of a table + */ + @JsonProperty + protected final String indexName; + + protected final String tableName; + + @JsonProperty + protected final IndexDescriptor.IndexType indexType; + + @JsonProperty + protected final NullDirection nullsDirection; + + public DrillIndexDefinition(List<LogicalExpression> indexCols, + CollationContext indexCollationContext, + List<LogicalExpression> nonIndexCols, + List<LogicalExpression> rowKeyColumns, + String indexName, + String tableName, + IndexType type, + NullDirection nullsDirection) { + this.indexColumns = indexCols; + this.nonIndexColumns = nonIndexCols; + this.rowKeyColumns = rowKeyColumns; + this.indexName = indexName; + this.tableName = tableName; + this.indexType = type; + this.allIndexColumns = Sets.newHashSet(indexColumns); + this.allIndexColumns.addAll(nonIndexColumns); + this.indexCollationContext = indexCollationContext; + this.nullsDirection = nullsDirection; + + } + + @Override + public int getIndexColumnOrdinal(LogicalExpression path) { + int id = indexColumns.indexOf(path); + return id; + } + + @Override + public boolean isCoveringIndex(List<LogicalExpression> columns) { + return allIndexColumns.containsAll(columns); + } + + @Override + public boolean allColumnsIndexed(Collection<LogicalExpression> columns) { + return columnsInIndexFields(columns, indexColumns); + } + + @Override + public boolean someColumnsIndexed(Collection<LogicalExpression> columns) { + return someColumnsInIndexFields(columns, indexColumns); + } + + public boolean pathExactIn(SchemaPath path, Collection<LogicalExpression> exprs) { + for (LogicalExpression expr : exprs) { + if (expr instanceof SchemaPath) { + if (((SchemaPath) expr).toExpr().equals(path.toExpr())) { + return true; + } + } + } + + return false; + } + + boolean castIsCompatible(CastExpression castExpr, Collection<LogicalExpression> indexFields) { + for(LogicalExpression indexExpr : indexFields) { + if(indexExpr.getClass() != castExpr.getClass()) { + continue; + } + CastExpression indexCastExpr = (CastExpression)indexExpr; + //we compare input using equals because we know we are comparing SchemaPath, + //if we extend to support other expression, make sure the equals of that expression + //is implemented properly, otherwise it will fall to identity comparison + if ( !castExpr.getInput().equals(indexCastExpr.getInput()) ) { + continue; + } + + if( castExpr.getMajorType().getMinorType() != indexCastExpr.getMajorType().getMinorType()) { + continue; + } + return true; + } + return false; + } + + protected boolean columnsInIndexFields(Collection<LogicalExpression> columns, Collection<LogicalExpression> indexFields) { + //we need to do extra check, so we could allow the case when query condition expression is not identical with indexed fields + //and they still could use the index either by implicit cast or the difference is allowed, e.g. width of varchar + for (LogicalExpression col : columns) { + if (col instanceof CastExpression) { + if (!castIsCompatible((CastExpression) col, indexFields)) { + return false; + } + } + else { + if (!pathExactIn((SchemaPath)col, indexFields)) { + return false; + } + } + } + return true;//indexFields.containsAll(columns); + } + + protected boolean someColumnsInIndexFields(Collection<LogicalExpression> columns, + Collection<LogicalExpression> indexFields) { + + //we need to do extra check, so we could allow the case when query condition expression is not identical with indexed fields + //and they still could use the index either by implicit cast or the difference is allowed, e.g. width of varchar + for (LogicalExpression col : columns) { + if (col instanceof CastExpression) { + if (castIsCompatible((CastExpression) col, indexFields)) { + return true; + } + } + else { + if (pathExactIn((SchemaPath)col, indexFields)) { + return true; + } + } + } + return false; + } + + @Override + public String toString() { + String columnsDesc = " Index columns: " + indexColumns.toString() + " Non-Index columns: " + nonIndexColumns.toString(); + String desc = "Table: " + tableName + " Index: " + indexName + columnsDesc; + return desc; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null) { + return false; + } + DrillIndexDefinition index1 = (DrillIndexDefinition) o; + return tableName.equals(index1.tableName) + && indexName.equals(index1.indexName) + && indexType.equals(index1.indexType) + && indexColumns.equals(index1.indexColumns); + } + + @Override + public int hashCode() { + final int prime = 31; + final String fullName = tableName + indexName; + int result = 1; + result = prime * result + fullName.hashCode(); + result = prime * result + indexType.hashCode(); + + return result; + } + + @Override + @JsonProperty + public String getIndexName() { + return indexName; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + @JsonProperty + public IndexDescriptor.IndexType getIndexType() { + return indexType; + } + + @Override + @JsonProperty + public List<LogicalExpression> getRowKeyColumns() { + return this.rowKeyColumns; + } + + @Override + @JsonProperty + public List<LogicalExpression> getIndexColumns() { + return this.indexColumns; + } + + @Override + @JsonProperty + public List<LogicalExpression> getNonIndexColumns() { + return this.nonIndexColumns; + } + + @Override + @JsonIgnore + public RelCollation getCollation() { + if (indexCollationContext != null) { + return RelCollations.of(indexCollationContext.relFieldCollations); + } + return null; + } + + @Override + @JsonIgnore + public Map<LogicalExpression, RelFieldCollation> getCollationMap() { + return indexCollationContext.collationMap; + } + + @Override + @JsonIgnore + public NullDirection getNullsOrderingDirection() { + return nullsDirection; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java new file mode 100644 index 000000000..4da62c204 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.IndexGroupScan; +import org.apache.drill.exec.planner.cost.PluginCost; +import org.apache.drill.exec.planner.logical.DrillTable; + +import java.io.IOException; +import java.util.List; + +public class DrillIndexDescriptor extends AbstractIndexDescriptor { + + /** + * The name of Drill's Storage Plugin on which the Index was stored + */ + private String storage; + + private DrillTable table; + + public DrillIndexDescriptor(List<LogicalExpression> indexCols, + CollationContext indexCollationContext, + List<LogicalExpression> nonIndexCols, + List<LogicalExpression> rowKeyColumns, + String indexName, + String tableName, + IndexType type, + NullDirection nullsDirection) { + super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection); + } + + public DrillIndexDescriptor(DrillIndexDefinition def) { + this(def.indexColumns, def.indexCollationContext, def.nonIndexColumns, def.rowKeyColumns, def.indexName, + def.getTableName(), def.getIndexType(), def.nullsDirection); + } + + @Override + public double getRows(RelNode scan, RexNode indexCondition) { + //TODO: real implementation is to use Drill's stats implementation. for now return fake value 1.0 + return 1.0; + } + + @Override + public IndexGroupScan getIndexGroupScan() { + try { + final DrillTable idxTable = getDrillTable(); + GroupScan scan = idxTable.getGroupScan(); + + if (!(scan instanceof IndexGroupScan)){ + logger.error("The Groupscan from table {} is not an IndexGroupScan", idxTable.toString()); + return null; + } + return (IndexGroupScan)scan; + } + catch(IOException e) { + logger.error("Error in getIndexGroupScan ", e); + } + return null; + } + + public void attach(String storageName, DrillTable inTable) { + storage = storageName; + setDrillTable(inTable); + } + + public void setStorageName(String storageName) { + storage = storageName; + } + + public String getStorageName() { + return storage; + } + + public void setDrillTable(DrillTable table) { + this.table = table; + } + + public DrillTable getDrillTable() { + return this.table; + } + + public FunctionalIndexInfo getFunctionalInfo() { + return null; + } + + @Override + public PluginCost getPluginCostModel() { + return null; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java new file mode 100644 index 000000000..a12dcc6c1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; + +import java.util.Map; +import java.util.Set; + +/** + * FunctionalIndexInfo is to collect Functional fields in IndexDescriptor, derive information needed for index plan, + * e.g. convert and rewrite filter, columns, and rowtype on index scan that involve functional index. + * In case different store might have different way to rename expression in index table, we allow storage plugin + */ +public interface FunctionalIndexInfo { + + /** + * @return if this index has functional indexed field, return true + */ + boolean hasFunctional(); + + /** + * @return the IndexDescriptor this IndexInfo built from + */ + IndexDescriptor getIndexDesc(); + + /** + * getNewPath: for an original path, return new rename '$N' path, notice there could be multiple renamed paths + * if the there are multiple functional indexes refer original path. + * @param path + * @return + */ + SchemaPath getNewPath(SchemaPath path); + + /** + * return a plain field path if the incoming index expression 'expr' is replaced to be a plain field + * @param expr suppose to be an indexed expression + * @return the renamed schemapath in index table for the indexed expression + */ + SchemaPath getNewPathFromExpr(LogicalExpression expr); + + /** + * @return the map of indexed expression --> the involved schema paths in a indexed expression + */ + Map<LogicalExpression, Set<SchemaPath>> getPathsInFunctionExpr(); + + /** + * @return the map between indexed expression and to-be-converted target expression for scan in index + * e.g. cast(a.b as int) -> '$0' + */ + Map<LogicalExpression, LogicalExpression> getExprMap(); + + /** + * @return the set of all new field names for indexed functions in index + */ + Set<SchemaPath> allNewSchemaPaths(); + + /** + * @return the set of all schemaPath exist in functional index fields + */ + Set<SchemaPath> allPathsInFunction(); + + /** + * Whether this implementation( may be different per storage) support rewrite rewriting varchar equality expression, + * e.g. cast(a.b as varchar(2)) = 'ca' to LIKE expression: cast(a.b as varchar(2) LIKE 'ca%' + */ + boolean supportEqualCharConvertToLike(); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java new file mode 100644 index 000000000..65788cb52 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.DbGroupScan; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; +import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.apache.drill.exec.planner.common.DrillProjectRelBase; +import java.util.List; +import java.util.Set; + +public interface IndexCallContext { + DrillScanRelBase getScan(); + + DbGroupScan getGroupScan(); + + List<RelCollation> getCollationList(); + + RelCollation getCollation(); + + boolean hasLowerProject(); + + boolean hasUpperProject(); + + RelOptRuleCall getCall(); + + Set<LogicalExpression> getLeftOutPathsInFunctions(); + + RelNode getFilter(); + + IndexableExprMarker getOrigMarker(); + + List<LogicalExpression> getSortExprs(); + + DrillProjectRelBase getLowerProject(); + + DrillProjectRelBase getUpperProject(); + + void setLeftOutPathsInFunctions(Set<LogicalExpression> exprs); + + List<SchemaPath> getScanColumns(); + + RexNode getFilterCondition(); + + RexNode getOrigCondition(); + + Sort getSort(); + + void createSortExprs(); + + RelNode getExchange(); + + List<DistributionField> getDistributionFields(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java new file mode 100644 index 000000000..9b4d170e0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.IndexGroupScan; + +// Interface used to describe an index collection +public interface IndexCollection extends Iterable<IndexDescriptor> { + /** + * Types of an index collections: NATIVE_SECONDARY_INDEX_COLLECTION, EXTERNAL_SECONDARY_INDEX_COLLECTION + */ + public static enum IndexCollectionType { + NATIVE_SECONDARY_INDEX_COLLECTION, + EXTERNAL_SECONDARY_INDEX_COLLECTION + }; + + /** + * Add a new index to the collection. Return True if index was successfully added; False otherwise + */ + public boolean addIndex(IndexDescriptor index); + + /** + * Remove an index (identified by table name and index name) from the collection. + * Return True if index was successfully removed; False otherwise + */ + public boolean removeIndex(IndexDescriptor index); + + /** + * Clears all entries from this index collection + */ + public void clearAll(); + + /** + * Get the type of this index based on {@link IndexCollectionType} + * @return one of the values in {@link IndexCollectionType} + */ + public IndexCollectionType getIndexCollectionType(); + + /** + * Whether or not this index collection supports index selection (selecting an + * appropriate index out of multiple candidates). Typically, external index collections + * such as Elasticsearch already have this capability while native secondary index collection + * may not have - in such cases, Drill needs to do the index selection. + */ + public boolean supportsIndexSelection(); + + /** + * Get the estimated row count for a single index condition + * @param indexCondition The index condition (e.g index_col1 < 10 AND index_col2 = 'abc') + * @return The estimated row count + */ + public double getRows(RexNode indexCondition); + + /** + * Whether or not the index supports getting row count statistics + * @return True if index supports getting row count, False otherwise + */ + public boolean supportsRowCountStats(); + + /** + * Whether or not the index supports full-text search (to allow pushing down such filters) + * @return True if index supports full-text search, False otherwise + */ + public boolean supportsFullTextSearch(); + + /** + * If this IndexCollection exposes a single GroupScan, return the GroupScan instance. For external indexes + * such as Elasticsearch, we may have a single GroupScan representing all the indexes contained + * within that collection. On the other hand, for native indexes, each separate index would + * have its own GroupScan. + * @return GroupScan for this IndexCollection if available, otherwise null + */ + public IndexGroupScan getGroupScan(); + + /** + * Check if the field name is the leading key of any of the indexes in this collection + * @param path + * @return True if an appropriate index is found, False otherwise + */ + public boolean isColumnIndexed(SchemaPath path); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java new file mode 100644 index 000000000..995d23c57 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.drill.common.expression.LogicalExpression; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +// Interface used to define an index, +public interface IndexDefinition { + /** + * Types of an index: PRIMARY_KEY_INDEX, NATIVE_SECONDARY_INDEX, EXTERNAL_SECONDARY_INDEX + */ + static enum IndexType { + PRIMARY_KEY_INDEX, + NATIVE_SECONDARY_INDEX, + EXTERNAL_SECONDARY_INDEX + }; + + /** + * Check to see if the field name is an index column and if so return the ordinal position in the index + * @param path The field path you want to compare to index column names. + * @return Return ordinal of the indexed column if valid, otherwise return -1 + */ + int getIndexColumnOrdinal(LogicalExpression path); + + /** + * Get the name of the index + */ + String getIndexName(); + + /** + * Check if this index 'covers' all the columns specified in the supplied list of columns + * @param columns + * @return True for covering index, False for non-covering + */ + boolean isCoveringIndex(List<LogicalExpression> columns); + + /** + * Check if this index have all the columns specified in the supplied list of columns indexed + * @param columns + * @return True if all fields are indexed, False for some or all fields is not indexed + */ + boolean allColumnsIndexed(Collection<LogicalExpression> columns); + + /** + * Check if this index has some columns specified in the supplied list of columns indexed + * @param columns + * @return True if some fields are indexed, False if none of the fields are indexed + */ + boolean someColumnsIndexed(Collection<LogicalExpression> columns); + + /** + * Get the list of columns (typically 1 column) that constitute the row key (primary key) + * @return + */ + List<LogicalExpression> getRowKeyColumns(); + + /** + * Get the name of the table this index is associated with + */ + String getTableName(); + + /** + * Get the type of this index based on {@link IndexType} + * @return one of the values in {@link IndexType} + */ + IndexType getIndexType(); + + + List<LogicalExpression> getIndexColumns(); + + List<LogicalExpression> getNonIndexColumns(); + + RelCollation getCollation(); + + Map<LogicalExpression, RelFieldCollation> getCollationMap(); + + /** + * Get the nulls ordering of this index + * @return True, if nulls first. False otherwise + */ + NullDirection getNullsOrderingDirection(); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java new file mode 100644 index 000000000..f355285cc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.IndexGroupScan; +import org.apache.drill.exec.planner.cost.PluginCost; + + +/** + * IndexDefinition + functions to access materialized index(index table/scan, etc) + */ + +public interface IndexDescriptor extends IndexDefinition { + + /** + * Get the estimated row count for a single index condition + * @param input The rel node corresponding to the primary table + * @param indexCondition The index condition (e.g index_col1 < 10 AND index_col2 = 'abc') + * @return The estimated row count + */ + double getRows(RelNode input, RexNode indexCondition); + + /** + * Whether or not the index supports getting row count statistics + * @return True if index supports getting row count, False otherwise + */ + boolean supportsRowCountStats(); + + /** + * Get an instance of the group scan associated with this index descriptor + * @return An instance of group scan for this index + */ + IndexGroupScan getIndexGroupScan(); + + /** + * Whether or not the index supports full-text search (to allow pushing down such filters) + * @return True if index supports full-text search, False otherwise + */ + boolean supportsFullTextSearch(); + + FunctionalIndexInfo getFunctionalInfo(); + + public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, + int numProjectedFields, GroupScan primaryGroupScan); + + public PluginCost getPluginCostModel(); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java new file mode 100644 index 000000000..309083b1b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + + +public interface IndexDiscover { + IndexCollection getTableIndex(String tableName); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java new file mode 100644 index 000000000..fde2a32d2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.drill.exec.physical.base.AbstractDbGroupScan; +import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.calcite.rel.RelNode; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * IndexDiscoverBase is the layer to read index configurations of tables on storage plugins, + * then based on the properties it collected, get the StoragePlugin from StoragePluginRegistry, + * together with indexes information, build an IndexCollection + */ +public abstract class IndexDiscoverBase implements IndexDiscover { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IndexDiscoverBase.class); + + private AbstractDbGroupScan scan; // group scan corresponding to the primary table + private RelNode scanRel; // physical scan rel corresponding to the primary table + + public IndexDiscoverBase(AbstractDbGroupScan inScan, DrillScanRelBase inScanPrel) { + scan = inScan; + scanRel = inScanPrel; + } + + public IndexDiscoverBase(AbstractDbGroupScan inScan, ScanPrel inScanPrel) { + scan = inScan; + scanRel = inScanPrel; + } + + public AbstractDbGroupScan getOriginalScan() { + return scan; + } + + public RelNode getOriginalScanRel() { + return scanRel; + } + + public IndexCollection getTableIndex(String tableName, String storageName, Collection<DrillIndexDefinition> indexDefs ) { + Set<DrillIndexDescriptor> idxSet = new HashSet<>(); + for (DrillIndexDefinition def : indexDefs) { + DrillIndexDescriptor indexDescriptor = new DrillIndexDescriptor(def); + materializeIndex(storageName, indexDescriptor); + } + return new DrillIndexCollection(getOriginalScanRel(), idxSet); + } + + public void materializeIndex(String storageName, DrillIndexDescriptor index) { + index.setStorageName(storageName); + index.setDrillTable(buildDrillTable(index)); + } + + /** + * When there is storageName in IndexDescriptor, get a DrillTable instance based on the + * StorageName and other informaiton in idxDesc that helps identifying the table. + * @param idxDesc + * @return + */ + public DrillTable getExternalDrillTable(IndexDescriptor idxDesc) { + //XX: get table object for this index, index storage plugin should provide interface to get the DrillTable object + return null; + } + + /** + * Abstract function getDrillTable will be implemented the IndexDiscover within storage plugin(e.g. HBase, MaprDB) + * since the implementations of AbstractStoragePlugin, IndexDescriptor and DrillTable in that storage plugin may have + * the implement details. + * @param idxDesc + + * @return + */ + public DrillTable buildDrillTable(IndexDescriptor idxDesc) { + if(idxDesc.getIndexType() == IndexDescriptor.IndexType.EXTERNAL_SECONDARY_INDEX) { + return getExternalDrillTable(idxDesc); + } + else { + return getNativeDrillTable(idxDesc); + } + } + + /** + * When it is native index(index provided by native storage plugin), + * the actual IndexDiscover should provide the implementation to get the DrillTable object of index, + * Otherwise, we call IndexDiscoverable interface exposed from external storage plugin's SchemaFactory + * to get the desired DrillTable. + * @param idxDesc + * @return + */ + public abstract DrillTable getNativeDrillTable(IndexDescriptor idxDesc); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java new file mode 100644 index 000000000..dbf5edc82 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.drill.exec.planner.logical.DrillTable; + + +/** + * SchemaFactory of a storage plugin that can used to store index tables should expose this interface to allow + * IndexDiscovers discovering the index table without adding dependency to the storage plugin. + */ +public interface IndexDiscoverable { + + /** + * return the found DrillTable with path (e.g. names={"elasticsearch", "staffidx", "stjson"}) + * @param discover + * @param desc + * @return + */ + DrillTable findTable(IndexDiscover discover, DrillIndexDescriptor desc); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java new file mode 100644 index 000000000..ea34ea585 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; + +import java.util.List; + +/** + * Encapsulates one or more IndexProperties representing (non)covering or intersecting indexes. The encapsulated + * IndexProperties are used to rank the index in comparison with other IndexGroups. + */ +public class IndexGroup { + private List<IndexProperties> indexProps; + + public IndexGroup() { + indexProps = Lists.newArrayList(); + } + + public boolean isIntersectIndex() { + if (indexProps.size() > 1) { + return true; + } else { + return false; + } + } + + public int numIndexes() { + return indexProps.size(); + } + + public void addIndexProp(IndexProperties prop) { + indexProps.add(prop); + } + + public void addIndexProp(List<IndexProperties> prop) { + indexProps.addAll(prop); + } + + public boolean removeIndexProp(IndexProperties prop) { + return indexProps.remove(prop); + } + + public List<IndexProperties> getIndexProps() { + return indexProps; + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java new file mode 100644 index 000000000..cfdd6d030 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.planner.common.DrillScanRelBase; + +import java.util.Map; + +/** + * IndexProperties encapsulates the various metrics of a single index that are related to + * the current query. These metrics are subsequently used to rank the index in comparison + * with other indexes. + */ +public interface IndexProperties { + + void setProperties(Map<LogicalExpression, RexNode> prefixMap, + boolean satisfiesCollation, + RexNode indexColumnsRemainderFilter, + Statistics stats); + + double getLeadingSelectivity(); + + double getRemainderSelectivity(); + + boolean isCovering(); + + double getTotalRows(); + + IndexDescriptor getIndexDesc(); + + DrillScanRelBase getPrimaryTableScan(); + + RexNode getTotalRemainderFilter(); + + boolean satisfiesCollation(); + + void setSatisfiesCollation(boolean satisfiesCollation); + + RelOptCost getSelfCost(RelOptPlanner planner); + + int numLeadingFilters(); + + double getAvgRowSize(); +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java new file mode 100644 index 000000000..e71636973 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; + + +import java.util.List; + +public interface IndexStatistics { + /** Returns the approximate number of rows in the table. */ + double getRowCount(); + + /** Returns the collections of columns on which this table is sorted. */ + List<RelCollation> getCollations(); + + /** Returns the distribution of the data in query result table. */ + RelDistribution getDistribution(); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java new file mode 100644 index 000000000..a1a6fc882 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + + +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.apache.drill.shaded.guava.com.google.common.collect.Sets; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.physical.PrelUtil; + +import java.util.Map; +import java.util.Set; + +/** + * The filter expressions that could be indexed + * Other than SchemaPaths, which represent columns of a table and could be indexed, + * we consider only function expressions, and specifically, CAST function. + * To judge if an expression is indexable, we check these: + * 1, this expression should be one operand of a comparison operator, one of SqlKind.COMPARISON: + * IN, EQUALS, NOT_EQUALS, LESS_THAN, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN_OR_EQUAL + * 2, the expression tree should contain at least one inputRef (which means this expression is a + * computation on top of at least one column), and if we have more than one indexable expressions + * are found from operands of comparison operator, we should not take any expression as indexable. + * + * 3, (LIMIT to one level function) the expression is a function call, and no nested function call underneath, except ITEM + * 4, (LIMIT to CAST), the function call is a CAST + */ +public class IndexableExprMarker extends RexVisitorImpl<Boolean> { + + //map of rexNode->converted LogicalExpression + final Map<RexNode, LogicalExpression> desiredExpressions = Maps.newHashMap(); + + //the expressions in equality comparison + final Map<RexNode, LogicalExpression> equalityExpressions = Maps.newHashMap(); + + //the expression found in non-equality comparison + final Map<RexNode, LogicalExpression> notInEquality = Maps.newHashMap(); + + //for =(cast(a.b as VARCHAR(len)), 'abcd'), if the 'len' is less than the max length of casted field on index table, + // we want to rewrite it to LIKE(cast(a.b as VARCHAR(len)), 'abcd%') + //map equalOnCastChar: key is the equal operator, value is the operand (cast(a.b as VARCHAR(10)), + final Map<RexNode, LogicalExpression> equalOnCastChar = Maps.newHashMap(); + + final private RelNode inputRel; + + //flag current recursive call state: whether we are on a direct operand of comparison operator + boolean directCompareOp = false; + + RexCall contextCall = null; + + DrillParseContext parserContext; + + public IndexableExprMarker(RelNode inputRel) { + super(true); + this.inputRel = inputRel; + parserContext = new DrillParseContext(PrelUtil.getPlannerSettings(inputRel.getCluster())); + } + + public Map<RexNode, LogicalExpression> getIndexableExpression() { + return ImmutableMap.copyOf(desiredExpressions); + } + + public Map<RexNode, LogicalExpression> getEqualOnCastChar() { + return ImmutableMap.copyOf(equalOnCastChar); + } + + /** + * return the expressions that were only in equality condition _and_ only once. ( a.b = 'value' ) + * @return + */ + public Set<LogicalExpression> getExpressionsOnlyInEquality() { + + Set<LogicalExpression> onlyInEquality = Sets.newHashSet(); + + Set<LogicalExpression> notInEqSet = Sets.newHashSet(); + + Set<LogicalExpression> inEqMoreThanOnce = Sets.newHashSet(); + + notInEqSet.addAll(notInEquality.values()); + + for (LogicalExpression expr : equalityExpressions.values()) { + //only process expr that is not in any non-equality condition(!notInEqSet.contains) + if (!notInEqSet.contains(expr)) { + + //expr appear in two and more equality conditions should be ignored too + if (inEqMoreThanOnce.contains(expr)) { + continue; + } + + //we already have recorded this expr in equality condition, move it to inEqMoreThanOnce + if (onlyInEquality.contains(expr)) { + inEqMoreThanOnce.add(expr); + onlyInEquality.remove(expr); + continue; + } + + //finally we could take this expr + onlyInEquality.add(expr); + } + } + return onlyInEquality; + } + + @Override + public Boolean visitInputRef(RexInputRef rexInputRef) { + return directCompareOp; + } + + public boolean containInputRef(RexNode rex) { + if (rex instanceof RexInputRef) { + return true; + } + if ((rex instanceof RexCall) && "ITEM".equals(((RexCall)rex).getOperator().getName())) { + return true; + } + //TODO: use a visitor search recursively for inputRef, if found one return true + return false; + } + + public boolean operandsAreIndexable(RexCall call) { + SqlKind kind = call.getKind(); + boolean kindIsRight = (SqlKind.COMPARISON.contains(kind) || kind==SqlKind.LIKE || kind == SqlKind.SIMILAR); + + if (!kindIsRight) { + return false; + } + + int inputReference = 0; + for (RexNode operand : call.operands) { + //if for this operator, there are two operands and more have inputRef, which means it is something like: + // a.b = a.c, instead of a.b ='hello', so this cannot apply index + if (containInputRef(operand)) { + inputReference++; + if(inputReference>=2) { + return false; + } + } + } + return true; + } + + @Override + public Boolean visitCall(RexCall call) { + if (call.getKind() == SqlKind.NOT) { + // Conditions under NOT are not indexable + return false; + } + if (operandsAreIndexable(call)) { + for (RexNode operand : call.operands) { + directCompareOp = true; + contextCall = call; + boolean markIt = operand.accept(this); + directCompareOp = false; + contextCall = null; + if (markIt) { + LogicalExpression expr = DrillOptiq.toDrill(parserContext, inputRel, operand); + desiredExpressions.put(operand, expr); + if (call.getKind() == SqlKind.EQUALS) { + equalityExpressions.put(operand, expr); + } + else { + notInEquality.put(operand, expr); + } + } + } + return false; + } + + //now we are handling a call directly under comparison e.g. <([call], literal) + if (directCompareOp) { + // if it is an item, or CAST function + if ("ITEM".equals(call.getOperator().getName())) { + return directCompareOp; + } + else if (call.getKind() == SqlKind.CAST) { + //For now, we care only direct CAST: CAST's operand is a field(schemaPath), + // either ITEM call(nested name) or inputRef + + //cast as char/varchar in equals function + if(contextCall != null && contextCall.getKind() == SqlKind.EQUALS + && (call.getType().getSqlTypeName()== SqlTypeName.CHAR + || call.getType().getSqlTypeName()==SqlTypeName.VARCHAR)) { + equalOnCastChar.put(contextCall, DrillOptiq.toDrill(parserContext, inputRel, call)); + } + + RexNode castOp = call.operands.get(0); + if (castOp instanceof RexInputRef) { + return true; + } + if ((castOp instanceof RexCall) && ("ITEM".equals(((RexCall)castOp).getOperator().getName()))) { + return true; + } + } + } + + for (RexNode operand : call.operands) { + boolean bret = operand.accept(this); + } + return false; + } + + public Boolean visitLocalRef(RexLocalRef localRef) { + return false; + } + + public Boolean visitLiteral(RexLiteral literal) { + return false; + } + + public Boolean visitOver(RexOver over) { + return false; + } + + public Boolean visitCorrelVariable(RexCorrelVariable correlVariable) { + return false; + } + + public Boolean visitDynamicParam(RexDynamicParam dynamicParam) { + return false; + } + + public Boolean visitRangeRef(RexRangeRef rangeRef) { + return false; + } + + public Boolean visitFieldAccess(RexFieldAccess fieldAccess) { + final RexNode expr = fieldAccess.getReferenceExpr(); + return expr.accept(this); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java new file mode 100644 index 000000000..c17d09f00 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class InvalidIndexDefinitionException extends DrillRuntimeException { + public InvalidIndexDefinitionException(String message) { + super(message); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java new file mode 100644 index 000000000..2859102e2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.planner.common.DrillScanRelBase; + +public interface Statistics { + + double ROWCOUNT_UNKNOWN = -1; + //HUGE is same as DrillCostBase.HUGE + double ROWCOUNT_HUGE = Double.MAX_VALUE; + double AVG_ROWSIZE_UNKNOWN = -1; + long AVG_COLUMN_SIZE = 10; + + /** Returns whether statistics are available. Should be called prior to using the statistics + */ + boolean isStatsAvailable(); + + /** Returns a unique index identifier + * @param idx - Index specified as a {@link IndexDescriptor} + * @return The unique index identifier + */ + String buildUniqueIndexIdentifier(IndexDescriptor idx); + + /** Returns the rowcount for the specified filter condition + * @param condition - Filter specified as a {@link RexNode} + * @param tabIdxName - The index name generated using {@code buildUniqueIndexIdentifier} + * @param scanRel - The current scan rel + * @return the rowcount for the filter + */ + double getRowCount(RexNode condition, String tabIdxName, RelNode scanRel); + + /** Returns the leading rowcount for the specified filter condition + * Leading rowcount means rowcount for filter condition only on leading index columns. + * @param condition - Filter specified as a {@link RexNode} + * @param tabIdxName - The index name generated using {@code buildUniqueIndexIdentifier} + * @param scanRel - The current scan rel + * @return the leading rowcount + */ + double getLeadingRowCount(RexNode condition, String tabIdxName, RelNode scanRel); + + /** Returns the average row size for the specified filter condition + * @param tabIdxName - The index name generated using {@code buildUniqueIndexIdentifier} + * @param isIndexScan - Whether the current rel is an index scan (false for primary table) + */ + double getAvgRowSize(String tabIdxName, boolean isIndexScan); + + boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java new file mode 100644 index 000000000..6894e4fdb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.index; + +public interface StatisticsPayload { + double getRowCount(); + double getLeadingRowCount(); + double getAvgRowSize(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java index 53036f1f4..ed9b32fe2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java @@ -91,6 +91,10 @@ public abstract class DrillTable implements Table { this.options = options; } + public void setGroupScan(GroupScan scan) { + this.scan = scan; + } + public GroupScan getGroupScan() throws IOException{ if (scan == null) { if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java new file mode 100644 index 000000000..754c5d753 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.exec.record.VectorWrapper; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class") +public interface PartitionFunction { + + /** + * Return the list of FieldReferences that participate in the partitioning function + * @return list of FieldReferences + */ + List<FieldReference> getPartitionRefList(); + + /** + * Setup method for the partitioning function + * @param partitionKeys a list of partition columns on which range partitioning is needed + */ + void setup(List<VectorWrapper<?>> partitionKeys); + + /** + * Evaluate a partitioning function for a particular row index and return the partition id + * @param index the integer index into the partition keys vector for a specific 'row' of values + * @param numPartitions the max number of partitions that are allowed + * @return partition id, an integer value + */ + int eval(int index, int numPartitions); + + /** + * Returns a FieldReference (LogicalExpression) for the partition function + * @return FieldReference for the partition function + */ + FieldReference getPartitionFieldRef(); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index eb6112dfc..cb790918a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -80,7 +80,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } } - protected static enum BatchState { + public static enum BatchState { /** Need to build schema and return. */ BUILD_SCHEMA, /** This is still the first data batch. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java index 9314da678..1bbbe76b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java @@ -101,6 +101,11 @@ public abstract class AbstractRecordReader implements RecordReader { } } + @Override + public boolean hasNext() { + return false; + } + protected List<SchemaPath> getDefaultColumnsToRead() { return GroupScan.ALL_COLUMNS; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java index edd91d157..33b361c7b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java @@ -43,6 +43,14 @@ public interface RecordReader extends AutoCloseable { void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException; /** + * Check if the reader may have potentially more data to be read in subsequent iterations. Certain types of readers + * such as repeatable readers can be invoked multiple times, so this method will allow ScanBatch to check with + * the reader before closing it. + * @return return true if there could potentially be more reads, false otherwise + */ + boolean hasNext(); + + /** * Increments this record reader forward, writing via the provided output * mutator into the output batch. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java new file mode 100644 index 000000000..5f9eef801 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util; + + +import org.apache.drill.shaded.guava.com.google.common.io.BaseEncoding; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.planner.physical.PlannerSettings; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * This class provided utility methods to encode and decode a set of user specified + * SchemaPaths to a set of encoded SchemaPaths with the following properties. + * <ol> + * <li>Valid Drill identifier as per its grammar with only one, root name segment. + * <li>A single identifier can not exceed 1024 characters in length. + * </ol> + * <p> + * Format of the encoded SchemaPath: + * <blockquote><pre>$$ENC\d\dlt;base32 encoded input paths></pre></blockquote> + * <p> + * We use Base-32 over Base-64 because the later's charset includes '\' and '+'. + */ +public class EncodedSchemaPathSet { + + private static final int ESTIMATED_ENCODED_SIZE = 1024; + + private static final String ENC_PREFIX = "$$ENC"; + + private static final String ENC_FORMAT_STRING = ENC_PREFIX + "%02d%s"; + private static final int ENC_PREFIX_SIZE = ENC_PREFIX.length() + "00".length(); + private static final int MAX_ENC_IDENTIFIER_SIZE = (PlannerSettings.DEFAULT_IDENTIFIER_MAX_LENGTH - ENC_PREFIX_SIZE); + private static final int MAX_ENC_IDENTIFIER_COUNT = 100; // "$$ENC00*...$$ENC99*" + + private static final BaseEncoding CODEC = BaseEncoding.base32().omitPadding(); // no-padding version + + public static final String ENCODED_STAR_COLUMN = encode("*")[0]; + + /* + * Performance of various methods of encoding a Java String to UTF-8 keeps changing + * between releases, hence we'll encapsulate the actual methods within these functions + * and use them everywhere in Drill + */ + private static final String UTF_8 = "utf-8"; + + + private static byte[] encodeUTF(String input) { + try { + return input.getBytes(UTF_8); + } catch (UnsupportedEncodingException e) { + throw new DrillRuntimeException(e); // should never come to this + } + } + + private static String decodeUTF(byte[] input) { + try { + return new String(input, UTF_8); + } catch (UnsupportedEncodingException e) { + throw new DrillRuntimeException(e); // should never come to this + } + } + + private static String decodeUTF(byte[] input, int offset, int length) { + try { + return new String(input, offset, length, UTF_8); + } catch (UnsupportedEncodingException e) { + throw new DrillRuntimeException(e); // should never come to this + } + } + + /** + * Returns the encoded array of SchemaPath identifiers from the input array of SchemaPath. + * <p> + * The returned identifiers have the following properties: + * <ul> + * <li>Each SchemaPath identifier in the array has only one single root NameSegment.</li> + * <li>Maximum length of each such identifier is equal to the maximum length of Drill identifier (currently 1024).</li> + * </ul> + * <p> + * We take advantage of the fact that Java's modified utf-8 encoding can never contain + * embedded null byte. + * @see <a>http://docs.oracle.com/javase/8/docs/api/java/io/DataInput.html#modified-utf-8</a> + */ + public static String[] encode(final String... schemaPaths) { + Preconditions.checkArgument(schemaPaths != null && schemaPaths.length > 0, + "At least one schema path should be provided"); + + NoCopyByteArrayOutputStream out = new NoCopyByteArrayOutputStream(ESTIMATED_ENCODED_SIZE); + int bufOffset = 1; // 1st byte is NULL + for (String schemaPath : schemaPaths) { + out.write(0); + out.write(encodeUTF(schemaPath)); + } + out.close(); + + final int bufLen = out.size() - 1; // not counting the first NULL byte + String encodedStr = CODEC.encode(out.getBuffer(), bufOffset, bufLen); + assert !encodedStr.endsWith("=") : String.format("Encoded string '%s' ends with '='", encodedStr); + return splitIdentifiers(encodedStr); + } + + public static boolean isEncodedSchemaPath(SchemaPath schemaPath) { + return schemaPath != null && isEncodedSchemaPath(schemaPath.getRootSegment().getNameSegment().getPath()); + } + + public static boolean isEncodedSchemaPath(String schemaPath) { + return schemaPath != null && schemaPath.startsWith(ENC_PREFIX); + } + + /** + * Returns the decoded Collection of SchemaPath from the input which + * may contain a mix of encoded and non-encoded SchemaPaths. + * <p> + * The size of returned Collection is always equal to or greater than the + * input array. + * <p> + * The non-encoded SchemaPaths are collated in the beginning to the returned + * array, in the same order as that of the input array. + */ + public static Collection<SchemaPath> decode(final Collection<SchemaPath> encodedPaths) { + String[] schemaPathStrings = new String[encodedPaths.size()]; + Iterator<SchemaPath> encodedPathsItr = encodedPaths.iterator(); + for (int i = 0; i < schemaPathStrings.length; i++) { + SchemaPath schemaPath = encodedPathsItr.next(); + if (schemaPath.getRootSegmentPath().startsWith(ENC_PREFIX)) { + // encoded schema path contains only root segment + schemaPathStrings[i] = schemaPath.getRootSegmentPath(); + } else { + schemaPathStrings[i] = schemaPath.toExpr(); + } + } + String[] decodedStrings = decode(schemaPathStrings); + if (decodedStrings == schemaPathStrings) { + return encodedPaths; // return the original collection as no encoded SchemaPath was found + } else { + ImmutableList.Builder<SchemaPath> builder = new ImmutableList.Builder<>(); + for (String decodedString : decodedStrings) { + if ("*".equals(decodedString) || "`*`".equals(decodedString)) { + builder.add(SchemaPath.STAR_COLUMN); + } else { + builder.add(SchemaPath.parseFromString(decodedString)); + } + } + return builder.build(); + } + } + + /** + * Returns the decoded array of SchemaPath strings from the input which + * may contain a mix of encoded and non-encoded SchemaPaths. + * <p> + * The size of returned array is always equal to or greater than the + * input array. + * <p> + * The non-encoded SchemaPaths are collated in the beginning to the returned + * array, in the same order as that of the input array. + */ + public static String[] decode(final String... encodedPaths) { + Preconditions.checkArgument(encodedPaths != null && encodedPaths.length > 0, + "At least one encoded path should be provided"); + + StringBuilder sb = new StringBuilder(ESTIMATED_ENCODED_SIZE); + + // As the encoded schema path move across components, they could get reordered. + // Sorting ensures that the original order is restored before concatenating the + // components back to the full encoded String. + Arrays.sort(encodedPaths); + + List<String> decodedPathList = Lists.newArrayList(); + for (String encodedPath : encodedPaths) { + if (encodedPath.startsWith(ENC_PREFIX)) { + sb.append(encodedPath, ENC_PREFIX_SIZE, encodedPath.length()); + } else { + decodedPathList.add(encodedPath); + } + } + + if (sb.length() > 0) { + byte[] decodedBytes; + try { + decodedBytes = CODEC.decode(sb); + } catch (IllegalArgumentException e) { + throw new DrillRuntimeException(String.format( + "Unable to decode the input strings as encoded schema paths:\n%s", Arrays.asList(encodedPaths)), e); + } + + int start = 0, index = 0; + for (; index < decodedBytes.length; index++) { + if (decodedBytes[index] == 0 && index - start > 0) { + decodedPathList.add(decodeUTF(decodedBytes, start, index-start)); + start = index + 1; + } + } + if (index - start > 0) { + String lastSchemaPath = decodeUTF(decodedBytes, start, index-start).trim(); + if (!lastSchemaPath.isEmpty()) { + decodedPathList.add(lastSchemaPath); + } + } + return decodedPathList.toArray(new String[decodedPathList.size()]); + } else { + // original list did not have any encoded path, return as is + return encodedPaths; + } + } + + /** + * Splits the input string so that the length of each encoded string, + * including the signature prefix is less than or equal to MAX_DRILL_IDENTIFIER_SIZE. + */ + private static String[] splitIdentifiers(String input) { + if (input.length() < MAX_ENC_IDENTIFIER_SIZE) { + return new String[] { String.format(ENC_FORMAT_STRING, 0, input) }; + } + int splitsCount = (int) Math.ceil(input.length() / (double)MAX_ENC_IDENTIFIER_SIZE); + if (splitsCount > MAX_ENC_IDENTIFIER_COUNT) { + throw new DrillRuntimeException(String.format( + "Encoded size of the SchemaPath identifier '%s' exceeded maximum value.", input)); + } + String[] result = new String[splitsCount]; + for (int i = 0, startIdx = 0; i < result.length; i++, startIdx += MAX_ENC_IDENTIFIER_SIZE) { + // TODO: see if we can avoid memcpy due to input.substring() call + result[i] = String.format(ENC_FORMAT_STRING, i, input.substring(startIdx, Math.min(input.length(), startIdx + MAX_ENC_IDENTIFIER_SIZE))); + } + return result; + } + + /** + * Optimized version of Java's ByteArrayOutputStream which returns the underlying + * byte array instead of making a copy + */ + private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { + public NoCopyByteArrayOutputStream(int size) { + super(size); + } + + public byte[] getBuffer() { + return buf; + } + + public int size() { + return count; + } + + @Override + public void write(int b) { + super.write(b); + } + + @Override + public void write(byte[] b) { + super.write(b, 0, b.length); + } + + @Override + public void close() { + try { + super.close(); + } catch (IOException e) { + throw new DrillRuntimeException(e); // should never come to this + } + } + } + +} |