From 9667e92e1e87ce1826f0eac3f2396187dbfa8aaa Mon Sep 17 00:00:00 2001 From: "weijie.tong" Date: Sun, 14 Oct 2018 19:41:51 +0800 Subject: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree closes #1504 --- .../java/org/apache/drill/exec/ExecConstants.java | 4 + .../org/apache/drill/exec/ops/FragmentContext.java | 20 +- .../apache/drill/exec/ops/FragmentContextImpl.java | 82 ++++- .../drill/exec/ops/OperatorMetricRegistry.java | 2 + .../exec/physical/config/RuntimeFilterPOP.java | 16 +- .../impl/filter/RuntimeFilterRecordBatch.java | 82 +++-- .../exec/physical/impl/join/HashJoinBatch.java | 43 ++- .../exec/planner/physical/RuntimeFilterPrel.java | 14 +- .../physical/visitor/RuntimeFilterVisitor.java | 80 ++++- .../exec/server/options/SystemOptionManager.java | 2 + .../org/apache/drill/exec/work/WorkManager.java | 10 +- .../apache/drill/exec/work/filter/BloomFilter.java | 62 ++-- .../drill/exec/work/filter/BloomFilterDef.java | 12 +- .../drill/exec/work/filter/RuntimeFilterDef.java | 21 +- .../exec/work/filter/RuntimeFilterReporter.java | 5 +- .../exec/work/filter/RuntimeFilterRouter.java | 280 ++++++++--------- .../drill/exec/work/filter/RuntimeFilterSink.java | 340 ++++++++++++--------- .../exec/work/filter/RuntimeFilterWritable.java | 21 ++ .../apache/drill/exec/work/foreman/Foreman.java | 2 +- .../java-exec/src/main/resources/drill-module.conf | 2 + .../exec/physical/impl/join/TestHashJoinJPPD.java | 10 +- .../physical/impl/join/TestHashJoinJPPDPlan.java | 52 ---- .../drill/exec/work/filter/BloomFilterTest.java | 19 +- .../org/apache/drill/test/OperatorFixture.java | 27 +- .../apache/drill/test/PhysicalOpUnitTestBase.java | 15 +- .../java/org/apache/drill/exec/proto/BitData.java | 128 +++++++- .../org/apache/drill/exec/proto/SchemaBitData.java | 7 + .../drill/exec/proto/beans/RuntimeFilterBDef.java | 22 ++ protocol/src/main/protobuf/BitData.proto | 1 + 29 files changed, 876 insertions(+), 505 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index fb4657259..c4d7652f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -153,6 +153,10 @@ public final class ExecConstants { public static final IntegerValidator HASHJOIN_BLOOM_FILTER_MAX_SIZE = new IntegerValidator(HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY, null); public static final String HASHJOIN_BLOOM_FILTER_FPP_KEY = "exec.hashjoin.bloom_filter.fpp"; public static final DoubleValidator HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR = new RangeDoubleValidator(HASHJOIN_BLOOM_FILTER_FPP_KEY, Double.MIN_VALUE, 1.0, null); + public static final String HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY = "exec.hashjoin.runtime_filter.waiting.enable"; + public static final BooleanValidator HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING = new BooleanValidator(HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY, null); + public static final String HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY = "exec.hashjoin.runtime_filter.max.waiting.time"; + public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 88c21d9e9..5125f7201 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -20,8 +20,7 @@ package org.apache.drill.exec.ops; import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; - -import org.apache.drill.exec.work.filter.RuntimeFilterSink; +import java.util.concurrent.TimeUnit; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; @@ -159,18 +158,23 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable { @Override void close(); - - /** - * @return - */ - RuntimeFilterSink getRuntimeFilterSink(); - /** * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment * @param runtimeFilter */ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter); + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier); + + /** + * get the RuntimeFilter with a blocking wait, if the waiting option is enabled + * @param rfIdentifier + * @param maxWaitTime + * @param timeUnit + * @return the RFW or null + */ + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit); + interface ExecutorState { /** * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index 6e40466e8..b740c927d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -21,7 +21,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; @@ -60,8 +65,6 @@ import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.testing.ExecutionControls; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.work.batch.IncomingBuffers; - -import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.shaded.guava.com.google.common.base.Function; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -115,6 +118,10 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor private final BufferManager bufferManager; private ExecutorState executorState; private final ExecutionControls executionControls; + private boolean enableRuntimeFilter; + private boolean enableRFWaiting; + private Lock lock4RF; + private Condition condition4RF; private final SendingAccountor sendingAccountor = new SendingAccountor(); private final Consumer exceptionConsumer = new Consumer() { @@ -136,8 +143,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor private final AccountingUserConnection accountingUserConnection; /** Stores constants and their holders by type */ private final Map> constantValueHolderCache; - - private RuntimeFilterSink runtimeFilterSink; + private Map rfIdentifier2RFW = new ConcurrentHashMap<>(); + private Map rfIdentifier2fetched = new ConcurrentHashMap<>(); /** * Create a FragmentContext instance for non-root fragment. @@ -209,10 +216,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor stats = new FragmentStats(allocator, fragment.getAssignment()); bufferManager = new BufferManagerImpl(this.allocator); constantValueHolderCache = Maps.newHashMap(); - boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER); - if (enableRF) { - ExecutorService executorService = context.getExecutor(); - this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService); + enableRuntimeFilter = this.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val; + enableRFWaiting = this.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val && enableRuntimeFilter; + if (enableRFWaiting) { + lock4RF = new ReentrantLock(); + condition4RF = lock4RF.newCondition(); } } @@ -362,12 +370,50 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor @Override public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { - this.runtimeFilterSink.aggregate(runtimeFilter); + long rfIdentifier = runtimeFilter.getRuntimeFilterBDef().getRfIdentifier(); + //if the RF was sent directly from the HJ nodes, we don't need to retain the buffer again + // as the RuntimeFilterReporter has already retained the buffer + rfIdentifier2fetched.put(rfIdentifier, false); + rfIdentifier2RFW.put(rfIdentifier, runtimeFilter); + if (enableRFWaiting) { + lock4RF.lock(); + try { + condition4RF.signal(); + } catch (Exception e) { + logger.info("fail to signal the waiting thread.", e); + } finally { + lock4RF.unlock(); + } + } + } + + @Override + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) { + RuntimeFilterWritable runtimeFilterWritable = rfIdentifier2RFW.get(rfIdentifier); + if (runtimeFilterWritable != null) { + rfIdentifier2fetched.put(rfIdentifier, true); + } + return runtimeFilterWritable; } @Override - public RuntimeFilterSink getRuntimeFilterSink() { - return runtimeFilterSink; + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit) { + if (rfIdentifier2RFW.get(rfIdentifier) != null) { + return getRuntimeFilter(rfIdentifier); + } + if (enableRFWaiting) { + lock4RF.lock(); + try { + if (rfIdentifier2RFW.get(rfIdentifier) == null) { + condition4RF.await(maxWaitTime, timeUnit); + } + } catch (InterruptedException e) { + logger.info("Condition was interrupted", e); + } finally { + lock4RF.unlock(); + } + } + return getRuntimeFilter(rfIdentifier); } /** @@ -484,12 +530,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor // Close the buffers before closing the operators; this is needed as buffer ownership // is attached to the receive operators. suppressingClose(buffers); - + closeNotConsumedRFWs(); // close operator context for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } - suppressingClose(runtimeFilterSink); suppressingClose(bufferManager); suppressingClose(allocator); } @@ -550,4 +595,15 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor protected BufferManager getBufferManager() { return bufferManager; } + + private void closeNotConsumedRFWs() { + for (RuntimeFilterWritable runtimeFilterWritable : rfIdentifier2RFW.values()){ + long rfIdentifier = runtimeFilterWritable.getRuntimeFilterBDef().getRfIdentifier(); + boolean fetchedByOperator = rfIdentifier2fetched.get(rfIdentifier); + if (!fetchedByOperator) { + //if the RF hasn't been consumed by the operator, we have to released it one more time. + runtimeFilterWritable.close(); + } + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java index ac867020d..da590689d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.impl.ScreenCreator; import org.apache.drill.exec.physical.impl.SingleSenderCreator; import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate; import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec; +import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch; import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch; import org.apache.drill.exec.physical.impl.join.HashJoinBatch; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; @@ -61,6 +62,7 @@ public class OperatorMetricRegistry { register(CoreOperatorType.LATERAL_JOIN_VALUE, AbstractBinaryRecordBatch.Metric.class); register(CoreOperatorType.UNNEST_VALUE, UnnestRecordBatch.Metric.class); register(CoreOperatorType.UNION_VALUE, AbstractBinaryRecordBatch.Metric.class); + register(CoreOperatorType.RUNTIME_FILTER_VALUE, RuntimeFilterRecordBatch.Metric.class); } private static void register(final int operatorType, final Class metricDef) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java index 50c00d792..b35bf29af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java @@ -31,9 +31,12 @@ public class RuntimeFilterPOP extends AbstractSingle { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPOP.class); + private long identifier; + @JsonCreator - public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child) { + public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child, @JsonProperty("identifier")long identifier) { super(child); + this.identifier = identifier; } @Override @@ -43,7 +46,7 @@ public class RuntimeFilterPOP extends AbstractSingle { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new RuntimeFilterPOP(child); + return new RuntimeFilterPOP(child, identifier); } @Override @@ -55,4 +58,13 @@ public class RuntimeFilterPOP extends AbstractSingle { public int getOperatorType() { return CoreOperatorType.RUNTIME_FILTER_VALUE; } + + + public long getIdentifier() { + return identifier; + } + + public void setIdentifier(long identifier) { + this.identifier = identifier; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java index 9248bbc69..bf7ed7912 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java @@ -22,11 +22,13 @@ import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.RuntimeFilterPOP; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -36,14 +38,13 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.work.filter.BloomFilter; -import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.exec.work.filter.RuntimeFilterWritable; - import java.util.ArrayList; import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates @@ -59,12 +60,21 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch toFilterFields; private List bloomFilters; private RuntimeFilterWritable current; - private RuntimeFilterWritable previous; private int originalRecordCount; + private long filteredRows = 0l; + private long appliedTimes = 0l; + private int batchTimes = 0; + private boolean waited = false; + private boolean enableRFWaiting; + private long maxWaitingTime; + private long rfIdentifier; private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); + enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val; + maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val; + this.rfIdentifier = pop.getIdentifier(); } @Override @@ -89,7 +99,6 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch 0) { + waited = true; + try { + stats.startWait(); + current = context.getRuntimeFilter(rfIdentifier, maxWaitingTime, TimeUnit.MILLISECONDS); + } finally { + stats.stopWait(); + } + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 88eadf291..0ac0809d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -19,10 +19,13 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; @@ -203,11 +206,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem private int originalPartition = -1; // the partition a secondary reads from IntVector read_right_HV_vector; // HV vector that was read from the spilled batch private int maxBatchesInMemory; - private List bloomFilters = new ArrayList<>(); private List probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters private boolean enableRuntimeFilter; private RuntimeFilterReporter runtimeFilterReporter; private ValueVectorHashHelper.Hash64 hash64; + private Map bloomFilter2buildId = new HashMap<>(); + private Map bloomFilterDef2buildId = new HashMap<>(); + private List bloomFilters = new ArrayList<>(); /** * This holds information about the spilled partitions for the build and probe side. @@ -757,6 +762,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem enableRuntimeFilter = false; return; } + RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); + List bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); + for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { + String buildField = bloomFilterDef.getBuildField(); + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); + if (typedFieldId == null) { + missingField = true; + break; + } + int fieldId = typedFieldId.getFieldIds()[0]; + bloomFilterDef2buildId.put(bloomFilterDef, fieldId); + } + if (missingField) { + logger.info("As some build side join key fields not found, runtime filter was disabled"); + enableRuntimeFilter = false; + return; + } ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context); try { hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds); @@ -799,9 +822,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem if (!enableRuntimeFilter) { return; } - if (runtimeFilterReporter != null) { - return; - } runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the @@ -809,11 +829,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem if (runtimeFilterDef != null) { List bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { + int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef); int numBytes = bloomFilterDef.getNumBytes(); String probeField = bloomFilterDef.getProbeField(); probeFields.add(probeField); BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator()); bloomFilters.add(bloomFilter); + bloomFilter2buildId.put(bloomFilter, buildFieldId); } } } @@ -992,13 +1014,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem //create runtime filter if (spilledState.isFirstCycle() && enableRuntimeFilter) { //create runtime filter and send out async - int condFieldIndex = 0; - for (BloomFilter bloomFilter : bloomFilters) { + for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) { + int fieldId = bloomFilter2buildId.get(bloomFilter); for (int ind = 0; ind < currentRecordCount; ind++) { - long hashCode = hash64.hash64Code(ind, 0, condFieldIndex); + long hashCode = hash64.hash64Code(ind, 0, fieldId); bloomFilter.insert(hashCode); } - condFieldIndex++; } } @@ -1027,9 +1048,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem } if (spilledState.isFirstCycle() && enableRuntimeFilter) { - if (bloomFilters.size() > 0) { + if (bloomFilter2buildId.size() > 0) { int hashJoinOpId = this.popConfig.getOperatorId(); - runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId); + runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId); } } @@ -1237,7 +1258,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), "configured output batch size: %d", configuredBatchSize); - enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER); + enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java index 59e1622b5..1729027de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java @@ -27,25 +27,29 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import java.io.IOException; import java.util.List; -public class RuntimeFilterPrel extends SinglePrel{ +public class RuntimeFilterPrel extends SinglePrel { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class); - public RuntimeFilterPrel(Prel child){ + private long identifier; + + public RuntimeFilterPrel(Prel child, long identifier){ super(child.getCluster(), child.getTraitSet(), child); + this.identifier = identifier; } - public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) { + public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, long identifier) { super(cluster, traits, child); + this.identifier = identifier; } @Override public RelNode copy(RelTraitSet traitSet, List inputs) { - return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0)); + return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0), identifier); } @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator)); + RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator), identifier); return creator.addMetadata(this, r); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java index fcfa2bca1..4d309aea0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.planner.physical.visitor; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinInfo; @@ -28,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.physical.BroadcastExchangePrel; import org.apache.drill.exec.planner.physical.ExchangePrel; import org.apache.drill.exec.planner.physical.HashAggPrel; @@ -43,11 +41,14 @@ import org.apache.drill.exec.planner.physical.TopNPrel; import org.apache.drill.exec.work.filter.BloomFilter; import org.apache.drill.exec.work.filter.BloomFilterDef; import org.apache.drill.exec.work.filter.RuntimeFilterDef; - +import org.apache.drill.shaded.guava.com.google.common.collect.HashMultimap; +import org.apache.drill.shaded.guava.com.google.common.collect.Multimap; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; /** * This visitor does two major things: @@ -58,9 +59,14 @@ public class RuntimeFilterVisitor extends BasePrelVisitor toAddRuntimeFilter = new HashSet<>(); + private Multimap probeSideScan2hj = HashMultimap.create(); + private double fpp; + private int bloomFilterMaxSizeInBytesDef; + private static final AtomicLong rfIdCounter = new AtomicLong(); + private RuntimeFilterVisitor(QueryContext queryContext) { this.bloomFilterMaxSizeInBytesDef = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY).num_val.intValue(); this.fpp = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_KEY).float_val; @@ -76,7 +82,7 @@ public class RuntimeFilterVisitor extends BasePrelVisitor children = Lists.newArrayList(); + List children = new ArrayList<>(); for (Prel child : prel) { child = child.accept(this, value); children.add(child); @@ -100,8 +106,18 @@ public class RuntimeFilterVisitor extends BasePrelVisitor hashJoinPrels = probeSideScan2hj.get(prel); + RuntimeFilterPrel runtimeFilterPrel = null; + for (HashJoinPrel hashJoinPrel : hashJoinPrels) { + long identifier = rfIdCounter.incrementAndGet(); + hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier); + if (runtimeFilterPrel == null) { + runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier); + } else { + runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, identifier); + } + } return runtimeFilterPrel; } else { return prel; @@ -134,13 +150,24 @@ public class RuntimeFilterVisitor extends BasePrelVisitor bloomFilterDefs = new ArrayList<>(); //find the possible left scan node of the left join key - GroupScan groupScan = null; + ScanPrel probeSideScanPrel = null; RelNode left = hashJoinPrel.getLeft(); + RelNode right = hashJoinPrel.getRight(); + ExchangePrel exchangePrel = findRightExchangePrel(right); + if (exchangePrel == null) { + //Does not support the single fragment mode ,that is the right build side + //can only be BroadcastExchangePrel or HashToRandomExchangePrel + return null; + } List leftFields = left.getRowType().getFieldNames(); + List rightFields = right.getRowType().getFieldNames(); List leftKeys = hashJoinPrel.getLeftKeys(); RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery(); + int i = 0; for (Integer leftKey : leftKeys) { String leftFieldName = leftFields.get(leftKey); + String rightFieldName = rightFields.get(i); + i++; //This also avoids the left field of the join condition with a function call. ScanPrel scanPrel = findLeftScanPrel(leftFieldName, left); if (scanPrel != null) { @@ -160,17 +187,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor bloomFilterMaxSizeInBytesDef ? bloomFilterMaxSizeInBytesDef : bloomFilterSizeInBytes; //left the local parameter to be set later. - BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName); + BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName, rightFieldName); bloomFilterDef.setLeftNDV(ndv); bloomFilterDefs.add(bloomFilterDef); toAddRuntimeFilter.add(scanPrel); - groupScan = scanPrel.getGroupScan(); + probeSideScanPrel = scanPrel; } } if (bloomFilterDefs.size() > 0) { //left sendToForeman parameter to be set later. - RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false); - runtimeFilterDef.setProbeSideGroupScan(groupScan); + RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1); + probeSideScan2hj.put(probeSideScanPrel, hashJoinPrel); return runtimeFilterDef; } return null; @@ -265,6 +292,30 @@ public class RuntimeFilterVisitor extends BasePrelVisitor relNodes = rightRelNode.getInputs(); + if (relNodes.size() == 1) { + RelNode leftNode = relNodes.get(0); + return findRightExchangePrel(leftNode); + } else { + return null; + } + } + } + private boolean containBlockNode(Prel startNode, Prel endNode) { BlockNodeVisitor blockNodeVisitor = new BlockNodeVisitor(); startNode.accept(blockNodeVisitor, endNode); @@ -311,6 +362,11 @@ public class RuntimeFilterVisitor extends BasePrelVisitor> 27; + bucketMask[i] = bucketMask[i] >>> 27; } for (int i = 0; i < 8; ++i) { - mask[i] = 0x1 << mask[i]; + bucketMask[i] = 0x1 << bucketMask[i]; } } /** * Add an element's hash value to this bloom filter. + * * @param hash hash result of element. */ public void insert(long hash) { @@ -101,16 +101,13 @@ public class BloomFilter { int key = (int) hash; setMask(key); int initialStartIndex = bucketIndex * BYTES_PER_BUCKET; - byteBuf.getBytes(initialStartIndex, tempBucket); for (int i = 0; i < 8; i++) { + int index = initialStartIndex + i * 4; //every iterate batch,we set 32 bits - int bitsetIndex = i * 4; - tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24)); - tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16)); - tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8)); - tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i])); + int a = byteBuf.getInt(index); + a |= bucketMask[i]; + byteBuf.setInt(index, a); } - byteBuf.setBytes(initialStartIndex, tempBucket); } /** @@ -123,17 +120,12 @@ public class BloomFilter { int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); int key = (int) hash; setMask(key); - int startIndex = bucketIndex * BYTES_PER_BUCKET; - byteBuf.getBytes(startIndex, tempBucket); for (int i = 0; i < 8; i++) { - byte set = 0; - int bitsetIndex = i * 4; - set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24)); - set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16)); - set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8)); - set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]); - if (0 == set) { + int index = startIndex + i * 4; + int a = byteBuf.getInt(index); + int b = a & bucketMask[i]; + if (b == 0) { return false; } } @@ -142,6 +134,7 @@ public class BloomFilter { /** * Merge this bloom filter with other one + * * @param other */ public void or(BloomFilter other) { @@ -150,20 +143,19 @@ public class BloomFilter { Preconditions.checkArgument(otherLength == thisLength); Preconditions.checkState(otherLength % BYTES_PER_BUCKET == 0); Preconditions.checkState(thisLength % BYTES_PER_BUCKET == 0); - byte[] otherTmpBucket = new byte[BYTES_PER_BUCKET]; - for (int i = 0; i < thisLength / BYTES_PER_BUCKET; i++) { - byteBuf.getBytes(i * BYTES_PER_BUCKET, tempBucket); - other.byteBuf.getBytes(i * BYTES_PER_BUCKET, otherTmpBucket); - for (int j = 0; j < BYTES_PER_BUCKET; j++) { - tempBucket[j] = (byte) (tempBucket[j] | otherTmpBucket[j]); - } - this.byteBuf.setBytes(i, tempBucket); + for (int i = 0; i < thisLength / 8; i++) { + int index = i * 8; + long a = byteBuf.getLong(index); + long b = other.byteBuf.getLong(index); + long c = a | b; + byteBuf.setLong(index, c); } } /** * Calculate optimal size according to the number of distinct values and false positive probability. * See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula. + * * @param ndv: The number of distinct values. * @param fpp: The false positive probability. * @return optimal number of bytes of given ndv and fpp. @@ -177,7 +169,7 @@ public class BloomFilter { bits |= bits >> 8; bits |= bits >> 16; bits++; - int bytes = bits/8; + int bytes = bits / 8; return bytes; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java index 9a6df57ae..b2a9bd763 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java @@ -28,6 +28,8 @@ public class BloomFilterDef { private boolean local; private String probeField; + + private String buildField; //TODO @JsonIgnore private Double leftNDV; @@ -37,10 +39,11 @@ public class BloomFilterDef { @JsonCreator public BloomFilterDef(@JsonProperty("numBytes") int numBytes, @JsonProperty("local") boolean local, @JsonProperty("probeField") - String probeField){ + String probeField, @JsonProperty("buildField") String buildField){ this.numBytes = numBytes; this.local = local; this.probeField = probeField; + this.buildField = buildField; } @@ -61,7 +64,7 @@ public class BloomFilterDef { } public String toString() { - return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + " }"; + return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + ",buildField= " + buildField + " }"; } @JsonIgnore @@ -82,4 +85,9 @@ public class BloomFilterDef { this.rightNDV = rightNDV; } + public String getBuildField() + { + return buildField; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java index 5fb51bf24..efe300f3a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java @@ -18,13 +18,8 @@ package org.apache.drill.exec.work.filter; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; - import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.drill.exec.physical.base.GroupScan; - - import java.util.List; @JsonIgnoreProperties(ignoreUnknown = true) @@ -37,17 +32,18 @@ public class RuntimeFilterDef { private List bloomFilterDefs; private boolean sendToForeman; - @JsonIgnore - private GroupScan probeSideGroupScan; + private long runtimeFilterIdentifier; @JsonCreator public RuntimeFilterDef(@JsonProperty("generateBloomFilter") boolean generateBloomFilter, @JsonProperty("generateMinMaxFilter") boolean generateMinMaxFilter, - @JsonProperty("bloomFilterDefs") List bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman) { + @JsonProperty("bloomFilterDefs") List bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman, + @JsonProperty("runtimeFilterIdentifier") long runtimeFilterIdentifier) { this.generateBloomFilter = generateBloomFilter; this.generateMinMaxFilter = generateMinMaxFilter; this.bloomFilterDefs = bloomFilterDefs; this.sendToForeman = sendToForeman; + this.runtimeFilterIdentifier = runtimeFilterIdentifier; } @@ -84,12 +80,11 @@ public class RuntimeFilterDef { this.sendToForeman = sendToForeman; } - @JsonIgnore - public GroupScan getProbeSideGroupScan() { - return probeSideGroupScan; + public long getRuntimeFilterIdentifier() { + return runtimeFilterIdentifier; } - public void setProbeSideGroupScan(GroupScan probeSideGroupScan) { - this.probeSideGroupScan = probeSideGroupScan; + public void setRuntimeFilterIdentifier(long runtimeFilterIdentifier) { + this.runtimeFilterIdentifier = runtimeFilterIdentifier; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java index 6e4a9a8e5..93736c5f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java @@ -39,7 +39,9 @@ public class RuntimeFilterReporter { this.context = context; } - public void sendOut(List bloomFilters, List probeFields, boolean sendToForeman, int hashJoinOpId) { + public void sendOut(List bloomFilters, List probeFields, RuntimeFilterDef runtimeFilterDef, int hashJoinOpId) { + boolean sendToForeman = runtimeFilterDef.isSendToForeman(); + long rfIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier(); ExecProtos.FragmentHandle fragmentHandle = context.getHandle(); DrillBuf[] data = new DrillBuf[bloomFilters.size()]; List bloomFilterSizeInBytes = new ArrayList<>(); @@ -64,6 +66,7 @@ public class RuntimeFilterReporter { .setMinorFragmentId(minorFragmentId) .setToForeman(sendToForeman) .setHjOpId(hashJoinOpId) + .setRfIdentifier(rfIdentifier) .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes) .build(); RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java index 5a8c6fc9e..a4946a96c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java @@ -17,39 +17,24 @@ */ package org.apache.drill.exec.work.filter; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.buffer.DrillBuf; import org.apache.commons.collections.CollectionUtils; -import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.Consumer; import org.apache.drill.exec.ops.SendingAccountor; -import org.apache.drill.exec.ops.StatusHandler; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.RuntimeFilterPOP; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.Wrapper; -import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.CoordinationProtos; -import org.apache.drill.exec.proto.GeneralRPCProtos; -import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.QueryWorkUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * This class manages the RuntimeFilter routing information of the pushed down join predicate @@ -69,29 +54,24 @@ import java.util.concurrent.ConcurrentHashMap; public class RuntimeFilterRouter { private Wrapper rootWrapper; - //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints - private Map> joinMjId2probdeScanEps = new HashMap<>(); - //HashJoin node's major fragment id to its corresponding probe side nodes's number - private Map joinMjId2scanSize = new ConcurrentHashMap<>(); - //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id - private Map joinMjId2ScanMjId = new HashMap<>(); - - private DrillbitContext drillbitContext; private SendingAccountor sendingAccountor = new SendingAccountor(); + private RuntimeFilterSink runtimeFilterSink; + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class); /** * This class maintains context for the runtime join push down's filter management. It * does a traversal of the physical operators by leveraging the root wrapper which indirectly * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * * @param workUnit * @param drillbitContext */ public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { this.rootWrapper = workUnit.getRootWrapper(); - this.drillbitContext = drillbitContext; + runtimeFilterSink = new RuntimeFilterSink(drillbitContext, sendingAccountor); } /** @@ -99,6 +79,12 @@ public class RuntimeFilterRouter { * record the relationship between the RuntimeFilter producers and consumers. */ public void collectRuntimeFilterParallelAndControlInfo() { + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + Map> joinMjId2probeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + Map joinMjId2ScanMjId = new HashMap<>(); + Map joinMjId2rfNumber = new HashMap<>(); + RuntimeFilterParallelismCollector runtimeFilterParallelismCollector = new RuntimeFilterParallelismCollector(); rootWrapper.getNode().getRoot().accept(runtimeFilterParallelismCollector, null); List holders = runtimeFilterParallelismCollector.getHolders(); @@ -107,67 +93,33 @@ public class RuntimeFilterRouter { List probeSideEndpoints = holder.getProbeSideScanEndpoints(); int probeSideScanMajorId = holder.getProbeSideScanMajorId(); int joinNodeMajorId = holder.getJoinMajorId(); + int buildSideRfNumber = holder.getBuildSideRfNumber(); RuntimeFilterDef runtimeFilterDef = holder.getRuntimeFilterDef(); boolean sendToForeman = runtimeFilterDef.isSendToForeman(); if (sendToForeman) { //send RuntimeFilter to Foreman - joinMjId2probdeScanEps.put(joinNodeMajorId, probeSideEndpoints); - joinMjId2scanSize.put(joinNodeMajorId, probeSideEndpoints.size()); + joinMjId2probeScanEps.put(joinNodeMajorId, probeSideEndpoints); joinMjId2ScanMjId.put(joinNodeMajorId, probeSideScanMajorId); + joinMjId2rfNumber.put(joinNodeMajorId, buildSideRfNumber); } } + runtimeFilterSink.setJoinMjId2probeScanEps(joinMjId2probeScanEps); + runtimeFilterSink.setJoinMjId2rfNumber(joinMjId2rfNumber); + runtimeFilterSink.setJoinMjId2ScanMjId(joinMjId2ScanMjId); } - public void waitForComplete() { sendingAccountor.waitForSendComplete(); + runtimeFilterSink.close(); } /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { - broadcastAggregatedRuntimeFilter(runtimeFilterWritable); - } - - - private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) { - BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); - int joinMajorId = runtimeFilterB.getMajorFragmentId(); - UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); - List probeFields = runtimeFilterB.getProbeFieldsList(); - DrillBuf[] data = srcRuntimeFilterWritable.getData(); - List scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId); - int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId); - for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) { - BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder(); - for (String probeField : probeFields) { - builder.addProbeFields(probeField); - } - BitData.RuntimeFilterBDef runtimeFilterBDef = builder - .setQueryId(queryId) - .setMajorFragmentId(scanNodeMjId) - .setMinorFragmentId(minorId) - .build(); - RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data); - CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId); - DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint); - Consumer exceptionConsumer = new Consumer() { - @Override - public void accept(final RpcException e) { - logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); - } - - @Override - public void interrupt(final InterruptedException e) { - logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); - } - }; - RpcOutcomeListener statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); - AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler); - accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable); - } + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { + runtimeFilterSink.add(srcRuntimeFilterWritable); } /** @@ -183,18 +135,29 @@ public class RuntimeFilterRouter { boolean isHashJoinOp = op instanceof HashJoinPOP; if (isHashJoinOp) { HashJoinPOP hashJoinPOP = (HashJoinPOP) op; + int hashJoinOpId = hashJoinPOP.getOperatorId(); RuntimeFilterDef runtimeFilterDef = hashJoinPOP.getRuntimeFilterDef(); - if (runtimeFilterDef != null) { - if (holder == null) { - holder = new RFHelperHolder(); + if (runtimeFilterDef != null && runtimeFilterDef.isSendToForeman()) { + if (holder == null || holder.getJoinOpId() != hashJoinOpId) { + holder = new RFHelperHolder(hashJoinOpId); holders.add(holder); } holder.setRuntimeFilterDef(runtimeFilterDef); - GroupScan probeSideScanOp = runtimeFilterDef.getProbeSideGroupScan(); - Wrapper container = findPhysicalOpContainer(rootWrapper, hashJoinPOP); + long runtimeFilterIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier(); + WrapperOperatorsVisitor operatorsVisitor = new WrapperOperatorsVisitor(hashJoinPOP); + Wrapper container = findTargetWrapper(rootWrapper, operatorsVisitor); + if (container == null) { + throw new IllegalStateException(String.format("No valid Wrapper found for HashJoinPOP with id=%d", hashJoinPOP.getOperatorId())); + } + int buildSideRFNumber = container.getAssignedEndpoints().size(); + holder.setBuildSideRfNumber(buildSideRFNumber); int majorFragmentId = container.getMajorFragmentId(); holder.setJoinMajorId(majorFragmentId); - Wrapper probeSideScanContainer = findPhysicalOpContainer(rootWrapper, probeSideScanOp); + WrapperRuntimeFilterOperatorsVisitor runtimeFilterOperatorsVisitor = new WrapperRuntimeFilterOperatorsVisitor(runtimeFilterIdentifier); + Wrapper probeSideScanContainer = findTargetWrapper(container, runtimeFilterOperatorsVisitor); + if (probeSideScanContainer == null) { + throw new IllegalStateException(String.format("No valid Wrapper found for RuntimeFilterPOP with id=%d", op.getOperatorId())); + } int probeSideScanMjId = probeSideScanContainer.getMajorFragmentId(); List probeSideScanEps = probeSideScanContainer.getAssignedEndpoints(); holder.setProbeSideScanEndpoints(probeSideScanEps); @@ -209,59 +172,63 @@ public class RuntimeFilterRouter { } } - private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor { + private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) { + targetOpVisitor.setCurrentFragment(wrapper.getNode()); + wrapper.getNode().getRoot().accept(targetOpVisitor, null); + boolean contain = targetOpVisitor.isContain(); + if (contain) { + return wrapper; + } + List dependencies = wrapper.getFragmentDependencies(); + if (CollectionUtils.isEmpty(dependencies)) { + return null; + } + for (Wrapper dependencyWrapper : dependencies) { + Wrapper opContainer = findTargetWrapper(dependencyWrapper, targetOpVisitor); + if (opContainer != null) { + return opContainer; + } + } + return null; + } - private Fragment fragment; + private abstract class TargetPhysicalOperatorVisitor extends AbstractPhysicalVisitor { - private boolean contain = false; + protected Exchange sendingExchange; - private boolean targetIsGroupScan; + public void setCurrentFragment(Fragment fragment) { + sendingExchange = fragment.getSendingExchange(); + } - private boolean targetIsHashJoin; + public abstract boolean isContain(); + } - private String targetGroupScanDigest; + private class WrapperOperatorsVisitor extends TargetPhysicalOperatorVisitor { - private String targetHashJoinJson; + private boolean contain = false; + private PhysicalOperator targetOp; - public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) { - this.fragment = fragment; - this.targetIsGroupScan = targetOp instanceof GroupScan; - this.targetIsHashJoin = targetOp instanceof HashJoinPOP; - this.targetGroupScanDigest = targetIsGroupScan ? ((GroupScan) targetOp).getDigest() : null; - this.targetHashJoinJson = targetIsHashJoin ? jsonOfPhysicalOp(targetOp) : null; + public WrapperOperatorsVisitor(PhysicalOperator targetOp) { + this.targetOp = targetOp; } @Override public Void visitExchange(Exchange exchange, Void value) throws RuntimeException { - List exchangeFragmentPairs = fragment.getReceivingExchangePairs(); - for (Fragment.ExchangeFragmentPair exchangeFragmentPair : exchangeFragmentPairs) { - boolean same = exchange == exchangeFragmentPair.getExchange(); - if (same) { - return null; - } + if (exchange != sendingExchange) { + return null; } return exchange.getChild().accept(this, value); } @Override public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException { - boolean same = false; - if (targetIsGroupScan && op instanceof GroupScan) { - //Since GroupScan may be rewrite during the planing, here we use the digest to identify it. - String currentDigest = ((GroupScan) op).getDigest(); - same = targetGroupScanDigest.equals(currentDigest); - } - if (targetIsHashJoin && op instanceof HashJoinPOP) { - String currentOpJson = jsonOfPhysicalOp(op); - same = targetHashJoinJson.equals(currentOpJson); - } - if (!same) { + if (op == targetOp) { + contain = true; + } else { for (PhysicalOperator child : op) { child.accept(this, value); } - } else { - contain = true; } return null; } @@ -269,42 +236,57 @@ public class RuntimeFilterRouter { public boolean isContain() { return contain; } + } + + private class WrapperRuntimeFilterOperatorsVisitor extends TargetPhysicalOperatorVisitor { + + private boolean contain = false; + + private long identifier; + + + public WrapperRuntimeFilterOperatorsVisitor(long identifier) { + this.identifier = identifier; + } - public String jsonOfPhysicalOp(PhysicalOperator operator) { - try { - ObjectMapper objectMapper = new ObjectMapper(); - StringWriter stringWriter = new StringWriter(); - objectMapper.writeValue(stringWriter, operator); - return stringWriter.toString(); - } catch (IOException e) { - throw new RuntimeException(e); + @Override + public Void visitExchange(Exchange exchange, Void value) throws RuntimeException { + if (exchange != sendingExchange) { + return null; } + return exchange.getChild().accept(this, value); } - } - private boolean containsPhysicalOperator(Wrapper wrapper, PhysicalOperator op) { - WrapperOperatorsVisitor wrapperOpsVistitor = new WrapperOperatorsVisitor(op, wrapper.getNode()); - wrapper.getNode().getRoot().accept(wrapperOpsVistitor, null); - return wrapperOpsVistitor.isContain(); - } + @Override + public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException { + boolean same; + boolean isRuntimeFilterPop = op instanceof RuntimeFilterPOP; + boolean isHashJoinPop = op instanceof HashJoinPOP; - private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) { - boolean contain = containsPhysicalOperator(wrapper, op); - if (contain) { - return wrapper; - } - List dependencies = wrapper.getFragmentDependencies(); - if (CollectionUtils.isEmpty(dependencies)) { + if (isHashJoinPop) { + HashJoinPOP hashJoinPOP = (HashJoinPOP) op; + PhysicalOperator leftPop = hashJoinPOP.getLeft(); + leftPop.accept(this, value); + return null; + } + + if (isRuntimeFilterPop) { + RuntimeFilterPOP runtimeFilterPOP = (RuntimeFilterPOP) op; + same = this.identifier == runtimeFilterPOP.getIdentifier(); + if (same) { + contain = true; + return null; + } + } + for (PhysicalOperator child : op) { + child.accept(this, value); + } return null; } - for (Wrapper dependencyWrapper : dependencies) { - Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, op); - if (opContainer != null) { - return opContainer; - } + + public boolean isContain() { + return contain; } - //should not be here - throw new IllegalStateException(String.format("No valid Wrapper found for physicalOperator with id=%d", op.getOperatorId())); } /** @@ -320,6 +302,22 @@ public class RuntimeFilterRouter { private RuntimeFilterDef runtimeFilterDef; + private int joinOpId; + + private int buildSideRfNumber; + + public RFHelperHolder(int joinOpId) { + this.joinOpId = joinOpId; + } + + public int getJoinOpId() { + return joinOpId; + } + + public void setJoinOpId(int joinOpId) { + this.joinOpId = joinOpId; + } + public List getProbeSideScanEndpoints() { return probeSideScanEndpoints; } @@ -352,5 +350,13 @@ public class RuntimeFilterRouter { public void setRuntimeFilterDef(RuntimeFilterDef runtimeFilterDef) { this.runtimeFilterDef = runtimeFilterDef; } + + public int getBuildSideRfNumber() { + return buildSideRfNumber; + } + + public void setBuildSideRfNumber(int buildSideRfNumber) { + this.buildSideRfNumber = buildSideRfNumber; + } } -} +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java index 14686254f..f69a44ef7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -17,206 +17,250 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); +public class RuntimeFilterSink implements Closeable +{ - private int staleBookId = 0; + private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; + private Map joinMjId2rfNumber; - private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probeScanEps = new HashMap<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ - private ReentrantLock queueLock = new ReentrantLock(); + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map joinMjId2Stopwatch = new HashMap<>(); - private Condition notEmpty = queueLock.newCondition(); + private DrillbitContext drillbitContext; - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private SendingAccountor sendingAccountor; - private BufferAllocator bufferAllocator; + private AsyncAggregateWorker asyncAggregateWorker; - private Future future; + private AtomicBoolean running = new AtomicBoolean(true); private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { - this.bufferAllocator = bufferAllocator; - AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); - future = executorService.submit(asyncAggregateWorker); + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { + this.drillbitContext = drillbitContext; + this.sendingAccountor = sendingAccountor; + asyncAggregateWorker = new AsyncAggregateWorker(); + drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { - if (running.get()) { - try { - aggregatedRFLock.lock(); - if (containOne()) { - boolean same = aggregated.equals(runtimeFilterWritable); - if (!same) { - // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs - // share the same FragmentContext. - aggregated.close(); - currentBookId.set(0); - staleBookId = 0; - clearQueued(false); - } - } - } finally { - aggregatedRFLock.unlock(); - } + public void add(RuntimeFilterWritable runtimeFilterWritable) + { + if (!running.get()) { + runtimeFilterWritable.close(); + return; + } + runtimeFilterWritable.retainBuffers(1); + int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId(); + if (joinMjId2Stopwatch.get(joinMjId) == null) { + Stopwatch stopwatch = Stopwatch.createStarted(); + joinMjId2Stopwatch.put(joinMjId, stopwatch); + } + synchronized (rfQueue) { + rfQueue.add(runtimeFilterWritable); + rfQueue.notify(); + } + } + public void close() { + running.set(false); + if (asyncAggregateWorker != null) { + synchronized (rfQueue) { + rfQueue.notify(); + } + } + while (!asyncAggregateWorker.over.get()) { try { - queueLock.lock(); - if (rfQueue != null) { - rfQueue.add(runtimeFilterWritable); - notEmpty.signal(); - } else { - runtimeFilterWritable.close(); - } - } finally { - queueLock.unlock(); + Thread.sleep(100); + } catch (InterruptedException e) { + logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", e); } - } else { + } + for (RuntimeFilterWritable runtimeFilterWritable : joinMjId2AggregatedRF.values()) { runtimeFilterWritable.close(); } } - public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() { - try { - aggregatedRFLock.lock(); - return aggregated.duplicate(bufferAllocator); - } finally { - aggregatedRFLock.unlock(); + private void aggregate(RuntimeFilterWritable srcRuntimeFilterWritable) + { + BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); + int joinMajorId = runtimeFilterB.getMajorFragmentId(); + int buildSideRfNumber; + RuntimeFilterWritable toAggregated = null; + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { + toAggregated = srcRuntimeFilterWritable; + toAggregated.retainBuffers(1); + } else { + toAggregated.aggregate(srcRuntimeFilterWritable); } - } - - /** - * whether there's a fresh aggregated RuntimeFilter - * - * @return - */ - public boolean hasFreshOne() { - if (currentBookId.get() > staleBookId) { - staleBookId = currentBookId.get(); - return true; + joinMjId2AggregatedRF.put(joinMajorId, toAggregated); + if (buildSideRfNumber == 0) { + joinMjId2AggregatedRF.remove(joinMajorId); + route(toAggregated); + joinMjId2rfNumber.remove(joinMajorId); + Stopwatch stopwatch = joinMjId2Stopwatch.get(joinMajorId); + logger.info( + "received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms", + joinMajorId, + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ); } - return false; - } - - /** - * whether there's a usable RuntimeFilter. - * - * @return - */ - public boolean containOne() { - return aggregated != null; } - private void doCleanup() { - running.compareAndSet(true, false); - try { - aggregatedRFLock.lock(); - if (containOne()) { - aggregated.close(); - aggregated = null; + private void route(RuntimeFilterWritable srcRuntimeFilterWritable) + { + BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); + int joinMajorId = runtimeFilterB.getMajorFragmentId(); + UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); + List probeFields = runtimeFilterB.getProbeFieldsList(); + List sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList(); + long rfIdentifier = runtimeFilterB.getRfIdentifier(); + DrillBuf[] data = srcRuntimeFilterWritable.getData(); + List scanNodeEps = joinMjId2probeScanEps.get(joinMajorId); + int scanNodeSize = scanNodeEps.size(); + srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1); + int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId); + for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) { + BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder(); + for (String probeField : probeFields) { + builder.addProbeFields(probeField); } - } finally { - aggregatedRFLock.unlock(); + BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId) + .setMajorFragmentId(scanNodeMjId) + .setMinorFragmentId(minorId) + .setToForeman(false) + .setRfIdentifier(rfIdentifier) + .addAllBloomFilterSizeInBytes(sizeInBytes) + .build(); + RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data); + CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId); + + DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint); + Consumer exceptionConsumer = new Consumer() + { + @Override + public void accept(final RpcException e) + { + logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); + } + + @Override + public void interrupt(final InterruptedException e) + { + logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); + } + }; + RpcOutcomeListener statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); + AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler); + accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable); } } - @Override - public void close() throws Exception { - future.cancel(true); - doCleanup(); + public void setJoinMjId2rfNumber(Map joinMjId2rfNumber) + { + this.joinMjId2rfNumber = joinMjId2rfNumber; } - private void clearQueued(boolean setToNull) { - RuntimeFilterWritable toClear; - try { - queueLock.lock(); - while (rfQueue != null && (toClear = rfQueue.poll()) != null) { - toClear.close(); - } - rfQueue = (setToNull) ? null : rfQueue; - } finally { - queueLock.unlock(); - } + public void setJoinMjId2probeScanEps(Map> joinMjId2probeScanEps) + { + this.joinMjId2probeScanEps = joinMjId2probeScanEps; } - private class AsyncAggregateWorker implements Runnable { + public void setJoinMjId2ScanMjId(Map joinMjId2ScanMjId) + { + this.joinMjId2ScanMjId = joinMjId2ScanMjId; + } + + private class AsyncAggregateWorker implements Runnable + { + private AtomicBoolean over = new AtomicBoolean(false); @Override - public void run() { - try { + public void run() + { + while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) { RuntimeFilterWritable toAggregate = null; - while (running.get()) { + synchronized (rfQueue) { try { - queueLock.lock(); - toAggregate = (rfQueue != null) ? rfQueue.poll() : null; - if (toAggregate == null) { - notEmpty.await(); - continue; + toAggregate = rfQueue.poll(); + while (toAggregate == null && running.get()) { + rfQueue.wait(); + toAggregate = rfQueue.poll(); } - } finally { - queueLock.unlock(); + } catch (InterruptedException ex) { + logger.error("RFW_Aggregator thread being interrupted", ex); + continue; } - - try { - aggregatedRFLock.lock(); - if (containOne()) { - aggregated.aggregate(toAggregate); - - // Release the byteBuf referenced by toAggregate since aggregate will not do it - toAggregate.close(); - } else { - aggregated = toAggregate; - } - } finally { - aggregatedRFLock.unlock(); + } + if (toAggregate == null) { + continue; + } + // perform aggregate outside the sync block. + try { + aggregate(toAggregate); + } catch (Exception ex) { + logger.error("Failed to aggregate or route the RFW", ex); + throw new DrillRuntimeException(ex); + } finally { + if (toAggregate != null) { + toAggregate.close(); } - currentBookId.incrementAndGet(); } - } catch (InterruptedException e) { - logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName()); - Thread.currentThread().interrupt(); - } finally { - doCleanup(); - clearQueued(true); } + + if (!running.get()) { + RuntimeFilterWritable toClose; + while ((toClose = rfQueue.poll()) != null) { + toClose.close(); + } + } + over.set(true); } } } - - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java index 9a971e94c..f8c2701b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java @@ -103,6 +103,27 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ return new RuntimeFilterWritable(runtimeFilterBDef, cloned); } + public void retainBuffers(final int increment) { + if (increment <= 0) { + return; + } + for (final DrillBuf buf : data) { + buf.retain(increment); + } + } + //TODO: Not used currently because of DRILL-6826 + public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) { + int bufNum = data.length; + DrillBuf [] newBufs = new DrillBuf[bufNum]; + int i = 0; + for (DrillBuf buf : data) { + DrillBuf transferredBuffer = buf.transferOwnership(bufferAllocator).buffer; + newBufs[i] = transferredBuffer; + i++; + } + return new RuntimeFilterWritable(this.runtimeFilterBDef, newBufs); + } + public String toString() { return identifier; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 42b76f278..a379db175 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -151,7 +151,7 @@ public class Foreman implements Runnable { this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this); this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult()); this.profileOption = setProfileOption(queryContext.getOptions()); - this.enableRuntimeFilter = drillbitContext.getOptionManager().getBoolean(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY); + this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val; } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 8aa3233a9..a2d3cdc7a 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -469,6 +469,8 @@ drill.exec.options: { exec.hashjoin.enable.runtime_filter: false, exec.hashjoin.bloom_filter.fpp: 0.75, exec.hashjoin.bloom_filter.max.size: 33554432, #32 MB + exec.hashjoin.runtime_filter.waiting.enable: true, + exec.hashjoin.runtime_filter.max.waiting.time: 300, #400 ms exec.hashagg.mem_limit: 0, exec.hashagg.min_batches_per_partition: 2, exec.hashagg.num_partitions: 32, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java index 5eae12e6e..a5fc5ba49 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java @@ -40,9 +40,9 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase { public void testBroadcastHashJoin1Cond() { List bloomFilterDefs = new ArrayList<>(); int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01); - BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft"); + BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt"); bloomFilterDefs.add(bloomFilterDef); - RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false ); + RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1); HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER, runtimeFilterDef); operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4); @@ -71,11 +71,11 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase { public void testBroadcastHashJoin2Cond() { List bloomFilterDefs = new ArrayList<>(); int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01); - BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft"); - BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a"); + BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt"); + BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a", "b"); bloomFilterDefs.add(bloomFilterDef); bloomFilterDefs.add(bloomFilterDef1); - RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false ); + RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1); HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("lft", "EQUALS", "rgt"), joinCond("a", "EQUALS", "b")), JoinRelType.INNER, runtimeFilterDef); operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java index 2370ffa06..ac174d1ce 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java @@ -33,38 +33,6 @@ public class TestHashJoinJPPDPlan extends JoinTestBase { test("alter session set `exec.hashjoin.enable.runtime_filter` = true"); } - @Test - public void testInnnerHashJoin() throws Exception { - String sql = "SELECT nations.N_NAME, count(*)" - + "FROM\n" - + " dfs.`sample-data/nation.parquet` nations\n" - + "JOIN\n" - + " dfs.`sample-data/region.parquet` regions\n" - + " on nations.N_REGIONKEY = regions.R_REGIONKEY " - + "WHERE nations.N_NAME = 'A' " - + "group by nations.N_NAME"; - String expectedColNames1 = "\"runtimeFilterDef\""; - String expectedColNames2 = "\"bloomFilterDefs\""; - String expectedColNames3 = "\"runtime-filter\""; - testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3); - } - - @Test - public void testRightHashJoin() throws Exception { - String sql = "SELECT nations.N_NAME, count(*)" - + "FROM\n" - + " dfs.`sample-data/nation.parquet` nations\n" - + "RIGHT JOIN\n" - + " dfs.`sample-data/region.parquet` regions\n" - + " on nations.N_REGIONKEY = regions.R_REGIONKEY " - + "WHERE nations.N_NAME = 'A' " - + "group by nations.N_NAME"; - String expectedColNames1 = "\"runtimeFilterDef\""; - String expectedColNames2 = "\"bloomFilterDefs\""; - String expectedColNames3 = "\"runtime-filter\""; - testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3); - } - @Test public void testLeftHashJoin() throws Exception { String sql = "SELECT nations.N_NAME, count(*)" @@ -95,24 +63,4 @@ public class TestHashJoinJPPDPlan extends JoinTestBase { String excludedColNames3 = "\"runtime-filter\""; testPlanWithAttributesMatchingPatterns(sql, null, new String[]{excludedColNames1, excludedColNames2, excludedColNames3}); } - - @Test - public void testInnnerHashJoinWithRightDeepTree() throws Exception { - String sql = "SELECT nations.N_NAME, count(*)" - + "FROM\n" - + " cp.`tpch/nation.parquet` nations\n" - + "JOIN\n" - + " cp.`tpch/region.parquet` regions\n" - + " on nations.N_REGIONKEY = regions.R_REGIONKEY " - + "JOIN cp.`tpch/customer.parquet` customers\n" - + " on nations.N_NATIONKEY = customers.C_NATIONKEY " - + "WHERE nations.N_NAME = 'A' " - + "group by nations.N_NAME"; - String expectedColNames1 = "\"runtimeFilterDef\""; - String expectedColNames2 = "\"bloomFilterDefs\""; - String expectedColNames3 = "\"runtime-filter\""; - testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3); - } - - } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java index c05cdfdf8..c1d157600 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java @@ -135,7 +135,6 @@ public class BloomFilterTest { @Test public void testNotExist() throws Exception { - Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), ClassPathScanner.fromPrescan(c)); bit.run(); DrillbitContext bitContext = bit.getContext(); @@ -192,6 +191,12 @@ public class BloomFilterTest { long hashCode = probeHash64.hash64Code(0, 0, 0); boolean contain = bloomFilter.find(hashCode); Assert.assertFalse(contain); + bloomFilter.getContent().close(); + vectorContainer.clear(); + probeVectorContainer.clear(); + context.close(); + bitContext.close(); + bit.close(); } @@ -254,6 +259,12 @@ public class BloomFilterTest { long hashCode = probeHash64.hash64Code(0, 0, 0); boolean contain = bloomFilter.find(hashCode); Assert.assertTrue(contain); + bloomFilter.getContent().close(); + vectorContainer.clear(); + probeVectorContainer.clear(); + context.close(); + bitContext.close(); + bit.close(); } @@ -324,5 +335,11 @@ public class BloomFilterTest { long hashCode = probeHash64.hash64Code(0, 0, 0); boolean contain = bloomFilter.find(hashCode); Assert.assertTrue(contain); + bloomFilter.getContent().close(); + vectorContainer.clear(); + probeVectorContainer.clear(); + context.close(); + bitContext.close(); + bit.close(); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index a1e7d0d2a..b41798ded 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -17,7 +17,6 @@ */ package org.apache.drill.test; -import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.shaded.guava.com.google.common.base.Function; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -26,10 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; - -import org.apache.drill.shaded.guava.com.google.common.base.Function; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import io.netty.buffer.DrillBuf; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; @@ -76,12 +71,7 @@ import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.hadoop.security.UserGroupInformation; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Test fixture for operator and (especially) "sub-operator" tests. @@ -182,7 +172,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { private ExecutorState executorState = new OperatorFixture.MockExecutorState(); private ExecutionControls controls; - private RuntimeFilterSink runtimeFilterSink; public MockFragmentContext(final DrillConfig config, final OptionManager options, @@ -198,7 +187,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { this.controls = new ExecutionControls(options); compiler = new CodeCompiler(config, options); bufferManager = new BufferManagerImpl(allocator); - this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool()); } private static FunctionImplementationRegistry newFunctionRegistry( @@ -319,13 +307,18 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { } @Override - public RuntimeFilterSink getRuntimeFilterSink() { - return runtimeFilterSink; + public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { } @Override - public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { - runtimeFilterSink.aggregate(runtimeFilter); + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) { + return null; + } + + @Override + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit) + { + return null; } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java index 84a7c785b..b0820e926 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java @@ -38,7 +38,6 @@ import org.apache.drill.exec.server.QueryProfileStoreContext; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.easy.json.JSONRecordReader; import org.apache.drill.exec.work.batch.IncomingBuffers; -import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.exec.work.filter.RuntimeFilterWritable; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -74,6 +73,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class PhysicalOpUnitTestBase extends ExecTest { protected MockExecutorFragmentContext fragContext; @@ -198,12 +198,10 @@ public class PhysicalOpUnitTestBase extends ExecTest { *

*/ protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext { - private RuntimeFilterSink runtimeFilterSink; public MockExecutorFragmentContext(final FragmentContext fragmentContext) { super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(), fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor()); - this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool()); } @Override @@ -301,12 +299,17 @@ public class PhysicalOpUnitTestBase extends ExecTest { @Override public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { - this.runtimeFilterSink.aggregate(runtimeFilter); } @Override - public RuntimeFilterSink getRuntimeFilterSink() { - return runtimeFilterSink; + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) { + return null; + } + + @Override + public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit) + { + return null; } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java index d7921fc0f..e43380db8 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java @@ -2536,6 +2536,24 @@ public final class BitData { * */ int getHjOpId(); + + // optional int64 rf_identifier = 8; + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + boolean hasRfIdentifier(); + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + long getRfIdentifier(); } /** * Protobuf type {@code exec.bit.data.RuntimeFilterBDef} @@ -2650,6 +2668,11 @@ public final class BitData { hjOpId_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000020; + rfIdentifier_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -2867,6 +2890,30 @@ public final class BitData { return hjOpId_; } + // optional int64 rf_identifier = 8; + public static final int RF_IDENTIFIER_FIELD_NUMBER = 8; + private long rfIdentifier_; + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + public boolean hasRfIdentifier() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + public long getRfIdentifier() { + return rfIdentifier_; + } + private void initFields() { queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance(); majorFragmentId_ = 0; @@ -2875,6 +2922,7 @@ public final class BitData { bloomFilterSizeInBytes_ = java.util.Collections.emptyList(); probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY; hjOpId_ = 0; + rfIdentifier_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2909,6 +2957,9 @@ public final class BitData { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeInt32(7, hjOpId_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt64(8, rfIdentifier_); + } getUnknownFields().writeTo(output); } @@ -2956,6 +3007,10 @@ public final class BitData { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, hjOpId_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(8, rfIdentifier_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3091,6 +3146,8 @@ public final class BitData { bitField0_ = (bitField0_ & ~0x00000020); hjOpId_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + rfIdentifier_ = 0L; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -3154,6 +3211,10 @@ public final class BitData { to_bitField0_ |= 0x00000010; } result.hjOpId_ = hjOpId_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000020; + } + result.rfIdentifier_ = rfIdentifier_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3205,6 +3266,9 @@ public final class BitData { if (other.hasHjOpId()) { setHjOpId(other.getHjOpId()); } + if (other.hasRfIdentifier()) { + setRfIdentifier(other.getRfIdentifier()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3708,6 +3772,55 @@ public final class BitData { return this; } + // optional int64 rf_identifier = 8; + private long rfIdentifier_ ; + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public boolean hasRfIdentifier() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public long getRfIdentifier() { + return rfIdentifier_; + } + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public Builder setRfIdentifier(long value) { + bitField0_ |= 0x00000080; + rfIdentifier_ = value; + onChanged(); + return this; + } + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public Builder clearRfIdentifier() { + bitField0_ = (bitField0_ & ~0x00000080); + rfIdentifier_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef) } @@ -3761,16 +3874,17 @@ public final class BitData { " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" + "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" + "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" + - "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" + + "isLastBatch\030\007 \001(\010\"\350\001\n\021RuntimeFilterBDef\022" + "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" + "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" + "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" + "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" + - "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" + - "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n", - "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" + - "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" + - "l.exec.protoB\007BitDataH\001" + "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" + + "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007", + "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" + + "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" + + "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" + + "DataH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3800,7 +3914,7 @@ public final class BitData { internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_exec_bit_data_RuntimeFilterBDef_descriptor, - new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", }); + new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", "RfIdentifier", }); return null; } }; diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java index 3c88ffced..ecf0f187f 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java @@ -443,6 +443,8 @@ public final class SchemaBitData output.writeString(6, probeFields, true); if(message.hasHjOpId()) output.writeInt32(7, message.getHjOpId(), false); + if(message.hasRfIdentifier()) + output.writeInt64(8, message.getRfIdentifier(), false); } public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message) { @@ -504,6 +506,9 @@ public final class SchemaBitData case 7: builder.setHjOpId(input.readInt32()); break; + case 8: + builder.setRfIdentifier(input.readInt64()); + break; default: input.handleUnknownField(number, this); } @@ -551,6 +556,7 @@ public final class SchemaBitData case 5: return "bloomFilterSizeInBytes"; case 6: return "probeFields"; case 7: return "hjOpId"; + case 8: return "rfIdentifier"; default: return null; } } @@ -569,6 +575,7 @@ public final class SchemaBitData fieldMap.put("bloomFilterSizeInBytes", 5); fieldMap.put("probeFields", 6); fieldMap.put("hjOpId", 7); + fieldMap.put("rfIdentifier", 8); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java index 2d1c2a702..3b2c1027e 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java @@ -56,6 +56,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message bloomFilterSizeInBytes; private List probeFields; private int hjOpId; + private long rfIdentifier; public RuntimeFilterBDef() { @@ -155,6 +156,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message