diff options
18 files changed, 638 insertions, 256 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java new file mode 100644 index 000000000..440f69f4b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr; + +import com.google.common.base.Preconditions; + +/** + * Holder class that contains batch naming, batch and record index. Batch index is used when batch is hyper container. + * Used to distinguish batches in non-equi conditions during expression materialization. + * Mostly used for nested loop join which allows non equi-join. + * + * BatchReference instance can be created during batch initialization + * (ex: instance of {@link org.apache.drill.exec.record.AbstractRecordBatch}) + * since naming of batches used won't change during data processing. + * Though information from batch reference will be used during schema build (i.e. once per OK_NEW_SCHEMA). + * + * Example: + * BatchReference{batchName='leftBatch', batchIndex='leftIndex', recordIndex='leftIndex'} + * BatchReference{batchName='rightContainer', batchIndex='rightBatchIndex', recordIndex='rightRecordIndexWithinBatch'} + * + */ +public final class BatchReference { + + private final String batchName; + + private final String batchIndex; + + private final String recordIndex; + + public BatchReference(String batchName, String recordIndex) { + // when batch index is not indicated, record index value will be set instead + this(batchName, recordIndex, recordIndex); + } + + public BatchReference(String batchName, String batchIndex, String recordIndex) { + Preconditions.checkNotNull(batchName, "Batch name should not be null."); + Preconditions.checkNotNull(batchIndex, "Batch index should not be null."); + Preconditions.checkNotNull(recordIndex, "Record index should not be null."); + this.batchName = batchName; + this.batchIndex = batchIndex; + this.recordIndex = recordIndex; + } + + public String getBatchName() { + return batchName; + } + + public String getBatchIndex() { + return batchIndex; + } + + public String getRecordIndex() { + return recordIndex; + } + + @Override + public String toString() { + return "BatchReference{" + + "batchName='" + batchName + '\'' + + ", batchIndex='" + batchIndex + '\'' + + ", recordIndex='" + recordIndex + '\'' + + '}'; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index 75b83c971..73a03632b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -399,15 +399,29 @@ public class EvaluationVisitor { private HoldingContainer visitValueVectorReadExpression(ValueVectorReadExpression e, ClassGenerator<?> generator) throws RuntimeException { // declare value vector + DirectExpression batchName; + JExpression batchIndex; + JExpression recordIndex; + + // if value vector read expression has batch reference, use its values in generated code, + // otherwise use values provided by mapping set (which point to only one batch) + // primary used for non-equi joins where expression conditions may refer to more than one batch + BatchReference batchRef = e.getBatchRef(); + if (batchRef != null) { + batchName = DirectExpression.direct(batchRef.getBatchName()); + batchIndex = DirectExpression.direct(batchRef.getBatchIndex()); + recordIndex = DirectExpression.direct(batchRef.getRecordIndex()); + } else { + batchName = generator.getMappingSet().getIncoming(); + batchIndex = generator.getMappingSet().getValueReadIndex(); + recordIndex = batchIndex; + } - JExpression vv1 = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getIncoming(), - e.getFieldId()); - JExpression indexVariable = generator.getMappingSet().getValueReadIndex(); - - JExpression componentVariable = indexVariable.shrz(JExpr.lit(16)); + JExpression vv1 = generator.declareVectorValueSetupAndMember(batchName, e.getFieldId()); + JExpression componentVariable = batchIndex.shrz(JExpr.lit(16)); if (e.isSuperReader()) { vv1 = (vv1.component(componentVariable)); - indexVariable = indexVariable.band(JExpr.lit((int) Character.MAX_VALUE)); + recordIndex = recordIndex.band(JExpr.lit((int) Character.MAX_VALUE)); } // evaluation work. @@ -418,14 +432,9 @@ public class EvaluationVisitor { final boolean repeated = Types.isRepeated(e.getMajorType()); final boolean listVector = e.getTypedFieldId().isListVector(); - int[] fieldIds = e.getFieldId().getFieldIds(); - for (int i = 1; i < fieldIds.length; i++) { - - } - if (!hasReadPath && !complex) { JBlock eval = new JBlock(); - GetSetVectorHelper.read(e.getMajorType(), vv1, eval, out, generator.getModel(), indexVariable); + GetSetVectorHelper.read(e.getMajorType(), vv1, eval, out, generator.getModel(), recordIndex); generator.getEvalBlock().add(eval); } else { @@ -444,7 +453,7 @@ public class EvaluationVisitor { // position to the correct value. eval.add(expr.invoke("reset")); - eval.add(expr.invoke("setPosition").arg(indexVariable)); + eval.add(expr.invoke("setPosition").arg(recordIndex)); int listNum = 0; while (seg != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java index b70ad269c..b461b5cfa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,13 +22,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; import java.util.HashSet; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.BooleanOperator; @@ -64,7 +64,6 @@ import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; import org.apache.drill.common.expression.fn.CastFunctions; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; import org.apache.drill.common.expression.visitors.ConditionalExprOptimizer; -import org.apache.drill.common.expression.visitors.ExprVisitor; import org.apache.drill.common.expression.visitors.ExpressionValidator; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -80,7 +79,6 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder; import org.apache.drill.exec.expr.fn.ExceptionFunction; import org.apache.drill.exec.expr.fn.FunctionLookupContext; import org.apache.drill.exec.expr.stat.TypedFieldExpr; -import org.apache.drill.exec.record.MaterializeVisitor; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.resolver.FunctionResolver; @@ -100,7 +98,7 @@ public class ExpressionTreeMaterializer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class); private ExpressionTreeMaterializer() { - }; + } public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext) { return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, functionLookupContext, false, false); @@ -126,9 +124,51 @@ public class ExpressionTreeMaterializer { return out; } - public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext, - boolean allowComplexWriterExpr, boolean unionTypeEnabled) { - LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr, unionTypeEnabled), functionLookupContext); + /** + * Materializes logical expression taking into account passed parameters. + * Is used to materialize logical expression that contains reference to one batch. + * + * @param expr logical expression to be materialized + * @param batch batch instance + * @param errorCollector error collector + * @param functionLookupContext context to find drill function holder + * @param allowComplexWriterExpr true if complex expressions are allowed + * @param unionTypeEnabled true if union type is enabled + * @return materialized logical expression + */ + public static LogicalExpression materialize(LogicalExpression expr, + VectorAccessible batch, + ErrorCollector errorCollector, + FunctionLookupContext functionLookupContext, + boolean allowComplexWriterExpr, + boolean unionTypeEnabled) { + Map<VectorAccessible, BatchReference> batches = Maps.newHashMap(); + batches.put(batch, null); + return materialize(expr, batches, errorCollector, functionLookupContext, allowComplexWriterExpr, unionTypeEnabled); + } + + /** + * Materializes logical expression taking into account passed parameters. + * Is used to materialize logical expression that can contain several batches with or without custom batch reference. + * + * @param expr logical expression to be materialized + * @param batches one or more batch instances used in expression + * @param errorCollector error collector + * @param functionLookupContext context to find drill function holder + * @param allowComplexWriterExpr true if complex expressions are allowed + * @param unionTypeEnabled true if union type is enabled + * @return materialized logical expression + */ + public static LogicalExpression materialize(LogicalExpression expr, + Map<VectorAccessible, BatchReference> batches, + ErrorCollector errorCollector, + FunctionLookupContext functionLookupContext, + boolean allowComplexWriterExpr, + boolean unionTypeEnabled) { + + LogicalExpression out = expr.accept( + new MaterializeVisitor(batches, errorCollector, allowComplexWriterExpr, unionTypeEnabled), + functionLookupContext); if (!errorCollector.hasErrors()) { out = out.accept(ConditionalExprOptimizer.INSTANCE, null); @@ -224,24 +264,40 @@ public class ExpressionTreeMaterializer { errorCollector.addGeneralError(call.getPosition(), sb.toString()); } + /** + * Visitor that wraps schema path into value vector read expression + * if schema path is present in one of the batches, + * otherwise instance of null expression. + */ private static class MaterializeVisitor extends AbstractMaterializeVisitor { - private final VectorAccessible batch; - public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector, boolean allowComplexWriter, boolean unionTypeEnabled) { + private final Map<VectorAccessible, BatchReference> batches; + + public MaterializeVisitor(Map<VectorAccessible, BatchReference> batches, + ErrorCollector errorCollector, + boolean allowComplexWriter, + boolean unionTypeEnabled) { super(errorCollector, allowComplexWriter, unionTypeEnabled); - this.batch = batch; + this.batches = batches; } @Override - public LogicalExpression visitSchemaPath(SchemaPath path, FunctionLookupContext functionLookupContext) { - // logger.debug("Visiting schema path {}", path); - TypedFieldId tfId = batch.getValueVectorId(path); + public LogicalExpression visitSchemaPath(final SchemaPath path, FunctionLookupContext functionLookupContext) { + TypedFieldId tfId = null; + BatchReference batchRef = null; + for (Map.Entry<VectorAccessible, BatchReference> entry : batches.entrySet()) { + tfId = entry.getKey().getValueVectorId(path); + if (tfId != null) { + batchRef = entry.getValue(); + break; + } + } + if (tfId == null) { logger.warn("Unable to find value vector of path {}, returning null instance.", path); return NullExpression.INSTANCE; } else { - ValueVectorReadExpression e = new ValueVectorReadExpression(tfId); - return e; + return new ValueVectorReadExpression(tfId, batchRef); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java index a556dc239..410c48aef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +19,7 @@ package org.apache.drill.exec.expr; import java.util.Iterator; +import com.google.common.collect.ImmutableSet; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.PathSegment; @@ -26,16 +27,28 @@ import org.apache.drill.common.expression.visitors.ExprVisitor; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.record.TypedFieldId; -import com.google.common.collect.Iterators; - -public class ValueVectorReadExpression implements LogicalExpression{ +/** + * Wraps a value vector field to be read, providing metadata about the field. + * Also may contain batch naming information to which this field belongs. + * If such information is absent default namings will be used from mapping set during materialization. + */ +public class ValueVectorReadExpression implements LogicalExpression { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class); private final TypedFieldId fieldId; - + private final BatchReference batchRef; public ValueVectorReadExpression(TypedFieldId tfId){ + this(tfId, null); + } + + public ValueVectorReadExpression(TypedFieldId tfId, BatchReference batchRef){ this.fieldId = tfId; + this.batchRef = batchRef; + } + + public BatchReference getBatchRef() { + return batchRef; } public boolean hasReadPath(){ @@ -74,7 +87,7 @@ public class ValueVectorReadExpression implements LogicalExpression{ @Override public Iterator<LogicalExpression> iterator() { - return Iterators.emptyIterator(); + return ImmutableSet.<LogicalExpression>of().iterator(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java index fd584ea49..1d747f74f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,7 @@ package org.apache.drill.exec.physical.config; import java.util.Iterator; import java.util.List; -import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -33,7 +33,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; @JsonTypeName("nested-loop-join") public class NestedLoopJoinPOP extends AbstractBase { @@ -42,27 +41,20 @@ public class NestedLoopJoinPOP extends AbstractBase { private final PhysicalOperator left; private final PhysicalOperator right; - - /* - * Conditions and jointype are currently not used, since the condition is always true - * and we don't perform any special execution operation based on join type either. However - * when we enhance NLJ this would be used. - */ - private final List<JoinCondition> conditions; private final JoinRelType joinType; + private final LogicalExpression condition; @JsonCreator public NestedLoopJoinPOP( @JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, - @JsonProperty("conditions") List<JoinCondition> conditions, - @JsonProperty("joinType") JoinRelType joinType + @JsonProperty("joinType") JoinRelType joinType, + @JsonProperty("condition") LogicalExpression condition ) { this.left = left; this.right = right; - this.conditions = conditions; - Preconditions.checkArgument(joinType != null, "Join type is missing!"); this.joinType = joinType; + this.condition = condition; } @Override @@ -72,8 +64,8 @@ public class NestedLoopJoinPOP extends AbstractBase { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { - Preconditions.checkArgument(children.size() == 2); - return new NestedLoopJoinPOP(children.get(0), children.get(1), conditions, joinType); + Preconditions.checkArgument(children.size() == 2, "Nested loop join should have two physical operators"); + return new NestedLoopJoinPOP(children.get(0), children.get(1), joinType, condition); } @Override @@ -93,9 +85,7 @@ public class NestedLoopJoinPOP extends AbstractBase { return joinType; } - public List<JoinCondition> getConditions() { - return conditions; - } + public LogicalExpression getCondition() { return condition; } @Override public int getOperatorType() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java index 6cf07a2e8..f7d96ad8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl.join; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.ExpandableHyperContainer; @@ -36,8 +37,8 @@ public interface NestedLoopJoin { ExpandableHyperContainer rightContainer, LinkedList<Integer> rightCounts, NestedLoopJoinBatch outgoing); - // Produce output records - public int outputRecords(); + // Produce output records taking into account join type + public int outputRecords(JoinRelType joinType); // Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex' public void emitLeft(int leftIndex, int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index bdd9f0eb4..8336e8642 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -19,9 +19,16 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; import java.util.LinkedList; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.compile.sig.GeneratorMapping; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; @@ -29,8 +36,11 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.BatchReference; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; +import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; @@ -38,6 +48,7 @@ import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.AllocationHelper; @@ -45,6 +56,8 @@ import com.google.common.base.Preconditions; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; import com.sun.codemodel.JVar; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; /* * RecordBatch implementation for the nested loop join operator @@ -86,7 +99,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> // We accumulate all the batches on the right side in a hyper container. private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer(); - // Record count of the individual batches in the right hypoer container + // Record count of the individual batches in the right hyper container private LinkedList<Integer> rightCounts = new LinkedList<>(); @@ -132,7 +145,6 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> * Method drains the right side input of the NLJ and accumulates the data * in a hyper container. Once we have all the data from the right side we * process the left side one batch at a time and produce the output batch - * which is a cross product of the two sides. * @return IterOutcome state of the nested loop join batch */ @Override @@ -179,7 +191,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> allocateVectors(); // invoke the runtime generated method to emit records in the output batch - outputRecords = nljWorker.outputRecords(); + outputRecords = nljWorker.outputRecords(popConfig.getJoinType()); // Set the record count for (final VectorWrapper<?> vw : container) { @@ -214,26 +226,59 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> /** * Method generates the runtime code needed for NLJ. Other than the setup method to set the input and output value - * vector references we implement two more methods - * 1. emitLeft() -> Project record from the left side - * 2. emitRight() -> Project record from the right side (which is a hyper container) + * vector references we implement three more methods + * 1. doEval() -> Evaluates if record from left side matches record from the right side + * 2. emitLeft() -> Project record from the left side + * 3. emitRight() -> Project record from the right side (which is a hyper container) * @return the runtime generated class that implements the NestedLoopJoin interface - * @throws IOException - * @throws ClassTransformationException */ - private NestedLoopJoin setupWorker() throws IOException, ClassTransformationException { - final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get(NestedLoopJoin.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + private NestedLoopJoin setupWorker() throws IOException, ClassTransformationException, SchemaChangeException { + final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get( + NestedLoopJoin.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); nLJCodeGenerator.plainJavaCapable(true); // Uncomment out this line to debug the generated code. // nLJCodeGenerator.saveCodeForDebugging(true); final ClassGenerator<NestedLoopJoin> nLJClassGenerator = nLJCodeGenerator.getRoot(); + // generate doEval + final ErrorCollector collector = new ErrorCollectorImpl(); + + /* + Logical expression may contain fields from left and right batches. During code generation (materialization) + we need to indicate from which input field should be taken. + + Non-equality joins can belong to one of below categories. For example: + 1. Join on non-equality join predicates: + select * from t1 inner join t2 on (t1.c1 between t2.c1 AND t2.c2) AND (...) + 2. Join with an OR predicate: + select * from t1 inner join t2 on on t1.c1 = t2.c1 OR t1.c2 = t2.c2 + */ + Map<VectorAccessible, BatchReference> batches = ImmutableMap + .<VectorAccessible, BatchReference>builder() + .put(left, new BatchReference("leftBatch", "leftIndex")) + .put(rightContainer, new BatchReference("rightContainer", "rightBatchIndex", "rightRecordIndexWithinBatch")) + .build(); + + LogicalExpression materialize = ExpressionTreeMaterializer.materialize( + popConfig.getCondition(), + batches, + collector, + context.getFunctionRegistry(), + false, + false); + + if (collector.hasErrors()) { + throw new SchemaChangeException(String.format("Failure while trying to materialize join condition. Errors:\n %s.", + collector.toErrorString())); + } + + nLJClassGenerator.addExpr(new ReturnValueExpression(materialize), ClassGenerator.BlkCreateMode.FALSE); + // generate emitLeft nLJClassGenerator.setMappingSet(emitLeftMapping); JExpression outIndex = JExpr.direct("outIndex"); JExpression leftIndex = JExpr.direct("leftIndex"); - int fieldId = 0; int outputFieldId = 0; // Set the input and output value vector references corresponding to the left batch @@ -243,8 +288,10 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> // Add the vector to the output container container.addOrGet(field); - JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch", new TypedFieldId(fieldType, false, fieldId)); - JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(fieldType, false, outputFieldId)); + JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch", + new TypedFieldId(fieldType, false, fieldId)); + JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", + new TypedFieldId(fieldType, false, outputFieldId)); nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV)); nLJClassGenerator.rotateBlock(); @@ -252,6 +299,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> outputFieldId++; } + // generate emitRight fieldId = 0; nLJClassGenerator.setMappingSet(emitRightMapping); JExpression batchIndex = JExpr.direct("batchIndex"); @@ -260,12 +308,22 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> // Set the input and output value vector references corresponding to the right batch for (MaterializedField field : rightSchema) { - final TypeProtos.MajorType fieldType = field.getType(); - // Add the vector to our output container - container.addOrGet(field); + final TypeProtos.MajorType inputType = field.getType(); + TypeProtos.MajorType outputType; + // if join type is LEFT, make sure right batch output fields data mode is optional + if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL); + } else { + outputType = inputType; + } + + MaterializedField newField = MaterializedField.create(field.getPath(), outputType); + container.addOrGet(newField); - JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer", new TypedFieldId(field.getType(), true, fieldId)); - JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(fieldType, false, outputFieldId)); + JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer", + new TypedFieldId(inputType, true, fieldId)); + JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", + new TypedFieldId(outputType, false, outputFieldId)); nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe") .arg(recordIndexWithinBatch) .arg(outIndex) @@ -290,7 +348,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> /** * Builds the output container's schema. Goes over the left and the right * batch and adds the corresponding vectors to the output container. - * @throws SchemaChangeException + * @throws SchemaChangeException if batch schema was changed during execution */ @Override protected void buildSchema() throws SchemaChangeException { @@ -314,28 +372,39 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> for (final VectorWrapper<?> vw : left) { container.addOrGet(vw.getField()); } - - // if we have a schema batch, skip it - if (left.getRecordCount() == 0) { - leftUpstream = next(LEFT_INPUT, left); - } } if (rightUpstream != IterOutcome.NONE) { - rightSchema = right.getSchema(); - for (final VectorWrapper<?> vw : right) { - container.addOrGet(vw.getField()); + // make right input schema optional if we have LEFT join + for (final VectorWrapper<?> vectorWrapper : right) { + TypeProtos.MajorType inputType = vectorWrapper.getField().getType(); + TypeProtos.MajorType outputType; + if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL); + } else { + outputType = inputType; + } + MaterializedField newField = MaterializedField.create(vectorWrapper.getField().getPath(), outputType); + ValueVector valueVector = container.addOrGet(newField); + if (valueVector instanceof AbstractContainerVector) { + vectorWrapper.getValueVector().makeTransferPair(valueVector); + valueVector.clear(); + } } + rightSchema = right.getSchema(); addBatchToHyperContainer(right); } + allocateVectors(); nljWorker = setupWorker(); - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - - allocateVectors(); + // if left batch is empty, fetch next + if (leftUpstream != IterOutcome.NONE && left.getRecordCount() == 0) { + leftUpstream = next(LEFT_INPUT, left); + } container.setRecordCount(0); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException(e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java index 842c891d9..bdd6f9d8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl.join; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.ExpandableHyperContainer; @@ -40,35 +41,32 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { // Record count of the left batch currently being processed private int leftRecordCount = 0; - // List of record counts per batch in the hyper container + // List of record counts per batch in the hyper container private List<Integer> rightCounts = null; // Output batch private NestedLoopJoinBatch outgoing = null; - // Next right batch to process - private int nextRightBatchToProcess = 0; - - // Next record in the current right batch to process - private int nextRightRecordToProcess = 0; - - // Next record in the left batch to process - private int nextLeftRecordToProcess = 0; + // Iteration status tracker + private IterationStatusTracker tracker = new IterationStatusTracker(); /** * Method initializes necessary state and invokes the doSetup() to set the - * input and output value vector references + * input and output value vector references. + * * @param context Fragment context * @param left Current left input batch being processed * @param rightContainer Hyper container + * @param rightCounts Counts for each right container * @param outgoing Output batch */ - public void setupNestedLoopJoin(FragmentContext context, RecordBatch left, + public void setupNestedLoopJoin(FragmentContext context, + RecordBatch left, ExpandableHyperContainer rightContainer, LinkedList<Integer> rightCounts, NestedLoopJoinBatch outgoing) { this.left = left; - leftRecordCount = left.getRecordCount(); + this.leftRecordCount = left.getRecordCount(); this.rightCounts = rightCounts; this.outgoing = outgoing; @@ -76,96 +74,100 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { } /** - * This method is the core of the nested loop join. For every record on the right we go over - * the left batch and produce the cross product output + * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method + * controls which left batch we are processing and fetches the next left input batch once we exhaust the current one. + * + * @param joinType join type (INNER ot LEFT) + * @return the number of records produced in the output batch + */ + public int outputRecords(JoinRelType joinType) { + int outputIndex = 0; + while (leftRecordCount != 0) { + outputIndex = populateOutgoingBatch(joinType, outputIndex); + if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) { + break; + } + // reset state and get next left batch + resetAndGetNextLeft(); + } + return outputIndex; + } + + /** + * This method is the core of the nested loop join.For each left batch record looks for matching record + * from the list of right batches. Match is checked by calling {@link #doEval(int, int, int)} method. + * If matching record is found both left and right records are written into output batch, + * otherwise if join type is LEFT, than only left record is written, right batch record values will be null. + * + * @param joinType join type (INNER or LEFT) * @param outputIndex index to start emitting records at * @return final outputIndex after producing records in the output batch */ - private int populateOutgoingBatch(int outputIndex) { - - // Total number of batches on the right side - int totalRightBatches = rightCounts.size(); - - // Total number of records on the left - int localLeftRecordCount = leftRecordCount; - - /* - * The below logic is the core of the NLJ. To have better performance we copy the instance members into local - * method variables, once we are done with the loop we need to update the instance variables to reflect the new - * state. To avoid code duplication of resetting the instance members at every exit point in the loop we are using - * 'goto' - */ - int localNextRightBatchToProcess = nextRightBatchToProcess; - int localNextRightRecordToProcess = nextRightRecordToProcess; - int localNextLeftRecordToProcess = nextLeftRecordToProcess; - - outer: { - - for (; localNextRightBatchToProcess< totalRightBatches; localNextRightBatchToProcess++) { // for every batch on the right - int compositeIndexPart = localNextRightBatchToProcess << 16; - int rightRecordCount = rightCounts.get(localNextRightBatchToProcess); - - for (; localNextRightRecordToProcess < rightRecordCount; localNextRightRecordToProcess++) { // for every record in this right batch - for (; localNextLeftRecordToProcess < localLeftRecordCount; localNextLeftRecordToProcess++) { // for every record in the left batch - + private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) { + // copy index and match counters as local variables to speed up processing + int nextRightBatchToProcess = tracker.getNextRightBatchToProcess(); + int nextRightRecordToProcess = tracker.getNextRightRecordToProcess(); + int nextLeftRecordToProcess = tracker.getNextLeftRecordToProcess(); + boolean rightRecordMatched = tracker.isRightRecordMatched(); + + outer: + // for every record in the left batch + for (; nextLeftRecordToProcess < leftRecordCount; nextLeftRecordToProcess++) { + // for every batch on the right + for (; nextRightBatchToProcess < rightCounts.size(); nextRightBatchToProcess++) { + int rightRecordCount = rightCounts.get(nextRightBatchToProcess); + // for every record in right batch + for (; nextRightRecordToProcess < rightRecordCount; nextRightRecordToProcess++) { + + if (doEval(nextLeftRecordToProcess, nextRightBatchToProcess, nextRightRecordToProcess)) { // project records from the left and right batches - emitLeft(localNextLeftRecordToProcess, outputIndex); - emitRight(localNextRightBatchToProcess, localNextRightRecordToProcess, outputIndex); + emitLeft(nextLeftRecordToProcess, outputIndex); + emitRight(nextRightBatchToProcess, nextRightRecordToProcess, outputIndex); outputIndex++; + rightRecordMatched = true; - // TODO: Optimization; We can eliminate this check and compute the limits before the loop if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) { - localNextLeftRecordToProcess++; + nextRightRecordToProcess++; // no more space left in the batch, stop processing break outer; } } - localNextLeftRecordToProcess = 0; } - localNextRightRecordToProcess = 0; + nextRightRecordToProcess = 0; } - } - - // update the instance members - nextRightBatchToProcess = localNextRightBatchToProcess; - nextRightRecordToProcess = localNextRightRecordToProcess; - nextLeftRecordToProcess = localNextLeftRecordToProcess; - - // done with the current left batch and there is space in the output batch continue processing - return outputIndex; - } - - /** - * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method - * controls which left batch we are processing and fetches the next left input batch one we exhaust - * the current one. - * @return the number of records produced in the output batch - */ - public int outputRecords() { - int outputIndex = 0; - while (leftRecordCount != 0) { - outputIndex = populateOutgoingBatch(outputIndex); - if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) { - break; + nextRightBatchToProcess = 0; + if (joinType == JoinRelType.LEFT && !rightRecordMatched) { + // project records from the left side only, records from right will be null + emitLeft(nextLeftRecordToProcess, outputIndex); + outputIndex++; + if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) { + nextLeftRecordToProcess++; + + // no more space left in the batch, stop processing + break; + } + } else { + // reset match indicator if matching record was found + rightRecordMatched = false; } - // reset state and get next left batch - resetAndGetNextLeft(); } + + // update iteration status tracker with actual index and match counters + tracker.update(nextRightBatchToProcess, nextRightRecordToProcess, nextLeftRecordToProcess, rightRecordMatched); return outputIndex; } /** - * Utility method to clear the memory in the left input batch once we have completed processing it. Resets some - * internal state which indicate the next records to process in the left and right batches. Also fetches the next - * left input batch. + * Utility method to clear the memory in the left input batch once we have completed processing it. + * Resets some internal state which indicates the next records to process in the left and right batches, + * also fetches the next left input batch. */ private void resetAndGetNextLeft() { - for (VectorWrapper<?> vw : left) { vw.getValueVector().clear(); } - nextRightBatchToProcess = nextRightRecordToProcess = nextLeftRecordToProcess = 0; + tracker.reset(); RecordBatch.IterOutcome leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left); switch (leftOutcome) { case OK_NEW_SCHEMA: @@ -191,5 +193,57 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { @Named("recordIndexWithinBatch") int recordIndexWithinBatch, @Named("outIndex") int outIndex); - public abstract void emitLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex); + public abstract void emitLeft(@Named("leftIndex") int leftIndex, + @Named("outIndex") int outIndex); + + protected abstract boolean doEval(@Named("leftIndex") int leftIndex, + @Named("rightBatchIndex") int batchIndex, + @Named("rightRecordIndexWithinBatch") int recordIndexWithinBatch); + + /** + * Helper class to track position of left and record batches during iteration + * and match status of record from the right batch. + */ + private static class IterationStatusTracker { + // Next right batch to process + private int nextRightBatchToProcess; + // Next record in the current right batch to process + private int nextRightRecordToProcess; + // Next record in the left batch to process + private int nextLeftRecordToProcess; + // Flag to indicate if record from the left found matching record from the right, applicable during left join + private boolean rightRecordMatched; + + int getNextRightBatchToProcess() { + return nextRightBatchToProcess; + } + + boolean isRightRecordMatched() { + return rightRecordMatched; + } + + int getNextLeftRecordToProcess() { + return nextLeftRecordToProcess; + } + + int getNextRightRecordToProcess() { + return nextRightRecordToProcess; + } + + void update(int nextRightBatchToProcess, + int nextRightRecordToProcess, + int nextLeftRecordToProcess, + boolean rightRecordMatchFound) { + this.nextRightBatchToProcess = nextRightBatchToProcess; + this.nextRightRecordToProcess = nextRightRecordToProcess; + this.nextLeftRecordToProcess = nextLeftRecordToProcess; + this.rightRecordMatched = rightRecordMatchFound; + } + + void reset() { + nextRightBatchToProcess = nextRightRecordToProcess = nextLeftRecordToProcess = 0; + rightRecordMatched = false; + } + + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index 155104038..513db9ba3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +19,7 @@ package org.apache.drill.exec.planner; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; +import com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.volcano.AbstractConverter.ExpandConversionRule; import org.apache.calcite.rel.core.RelFactories; @@ -126,11 +127,14 @@ public enum PlannerPhase { JOIN_PLANNING("LOPT Join Planning") { public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) { + List<RelOptRule> rules = Lists.newArrayList(); + if (context.getPlannerSettings().isJoinOptimizationEnabled()) { + rules.add(DRILL_JOIN_TO_MULTIJOIN_RULE); + rules.add(DRILL_LOPT_OPTIMIZE_JOIN_RULE); + } + rules.add(ProjectRemoveRule.INSTANCE); return PlannerPhase.mergedRuleSets( - RuleSets.ofList( - DRILL_JOIN_TO_MULTIJOIN_RULE, - DRILL_LOPT_OPTIMIZE_JOIN_RULE, - ProjectRemoveRule.INSTANCE), + RuleSets.ofList(rules), getStorageRules(context, plugins, this) ); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java index 4a94c719a..19c75240e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -14,12 +14,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - ******************************************************************************/ +*/ package org.apache.drill.exec.planner.logical; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import io.netty.buffer.DrillBuf; +import org.apache.calcite.rel.RelNode; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionStringBuilder; @@ -120,7 +121,7 @@ public class DrillConstExecutor implements RelOptPlanner.Executor { @Override public void reduce(final RexBuilder rexBuilder, List<RexNode> constExps, final List<RexNode> reducedValues) { for (final RexNode newCall : constExps) { - LogicalExpression logEx = DrillOptiq.toDrill(new DrillParseContext(plannerSettings), null /* input rel */, newCall); + LogicalExpression logEx = DrillOptiq.toDrill(new DrillParseContext(plannerSettings), (RelNode) null /* input rel */, newCall); ErrorCollectorImpl errors = new ErrorCollectorImpl(); final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(logEx, null, errors, funcImplReg); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java index 87b76aec9..5a907875c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,7 @@ import java.util.GregorianCalendar; import java.util.LinkedList; import java.util.List; -import org.apache.calcite.rel.logical.LogicalAggregate; +import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; @@ -70,27 +70,65 @@ public class DrillOptiq { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOptiq.class); /** - * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax. + * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax using one input. + * + * @param context parse context which contains planner settings + * @param input data input + * @param expr expression to be converted + * @return converted expression */ public static LogicalExpression toDrill(DrillParseContext context, RelNode input, RexNode expr) { - final RexToDrill visitor = new RexToDrill(context, input); + return toDrill(context, Lists.newArrayList(input), expr); + } + + /** + * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax using multiple inputs. + * + * @param context parse context which contains planner settings + * @param inputs multiple data inputs + * @param expr expression to be converted + * @return converted expression + */ + public static LogicalExpression toDrill(DrillParseContext context, List<RelNode> inputs, RexNode expr) { + final RexToDrill visitor = new RexToDrill(context, inputs); return expr.accept(visitor); } private static class RexToDrill extends RexVisitorImpl<LogicalExpression> { - private final RelNode input; + private final List<RelNode> inputs; private final DrillParseContext context; + private final List<RelDataTypeField> fieldList; - RexToDrill(DrillParseContext context, RelNode input) { + RexToDrill(DrillParseContext context, List<RelNode> inputs) { super(true); this.context = context; - this.input = input; + this.inputs = inputs; + this.fieldList = Lists.newArrayList(); + /* + Fields are enumerated by their presence order in input. Details {@link org.apache.calcite.rex.RexInputRef}. + Thus we can merge field list from several inputs by adding them into the list in order of appearance. + Each field index in the list will match field index in the RexInputRef instance which will allow us + to retrieve field from filed list by index in {@link #visitInputRef(RexInputRef)} method. Example: + + Query: select t1.c1, t2.c1. t2.c2 from t1 inner join t2 on t1.c1 between t2.c1 and t2.c2 + + Input 1: $0 + Input 2: $1, $2 + + Result: $0, $1, $2 + */ + for (RelNode input : inputs) { + if (input != null) { + fieldList.addAll(input.getRowType().getFieldList()); + } + } } @Override public LogicalExpression visitInputRef(RexInputRef inputRef) { final int index = inputRef.getIndex(); - final RelDataTypeField field = input.getRowType().getFieldList().get(index); + final RelDataTypeField field = fieldList.get(index); + Preconditions.checkNotNull(field, "Unable to find field using input reference"); return FieldReference.getWithQuotedRef(field.getName()); } @@ -129,7 +167,7 @@ public class DrillOptiq { return FunctionCallFactory.createExpression(call.getOperator().getName().toLowerCase(), ExpressionPosition.UNKNOWN, arg); case MINUS_PREFIX: - final RexBuilder builder = input.getCluster().getRexBuilder(); + final RexBuilder builder = inputs.get(0).getCluster().getRexBuilder(); final List<RexNode> operands = Lists.newArrayList(); operands.add(builder.makeExactLiteral(new BigDecimal(-1))); operands.add(call.getOperands().get(0)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java index dd74c92ec..80e8dda1c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java @@ -34,8 +34,6 @@ import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import com.google.common.collect.ImmutableList; @@ -44,7 +42,7 @@ import com.google.common.collect.Lists; // abstract base class for the join physical rules public abstract class JoinPruleBase extends Prule { - protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, NESTEDLOOP_JOIN}; + protected enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, NESTEDLOOP_JOIN} protected JoinPruleBase(RelOptRuleOperand operand, String description) { super(operand, description); @@ -56,10 +54,7 @@ public abstract class JoinPruleBase extends Prule { List<Integer> rightKeys = Lists.newArrayList(); List<Boolean> filterNulls = Lists.newArrayList(); JoinCategory category = JoinUtils.getJoinCategory(left, right, join.getCondition(), leftKeys, rightKeys, filterNulls); - if (category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY) { - return false; - } - return true; + return !(category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY); } protected List<DistributionField> getDistributionField(List<Integer> keys) { @@ -238,26 +233,14 @@ public abstract class JoinPruleBase extends Prule { } else { if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { - call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition, - join.getJoinType())); - + call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, + convertedRight, joinCondition, join.getJoinType())); } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { - call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition, - join.getJoinType())); + call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, + convertedRight, joinCondition, join.getJoinType())); } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) { - if (joinCondition.isAlwaysTrue()) { - call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition, - join.getJoinType())); - } else { - RexBuilder builder = join.getCluster().getRexBuilder(); - RexLiteral condition = builder.makeLiteral(true); // TRUE condition for the NLJ - - FilterPrel newFilterRel = new FilterPrel(join.getCluster(), convertedLeft.getTraitSet(), - new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, - condition, join.getJoinType()), - joinCondition); - call.transformTo(newFilterRel); - } + call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, + convertedRight, joinCondition, join.getJoinType())); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java index 7c4798ff4..b184eab88 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java @@ -18,13 +18,15 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; -import java.util.List; -import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.core.Join; @@ -35,17 +37,13 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelTraitSet; -import com.google.common.collect.Lists; - public class NestedLoopJoinPrel extends JoinPrel { public NestedLoopJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType) throws InvalidRelException { super(cluster, traits, left, right, condition, joinType); - RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys, filterNulls); } @Override @@ -71,8 +69,9 @@ public class NestedLoopJoinPrel extends JoinPrel { double rightRowCount = mq.getRowCount(this.getRight()); double nljFactor = PrelUtil.getSettings(getCluster()).getNestedLoopJoinFactor(); - // cpu cost of evaluating each leftkey=rightkey join condition - double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * this.getLeftKeys().size(); + // cpu cost of evaluating each expression in join condition + int exprNum = RelOptUtil.conjunctions(getCondition()).size() + RelOptUtil.disjunctions(getCondition()).size(); + double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * exprNum; double cpuCost = joinConditionCost * (leftRowCount * rightRowCount) * nljFactor; @@ -82,23 +81,29 @@ public class NestedLoopJoinPrel extends JoinPrel { @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - final List<String> fields = getRowType().getFieldNames(); - assert isUnique(fields); - - final List<String> leftFields = left.getRowType().getFieldNames(); - final List<String> rightFields = right.getRowType().getFieldNames(); - PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator); PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator); - JoinRelType jtype = this.getJoinType(); - - List<JoinCondition> conditions = Lists.newArrayList(); - - buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys); - - NestedLoopJoinPOP nljoin = new NestedLoopJoinPOP(leftPop, rightPop, conditions, jtype); - return creator.addMetadata(this, nljoin); + /* + Raw expression will be transformed into its logical representation. For example: + Query: + select t1.c1, t2.c1, t2.c2 from t1 inner join t2 on t1.c1 between t2.c1 and t2.c2 + Raw expression: + AND(>=($0, $1), <=($0, $2)) + Logical expression: + FunctionCall [func=booleanAnd, + args=[FunctionCall [func=greater_than_or_equal_to, args=[`i1`, `i10`]], + FunctionCall [func=less_than_or_equal_to, args=[`i1`, `i2`]]] + + Both tables have the same column name thus duplicated column name in second table are renamed: i1 -> i10. + */ + LogicalExpression condition = DrillOptiq.toDrill( + new DrillParseContext(PrelUtil.getSettings(getCluster())), + getInputs(), + getCondition()); + + NestedLoopJoinPOP nlj = new NestedLoopJoinPOP(leftPop, rightPop, getJoinType(), condition); + return creator.addMetadata(this, nlj); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java index bfb47d684..b98976b91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java @@ -49,7 +49,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase { PlannerSettings settings) { JoinRelType type = join.getJoinType(); - if (!(type == JoinRelType.INNER || (type == JoinRelType.LEFT && JoinUtils.hasScalarSubqueryInput(left, right)))) { + if (!(type == JoinRelType.INNER || type == JoinRelType.LEFT)) { return false; } @@ -63,11 +63,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase { } if (settings.isNlJoinForScalarOnly()) { - if (JoinUtils.hasScalarSubqueryInput(left, right)) { - return true; - } else { - return false; - } + return JoinUtils.hasScalarSubqueryInput(left, right); } return true; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index da5bc41f0..53d67c06e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -110,6 +110,30 @@ public class PlannerSettings implements Context{ public static final EnumeratedStringValidator QUOTING_IDENTIFIERS = new EnumeratedStringValidator( QUOTING_IDENTIFIERS_KEY, Quoting.BACK_TICK.string, Quoting.DOUBLE_QUOTE.string, Quoting.BRACKET.string); + /* + Enables rules that re-write query joins in the most optimal way. + Though its turned on be default and its value in query optimization is undeniable, user may want turn off such + optimization to leave join order indicated in sql query unchanged. + + For example: + Currently only nested loop join allows non-equi join conditions usage. + During planning stage nested loop join will be chosen when non-equi join is detected + and {@link #NLJOIN_FOR_SCALAR} set to false. Though query performance may not be the most optimal in such case, + user may use such workaround to execute queries with non-equi joins. + + Nested loop join allows only INNER and LEFT join usage and implies that right input is smaller that left input. + During LEFT join when join optimization is enabled and detected that right input is larger that left, + join will be optimized: left and right inputs will be flipped and LEFT join type will be changed to RIGHT one. + If query contains non-equi joins, after such optimization it will fail, since nested loop does not allow + RIGHT join. In this case if user accepts probability of non optimal performance, he may turn off join optimization. + Turning off join optimization, makes sense only if user are not sure that right output is less or equal to left, + otherwise join optimization can be left turned on. + + Note: once hash and merge joins will allow non-equi join conditions, + the need to turn off join optimization may go away. + */ + public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true); + public OptionManager options = null; public FunctionImplementationRegistry functionImplementationRegistry = null; @@ -282,6 +306,10 @@ public class PlannerSettings implements Context{ .build(logger); } + public boolean isJoinOptimizationEnabled() { + return options.getOption(JOIN_OPTIMIZATION); + } + @Override public <T> T unwrap(Class<T> clazz) { if(clazz == PlannerSettings.class){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 09c525929..6b8b49a46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -92,6 +92,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING, PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD, PlannerSettings.QUOTING_IDENTIFIERS, + PlannerSettings.JOIN_OPTIMIZATION, ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java index 6210022df..10a937230 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java @@ -21,11 +21,10 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.drill.PlanTestBase; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.common.util.TestTools; -import org.junit.Ignore; import org.junit.Test; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertThat; public class TestNestedLoopJoin extends PlanTestBase { @@ -33,16 +32,15 @@ public class TestNestedLoopJoin extends PlanTestBase { private static final String WORKING_PATH = TestTools.getWorkingPath(); private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; - private static final String NLJ = "Alter session set `planner.enable_hashjoin` = false; " + - "alter session set `planner.enable_mergejoin` = false; " + - "alter session set `planner.enable_nljoin_for_scalar_only` = false; "; - private static final String SINGLE_NLJ = "alter session set `planner.disable_exchanges` = true; " + NLJ; private static final String DISABLE_HJ = "alter session set `planner.enable_hashjoin` = false"; private static final String ENABLE_HJ = "alter session set `planner.enable_hashjoin` = true"; + private static final String RESET_HJ = "alter session reset `planner.enable_hashjoin`"; private static final String DISABLE_MJ = "alter session set `planner.enable_mergejoin` = false"; private static final String ENABLE_MJ = "alter session set `planner.enable_mergejoin` = true"; private static final String DISABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = false"; private static final String ENABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = true"; + private static final String DISABLE_JOIN_OPTIMIZATION = "alter session set `planner.enable_join_optimization` = false"; + private static final String RESET_JOIN_OPTIMIZATION = "alter session reset `planner.enable_join_optimization`"; // Test queries used by planning and execution tests private static final String testNlJoinExists_1 = "select r_regionkey from cp.`tpch/region.parquet` " @@ -68,6 +66,15 @@ public class TestNestedLoopJoin extends PlanTestBase { private static final String testNlJoinInequality_3 = "select r_regionkey from cp.`tpch/region.parquet` " + " where r_regionkey > (select min(n_regionkey) * 2 from cp.`tpch/nation.parquet` )"; + private static final String testNlJoinBetween = "select " + + "n.n_nationkey, length(r.r_name) r_name_len, length(r.r_comment) r_comment_len " + + "from (select * from cp.`tpch/nation.parquet` where n_regionkey = 1) n " + + "%s join (select * from cp.`tpch/region.parquet` where r_regionkey = 1) r " + + "on n.n_nationkey between length(r.r_name) and length(r.r_comment) " + + "order by n.n_nationkey"; + + private static final String testNlJoinWithLargeRightInput = "select * from cp.`tpch/region.parquet`r " + + "left join cp.`tpch/nation.parquet` n on r.r_regionkey <> n.n_regionkey"; @Test public void testNlJoinExists_1_planning() throws Exception { @@ -75,7 +82,6 @@ public class TestNestedLoopJoin extends PlanTestBase { } @Test - // @Ignore public void testNlJoinNotIn_1_planning() throws Exception { testPlanMatchingPatterns(testNlJoinNotIn_1, new String[]{nlpattern}, new String[]{}); } @@ -93,7 +99,6 @@ public class TestNestedLoopJoin extends PlanTestBase { } @Test - @Ignore // Re-test after CALCITE-695 is resolved public void testNlJoinInequality_3() throws Exception { test(DISABLE_NLJ_SCALAR); testPlanMatchingPatterns(testNlJoinInequality_3, new String[]{nlpattern}, new String[]{}); @@ -103,8 +108,8 @@ public class TestNestedLoopJoin extends PlanTestBase { @Test public void testNlJoinAggrs_1_planning() throws Exception { String query = "select total1, total2 from " - + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` where l_suppkey between 100 and 200), " - + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` where l_suppkey between 200 and 300) "; + + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` where l_suppkey between 100 and 200), " + + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` where l_suppkey between 200 and 300) "; testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); } @@ -207,7 +212,7 @@ public class TestNestedLoopJoin extends PlanTestBase { @Test public void testNLJWithEmptyBatch() throws Exception { - Long result = 0l; + long result = 0L; test(DISABLE_NLJ_SCALAR); test(DISABLE_HJ); @@ -256,18 +261,68 @@ public class TestNestedLoopJoin extends PlanTestBase { test(ENABLE_MJ); } + @Test + public void testNlJoinInnerBetween() throws Exception { + try { + test(DISABLE_NLJ_SCALAR); + String query = String.format(testNlJoinBetween, "INNER"); + testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("n_nationkey", "r_name_length", "r_comment_length") + .baselineValues(17, 7, 31) + .baselineValues(24, 7, 31) + .build(); + } finally { + test(RESET_HJ); + } + } + + @Test + public void testNlJoinLeftBetween() throws Exception { + try { + test(DISABLE_NLJ_SCALAR); + String query = String.format(testNlJoinBetween, "LEFT"); + testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("n_nationkey", "r_name_length", "r_comment_length") + .baselineValues(1, null, null) + .baselineValues(2, null, null) + .baselineValues(3, null, null) + .baselineValues(17, 7, 31) + .baselineValues(24, 7, 31) + .build(); + } finally { + test(RESET_HJ); + } + } + @Test(expected = UserRemoteException.class) - public void testExceptionLeftNlJoin() throws Exception { + public void testNlJoinWithLargeRightInputFailure() throws Exception { try { test(DISABLE_NLJ_SCALAR); - test("select r.r_regionkey, n.n_nationkey from cp.`tpch/nation.parquet` n " + - " left join cp.`tpch/region.parquet` r on n.n_regionkey < r.r_regionkey where n.n_nationkey < 3"); + test(testNlJoinWithLargeRightInput); } catch (UserRemoteException e) { - assertThat("No expected current \"UNSUPPORTED_OPERATION ERROR\"", - e.getMessage(), startsWith("UNSUPPORTED_OPERATION ERROR")); + assertThat(e.getMessage(), containsString("UNSUPPORTED_OPERATION ERROR: This query cannot be planned " + + "possibly due to either a cartesian join or an inequality join")); throw e; } finally { - test("alter session reset `planner.enable_nljoin_for_scalar_only`"); + test(RESET_HJ); + } + } + + @Test + public void testNlJoinWithLargeRightInputSuccess() throws Exception { + try { + test(DISABLE_NLJ_SCALAR); + test(DISABLE_JOIN_OPTIMIZATION); + testPlanMatchingPatterns(testNlJoinWithLargeRightInput, new String[]{nlpattern}, new String[]{}); + } finally { + test(RESET_HJ); + test(RESET_JOIN_OPTIMIZATION); } } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java index c3a9c2018..6620585a9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,7 @@ package org.apache.drill.exec.planner.logical; import com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; @@ -52,7 +53,7 @@ public class DrillOptiqTest { // create a dummy RexOver object. RexNode window = rex.makeOver(anyType, SqlStdOperatorTable.AVG, emptyList, emptyList, e, null, null, true, false, false); - DrillOptiq.toDrill(null, null, window); + DrillOptiq.toDrill(null, (RelNode) null, window); } catch (UserException e) { if (e.getMessage().contains(DrillOptiq.UNSUPPORTED_REX_NODE_ERROR)) { // got expected error return |