aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHanumath Maduri <hmaduri@apache.org>2019-01-11 20:17:47 -0800
committerAman Sinha <asinha@maprtech.com>2019-02-01 10:14:51 -0800
commit982e98061e029a39f1c593f695c0d93ec7079f0d (patch)
tree93f70d6d0bb1750011e7b9f34e6fd7f6f4b631a7
parent8fb85cd4370e6143641cda1ad5b998caca0b6bf7 (diff)
DRILL-6997: Semijoin is changing the join ordering for some tpcds queries.
close apache/drill#1620
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRule.java183
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java10
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java25
7 files changed, 281 insertions, 65 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 2d2b073a6..6cdb18d2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.drill.exec.planner.logical.DrillSemiJoinRule;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -167,6 +168,14 @@ public enum PlannerPhase {
}
},
+ SEMIJOIN_CONVERSION("Pushing down semi joins") {
+ public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
+ return PlannerPhase.mergedRuleSets(
+ RuleSets.ofList(DrillSemiJoinRule.JOIN)
+ );
+ }
+ },
+
SUM_CONVERSION("Convert SUM to $SUM0") {
public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
return PlannerPhase.mergedRuleSets(
@@ -379,10 +388,6 @@ public enum PlannerPhase {
DrillMergeProjectRule.getInstance(true, RelFactories.DEFAULT_PROJECT_FACTORY,
optimizerRulesContext.getFunctionRegistry())
);
- if (optimizerRulesContext.getPlannerSettings().isHashJoinEnabled() &&
- optimizerRulesContext.getPlannerSettings().isSemiJoinEnabled()) {
- basicRules.add(RuleInstance.SEMI_JOIN_PROJECT_RULE);
- }
return RuleSets.ofList(basicRules.build());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 86a03b5ac..e4314826d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -49,7 +49,6 @@ import org.apache.calcite.rel.rules.SubQueryRemoveRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.drill.exec.planner.logical.DrillConditions;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
* Contains rule instances which use custom RelBuilder.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
index 527b74464..8552f755f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
@@ -17,13 +17,19 @@
*/
package org.apache.drill.exec.planner.logical;
+import org.apache.calcite.rex.RexChecker;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Litmus;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.FieldReference;
@@ -78,7 +84,6 @@ public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel {
List<String> fields = new ArrayList<>();
fields.addAll(getInput(0).getRowType().getFieldNames());
fields.addAll(getInput(1).getRowType().getFieldNames());
- Preconditions.checkArgument(DrillJoinRel.isUnique(fields));
final int leftCount = left.getRowType().getFieldCount();
final List<String> leftFields = fields.subList(0, leftCount);
final List<String> rightFields = fields.subList(leftCount, leftCount + right.getRowType().getFieldCount());
@@ -99,6 +104,55 @@ public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel {
return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
}
+ @Override public boolean isValid(Litmus litmus, Context context) {
+ if (getRowType().getFieldCount()
+ != getSystemFieldList().size()
+ + left.getRowType().getFieldCount()
+ + right.getRowType().getFieldCount()) {
+ return litmus.fail("field count mismatch");
+ }
+ if (condition != null) {
+ if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
+ return litmus.fail("condition must be boolean: {}",
+ condition.getType());
+ }
+ // The input to the condition is a row type consisting of system
+ // fields, left fields, and right fields. Very similar to the
+ // output row type, except that fields have not yet been made due
+ // due to outer joins.
+ RexChecker checker =
+ new RexChecker(
+ getCluster().getTypeFactory().builder()
+ .addAll(getSystemFieldList())
+ .addAll(getLeft().getRowType().getFieldList())
+ .addAll(getRight().getRowType().getFieldList())
+ .build(),
+ context, litmus);
+ condition.accept(checker);
+ if (checker.getFailureCount() > 0) {
+ return litmus.fail(checker.getFailureCount()
+ + " failures in condition " + condition);
+ }
+ }
+ return litmus.succeed();
+ }
+
+ /*
+ The rowtype returned by the DrillSemiJoinRel is different from that of calcite's semi-join.
+ This is done because the semi-join implemented as the hash join doesn't remove the right side columns.
+ Also the DrillSemiJoinRule converts the join--(scan, Agg) to DrillSemiJoinRel whose rowtype still has
+ all the columns from both the relations.
+ */
+ @Override public RelDataType deriveRowType() {
+ return SqlValidatorUtil.deriveJoinRowType(
+ left.getRowType(),
+ right.getRowType(),
+ JoinRelType.INNER,
+ getCluster().getTypeFactory(),
+ null,
+ ImmutableList.of());
+ }
+
@Override
public boolean isSemiJoin() {
return true;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRule.java
new file mode 100644
index 000000000..257f5383f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRule.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * Planner rule that creates a {@code DrillSemiJoinRel} from a
+ * {@link org.apache.calcite.rel.core.Join} on top of a
+ * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ */
+public abstract class DrillSemiJoinRule extends RelOptRule {
+ private static final Predicate<Join> IS_LEFT_OR_INNER =
+ join -> {
+ switch (join.getJoinType()) {
+ case LEFT:
+ case INNER:
+ return true;
+ default:
+ return false;
+ }
+ };
+
+ public static final DrillSemiJoinRule JOIN =
+ new JoinToSemiJoinRule(Join.class, Aggregate.class,
+ DrillRelFactories.LOGICAL_BUILDER, "DrillSemiJoinRule:join");
+
+ protected DrillSemiJoinRule(Class<Join> joinClass, Class<Aggregate> aggregateClass,
+ RelBuilderFactory relBuilderFactory, String description) {
+ super(operandJ(joinClass, null, IS_LEFT_OR_INNER,
+ some(operand(RelNode.class, any()),
+ operandJ(aggregateClass, null, r -> true, any()))),
+ relBuilderFactory, description);
+ }
+
+ protected void perform(RelOptRuleCall call, Project project,
+ Join join, RelNode left, Aggregate aggregate) {
+ final RelOptCluster cluster = join.getCluster();
+ final RexBuilder rexBuilder = cluster.getRexBuilder();
+ if (project != null) {
+ final ImmutableBitSet bits =
+ RelOptUtil.InputFinder.bits(project.getProjects(), null);
+ final ImmutableBitSet rightBits =
+ ImmutableBitSet.range(left.getRowType().getFieldCount(),
+ join.getRowType().getFieldCount());
+ if (bits.intersects(rightBits)) {
+ return;
+ }
+ }
+ final JoinInfo joinInfo = join.analyzeCondition();
+ if (!joinInfo.rightSet().equals(
+ ImmutableBitSet.range(aggregate.getGroupCount()))) {
+ // Rule requires that aggregate key to be the same as the join key.
+ // By the way, neither a super-set nor a sub-set would work.
+ return;
+ }
+ if (!joinInfo.isEqui()) {
+ return;
+ }
+ final RelBuilder relBuilder = call.builder();
+ relBuilder.push(left);
+ switch (join.getJoinType()) {
+ case INNER:
+ final List<Integer> newRightKeyBuilder = new ArrayList<>();
+ final List<Integer> aggregateKeys = aggregate.getGroupSet().asList();
+ for (int key : joinInfo.rightKeys) {
+ newRightKeyBuilder.add(aggregateKeys.get(key));
+ }
+ final ImmutableIntList newRightKeys = ImmutableIntList.copyOf(newRightKeyBuilder);
+ relBuilder.push(aggregate.getInput());
+ final RexNode newCondition =
+ RelOptUtil.createEquiJoinCondition(relBuilder.peek(2, 0),
+ joinInfo.leftKeys, relBuilder.peek(2, 1), newRightKeys,
+ rexBuilder);
+ relBuilder.semiJoin(newCondition);
+ break;
+
+ case LEFT:
+ // The right-hand side produces no more than 1 row (because of the
+ // Aggregate) and no fewer than 1 row (because of LEFT), and therefore
+ // we can eliminate the semi-join.
+ break;
+
+ default:
+ throw new AssertionError(join.getJoinType());
+ }
+ if (project != null) {
+ relBuilder.project(project.getProjects(), project.getRowType().getFieldNames());
+ }
+ call.transformTo(relBuilder.build());
+ }
+
+ /** DrillSemiJoinRule that matches a Join with an Aggregate (without agg functions)
+ * as its right child.
+ */
+ public static class JoinToSemiJoinRule extends DrillSemiJoinRule {
+
+ /** Creates a JoinToSemiJoinRule. */
+ public JoinToSemiJoinRule(
+ Class<Join> joinClass, Class<Aggregate> aggregateClass,
+ RelBuilderFactory relBuilderFactory, String description) {
+ super(joinClass, aggregateClass, relBuilderFactory, description);
+ }
+ }
+
+ /**
+ * Check for the row schema if they aggregate's output rowtype is different
+ * from its input rowtype then do not convert the join to a semi join.
+ */
+ private static boolean isRowTypeSame(RelNode join, RelNode left, RelNode rightInput) {
+ return join.getRowType().getFieldCount() == left.getRowType().getFieldCount() +
+ rightInput.getRowType().getFieldCount();
+ }
+
+ /**
+ * Check if the join condition is a simple equality condition. This check
+ * is needed because joininfo.isEqui treats IS_NOT_DISTINCT_FROM as equi
+ * condition.
+ */
+ private static boolean isSimpleJoinCondition(RexNode joinCondition) {
+ if (joinCondition.isAlwaysFalse() || joinCondition.isAlwaysTrue()) {
+ return false;
+ }
+
+ List<RexNode> conjuncts = RelOptUtil.conjunctions(joinCondition);
+ for (RexNode condition : conjuncts) {
+ if (condition.getKind() != SqlKind.EQUALS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Join join = call.rel(0);
+ DrillAggregateRel agg = call.rel(2);
+ if (agg.getAggCallList().size() != 0) { return false; }
+ return isSimpleJoinCondition(join.getCondition()) &&
+ isRowTypeSame(join, call.rel(1), call.rel(2).getInput(0));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Join join = call.rel(0);
+ final RelNode left = call.rel(1);
+ final Aggregate aggregate = call.rel(2);
+ perform(call, null, join, left, aggregate);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index 2581fa667..f29daa3e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -21,10 +21,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.calcite.rex.RexChecker;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Litmus;
+
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
@@ -162,58 +159,7 @@ public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
return isSemiJoin;
}
- /* A Drill physical rel which is semi join will have output row type with fields from only
- left side of the join. Calcite's join rel expects to have the output row type from
- left and right side of the join. This function is overloaded to not throw exceptions for
- a Drill semi join physical rel.
- */
- @Override public boolean isValid(Litmus litmus, Context context) {
- if (!this.isSemiJoin && !super.isValid(litmus, context)) {
- return false;
- }
- if (getRowType().getFieldCount()
- != getSystemFieldList().size()
- + left.getRowType().getFieldCount()
- + (this.isSemiJoin ? 0 : right.getRowType().getFieldCount())) {
- return litmus.fail("field count mismatch");
- }
- if (condition != null) {
- if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
- return litmus.fail("condition must be boolean: {}",
- condition.getType());
- }
- // The input to the condition is a row type consisting of system
- // fields, left fields, and right fields. Very similar to the
- // output row type, except that fields have not yet been made due
- // due to outer joins.
- RexChecker checker =
- new RexChecker(
- getCluster().getTypeFactory().builder()
- .addAll(getSystemFieldList())
- .addAll(getLeft().getRowType().getFieldList())
- .addAll(getRight().getRowType().getFieldList())
- .build(),
- context, litmus);
- condition.accept(checker);
- if (checker.getFailureCount() > 0) {
- return litmus.fail(checker.getFailureCount()
- + " failures in condition " + condition);
- }
- }
- return litmus.succeed();
- }
-
@Override public RelDataType deriveRowType() {
- if (isSemiJoin) {
- return SqlValidatorUtil.deriveJoinRowType(
- left.getRowType(),
- null,
- this.joinType,
- getCluster().getTypeFactory(),
- null,
- new ArrayList<>());
- } else {
- return super.deriveRowType();
- }
+ return super.deriveRowType();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 0e374cd12..0881dc1c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -238,7 +238,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.DIRECTORY_PRUNING, setOpTransposeNode);
final RelTraitSet logicalTraits = pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
- final RelNode convertedRelNode;
+ RelNode convertedRelNode;
if (!context.getPlannerSettings().isHepOptEnabled()) {
// hep is disabled, use volcano
convertedRelNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE_AND_JOIN, pruned, logicalTraits);
@@ -277,6 +277,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
}
}
+ /* Ideally this conversion can be handled during logical planning phase itself
+ but currently the join ordering algorithm in calcite is not considering the
+ semi-joins. This can lead to sub optimal plans. Hence converting the joins
+ to semi-joins post join planning (refer CALCITE-2813). */
+ if (context.getPlannerSettings().isSemiJoinEnabled() &&
+ context.getPlannerSettings().isHashJoinEnabled()) {
+ convertedRelNode = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.SEMIJOIN_CONVERSION, convertedRelNode);
+ }
// Convert SUM to $SUM0
final RelNode convertedRelNodeWithSum0 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.SUM_CONVERSION, convertedRelNode);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
index a660fffee..923004f97 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
@@ -28,6 +28,9 @@ import org.junit.Test;
import static org.junit.Assert.assertTrue;
import org.junit.experimental.categories.Category;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
@Category({SlowTest.class, OperatorTest.class})
public class TestSemiJoin extends BaseTestQuery {
@Test
@@ -76,10 +79,11 @@ public class TestSemiJoin extends BaseTestQuery {
@Test
public void testLargeInClauseToSemiJoin() throws Exception {
String sql = "select employee_id, full_name from cp.`employee.json` " +
- "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 553, 651, 652, 653, 751, 752, 753, 851, 852, 853, 951, 952, 953)";
+ "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 553, 651, 652, 653, 751, 752, 753, 851, 851, 852, 853, 951, 952, 953, 954, 956)";
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
- .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true)
+ .setOptionDefault(PlannerSettings.IN_SUBQUERY_THRESHOLD.getOptionName(), 10);
try (ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
@@ -115,4 +119,21 @@ public class TestSemiJoin extends BaseTestQuery {
assertTrue(queryPlan.contains("semi-join: =[true]"));
}
}
+
+ @Test
+ public void testJoinOrderingSemiJoin() throws Exception {
+ String sql = "select * from cp.`employee.json` e1, cp.`employee.json` e2 " +
+ "where e1.employee_id in (select e.employee_id from cp.`employee.json` e) and e1.employee_id = e2.employee_id";
+
+
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String queryPlan = client.queryBuilder().sql(sql).explainText();
+ Matcher matcher = Pattern.compile(".*semi-join.*[false].*semi-join.*[true].*", Pattern.MULTILINE | Pattern.DOTALL).matcher(queryPlan);
+ assertTrue(matcher.find());
+ }
+ }
}