aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java78
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java129
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java212
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java49
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java1
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java91
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java5
18 files changed, 638 insertions, 256 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java
new file mode 100644
index 000000000..440f69f4b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Holder class that contains batch naming, batch and record index. Batch index is used when batch is hyper container.
+ * Used to distinguish batches in non-equi conditions during expression materialization.
+ * Mostly used for nested loop join which allows non equi-join.
+ *
+ * BatchReference instance can be created during batch initialization
+ * (ex: instance of {@link org.apache.drill.exec.record.AbstractRecordBatch})
+ * since naming of batches used won't change during data processing.
+ * Though information from batch reference will be used during schema build (i.e. once per OK_NEW_SCHEMA).
+ *
+ * Example:
+ * BatchReference{batchName='leftBatch', batchIndex='leftIndex', recordIndex='leftIndex'}
+ * BatchReference{batchName='rightContainer', batchIndex='rightBatchIndex', recordIndex='rightRecordIndexWithinBatch'}
+ *
+ */
+public final class BatchReference {
+
+ private final String batchName;
+
+ private final String batchIndex;
+
+ private final String recordIndex;
+
+ public BatchReference(String batchName, String recordIndex) {
+ // when batch index is not indicated, record index value will be set instead
+ this(batchName, recordIndex, recordIndex);
+ }
+
+ public BatchReference(String batchName, String batchIndex, String recordIndex) {
+ Preconditions.checkNotNull(batchName, "Batch name should not be null.");
+ Preconditions.checkNotNull(batchIndex, "Batch index should not be null.");
+ Preconditions.checkNotNull(recordIndex, "Record index should not be null.");
+ this.batchName = batchName;
+ this.batchIndex = batchIndex;
+ this.recordIndex = recordIndex;
+ }
+
+ public String getBatchName() {
+ return batchName;
+ }
+
+ public String getBatchIndex() {
+ return batchIndex;
+ }
+
+ public String getRecordIndex() {
+ return recordIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "BatchReference{" +
+ "batchName='" + batchName + '\'' +
+ ", batchIndex='" + batchIndex + '\'' +
+ ", recordIndex='" + recordIndex + '\'' +
+ '}';
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 75b83c971..73a03632b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -399,15 +399,29 @@ public class EvaluationVisitor {
private HoldingContainer visitValueVectorReadExpression(ValueVectorReadExpression e, ClassGenerator<?> generator)
throws RuntimeException {
// declare value vector
+ DirectExpression batchName;
+ JExpression batchIndex;
+ JExpression recordIndex;
+
+ // if value vector read expression has batch reference, use its values in generated code,
+ // otherwise use values provided by mapping set (which point to only one batch)
+ // primary used for non-equi joins where expression conditions may refer to more than one batch
+ BatchReference batchRef = e.getBatchRef();
+ if (batchRef != null) {
+ batchName = DirectExpression.direct(batchRef.getBatchName());
+ batchIndex = DirectExpression.direct(batchRef.getBatchIndex());
+ recordIndex = DirectExpression.direct(batchRef.getRecordIndex());
+ } else {
+ batchName = generator.getMappingSet().getIncoming();
+ batchIndex = generator.getMappingSet().getValueReadIndex();
+ recordIndex = batchIndex;
+ }
- JExpression vv1 = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getIncoming(),
- e.getFieldId());
- JExpression indexVariable = generator.getMappingSet().getValueReadIndex();
-
- JExpression componentVariable = indexVariable.shrz(JExpr.lit(16));
+ JExpression vv1 = generator.declareVectorValueSetupAndMember(batchName, e.getFieldId());
+ JExpression componentVariable = batchIndex.shrz(JExpr.lit(16));
if (e.isSuperReader()) {
vv1 = (vv1.component(componentVariable));
- indexVariable = indexVariable.band(JExpr.lit((int) Character.MAX_VALUE));
+ recordIndex = recordIndex.band(JExpr.lit((int) Character.MAX_VALUE));
}
// evaluation work.
@@ -418,14 +432,9 @@ public class EvaluationVisitor {
final boolean repeated = Types.isRepeated(e.getMajorType());
final boolean listVector = e.getTypedFieldId().isListVector();
- int[] fieldIds = e.getFieldId().getFieldIds();
- for (int i = 1; i < fieldIds.length; i++) {
-
- }
-
if (!hasReadPath && !complex) {
JBlock eval = new JBlock();
- GetSetVectorHelper.read(e.getMajorType(), vv1, eval, out, generator.getModel(), indexVariable);
+ GetSetVectorHelper.read(e.getMajorType(), vv1, eval, out, generator.getModel(), recordIndex);
generator.getEvalBlock().add(eval);
} else {
@@ -444,7 +453,7 @@ public class EvaluationVisitor {
// position to the correct value.
eval.add(expr.invoke("reset"));
- eval.add(expr.invoke("setPosition").arg(indexVariable));
+ eval.add(expr.invoke("setPosition").arg(recordIndex));
int listNum = 0;
while (seg != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index b70ad269c..b461b5cfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,13 +22,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.BooleanOperator;
@@ -64,7 +64,6 @@ import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.fn.CastFunctions;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.drill.common.expression.visitors.ConditionalExprOptimizer;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.common.expression.visitors.ExpressionValidator;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -80,7 +79,6 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder;
import org.apache.drill.exec.expr.fn.ExceptionFunction;
import org.apache.drill.exec.expr.fn.FunctionLookupContext;
import org.apache.drill.exec.expr.stat.TypedFieldExpr;
-import org.apache.drill.exec.record.MaterializeVisitor;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.resolver.FunctionResolver;
@@ -100,7 +98,7 @@ public class ExpressionTreeMaterializer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
private ExpressionTreeMaterializer() {
- };
+ }
public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext) {
return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, functionLookupContext, false, false);
@@ -126,9 +124,51 @@ public class ExpressionTreeMaterializer {
return out;
}
- public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext,
- boolean allowComplexWriterExpr, boolean unionTypeEnabled) {
- LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr, unionTypeEnabled), functionLookupContext);
+ /**
+ * Materializes logical expression taking into account passed parameters.
+ * Is used to materialize logical expression that contains reference to one batch.
+ *
+ * @param expr logical expression to be materialized
+ * @param batch batch instance
+ * @param errorCollector error collector
+ * @param functionLookupContext context to find drill function holder
+ * @param allowComplexWriterExpr true if complex expressions are allowed
+ * @param unionTypeEnabled true if union type is enabled
+ * @return materialized logical expression
+ */
+ public static LogicalExpression materialize(LogicalExpression expr,
+ VectorAccessible batch,
+ ErrorCollector errorCollector,
+ FunctionLookupContext functionLookupContext,
+ boolean allowComplexWriterExpr,
+ boolean unionTypeEnabled) {
+ Map<VectorAccessible, BatchReference> batches = Maps.newHashMap();
+ batches.put(batch, null);
+ return materialize(expr, batches, errorCollector, functionLookupContext, allowComplexWriterExpr, unionTypeEnabled);
+ }
+
+ /**
+ * Materializes logical expression taking into account passed parameters.
+ * Is used to materialize logical expression that can contain several batches with or without custom batch reference.
+ *
+ * @param expr logical expression to be materialized
+ * @param batches one or more batch instances used in expression
+ * @param errorCollector error collector
+ * @param functionLookupContext context to find drill function holder
+ * @param allowComplexWriterExpr true if complex expressions are allowed
+ * @param unionTypeEnabled true if union type is enabled
+ * @return materialized logical expression
+ */
+ public static LogicalExpression materialize(LogicalExpression expr,
+ Map<VectorAccessible, BatchReference> batches,
+ ErrorCollector errorCollector,
+ FunctionLookupContext functionLookupContext,
+ boolean allowComplexWriterExpr,
+ boolean unionTypeEnabled) {
+
+ LogicalExpression out = expr.accept(
+ new MaterializeVisitor(batches, errorCollector, allowComplexWriterExpr, unionTypeEnabled),
+ functionLookupContext);
if (!errorCollector.hasErrors()) {
out = out.accept(ConditionalExprOptimizer.INSTANCE, null);
@@ -224,24 +264,40 @@ public class ExpressionTreeMaterializer {
errorCollector.addGeneralError(call.getPosition(), sb.toString());
}
+ /**
+ * Visitor that wraps schema path into value vector read expression
+ * if schema path is present in one of the batches,
+ * otherwise instance of null expression.
+ */
private static class MaterializeVisitor extends AbstractMaterializeVisitor {
- private final VectorAccessible batch;
- public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector, boolean allowComplexWriter, boolean unionTypeEnabled) {
+ private final Map<VectorAccessible, BatchReference> batches;
+
+ public MaterializeVisitor(Map<VectorAccessible, BatchReference> batches,
+ ErrorCollector errorCollector,
+ boolean allowComplexWriter,
+ boolean unionTypeEnabled) {
super(errorCollector, allowComplexWriter, unionTypeEnabled);
- this.batch = batch;
+ this.batches = batches;
}
@Override
- public LogicalExpression visitSchemaPath(SchemaPath path, FunctionLookupContext functionLookupContext) {
- // logger.debug("Visiting schema path {}", path);
- TypedFieldId tfId = batch.getValueVectorId(path);
+ public LogicalExpression visitSchemaPath(final SchemaPath path, FunctionLookupContext functionLookupContext) {
+ TypedFieldId tfId = null;
+ BatchReference batchRef = null;
+ for (Map.Entry<VectorAccessible, BatchReference> entry : batches.entrySet()) {
+ tfId = entry.getKey().getValueVectorId(path);
+ if (tfId != null) {
+ batchRef = entry.getValue();
+ break;
+ }
+ }
+
if (tfId == null) {
logger.warn("Unable to find value vector of path {}, returning null instance.", path);
return NullExpression.INSTANCE;
} else {
- ValueVectorReadExpression e = new ValueVectorReadExpression(tfId);
- return e;
+ return new ValueVectorReadExpression(tfId, batchRef);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
index a556dc239..410c48aef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.drill.exec.expr;
import java.util.Iterator;
+import com.google.common.collect.ImmutableSet;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
@@ -26,16 +27,28 @@ import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.record.TypedFieldId;
-import com.google.common.collect.Iterators;
-
-public class ValueVectorReadExpression implements LogicalExpression{
+/**
+ * Wraps a value vector field to be read, providing metadata about the field.
+ * Also may contain batch naming information to which this field belongs.
+ * If such information is absent default namings will be used from mapping set during materialization.
+ */
+public class ValueVectorReadExpression implements LogicalExpression {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class);
private final TypedFieldId fieldId;
-
+ private final BatchReference batchRef;
public ValueVectorReadExpression(TypedFieldId tfId){
+ this(tfId, null);
+ }
+
+ public ValueVectorReadExpression(TypedFieldId tfId, BatchReference batchRef){
this.fieldId = tfId;
+ this.batchRef = batchRef;
+ }
+
+ public BatchReference getBatchRef() {
+ return batchRef;
}
public boolean hasReadPath(){
@@ -74,7 +87,7 @@ public class ValueVectorReadExpression implements LogicalExpression{
@Override
public Iterator<LogicalExpression> iterator() {
- return Iterators.emptyIterator();
+ return ImmutableSet.<LogicalExpression>of().iterator();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
index fd584ea49..1d747f74f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,7 +21,7 @@ package org.apache.drill.exec.physical.config;
import java.util.Iterator;
import java.util.List;
-import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -33,7 +33,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
@JsonTypeName("nested-loop-join")
public class NestedLoopJoinPOP extends AbstractBase {
@@ -42,27 +41,20 @@ public class NestedLoopJoinPOP extends AbstractBase {
private final PhysicalOperator left;
private final PhysicalOperator right;
-
- /*
- * Conditions and jointype are currently not used, since the condition is always true
- * and we don't perform any special execution operation based on join type either. However
- * when we enhance NLJ this would be used.
- */
- private final List<JoinCondition> conditions;
private final JoinRelType joinType;
+ private final LogicalExpression condition;
@JsonCreator
public NestedLoopJoinPOP(
@JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right,
- @JsonProperty("conditions") List<JoinCondition> conditions,
- @JsonProperty("joinType") JoinRelType joinType
+ @JsonProperty("joinType") JoinRelType joinType,
+ @JsonProperty("condition") LogicalExpression condition
) {
this.left = left;
this.right = right;
- this.conditions = conditions;
- Preconditions.checkArgument(joinType != null, "Join type is missing!");
this.joinType = joinType;
+ this.condition = condition;
}
@Override
@@ -72,8 +64,8 @@ public class NestedLoopJoinPOP extends AbstractBase {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.size() == 2);
- return new NestedLoopJoinPOP(children.get(0), children.get(1), conditions, joinType);
+ Preconditions.checkArgument(children.size() == 2, "Nested loop join should have two physical operators");
+ return new NestedLoopJoinPOP(children.get(0), children.get(1), joinType, condition);
}
@Override
@@ -93,9 +85,7 @@ public class NestedLoopJoinPOP extends AbstractBase {
return joinType;
}
- public List<JoinCondition> getConditions() {
- return conditions;
- }
+ public LogicalExpression getCondition() { return condition; }
@Override
public int getOperatorType() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index 6cf07a2e8..f7d96ad8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -36,8 +37,8 @@ public interface NestedLoopJoin {
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing);
- // Produce output records
- public int outputRecords();
+ // Produce output records taking into account join type
+ public int outputRecords(JoinRelType joinType);
// Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex'
public void emitLeft(int leftIndex, int outIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index bdd9f0eb4..8336e8642 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -19,9 +19,16 @@ package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
import java.util.LinkedList;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -29,8 +36,11 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.BatchReference;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
+import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
@@ -38,6 +48,7 @@ import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;
@@ -45,6 +56,8 @@ import com.google.common.base.Preconditions;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JVar;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
/*
* RecordBatch implementation for the nested loop join operator
@@ -86,7 +99,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
// We accumulate all the batches on the right side in a hyper container.
private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer();
- // Record count of the individual batches in the right hypoer container
+ // Record count of the individual batches in the right hyper container
private LinkedList<Integer> rightCounts = new LinkedList<>();
@@ -132,7 +145,6 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
* Method drains the right side input of the NLJ and accumulates the data
* in a hyper container. Once we have all the data from the right side we
* process the left side one batch at a time and produce the output batch
- * which is a cross product of the two sides.
* @return IterOutcome state of the nested loop join batch
*/
@Override
@@ -179,7 +191,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
allocateVectors();
// invoke the runtime generated method to emit records in the output batch
- outputRecords = nljWorker.outputRecords();
+ outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
// Set the record count
for (final VectorWrapper<?> vw : container) {
@@ -214,26 +226,59 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
/**
* Method generates the runtime code needed for NLJ. Other than the setup method to set the input and output value
- * vector references we implement two more methods
- * 1. emitLeft() -> Project record from the left side
- * 2. emitRight() -> Project record from the right side (which is a hyper container)
+ * vector references we implement three more methods
+ * 1. doEval() -> Evaluates if record from left side matches record from the right side
+ * 2. emitLeft() -> Project record from the left side
+ * 3. emitRight() -> Project record from the right side (which is a hyper container)
* @return the runtime generated class that implements the NestedLoopJoin interface
- * @throws IOException
- * @throws ClassTransformationException
*/
- private NestedLoopJoin setupWorker() throws IOException, ClassTransformationException {
- final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get(NestedLoopJoin.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
+ private NestedLoopJoin setupWorker() throws IOException, ClassTransformationException, SchemaChangeException {
+ final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get(
+ NestedLoopJoin.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
nLJCodeGenerator.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
// nLJCodeGenerator.saveCodeForDebugging(true);
final ClassGenerator<NestedLoopJoin> nLJClassGenerator = nLJCodeGenerator.getRoot();
+ // generate doEval
+ final ErrorCollector collector = new ErrorCollectorImpl();
+
+ /*
+ Logical expression may contain fields from left and right batches. During code generation (materialization)
+ we need to indicate from which input field should be taken.
+
+ Non-equality joins can belong to one of below categories. For example:
+ 1. Join on non-equality join predicates:
+ select * from t1 inner join t2 on (t1.c1 between t2.c1 AND t2.c2) AND (...)
+ 2. Join with an OR predicate:
+ select * from t1 inner join t2 on on t1.c1 = t2.c1 OR t1.c2 = t2.c2
+ */
+ Map<VectorAccessible, BatchReference> batches = ImmutableMap
+ .<VectorAccessible, BatchReference>builder()
+ .put(left, new BatchReference("leftBatch", "leftIndex"))
+ .put(rightContainer, new BatchReference("rightContainer", "rightBatchIndex", "rightRecordIndexWithinBatch"))
+ .build();
+
+ LogicalExpression materialize = ExpressionTreeMaterializer.materialize(
+ popConfig.getCondition(),
+ batches,
+ collector,
+ context.getFunctionRegistry(),
+ false,
+ false);
+
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException(String.format("Failure while trying to materialize join condition. Errors:\n %s.",
+ collector.toErrorString()));
+ }
+
+ nLJClassGenerator.addExpr(new ReturnValueExpression(materialize), ClassGenerator.BlkCreateMode.FALSE);
+ // generate emitLeft
nLJClassGenerator.setMappingSet(emitLeftMapping);
JExpression outIndex = JExpr.direct("outIndex");
JExpression leftIndex = JExpr.direct("leftIndex");
-
int fieldId = 0;
int outputFieldId = 0;
// Set the input and output value vector references corresponding to the left batch
@@ -243,8 +288,10 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
// Add the vector to the output container
container.addOrGet(field);
- JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch", new TypedFieldId(fieldType, false, fieldId));
- JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(fieldType, false, outputFieldId));
+ JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch",
+ new TypedFieldId(fieldType, false, fieldId));
+ JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+ new TypedFieldId(fieldType, false, outputFieldId));
nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
nLJClassGenerator.rotateBlock();
@@ -252,6 +299,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
outputFieldId++;
}
+ // generate emitRight
fieldId = 0;
nLJClassGenerator.setMappingSet(emitRightMapping);
JExpression batchIndex = JExpr.direct("batchIndex");
@@ -260,12 +308,22 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
// Set the input and output value vector references corresponding to the right batch
for (MaterializedField field : rightSchema) {
- final TypeProtos.MajorType fieldType = field.getType();
- // Add the vector to our output container
- container.addOrGet(field);
+ final TypeProtos.MajorType inputType = field.getType();
+ TypeProtos.MajorType outputType;
+ // if join type is LEFT, make sure right batch output fields data mode is optional
+ if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) {
+ outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL);
+ } else {
+ outputType = inputType;
+ }
+
+ MaterializedField newField = MaterializedField.create(field.getPath(), outputType);
+ container.addOrGet(newField);
- JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer", new TypedFieldId(field.getType(), true, fieldId));
- JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(fieldType, false, outputFieldId));
+ JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer",
+ new TypedFieldId(inputType, true, fieldId));
+ JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+ new TypedFieldId(outputType, false, outputFieldId));
nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe")
.arg(recordIndexWithinBatch)
.arg(outIndex)
@@ -290,7 +348,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
/**
* Builds the output container's schema. Goes over the left and the right
* batch and adds the corresponding vectors to the output container.
- * @throws SchemaChangeException
+ * @throws SchemaChangeException if batch schema was changed during execution
*/
@Override
protected void buildSchema() throws SchemaChangeException {
@@ -314,28 +372,39 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
for (final VectorWrapper<?> vw : left) {
container.addOrGet(vw.getField());
}
-
- // if we have a schema batch, skip it
- if (left.getRecordCount() == 0) {
- leftUpstream = next(LEFT_INPUT, left);
- }
}
if (rightUpstream != IterOutcome.NONE) {
- rightSchema = right.getSchema();
- for (final VectorWrapper<?> vw : right) {
- container.addOrGet(vw.getField());
+ // make right input schema optional if we have LEFT join
+ for (final VectorWrapper<?> vectorWrapper : right) {
+ TypeProtos.MajorType inputType = vectorWrapper.getField().getType();
+ TypeProtos.MajorType outputType;
+ if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) {
+ outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL);
+ } else {
+ outputType = inputType;
+ }
+ MaterializedField newField = MaterializedField.create(vectorWrapper.getField().getPath(), outputType);
+ ValueVector valueVector = container.addOrGet(newField);
+ if (valueVector instanceof AbstractContainerVector) {
+ vectorWrapper.getValueVector().makeTransferPair(valueVector);
+ valueVector.clear();
+ }
}
+ rightSchema = right.getSchema();
addBatchToHyperContainer(right);
}
+ allocateVectors();
nljWorker = setupWorker();
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
- allocateVectors();
+ // if left batch is empty, fetch next
+ if (leftUpstream != IterOutcome.NONE && left.getRecordCount() == 0) {
+ leftUpstream = next(LEFT_INPUT, left);
+ }
container.setRecordCount(0);
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException(e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index 842c891d9..bdd6f9d8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -40,35 +41,32 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
// Record count of the left batch currently being processed
private int leftRecordCount = 0;
- // List of record counts per batch in the hyper container
+ // List of record counts per batch in the hyper container
private List<Integer> rightCounts = null;
// Output batch
private NestedLoopJoinBatch outgoing = null;
- // Next right batch to process
- private int nextRightBatchToProcess = 0;
-
- // Next record in the current right batch to process
- private int nextRightRecordToProcess = 0;
-
- // Next record in the left batch to process
- private int nextLeftRecordToProcess = 0;
+ // Iteration status tracker
+ private IterationStatusTracker tracker = new IterationStatusTracker();
/**
* Method initializes necessary state and invokes the doSetup() to set the
- * input and output value vector references
+ * input and output value vector references.
+ *
* @param context Fragment context
* @param left Current left input batch being processed
* @param rightContainer Hyper container
+ * @param rightCounts Counts for each right container
* @param outgoing Output batch
*/
- public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
+ public void setupNestedLoopJoin(FragmentContext context,
+ RecordBatch left,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing) {
this.left = left;
- leftRecordCount = left.getRecordCount();
+ this.leftRecordCount = left.getRecordCount();
this.rightCounts = rightCounts;
this.outgoing = outgoing;
@@ -76,96 +74,100 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
}
/**
- * This method is the core of the nested loop join. For every record on the right we go over
- * the left batch and produce the cross product output
+ * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
+ * controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
+ *
+ * @param joinType join type (INNER ot LEFT)
+ * @return the number of records produced in the output batch
+ */
+ public int outputRecords(JoinRelType joinType) {
+ int outputIndex = 0;
+ while (leftRecordCount != 0) {
+ outputIndex = populateOutgoingBatch(joinType, outputIndex);
+ if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+ break;
+ }
+ // reset state and get next left batch
+ resetAndGetNextLeft();
+ }
+ return outputIndex;
+ }
+
+ /**
+ * This method is the core of the nested loop join.For each left batch record looks for matching record
+ * from the list of right batches. Match is checked by calling {@link #doEval(int, int, int)} method.
+ * If matching record is found both left and right records are written into output batch,
+ * otherwise if join type is LEFT, than only left record is written, right batch record values will be null.
+ *
+ * @param joinType join type (INNER or LEFT)
* @param outputIndex index to start emitting records at
* @return final outputIndex after producing records in the output batch
*/
- private int populateOutgoingBatch(int outputIndex) {
-
- // Total number of batches on the right side
- int totalRightBatches = rightCounts.size();
-
- // Total number of records on the left
- int localLeftRecordCount = leftRecordCount;
-
- /*
- * The below logic is the core of the NLJ. To have better performance we copy the instance members into local
- * method variables, once we are done with the loop we need to update the instance variables to reflect the new
- * state. To avoid code duplication of resetting the instance members at every exit point in the loop we are using
- * 'goto'
- */
- int localNextRightBatchToProcess = nextRightBatchToProcess;
- int localNextRightRecordToProcess = nextRightRecordToProcess;
- int localNextLeftRecordToProcess = nextLeftRecordToProcess;
-
- outer: {
-
- for (; localNextRightBatchToProcess< totalRightBatches; localNextRightBatchToProcess++) { // for every batch on the right
- int compositeIndexPart = localNextRightBatchToProcess << 16;
- int rightRecordCount = rightCounts.get(localNextRightBatchToProcess);
-
- for (; localNextRightRecordToProcess < rightRecordCount; localNextRightRecordToProcess++) { // for every record in this right batch
- for (; localNextLeftRecordToProcess < localLeftRecordCount; localNextLeftRecordToProcess++) { // for every record in the left batch
-
+ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
+ // copy index and match counters as local variables to speed up processing
+ int nextRightBatchToProcess = tracker.getNextRightBatchToProcess();
+ int nextRightRecordToProcess = tracker.getNextRightRecordToProcess();
+ int nextLeftRecordToProcess = tracker.getNextLeftRecordToProcess();
+ boolean rightRecordMatched = tracker.isRightRecordMatched();
+
+ outer:
+ // for every record in the left batch
+ for (; nextLeftRecordToProcess < leftRecordCount; nextLeftRecordToProcess++) {
+ // for every batch on the right
+ for (; nextRightBatchToProcess < rightCounts.size(); nextRightBatchToProcess++) {
+ int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
+ // for every record in right batch
+ for (; nextRightRecordToProcess < rightRecordCount; nextRightRecordToProcess++) {
+
+ if (doEval(nextLeftRecordToProcess, nextRightBatchToProcess, nextRightRecordToProcess)) {
// project records from the left and right batches
- emitLeft(localNextLeftRecordToProcess, outputIndex);
- emitRight(localNextRightBatchToProcess, localNextRightRecordToProcess, outputIndex);
+ emitLeft(nextLeftRecordToProcess, outputIndex);
+ emitRight(nextRightBatchToProcess, nextRightRecordToProcess, outputIndex);
outputIndex++;
+ rightRecordMatched = true;
- // TODO: Optimization; We can eliminate this check and compute the limits before the loop
if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
- localNextLeftRecordToProcess++;
+ nextRightRecordToProcess++;
// no more space left in the batch, stop processing
break outer;
}
}
- localNextLeftRecordToProcess = 0;
}
- localNextRightRecordToProcess = 0;
+ nextRightRecordToProcess = 0;
}
- }
-
- // update the instance members
- nextRightBatchToProcess = localNextRightBatchToProcess;
- nextRightRecordToProcess = localNextRightRecordToProcess;
- nextLeftRecordToProcess = localNextLeftRecordToProcess;
-
- // done with the current left batch and there is space in the output batch continue processing
- return outputIndex;
- }
-
- /**
- * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
- * controls which left batch we are processing and fetches the next left input batch one we exhaust
- * the current one.
- * @return the number of records produced in the output batch
- */
- public int outputRecords() {
- int outputIndex = 0;
- while (leftRecordCount != 0) {
- outputIndex = populateOutgoingBatch(outputIndex);
- if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
- break;
+ nextRightBatchToProcess = 0;
+ if (joinType == JoinRelType.LEFT && !rightRecordMatched) {
+ // project records from the left side only, records from right will be null
+ emitLeft(nextLeftRecordToProcess, outputIndex);
+ outputIndex++;
+ if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+ nextLeftRecordToProcess++;
+
+ // no more space left in the batch, stop processing
+ break;
+ }
+ } else {
+ // reset match indicator if matching record was found
+ rightRecordMatched = false;
}
- // reset state and get next left batch
- resetAndGetNextLeft();
}
+
+ // update iteration status tracker with actual index and match counters
+ tracker.update(nextRightBatchToProcess, nextRightRecordToProcess, nextLeftRecordToProcess, rightRecordMatched);
return outputIndex;
}
/**
- * Utility method to clear the memory in the left input batch once we have completed processing it. Resets some
- * internal state which indicate the next records to process in the left and right batches. Also fetches the next
- * left input batch.
+ * Utility method to clear the memory in the left input batch once we have completed processing it.
+ * Resets some internal state which indicates the next records to process in the left and right batches,
+ * also fetches the next left input batch.
*/
private void resetAndGetNextLeft() {
-
for (VectorWrapper<?> vw : left) {
vw.getValueVector().clear();
}
- nextRightBatchToProcess = nextRightRecordToProcess = nextLeftRecordToProcess = 0;
+ tracker.reset();
RecordBatch.IterOutcome leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
switch (leftOutcome) {
case OK_NEW_SCHEMA:
@@ -191,5 +193,57 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
@Named("recordIndexWithinBatch") int recordIndexWithinBatch,
@Named("outIndex") int outIndex);
- public abstract void emitLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
+ public abstract void emitLeft(@Named("leftIndex") int leftIndex,
+ @Named("outIndex") int outIndex);
+
+ protected abstract boolean doEval(@Named("leftIndex") int leftIndex,
+ @Named("rightBatchIndex") int batchIndex,
+ @Named("rightRecordIndexWithinBatch") int recordIndexWithinBatch);
+
+ /**
+ * Helper class to track position of left and record batches during iteration
+ * and match status of record from the right batch.
+ */
+ private static class IterationStatusTracker {
+ // Next right batch to process
+ private int nextRightBatchToProcess;
+ // Next record in the current right batch to process
+ private int nextRightRecordToProcess;
+ // Next record in the left batch to process
+ private int nextLeftRecordToProcess;
+ // Flag to indicate if record from the left found matching record from the right, applicable during left join
+ private boolean rightRecordMatched;
+
+ int getNextRightBatchToProcess() {
+ return nextRightBatchToProcess;
+ }
+
+ boolean isRightRecordMatched() {
+ return rightRecordMatched;
+ }
+
+ int getNextLeftRecordToProcess() {
+ return nextLeftRecordToProcess;
+ }
+
+ int getNextRightRecordToProcess() {
+ return nextRightRecordToProcess;
+ }
+
+ void update(int nextRightBatchToProcess,
+ int nextRightRecordToProcess,
+ int nextLeftRecordToProcess,
+ boolean rightRecordMatchFound) {
+ this.nextRightBatchToProcess = nextRightBatchToProcess;
+ this.nextRightRecordToProcess = nextRightRecordToProcess;
+ this.nextLeftRecordToProcess = nextLeftRecordToProcess;
+ this.rightRecordMatched = rightRecordMatchFound;
+ }
+
+ void reset() {
+ nextRightBatchToProcess = nextRightRecordToProcess = nextLeftRecordToProcess = 0;
+ rightRecordMatched = false;
+ }
+
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 155104038..513db9ba3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.volcano.AbstractConverter.ExpandConversionRule;
import org.apache.calcite.rel.core.RelFactories;
@@ -126,11 +127,14 @@ public enum PlannerPhase {
JOIN_PLANNING("LOPT Join Planning") {
public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
+ List<RelOptRule> rules = Lists.newArrayList();
+ if (context.getPlannerSettings().isJoinOptimizationEnabled()) {
+ rules.add(DRILL_JOIN_TO_MULTIJOIN_RULE);
+ rules.add(DRILL_LOPT_OPTIMIZE_JOIN_RULE);
+ }
+ rules.add(ProjectRemoveRule.INSTANCE);
return PlannerPhase.mergedRuleSets(
- RuleSets.ofList(
- DRILL_JOIN_TO_MULTIJOIN_RULE,
- DRILL_LOPT_OPTIMIZE_JOIN_RULE,
- ProjectRemoveRule.INSTANCE),
+ RuleSets.ofList(rules),
getStorageRules(context, plugins, this)
);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
index 4a94c719a..19c75240e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -14,12 +14,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- ******************************************************************************/
+*/
package org.apache.drill.exec.planner.logical;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import io.netty.buffer.DrillBuf;
+import org.apache.calcite.rel.RelNode;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionStringBuilder;
@@ -120,7 +121,7 @@ public class DrillConstExecutor implements RelOptPlanner.Executor {
@Override
public void reduce(final RexBuilder rexBuilder, List<RexNode> constExps, final List<RexNode> reducedValues) {
for (final RexNode newCall : constExps) {
- LogicalExpression logEx = DrillOptiq.toDrill(new DrillParseContext(plannerSettings), null /* input rel */, newCall);
+ LogicalExpression logEx = DrillOptiq.toDrill(new DrillParseContext(plannerSettings), (RelNode) null /* input rel */, newCall);
ErrorCollectorImpl errors = new ErrorCollectorImpl();
final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(logEx, null, errors, funcImplReg);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index 87b76aec9..5a907875c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.GregorianCalendar;
import java.util.LinkedList;
import java.util.List;
-import org.apache.calcite.rel.logical.LogicalAggregate;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
@@ -70,27 +70,65 @@ public class DrillOptiq {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOptiq.class);
/**
- * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax.
+ * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax using one input.
+ *
+ * @param context parse context which contains planner settings
+ * @param input data input
+ * @param expr expression to be converted
+ * @return converted expression
*/
public static LogicalExpression toDrill(DrillParseContext context, RelNode input, RexNode expr) {
- final RexToDrill visitor = new RexToDrill(context, input);
+ return toDrill(context, Lists.newArrayList(input), expr);
+ }
+
+ /**
+ * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax using multiple inputs.
+ *
+ * @param context parse context which contains planner settings
+ * @param inputs multiple data inputs
+ * @param expr expression to be converted
+ * @return converted expression
+ */
+ public static LogicalExpression toDrill(DrillParseContext context, List<RelNode> inputs, RexNode expr) {
+ final RexToDrill visitor = new RexToDrill(context, inputs);
return expr.accept(visitor);
}
private static class RexToDrill extends RexVisitorImpl<LogicalExpression> {
- private final RelNode input;
+ private final List<RelNode> inputs;
private final DrillParseContext context;
+ private final List<RelDataTypeField> fieldList;
- RexToDrill(DrillParseContext context, RelNode input) {
+ RexToDrill(DrillParseContext context, List<RelNode> inputs) {
super(true);
this.context = context;
- this.input = input;
+ this.inputs = inputs;
+ this.fieldList = Lists.newArrayList();
+ /*
+ Fields are enumerated by their presence order in input. Details {@link org.apache.calcite.rex.RexInputRef}.
+ Thus we can merge field list from several inputs by adding them into the list in order of appearance.
+ Each field index in the list will match field index in the RexInputRef instance which will allow us
+ to retrieve field from filed list by index in {@link #visitInputRef(RexInputRef)} method. Example:
+
+ Query: select t1.c1, t2.c1. t2.c2 from t1 inner join t2 on t1.c1 between t2.c1 and t2.c2
+
+ Input 1: $0
+ Input 2: $1, $2
+
+ Result: $0, $1, $2
+ */
+ for (RelNode input : inputs) {
+ if (input != null) {
+ fieldList.addAll(input.getRowType().getFieldList());
+ }
+ }
}
@Override
public LogicalExpression visitInputRef(RexInputRef inputRef) {
final int index = inputRef.getIndex();
- final RelDataTypeField field = input.getRowType().getFieldList().get(index);
+ final RelDataTypeField field = fieldList.get(index);
+ Preconditions.checkNotNull(field, "Unable to find field using input reference");
return FieldReference.getWithQuotedRef(field.getName());
}
@@ -129,7 +167,7 @@ public class DrillOptiq {
return FunctionCallFactory.createExpression(call.getOperator().getName().toLowerCase(),
ExpressionPosition.UNKNOWN, arg);
case MINUS_PREFIX:
- final RexBuilder builder = input.getCluster().getRexBuilder();
+ final RexBuilder builder = inputs.get(0).getCluster().getRexBuilder();
final List<RexNode> operands = Lists.newArrayList();
operands.add(builder.makeExactLiteral(new BigDecimal(-1)));
operands.add(call.getOperands().get(0));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index dd74c92ec..80e8dda1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -34,8 +34,6 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import com.google.common.collect.ImmutableList;
@@ -44,7 +42,7 @@ import com.google.common.collect.Lists;
// abstract base class for the join physical rules
public abstract class JoinPruleBase extends Prule {
- protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, NESTEDLOOP_JOIN};
+ protected enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, NESTEDLOOP_JOIN}
protected JoinPruleBase(RelOptRuleOperand operand, String description) {
super(operand, description);
@@ -56,10 +54,7 @@ public abstract class JoinPruleBase extends Prule {
List<Integer> rightKeys = Lists.newArrayList();
List<Boolean> filterNulls = Lists.newArrayList();
JoinCategory category = JoinUtils.getJoinCategory(left, right, join.getCondition(), leftKeys, rightKeys, filterNulls);
- if (category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY) {
- return false;
- }
- return true;
+ return !(category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY);
}
protected List<DistributionField> getDistributionField(List<Integer> keys) {
@@ -238,26 +233,14 @@ public abstract class JoinPruleBase extends Prule {
} else {
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
- join.getJoinType()));
-
+ call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
+ convertedRight, joinCondition, join.getJoinType()));
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
- call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
- join.getJoinType()));
+ call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
+ convertedRight, joinCondition, join.getJoinType()));
} else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
- if (joinCondition.isAlwaysTrue()) {
- call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
- join.getJoinType()));
- } else {
- RexBuilder builder = join.getCluster().getRexBuilder();
- RexLiteral condition = builder.makeLiteral(true); // TRUE condition for the NLJ
-
- FilterPrel newFilterRel = new FilterPrel(join.getCluster(), convertedLeft.getTraitSet(),
- new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight,
- condition, join.getJoinType()),
- joinCondition);
- call.transformTo(newFilterRel);
- }
+ call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
+ convertedRight, joinCondition, join.getJoinType()));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
index 7c4798ff4..b184eab88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
@@ -18,13 +18,15 @@
package org.apache.drill.exec.planner.physical;
import java.io.IOException;
-import java.util.List;
-import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.core.Join;
@@ -35,17 +37,13 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
-import com.google.common.collect.Lists;
-
public class NestedLoopJoinPrel extends JoinPrel {
public NestedLoopJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
JoinRelType joinType) throws InvalidRelException {
super(cluster, traits, left, right, condition, joinType);
- RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys, filterNulls);
}
@Override
@@ -71,8 +69,9 @@ public class NestedLoopJoinPrel extends JoinPrel {
double rightRowCount = mq.getRowCount(this.getRight());
double nljFactor = PrelUtil.getSettings(getCluster()).getNestedLoopJoinFactor();
- // cpu cost of evaluating each leftkey=rightkey join condition
- double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * this.getLeftKeys().size();
+ // cpu cost of evaluating each expression in join condition
+ int exprNum = RelOptUtil.conjunctions(getCondition()).size() + RelOptUtil.disjunctions(getCondition()).size();
+ double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * exprNum;
double cpuCost = joinConditionCost * (leftRowCount * rightRowCount) * nljFactor;
@@ -82,23 +81,29 @@ public class NestedLoopJoinPrel extends JoinPrel {
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- final List<String> fields = getRowType().getFieldNames();
- assert isUnique(fields);
-
- final List<String> leftFields = left.getRowType().getFieldNames();
- final List<String> rightFields = right.getRowType().getFieldNames();
-
PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
- JoinRelType jtype = this.getJoinType();
-
- List<JoinCondition> conditions = Lists.newArrayList();
-
- buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys);
-
- NestedLoopJoinPOP nljoin = new NestedLoopJoinPOP(leftPop, rightPop, conditions, jtype);
- return creator.addMetadata(this, nljoin);
+ /*
+ Raw expression will be transformed into its logical representation. For example:
+ Query:
+ select t1.c1, t2.c1, t2.c2 from t1 inner join t2 on t1.c1 between t2.c1 and t2.c2
+ Raw expression:
+ AND(>=($0, $1), <=($0, $2))
+ Logical expression:
+ FunctionCall [func=booleanAnd,
+ args=[FunctionCall [func=greater_than_or_equal_to, args=[`i1`, `i10`]],
+ FunctionCall [func=less_than_or_equal_to, args=[`i1`, `i2`]]]
+
+ Both tables have the same column name thus duplicated column name in second table are renamed: i1 -> i10.
+ */
+ LogicalExpression condition = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getSettings(getCluster())),
+ getInputs(),
+ getCondition());
+
+ NestedLoopJoinPOP nlj = new NestedLoopJoinPOP(leftPop, rightPop, getJoinType(), condition);
+ return creator.addMetadata(this, nlj);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
index bfb47d684..b98976b91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -49,7 +49,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
PlannerSettings settings) {
JoinRelType type = join.getJoinType();
- if (!(type == JoinRelType.INNER || (type == JoinRelType.LEFT && JoinUtils.hasScalarSubqueryInput(left, right)))) {
+ if (!(type == JoinRelType.INNER || type == JoinRelType.LEFT)) {
return false;
}
@@ -63,11 +63,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
}
if (settings.isNlJoinForScalarOnly()) {
- if (JoinUtils.hasScalarSubqueryInput(left, right)) {
- return true;
- } else {
- return false;
- }
+ return JoinUtils.hasScalarSubqueryInput(left, right);
}
return true;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index da5bc41f0..53d67c06e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -110,6 +110,30 @@ public class PlannerSettings implements Context{
public static final EnumeratedStringValidator QUOTING_IDENTIFIERS = new EnumeratedStringValidator(
QUOTING_IDENTIFIERS_KEY, Quoting.BACK_TICK.string, Quoting.DOUBLE_QUOTE.string, Quoting.BRACKET.string);
+ /*
+ Enables rules that re-write query joins in the most optimal way.
+ Though its turned on be default and its value in query optimization is undeniable, user may want turn off such
+ optimization to leave join order indicated in sql query unchanged.
+
+ For example:
+ Currently only nested loop join allows non-equi join conditions usage.
+ During planning stage nested loop join will be chosen when non-equi join is detected
+ and {@link #NLJOIN_FOR_SCALAR} set to false. Though query performance may not be the most optimal in such case,
+ user may use such workaround to execute queries with non-equi joins.
+
+ Nested loop join allows only INNER and LEFT join usage and implies that right input is smaller that left input.
+ During LEFT join when join optimization is enabled and detected that right input is larger that left,
+ join will be optimized: left and right inputs will be flipped and LEFT join type will be changed to RIGHT one.
+ If query contains non-equi joins, after such optimization it will fail, since nested loop does not allow
+ RIGHT join. In this case if user accepts probability of non optimal performance, he may turn off join optimization.
+ Turning off join optimization, makes sense only if user are not sure that right output is less or equal to left,
+ otherwise join optimization can be left turned on.
+
+ Note: once hash and merge joins will allow non-equi join conditions,
+ the need to turn off join optimization may go away.
+ */
+ public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true);
+
public OptionManager options = null;
public FunctionImplementationRegistry functionImplementationRegistry = null;
@@ -282,6 +306,10 @@ public class PlannerSettings implements Context{
.build(logger);
}
+ public boolean isJoinOptimizationEnabled() {
+ return options.getOption(JOIN_OPTIMIZATION);
+ }
+
@Override
public <T> T unwrap(Class<T> clazz) {
if(clazz == PlannerSettings.class){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 09c525929..6b8b49a46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -92,6 +92,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING,
PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD,
PlannerSettings.QUOTING_IDENTIFIERS,
+ PlannerSettings.JOIN_OPTIMIZATION,
ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
index 6210022df..10a937230 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
@@ -21,11 +21,10 @@ package org.apache.drill.exec.physical.impl.join;
import org.apache.drill.PlanTestBase;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.util.TestTools;
-import org.junit.Ignore;
import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
public class TestNestedLoopJoin extends PlanTestBase {
@@ -33,16 +32,15 @@ public class TestNestedLoopJoin extends PlanTestBase {
private static final String WORKING_PATH = TestTools.getWorkingPath();
private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
- private static final String NLJ = "Alter session set `planner.enable_hashjoin` = false; " +
- "alter session set `planner.enable_mergejoin` = false; " +
- "alter session set `planner.enable_nljoin_for_scalar_only` = false; ";
- private static final String SINGLE_NLJ = "alter session set `planner.disable_exchanges` = true; " + NLJ;
private static final String DISABLE_HJ = "alter session set `planner.enable_hashjoin` = false";
private static final String ENABLE_HJ = "alter session set `planner.enable_hashjoin` = true";
+ private static final String RESET_HJ = "alter session reset `planner.enable_hashjoin`";
private static final String DISABLE_MJ = "alter session set `planner.enable_mergejoin` = false";
private static final String ENABLE_MJ = "alter session set `planner.enable_mergejoin` = true";
private static final String DISABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = false";
private static final String ENABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = true";
+ private static final String DISABLE_JOIN_OPTIMIZATION = "alter session set `planner.enable_join_optimization` = false";
+ private static final String RESET_JOIN_OPTIMIZATION = "alter session reset `planner.enable_join_optimization`";
// Test queries used by planning and execution tests
private static final String testNlJoinExists_1 = "select r_regionkey from cp.`tpch/region.parquet` "
@@ -68,6 +66,15 @@ public class TestNestedLoopJoin extends PlanTestBase {
private static final String testNlJoinInequality_3 = "select r_regionkey from cp.`tpch/region.parquet` "
+ " where r_regionkey > (select min(n_regionkey) * 2 from cp.`tpch/nation.parquet` )";
+ private static final String testNlJoinBetween = "select " +
+ "n.n_nationkey, length(r.r_name) r_name_len, length(r.r_comment) r_comment_len " +
+ "from (select * from cp.`tpch/nation.parquet` where n_regionkey = 1) n " +
+ "%s join (select * from cp.`tpch/region.parquet` where r_regionkey = 1) r " +
+ "on n.n_nationkey between length(r.r_name) and length(r.r_comment) " +
+ "order by n.n_nationkey";
+
+ private static final String testNlJoinWithLargeRightInput = "select * from cp.`tpch/region.parquet`r " +
+ "left join cp.`tpch/nation.parquet` n on r.r_regionkey <> n.n_regionkey";
@Test
public void testNlJoinExists_1_planning() throws Exception {
@@ -75,7 +82,6 @@ public class TestNestedLoopJoin extends PlanTestBase {
}
@Test
- // @Ignore
public void testNlJoinNotIn_1_planning() throws Exception {
testPlanMatchingPatterns(testNlJoinNotIn_1, new String[]{nlpattern}, new String[]{});
}
@@ -93,7 +99,6 @@ public class TestNestedLoopJoin extends PlanTestBase {
}
@Test
- @Ignore // Re-test after CALCITE-695 is resolved
public void testNlJoinInequality_3() throws Exception {
test(DISABLE_NLJ_SCALAR);
testPlanMatchingPatterns(testNlJoinInequality_3, new String[]{nlpattern}, new String[]{});
@@ -103,8 +108,8 @@ public class TestNestedLoopJoin extends PlanTestBase {
@Test
public void testNlJoinAggrs_1_planning() throws Exception {
String query = "select total1, total2 from "
- + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` where l_suppkey between 100 and 200), "
- + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` where l_suppkey between 200 and 300) ";
+ + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` where l_suppkey between 100 and 200), "
+ + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` where l_suppkey between 200 and 300) ";
testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
}
@@ -207,7 +212,7 @@ public class TestNestedLoopJoin extends PlanTestBase {
@Test
public void testNLJWithEmptyBatch() throws Exception {
- Long result = 0l;
+ long result = 0L;
test(DISABLE_NLJ_SCALAR);
test(DISABLE_HJ);
@@ -256,18 +261,68 @@ public class TestNestedLoopJoin extends PlanTestBase {
test(ENABLE_MJ);
}
+ @Test
+ public void testNlJoinInnerBetween() throws Exception {
+ try {
+ test(DISABLE_NLJ_SCALAR);
+ String query = String.format(testNlJoinBetween, "INNER");
+ testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("n_nationkey", "r_name_length", "r_comment_length")
+ .baselineValues(17, 7, 31)
+ .baselineValues(24, 7, 31)
+ .build();
+ } finally {
+ test(RESET_HJ);
+ }
+ }
+
+ @Test
+ public void testNlJoinLeftBetween() throws Exception {
+ try {
+ test(DISABLE_NLJ_SCALAR);
+ String query = String.format(testNlJoinBetween, "LEFT");
+ testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("n_nationkey", "r_name_length", "r_comment_length")
+ .baselineValues(1, null, null)
+ .baselineValues(2, null, null)
+ .baselineValues(3, null, null)
+ .baselineValues(17, 7, 31)
+ .baselineValues(24, 7, 31)
+ .build();
+ } finally {
+ test(RESET_HJ);
+ }
+ }
+
@Test(expected = UserRemoteException.class)
- public void testExceptionLeftNlJoin() throws Exception {
+ public void testNlJoinWithLargeRightInputFailure() throws Exception {
try {
test(DISABLE_NLJ_SCALAR);
- test("select r.r_regionkey, n.n_nationkey from cp.`tpch/nation.parquet` n " +
- " left join cp.`tpch/region.parquet` r on n.n_regionkey < r.r_regionkey where n.n_nationkey < 3");
+ test(testNlJoinWithLargeRightInput);
} catch (UserRemoteException e) {
- assertThat("No expected current \"UNSUPPORTED_OPERATION ERROR\"",
- e.getMessage(), startsWith("UNSUPPORTED_OPERATION ERROR"));
+ assertThat(e.getMessage(), containsString("UNSUPPORTED_OPERATION ERROR: This query cannot be planned " +
+ "possibly due to either a cartesian join or an inequality join"));
throw e;
} finally {
- test("alter session reset `planner.enable_nljoin_for_scalar_only`");
+ test(RESET_HJ);
+ }
+ }
+
+ @Test
+ public void testNlJoinWithLargeRightInputSuccess() throws Exception {
+ try {
+ test(DISABLE_NLJ_SCALAR);
+ test(DISABLE_JOIN_OPTIMIZATION);
+ testPlanMatchingPatterns(testNlJoinWithLargeRightInput, new String[]{nlpattern}, new String[]{});
+ } finally {
+ test(RESET_HJ);
+ test(RESET_JOIN_OPTIMIZATION);
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
index c3a9c2018..6620585a9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,7 @@
package org.apache.drill.exec.planner.logical;
import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
@@ -52,7 +53,7 @@ public class DrillOptiqTest {
// create a dummy RexOver object.
RexNode window = rex.makeOver(anyType, SqlStdOperatorTable.AVG, emptyList, emptyList, e, null, null, true,
false, false);
- DrillOptiq.toDrill(null, null, window);
+ DrillOptiq.toDrill(null, (RelNode) null, window);
} catch (UserException e) {
if (e.getMessage().contains(DrillOptiq.UNSUPPORTED_REX_NODE_ERROR)) {
// got expected error return