diff options
author | Hanumath Maduri <hmaduri@apache.org> | 2019-01-11 20:17:47 -0800 |
---|---|---|
committer | Aman Sinha <asinha@maprtech.com> | 2019-02-01 10:14:51 -0800 |
commit | 982e98061e029a39f1c593f695c0d93ec7079f0d (patch) | |
tree | 93f70d6d0bb1750011e7b9f34e6fd7f6f4b631a7 | |
parent | 8fb85cd4370e6143641cda1ad5b998caca0b6bf7 (diff) |
DRILL-6997: Semijoin is changing the join ordering for some tpcds queries.
close apache/drill#1620
7 files changed, 281 insertions, 65 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index 2d2b073a6..6cdb18d2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.planner; +import org.apache.drill.exec.planner.logical.DrillSemiJoinRule; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -167,6 +168,14 @@ public enum PlannerPhase { } }, + SEMIJOIN_CONVERSION("Pushing down semi joins") { + public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) { + return PlannerPhase.mergedRuleSets( + RuleSets.ofList(DrillSemiJoinRule.JOIN) + ); + } + }, + SUM_CONVERSION("Convert SUM to $SUM0") { public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) { return PlannerPhase.mergedRuleSets( @@ -379,10 +388,6 @@ public enum PlannerPhase { DrillMergeProjectRule.getInstance(true, RelFactories.DEFAULT_PROJECT_FACTORY, optimizerRulesContext.getFunctionRegistry()) ); - if (optimizerRulesContext.getPlannerSettings().isHashJoinEnabled() && - optimizerRulesContext.getPlannerSettings().isSemiJoinEnabled()) { - basicRules.add(RuleInstance.SEMI_JOIN_PROJECT_RULE); - } return RuleSets.ofList(basicRules.build()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java index 86a03b5ac..e4314826d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java @@ -49,7 +49,6 @@ import org.apache.calcite.rel.rules.SubQueryRemoveRule; import org.apache.calcite.rel.rules.UnionToDistinctRule; import org.apache.drill.exec.planner.logical.DrillConditions; import org.apache.drill.exec.planner.logical.DrillRelFactories; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; /** * Contains rule instances which use custom RelBuilder. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java index 527b74464..8552f755f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java @@ -17,13 +17,19 @@ */ package org.apache.drill.exec.planner.logical; +import org.apache.calcite.rex.RexChecker; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Litmus; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Pair; import org.apache.drill.common.expression.FieldReference; @@ -78,7 +84,6 @@ public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel { List<String> fields = new ArrayList<>(); fields.addAll(getInput(0).getRowType().getFieldNames()); fields.addAll(getInput(1).getRowType().getFieldNames()); - Preconditions.checkArgument(DrillJoinRel.isUnique(fields)); final int leftCount = left.getRowType().getFieldCount(); final List<String> leftFields = fields.subList(0, leftCount); final List<String> rightFields = fields.subList(leftCount, leftCount + right.getRowType().getFieldCount()); @@ -99,6 +104,55 @@ public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel { return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType); } + @Override public boolean isValid(Litmus litmus, Context context) { + if (getRowType().getFieldCount() + != getSystemFieldList().size() + + left.getRowType().getFieldCount() + + right.getRowType().getFieldCount()) { + return litmus.fail("field count mismatch"); + } + if (condition != null) { + if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) { + return litmus.fail("condition must be boolean: {}", + condition.getType()); + } + // The input to the condition is a row type consisting of system + // fields, left fields, and right fields. Very similar to the + // output row type, except that fields have not yet been made due + // due to outer joins. + RexChecker checker = + new RexChecker( + getCluster().getTypeFactory().builder() + .addAll(getSystemFieldList()) + .addAll(getLeft().getRowType().getFieldList()) + .addAll(getRight().getRowType().getFieldList()) + .build(), + context, litmus); + condition.accept(checker); + if (checker.getFailureCount() > 0) { + return litmus.fail(checker.getFailureCount() + + " failures in condition " + condition); + } + } + return litmus.succeed(); + } + + /* + The rowtype returned by the DrillSemiJoinRel is different from that of calcite's semi-join. + This is done because the semi-join implemented as the hash join doesn't remove the right side columns. + Also the DrillSemiJoinRule converts the join--(scan, Agg) to DrillSemiJoinRel whose rowtype still has + all the columns from both the relations. + */ + @Override public RelDataType deriveRowType() { + return SqlValidatorUtil.deriveJoinRowType( + left.getRowType(), + right.getRowType(), + JoinRelType.INNER, + getCluster().getTypeFactory(), + null, + ImmutableList.of()); + } + @Override public boolean isSemiJoin() { return true; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRule.java new file mode 100644 index 000000000..257f5383f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRule.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +/** + * Planner rule that creates a {@code DrillSemiJoinRel} from a + * {@link org.apache.calcite.rel.core.Join} on top of a + * {@link org.apache.calcite.rel.logical.LogicalAggregate}. + */ +public abstract class DrillSemiJoinRule extends RelOptRule { + private static final Predicate<Join> IS_LEFT_OR_INNER = + join -> { + switch (join.getJoinType()) { + case LEFT: + case INNER: + return true; + default: + return false; + } + }; + + public static final DrillSemiJoinRule JOIN = + new JoinToSemiJoinRule(Join.class, Aggregate.class, + DrillRelFactories.LOGICAL_BUILDER, "DrillSemiJoinRule:join"); + + protected DrillSemiJoinRule(Class<Join> joinClass, Class<Aggregate> aggregateClass, + RelBuilderFactory relBuilderFactory, String description) { + super(operandJ(joinClass, null, IS_LEFT_OR_INNER, + some(operand(RelNode.class, any()), + operandJ(aggregateClass, null, r -> true, any()))), + relBuilderFactory, description); + } + + protected void perform(RelOptRuleCall call, Project project, + Join join, RelNode left, Aggregate aggregate) { + final RelOptCluster cluster = join.getCluster(); + final RexBuilder rexBuilder = cluster.getRexBuilder(); + if (project != null) { + final ImmutableBitSet bits = + RelOptUtil.InputFinder.bits(project.getProjects(), null); + final ImmutableBitSet rightBits = + ImmutableBitSet.range(left.getRowType().getFieldCount(), + join.getRowType().getFieldCount()); + if (bits.intersects(rightBits)) { + return; + } + } + final JoinInfo joinInfo = join.analyzeCondition(); + if (!joinInfo.rightSet().equals( + ImmutableBitSet.range(aggregate.getGroupCount()))) { + // Rule requires that aggregate key to be the same as the join key. + // By the way, neither a super-set nor a sub-set would work. + return; + } + if (!joinInfo.isEqui()) { + return; + } + final RelBuilder relBuilder = call.builder(); + relBuilder.push(left); + switch (join.getJoinType()) { + case INNER: + final List<Integer> newRightKeyBuilder = new ArrayList<>(); + final List<Integer> aggregateKeys = aggregate.getGroupSet().asList(); + for (int key : joinInfo.rightKeys) { + newRightKeyBuilder.add(aggregateKeys.get(key)); + } + final ImmutableIntList newRightKeys = ImmutableIntList.copyOf(newRightKeyBuilder); + relBuilder.push(aggregate.getInput()); + final RexNode newCondition = + RelOptUtil.createEquiJoinCondition(relBuilder.peek(2, 0), + joinInfo.leftKeys, relBuilder.peek(2, 1), newRightKeys, + rexBuilder); + relBuilder.semiJoin(newCondition); + break; + + case LEFT: + // The right-hand side produces no more than 1 row (because of the + // Aggregate) and no fewer than 1 row (because of LEFT), and therefore + // we can eliminate the semi-join. + break; + + default: + throw new AssertionError(join.getJoinType()); + } + if (project != null) { + relBuilder.project(project.getProjects(), project.getRowType().getFieldNames()); + } + call.transformTo(relBuilder.build()); + } + + /** DrillSemiJoinRule that matches a Join with an Aggregate (without agg functions) + * as its right child. + */ + public static class JoinToSemiJoinRule extends DrillSemiJoinRule { + + /** Creates a JoinToSemiJoinRule. */ + public JoinToSemiJoinRule( + Class<Join> joinClass, Class<Aggregate> aggregateClass, + RelBuilderFactory relBuilderFactory, String description) { + super(joinClass, aggregateClass, relBuilderFactory, description); + } + } + + /** + * Check for the row schema if they aggregate's output rowtype is different + * from its input rowtype then do not convert the join to a semi join. + */ + private static boolean isRowTypeSame(RelNode join, RelNode left, RelNode rightInput) { + return join.getRowType().getFieldCount() == left.getRowType().getFieldCount() + + rightInput.getRowType().getFieldCount(); + } + + /** + * Check if the join condition is a simple equality condition. This check + * is needed because joininfo.isEqui treats IS_NOT_DISTINCT_FROM as equi + * condition. + */ + private static boolean isSimpleJoinCondition(RexNode joinCondition) { + if (joinCondition.isAlwaysFalse() || joinCondition.isAlwaysTrue()) { + return false; + } + + List<RexNode> conjuncts = RelOptUtil.conjunctions(joinCondition); + for (RexNode condition : conjuncts) { + if (condition.getKind() != SqlKind.EQUALS) { + return false; + } + } + return true; + } + + @Override + public boolean matches(RelOptRuleCall call) { + Join join = call.rel(0); + DrillAggregateRel agg = call.rel(2); + if (agg.getAggCallList().size() != 0) { return false; } + return isSimpleJoinCondition(join.getCondition()) && + isRowTypeSame(join, call.rel(1), call.rel(2).getInput(0)); + } + + @Override public void onMatch(RelOptRuleCall call) { + final Join join = call.rel(0); + final RelNode left = call.rel(1); + final Aggregate aggregate = call.rel(2); + perform(call, null, join, left, aggregate); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java index 2581fa667..f29daa3e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java @@ -21,10 +21,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.apache.calcite.rex.RexChecker; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlValidatorUtil; -import org.apache.calcite.util.Litmus; + import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.JoinCondition; @@ -162,58 +159,7 @@ public abstract class JoinPrel extends DrillJoinRelBase implements Prel { return isSemiJoin; } - /* A Drill physical rel which is semi join will have output row type with fields from only - left side of the join. Calcite's join rel expects to have the output row type from - left and right side of the join. This function is overloaded to not throw exceptions for - a Drill semi join physical rel. - */ - @Override public boolean isValid(Litmus litmus, Context context) { - if (!this.isSemiJoin && !super.isValid(litmus, context)) { - return false; - } - if (getRowType().getFieldCount() - != getSystemFieldList().size() - + left.getRowType().getFieldCount() - + (this.isSemiJoin ? 0 : right.getRowType().getFieldCount())) { - return litmus.fail("field count mismatch"); - } - if (condition != null) { - if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) { - return litmus.fail("condition must be boolean: {}", - condition.getType()); - } - // The input to the condition is a row type consisting of system - // fields, left fields, and right fields. Very similar to the - // output row type, except that fields have not yet been made due - // due to outer joins. - RexChecker checker = - new RexChecker( - getCluster().getTypeFactory().builder() - .addAll(getSystemFieldList()) - .addAll(getLeft().getRowType().getFieldList()) - .addAll(getRight().getRowType().getFieldList()) - .build(), - context, litmus); - condition.accept(checker); - if (checker.getFailureCount() > 0) { - return litmus.fail(checker.getFailureCount() - + " failures in condition " + condition); - } - } - return litmus.succeed(); - } - @Override public RelDataType deriveRowType() { - if (isSemiJoin) { - return SqlValidatorUtil.deriveJoinRowType( - left.getRowType(), - null, - this.joinType, - getCluster().getTypeFactory(), - null, - new ArrayList<>()); - } else { - return super.deriveRowType(); - } + return super.deriveRowType(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 0e374cd12..0881dc1c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -238,7 +238,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler { final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.DIRECTORY_PRUNING, setOpTransposeNode); final RelTraitSet logicalTraits = pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL); - final RelNode convertedRelNode; + RelNode convertedRelNode; if (!context.getPlannerSettings().isHepOptEnabled()) { // hep is disabled, use volcano convertedRelNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE_AND_JOIN, pruned, logicalTraits); @@ -277,6 +277,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } } + /* Ideally this conversion can be handled during logical planning phase itself + but currently the join ordering algorithm in calcite is not considering the + semi-joins. This can lead to sub optimal plans. Hence converting the joins + to semi-joins post join planning (refer CALCITE-2813). */ + if (context.getPlannerSettings().isSemiJoinEnabled() && + context.getPlannerSettings().isHashJoinEnabled()) { + convertedRelNode = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.SEMIJOIN_CONVERSION, convertedRelNode); + } // Convert SUM to $SUM0 final RelNode convertedRelNodeWithSum0 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.SUM_CONVERSION, convertedRelNode); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java index a660fffee..923004f97 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java @@ -28,6 +28,9 @@ import org.junit.Test; import static org.junit.Assert.assertTrue; import org.junit.experimental.categories.Category; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + @Category({SlowTest.class, OperatorTest.class}) public class TestSemiJoin extends BaseTestQuery { @Test @@ -76,10 +79,11 @@ public class TestSemiJoin extends BaseTestQuery { @Test public void testLargeInClauseToSemiJoin() throws Exception { String sql = "select employee_id, full_name from cp.`employee.json` " + - "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 553, 651, 652, 653, 751, 752, 753, 851, 852, 853, 951, 952, 953)"; + "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 553, 651, 652, 653, 751, 752, 753, 851, 851, 852, 853, 951, 952, 953, 954, 956)"; ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) - .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true); + .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true) + .setOptionDefault(PlannerSettings.IN_SUBQUERY_THRESHOLD.getOptionName(), 10); try (ClusterFixture cluster = builder.build(); ClientFixture client = cluster.clientFixture()) { @@ -115,4 +119,21 @@ public class TestSemiJoin extends BaseTestQuery { assertTrue(queryPlan.contains("semi-join: =[true]")); } } + + @Test + public void testJoinOrderingSemiJoin() throws Exception { + String sql = "select * from cp.`employee.json` e1, cp.`employee.json` e2 " + + "where e1.employee_id in (select e.employee_id from cp.`employee.json` e) and e1.employee_id = e2.employee_id"; + + + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String queryPlan = client.queryBuilder().sql(sql).explainText(); + Matcher matcher = Pattern.compile(".*semi-join.*[false].*semi-join.*[true].*", Pattern.MULTILINE | Pattern.DOTALL).matcher(queryPlan); + assertTrue(matcher.find()); + } + } } |