aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorweijie.tong <weijie.tong@alipay.com>2018-10-14 19:41:51 +0800
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-11-29 18:33:23 +0200
commit9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch)
treecb68cd4bbedf6f84e00168cb0ab300c6dacdb35c
parent325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff)
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java82
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java82
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java80
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java280
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java340
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java2
-rw-r--r--exec/java-exec/src/main/resources/drill-module.conf2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java10
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java52
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java19
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java27
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java15
-rw-r--r--protocol/src/main/java/org/apache/drill/exec/proto/BitData.java128
-rw-r--r--protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java7
-rw-r--r--protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java22
-rw-r--r--protocol/src/main/protobuf/BitData.proto1
29 files changed, 876 insertions, 505 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index fb4657259..c4d7652f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -153,6 +153,10 @@ public final class ExecConstants {
public static final IntegerValidator HASHJOIN_BLOOM_FILTER_MAX_SIZE = new IntegerValidator(HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY, null);
public static final String HASHJOIN_BLOOM_FILTER_FPP_KEY = "exec.hashjoin.bloom_filter.fpp";
public static final DoubleValidator HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR = new RangeDoubleValidator(HASHJOIN_BLOOM_FILTER_FPP_KEY, Double.MIN_VALUE, 1.0, null);
+ public static final String HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY = "exec.hashjoin.runtime_filter.waiting.enable";
+ public static final BooleanValidator HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING = new BooleanValidator(HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY, null);
+ public static final String HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY = "exec.hashjoin.runtime_filter.max.waiting.time";
+ public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 88c21d9e9..5125f7201 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -20,8 +20,7 @@ package org.apache.drill.exec.ops;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
-
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
@@ -159,18 +158,23 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable {
@Override
void close();
-
- /**
- * @return
- */
- RuntimeFilterSink getRuntimeFilterSink();
-
/**
* add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
* @param runtimeFilter
*/
public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier);
+
+ /**
+ * get the RuntimeFilter with a blocking wait, if the waiting option is enabled
+ * @param rfIdentifier
+ * @param maxWaitTime
+ * @param timeUnit
+ * @return the RFW or null
+ */
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit);
+
interface ExecutorState {
/**
* Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 6e40466e8..b740c927d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -21,7 +21,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
@@ -60,8 +65,6 @@ import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.batch.IncomingBuffers;
-
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -115,6 +118,10 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
private final BufferManager bufferManager;
private ExecutorState executorState;
private final ExecutionControls executionControls;
+ private boolean enableRuntimeFilter;
+ private boolean enableRFWaiting;
+ private Lock lock4RF;
+ private Condition condition4RF;
private final SendingAccountor sendingAccountor = new SendingAccountor();
private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@@ -136,8 +143,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
private final AccountingUserConnection accountingUserConnection;
/** Stores constants and their holders by type */
private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
-
- private RuntimeFilterSink runtimeFilterSink;
+ private Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new ConcurrentHashMap<>();
+ private Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
/**
* Create a FragmentContext instance for non-root fragment.
@@ -209,10 +216,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
stats = new FragmentStats(allocator, fragment.getAssignment());
bufferManager = new BufferManagerImpl(this.allocator);
constantValueHolderCache = Maps.newHashMap();
- boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
- if (enableRF) {
- ExecutorService executorService = context.getExecutor();
- this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService);
+ enableRuntimeFilter = this.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
+ enableRFWaiting = this.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val && enableRuntimeFilter;
+ if (enableRFWaiting) {
+ lock4RF = new ReentrantLock();
+ condition4RF = lock4RF.newCondition();
}
}
@@ -362,12 +370,50 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
@Override
public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
- this.runtimeFilterSink.aggregate(runtimeFilter);
+ long rfIdentifier = runtimeFilter.getRuntimeFilterBDef().getRfIdentifier();
+ //if the RF was sent directly from the HJ nodes, we don't need to retain the buffer again
+ // as the RuntimeFilterReporter has already retained the buffer
+ rfIdentifier2fetched.put(rfIdentifier, false);
+ rfIdentifier2RFW.put(rfIdentifier, runtimeFilter);
+ if (enableRFWaiting) {
+ lock4RF.lock();
+ try {
+ condition4RF.signal();
+ } catch (Exception e) {
+ logger.info("fail to signal the waiting thread.", e);
+ } finally {
+ lock4RF.unlock();
+ }
+ }
+ }
+
+ @Override
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+ RuntimeFilterWritable runtimeFilterWritable = rfIdentifier2RFW.get(rfIdentifier);
+ if (runtimeFilterWritable != null) {
+ rfIdentifier2fetched.put(rfIdentifier, true);
+ }
+ return runtimeFilterWritable;
}
@Override
- public RuntimeFilterSink getRuntimeFilterSink() {
- return runtimeFilterSink;
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit) {
+ if (rfIdentifier2RFW.get(rfIdentifier) != null) {
+ return getRuntimeFilter(rfIdentifier);
+ }
+ if (enableRFWaiting) {
+ lock4RF.lock();
+ try {
+ if (rfIdentifier2RFW.get(rfIdentifier) == null) {
+ condition4RF.await(maxWaitTime, timeUnit);
+ }
+ } catch (InterruptedException e) {
+ logger.info("Condition was interrupted", e);
+ } finally {
+ lock4RF.unlock();
+ }
+ }
+ return getRuntimeFilter(rfIdentifier);
}
/**
@@ -484,12 +530,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
// Close the buffers before closing the operators; this is needed as buffer ownership
// is attached to the receive operators.
suppressingClose(buffers);
-
+ closeNotConsumedRFWs();
// close operator context
for (OperatorContextImpl opContext : contexts) {
suppressingClose(opContext);
}
- suppressingClose(runtimeFilterSink);
suppressingClose(bufferManager);
suppressingClose(allocator);
}
@@ -550,4 +595,15 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
protected BufferManager getBufferManager() {
return bufferManager;
}
+
+ private void closeNotConsumedRFWs() {
+ for (RuntimeFilterWritable runtimeFilterWritable : rfIdentifier2RFW.values()){
+ long rfIdentifier = runtimeFilterWritable.getRuntimeFilterBDef().getRfIdentifier();
+ boolean fetchedByOperator = rfIdentifier2fetched.get(rfIdentifier);
+ if (!fetchedByOperator) {
+ //if the RF hasn't been consumed by the operator, we have to released it one more time.
+ runtimeFilterWritable.close();
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index ac867020d..da590689d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.impl.ScreenCreator;
import org.apache.drill.exec.physical.impl.SingleSenderCreator;
import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec;
+import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
@@ -61,6 +62,7 @@ public class OperatorMetricRegistry {
register(CoreOperatorType.LATERAL_JOIN_VALUE, AbstractBinaryRecordBatch.Metric.class);
register(CoreOperatorType.UNNEST_VALUE, UnnestRecordBatch.Metric.class);
register(CoreOperatorType.UNION_VALUE, AbstractBinaryRecordBatch.Metric.class);
+ register(CoreOperatorType.RUNTIME_FILTER_VALUE, RuntimeFilterRecordBatch.Metric.class);
}
private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
index 50c00d792..b35bf29af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
@@ -31,9 +31,12 @@ public class RuntimeFilterPOP extends AbstractSingle {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPOP.class);
+ private long identifier;
+
@JsonCreator
- public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child) {
+ public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child, @JsonProperty("identifier")long identifier) {
super(child);
+ this.identifier = identifier;
}
@Override
@@ -43,7 +46,7 @@ public class RuntimeFilterPOP extends AbstractSingle {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new RuntimeFilterPOP(child);
+ return new RuntimeFilterPOP(child, identifier);
}
@Override
@@ -55,4 +58,13 @@ public class RuntimeFilterPOP extends AbstractSingle {
public int getOperatorType() {
return CoreOperatorType.RUNTIME_FILTER_VALUE;
}
+
+
+ public long getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(long identifier) {
+ this.identifier = identifier;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 9248bbc69..bf7ed7912 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -22,11 +22,13 @@ import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -36,14 +38,13 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.work.filter.BloomFilter;
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates
@@ -59,12 +60,21 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
private List<String> toFilterFields;
private List<BloomFilter> bloomFilters;
private RuntimeFilterWritable current;
- private RuntimeFilterWritable previous;
private int originalRecordCount;
+ private long filteredRows = 0l;
+ private long appliedTimes = 0l;
+ private int batchTimes = 0;
+ private boolean waited = false;
+ private boolean enableRFWaiting;
+ private long maxWaitingTime;
+ private long rfIdentifier;
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
+ enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val;
+ maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val;
+ this.rfIdentifier = pop.getIdentifier();
}
@Override
@@ -89,7 +99,6 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
@Override
protected IterOutcome doWork() {
- container.transferIn(incoming.getContainer());
originalRecordCount = incoming.getRecordCount();
sv2.setBatchActualRecordCount(originalRecordCount);
try {
@@ -97,6 +106,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
} catch (SchemaChangeException e) {
throw new UnsupportedOperationException(e);
}
+ container.transferIn(incoming.getContainer());
+ updateStats();
return getFinalOutcome(false);
}
@@ -155,21 +166,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
* schema change hash64 should be reset and this method needs to be called again.
*/
private void setupHashHelper() {
- final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
- // Check if RuntimeFilterWritable was received by the minor fragment or not
- if (!runtimeFilterSink.containOne()) {
+ current = context.getRuntimeFilter(rfIdentifier);
+ if (current == null) {
return;
}
- if (runtimeFilterSink.hasFreshOne()) {
- RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
- if (current == null) {
- current = freshRuntimeFilterWritable;
- previous = freshRuntimeFilterWritable;
- } else {
- previous = current;
- current = freshRuntimeFilterWritable;
- previous.close();
- }
+ if (bloomFilters == null) {
bloomFilters = current.unwrap();
}
// Check if HashHelper is initialized or not
@@ -189,8 +190,7 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
hashFieldExps.add(toHashFieldExp);
}
- hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]),
- typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
+ hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
} catch (Exception e) {
throw UserException.internalError(e).build(logger);
}
@@ -208,9 +208,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
sv2.setRecordCount(0);
return;
}
- final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
+ current = context.getRuntimeFilter(rfIdentifier);
+ timedWaiting();
+ batchTimes++;
sv2.allocateNew(originalRecordCount);
- if (!runtimeFilterSink.containOne()) {
+ if (current == null) {
// means none of the rows are filtered out hence set all the indexes
for (int i = 0; i < originalRecordCount; ++i) {
sv2.setIndex(i, i);
@@ -227,21 +229,17 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
String fieldName = toFilterFields.get(i);
computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
}
-
int svIndex = 0;
- int tmpFilterRows = 0;
for (int i = 0; i < originalRecordCount; i++) {
boolean contain = bitSet.get(i);
if (contain) {
sv2.setIndex(svIndex, i);
svIndex++;
} else {
- tmpFilterRows++;
+ filteredRows++;
}
}
-
- logger.debug("RuntimeFiltered has filtered out {} rows from incoming with {} rows",
- tmpFilterRows, originalRecordCount);
+ appliedTimes++;
sv2.setRecordCount(svIndex);
}
@@ -263,4 +261,34 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
+ "originalRecordCount={}, batchSchema={}]",
container, sv2, toFilterFields, originalRecordCount, incoming.getSchema());
}
+
+ public enum Metric implements MetricDef {
+ FILTERED_ROWS, APPLIED_TIMES;
+
+ @Override
+ public int metricId() {
+ return ordinal();
+ }
+ }
+
+ public void updateStats() {
+ stats.setLongStat(Metric.FILTERED_ROWS, filteredRows);
+ stats.setLongStat(Metric.APPLIED_TIMES, appliedTimes);
+ }
+
+ private void timedWaiting() {
+ if (!enableRFWaiting || waited) {
+ return;
+ }
+ //Downstream HashJoinBatch prefetch first batch from both sides in buildSchema phase hence waiting is done post that phase
+ if (current == null && batchTimes > 0) {
+ waited = true;
+ try {
+ stats.startWait();
+ current = context.getRuntimeFilter(rfIdentifier, maxWaitingTime, TimeUnit.MILLISECONDS);
+ } finally {
+ stats.stopWait();
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 88eadf291..0ac0809d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,10 +19,13 @@ package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -203,11 +206,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
private int originalPartition = -1; // the partition a secondary reads from
IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
private int maxBatchesInMemory;
- private List<BloomFilter> bloomFilters = new ArrayList<>();
private List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
private boolean enableRuntimeFilter;
private RuntimeFilterReporter runtimeFilterReporter;
private ValueVectorHashHelper.Hash64 hash64;
+ private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+ private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+ private List<BloomFilter> bloomFilters = new ArrayList<>();
/**
* This holds information about the spilled partitions for the build and probe side.
@@ -757,6 +762,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
enableRuntimeFilter = false;
return;
}
+ RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+ List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
+ for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+ String buildField = bloomFilterDef.getBuildField();
+ SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
+ TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+ if (typedFieldId == null) {
+ missingField = true;
+ break;
+ }
+ int fieldId = typedFieldId.getFieldIds()[0];
+ bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
+ }
+ if (missingField) {
+ logger.info("As some build side join key fields not found, runtime filter was disabled");
+ enableRuntimeFilter = false;
+ return;
+ }
ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context);
try {
hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
@@ -799,9 +822,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
if (!enableRuntimeFilter) {
return;
}
- if (runtimeFilterReporter != null) {
- return;
- }
runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
//RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
@@ -809,11 +829,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
if (runtimeFilterDef != null) {
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+ int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
int numBytes = bloomFilterDef.getNumBytes();
String probeField = bloomFilterDef.getProbeField();
probeFields.add(probeField);
BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator());
bloomFilters.add(bloomFilter);
+ bloomFilter2buildId.put(bloomFilter, buildFieldId);
}
}
}
@@ -992,13 +1014,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
//create runtime filter
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
//create runtime filter and send out async
- int condFieldIndex = 0;
- for (BloomFilter bloomFilter : bloomFilters) {
+ for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
+ int fieldId = bloomFilter2buildId.get(bloomFilter);
for (int ind = 0; ind < currentRecordCount; ind++) {
- long hashCode = hash64.hash64Code(ind, 0, condFieldIndex);
+ long hashCode = hash64.hash64Code(ind, 0, fieldId);
bloomFilter.insert(hashCode);
}
- condFieldIndex++;
}
}
@@ -1027,9 +1048,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
}
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
- if (bloomFilters.size() > 0) {
+ if (bloomFilter2buildId.size() > 0) {
int hashJoinOpId = this.popConfig.getOperatorId();
- runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
+ runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
}
}
@@ -1237,7 +1258,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
- enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+ enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
index 59e1622b5..1729027de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
@@ -27,25 +27,29 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import java.io.IOException;
import java.util.List;
-public class RuntimeFilterPrel extends SinglePrel{
+public class RuntimeFilterPrel extends SinglePrel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class);
- public RuntimeFilterPrel(Prel child){
+ private long identifier;
+
+ public RuntimeFilterPrel(Prel child, long identifier){
super(child.getCluster(), child.getTraitSet(), child);
+ this.identifier = identifier;
}
- public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+ public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, long identifier) {
super(cluster, traits, child);
+ this.identifier = identifier;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0));
+ return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0), identifier);
}
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator));
+ RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator), identifier);
return creator.addMetadata(this, r);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index fcfa2bca1..4d309aea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.planner.physical.visitor;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinInfo;
@@ -28,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.physical.BroadcastExchangePrel;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.HashAggPrel;
@@ -43,11 +41,14 @@ import org.apache.drill.exec.planner.physical.TopNPrel;
import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
-
+import org.apache.drill.shaded.guava.com.google.common.collect.HashMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This visitor does two major things:
@@ -58,9 +59,14 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
private Set<ScanPrel> toAddRuntimeFilter = new HashSet<>();
+ private Multimap<ScanPrel, HashJoinPrel> probeSideScan2hj = HashMultimap.create();
+
private double fpp;
+
private int bloomFilterMaxSizeInBytesDef;
+ private static final AtomicLong rfIdCounter = new AtomicLong();
+
private RuntimeFilterVisitor(QueryContext queryContext) {
this.bloomFilterMaxSizeInBytesDef = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY).num_val.intValue();
this.fpp = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_KEY).float_val;
@@ -76,7 +82,7 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
}
public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
- List<RelNode> children = Lists.newArrayList();
+ List<RelNode> children = new ArrayList<>();
for (Prel child : prel) {
child = child.accept(this, value);
children.add(child);
@@ -100,8 +106,18 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
@Override
public Prel visitScan(ScanPrel prel, Void value) throws RuntimeException {
if (toAddRuntimeFilter.contains(prel)) {
- //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node.
- RuntimeFilterPrel runtimeFilterPrel = new RuntimeFilterPrel(prel);
+ //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node or a runtime filter node.
+ Collection<HashJoinPrel> hashJoinPrels = probeSideScan2hj.get(prel);
+ RuntimeFilterPrel runtimeFilterPrel = null;
+ for (HashJoinPrel hashJoinPrel : hashJoinPrels) {
+ long identifier = rfIdCounter.incrementAndGet();
+ hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier);
+ if (runtimeFilterPrel == null) {
+ runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier);
+ } else {
+ runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, identifier);
+ }
+ }
return runtimeFilterPrel;
} else {
return prel;
@@ -134,13 +150,24 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
//find the possible left scan node of the left join key
- GroupScan groupScan = null;
+ ScanPrel probeSideScanPrel = null;
RelNode left = hashJoinPrel.getLeft();
+ RelNode right = hashJoinPrel.getRight();
+ ExchangePrel exchangePrel = findRightExchangePrel(right);
+ if (exchangePrel == null) {
+ //Does not support the single fragment mode ,that is the right build side
+ //can only be BroadcastExchangePrel or HashToRandomExchangePrel
+ return null;
+ }
List<String> leftFields = left.getRowType().getFieldNames();
+ List<String> rightFields = right.getRowType().getFieldNames();
List<Integer> leftKeys = hashJoinPrel.getLeftKeys();
RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery();
+ int i = 0;
for (Integer leftKey : leftKeys) {
String leftFieldName = leftFields.get(leftKey);
+ String rightFieldName = rightFields.get(i);
+ i++;
//This also avoids the left field of the join condition with a function call.
ScanPrel scanPrel = findLeftScanPrel(leftFieldName, left);
if (scanPrel != null) {
@@ -160,17 +187,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
int bloomFilterSizeInBytes = BloomFilter.optimalNumOfBytes(ndv.longValue(), fpp);
bloomFilterSizeInBytes = bloomFilterSizeInBytes > bloomFilterMaxSizeInBytesDef ? bloomFilterMaxSizeInBytesDef : bloomFilterSizeInBytes;
//left the local parameter to be set later.
- BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName);
+ BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName, rightFieldName);
bloomFilterDef.setLeftNDV(ndv);
bloomFilterDefs.add(bloomFilterDef);
toAddRuntimeFilter.add(scanPrel);
- groupScan = scanPrel.getGroupScan();
+ probeSideScanPrel = scanPrel;
}
}
if (bloomFilterDefs.size() > 0) {
//left sendToForeman parameter to be set later.
- RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false);
- runtimeFilterDef.setProbeSideGroupScan(groupScan);
+ RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
+ probeSideScan2hj.put(probeSideScanPrel, hashJoinPrel);
return runtimeFilterDef;
}
return null;
@@ -265,6 +292,30 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
}
}
+ private ExchangePrel findRightExchangePrel(RelNode rightRelNode) {
+ if (rightRelNode instanceof ExchangePrel) {
+ return (ExchangePrel) rightRelNode;
+ }
+ if (rightRelNode instanceof ScanPrel) {
+ return null;
+ } else if (rightRelNode instanceof RelSubset) {
+ RelNode bestNode = ((RelSubset) rightRelNode).getBest();
+ if (bestNode != null) {
+ return findRightExchangePrel(bestNode);
+ } else {
+ return null;
+ }
+ } else {
+ List<RelNode> relNodes = rightRelNode.getInputs();
+ if (relNodes.size() == 1) {
+ RelNode leftNode = relNodes.get(0);
+ return findRightExchangePrel(leftNode);
+ } else {
+ return null;
+ }
+ }
+ }
+
private boolean containBlockNode(Prel startNode, Prel endNode) {
BlockNodeVisitor blockNodeVisitor = new BlockNodeVisitor();
startNode.accept(blockNodeVisitor, endNode);
@@ -311,6 +362,11 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
return null;
}
+ if (currentPrel instanceof HashJoinPrel) {
+ encounteredBlockNode = true;
+ return null;
+ }
+
for (Prel subPrel : currentPrel) {
visitPrel(subPrel, endValue);
}
@@ -349,4 +405,4 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
}
}
-}
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 37934c8e9..c97220cef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -129,6 +129,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME),
+ new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING),
// ------------------------------------------- Index planning related options BEGIN --------------------------------------------------------------
new OptionDefinition(PlannerSettings.USE_SIMPLE_OPTIMIZER),
new OptionDefinition(PlannerSettings.INDEX_PLANNING),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 0d97e0ac3..7915843eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -379,11 +379,16 @@ public class WorkManager implements AutoCloseable {
return runningFragments.get(handle);
}
+ /**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) {
BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef();
boolean toForeman = runtimeFilterDef.getToForeman();
QueryId queryId = runtimeFilterDef.getQueryId();
String queryIdStr = QueryIdHelper.getQueryId(queryId);
+ runtimeFilter.retainBuffers(1);
//to foreman
if (toForeman) {
Foreman foreman = queries.get(queryId);
@@ -393,13 +398,14 @@ public class WorkManager implements AutoCloseable {
public void run() {
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
- currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
+ currentThread.setName(queryIdStr + ":foreman:routeRuntimeFilter");
try {
- foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
+ foreman.getRuntimeFilterRouter().register(runtimeFilter);
} catch (Exception e) {
logger.warn("Exception while registering the RuntimeFilter", e);
} finally {
currentThread.setName(originalName);
+ runtimeFilter.close();
}
}
});
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
index dc6cc2fcf..afbc56a5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
@@ -34,6 +34,7 @@ import java.util.Arrays;
public class BloomFilter {
// Bytes in a bucket.
private static final int BYTES_PER_BUCKET = 32;
+
// Minimum bloom filter data size.
private static final int MINIMUM_BLOOM_SIZE_IN_BYTES = 256;
@@ -41,16 +42,14 @@ public class BloomFilter {
private int numBytes;
- private int mask[] = new int[8];
-
- private byte[] tempBucket = new byte[32];
-
+ private int bucketMask[] = new int[8];
public BloomFilter(int numBytes, BufferAllocator bufferAllocator) {
int size = BloomFilter.adjustByteSize(numBytes);
this.byteBuf = bufferAllocator.buffer(size);
this.numBytes = byteBuf.capacity();
- this.byteBuf.writerIndex(numBytes);
+ this.byteBuf.writeZero(this.numBytes);
+ this.byteBuf.writerIndex(this.numBytes);
}
public BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator) {
@@ -74,26 +73,27 @@ public class BloomFilter {
}
private void setMask(int key) {
- //8 odd numbers act as salt value to participate in the computation of the mask.
+ //8 odd numbers act as salt value to participate in the computation of the bucketMask.
final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
- Arrays.fill(mask, 0);
+ Arrays.fill(bucketMask, 0);
for (int i = 0; i < 8; ++i) {
- mask[i] = key * SALT[i];
+ bucketMask[i] = key * SALT[i];
}
for (int i = 0; i < 8; ++i) {
- mask[i] = mask[i] >> 27;
+ bucketMask[i] = bucketMask[i] >>> 27;
}
for (int i = 0; i < 8; ++i) {
- mask[i] = 0x1 << mask[i];
+ bucketMask[i] = 0x1 << bucketMask[i];
}
}
/**
* Add an element's hash value to this bloom filter.
+ *
* @param hash hash result of element.
*/
public void insert(long hash) {
@@ -101,16 +101,13 @@ public class BloomFilter {
int key = (int) hash;
setMask(key);
int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
- byteBuf.getBytes(initialStartIndex, tempBucket);
for (int i = 0; i < 8; i++) {
+ int index = initialStartIndex + i * 4;
//every iterate batch,we set 32 bits
- int bitsetIndex = i * 4;
- tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24));
- tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16));
- tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8));
- tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i]));
+ int a = byteBuf.getInt(index);
+ a |= bucketMask[i];
+ byteBuf.setInt(index, a);
}
- byteBuf.setBytes(initialStartIndex, tempBucket);
}
/**
@@ -123,17 +120,12 @@ public class BloomFilter {
int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
int key = (int) hash;
setMask(key);
-
int startIndex = bucketIndex * BYTES_PER_BUCKET;
- byteBuf.getBytes(startIndex, tempBucket);
for (int i = 0; i < 8; i++) {
- byte set = 0;
- int bitsetIndex = i * 4;
- set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24));
- set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16));
- set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8));
- set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]);
- if (0 == set) {
+ int index = startIndex + i * 4;
+ int a = byteBuf.getInt(index);
+ int b = a & bucketMask[i];
+ if (b == 0) {
return false;
}
}
@@ -142,6 +134,7 @@ public class BloomFilter {
/**
* Merge this bloom filter with other one
+ *
* @param other
*/
public void or(BloomFilter other) {
@@ -150,20 +143,19 @@ public class BloomFilter {
Preconditions.checkArgument(otherLength == thisLength);
Preconditions.checkState(otherLength % BYTES_PER_BUCKET == 0);
Preconditions.checkState(thisLength % BYTES_PER_BUCKET == 0);
- byte[] otherTmpBucket = new byte[BYTES_PER_BUCKET];
- for (int i = 0; i < thisLength / BYTES_PER_BUCKET; i++) {
- byteBuf.getBytes(i * BYTES_PER_BUCKET, tempBucket);
- other.byteBuf.getBytes(i * BYTES_PER_BUCKET, otherTmpBucket);
- for (int j = 0; j < BYTES_PER_BUCKET; j++) {
- tempBucket[j] = (byte) (tempBucket[j] | otherTmpBucket[j]);
- }
- this.byteBuf.setBytes(i, tempBucket);
+ for (int i = 0; i < thisLength / 8; i++) {
+ int index = i * 8;
+ long a = byteBuf.getLong(index);
+ long b = other.byteBuf.getLong(index);
+ long c = a | b;
+ byteBuf.setLong(index, c);
}
}
/**
* Calculate optimal size according to the number of distinct values and false positive probability.
* See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula.
+ *
* @param ndv: The number of distinct values.
* @param fpp: The false positive probability.
* @return optimal number of bytes of given ndv and fpp.
@@ -177,7 +169,7 @@ public class BloomFilter {
bits |= bits >> 8;
bits |= bits >> 16;
bits++;
- int bytes = bits/8;
+ int bytes = bits / 8;
return bytes;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
index 9a6df57ae..b2a9bd763 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
@@ -28,6 +28,8 @@ public class BloomFilterDef {
private boolean local;
private String probeField;
+
+ private String buildField;
//TODO
@JsonIgnore
private Double leftNDV;
@@ -37,10 +39,11 @@ public class BloomFilterDef {
@JsonCreator
public BloomFilterDef(@JsonProperty("numBytes") int numBytes, @JsonProperty("local") boolean local, @JsonProperty("probeField")
- String probeField){
+ String probeField, @JsonProperty("buildField") String buildField){
this.numBytes = numBytes;
this.local = local;
this.probeField = probeField;
+ this.buildField = buildField;
}
@@ -61,7 +64,7 @@ public class BloomFilterDef {
}
public String toString() {
- return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + " }";
+ return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + ",buildField= " + buildField + " }";
}
@JsonIgnore
@@ -82,4 +85,9 @@ public class BloomFilterDef {
this.rightNDV = rightNDV;
}
+ public String getBuildField()
+ {
+ return buildField;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
index 5fb51bf24..efe300f3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
@@ -18,13 +18,8 @@
package org.apache.drill.exec.work.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.drill.exec.physical.base.GroupScan;
-
-
import java.util.List;
@JsonIgnoreProperties(ignoreUnknown = true)
@@ -37,17 +32,18 @@ public class RuntimeFilterDef {
private List<BloomFilterDef> bloomFilterDefs;
private boolean sendToForeman;
- @JsonIgnore
- private GroupScan probeSideGroupScan;
+ private long runtimeFilterIdentifier;
@JsonCreator
public RuntimeFilterDef(@JsonProperty("generateBloomFilter") boolean generateBloomFilter, @JsonProperty("generateMinMaxFilter") boolean generateMinMaxFilter,
- @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman) {
+ @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman,
+ @JsonProperty("runtimeFilterIdentifier") long runtimeFilterIdentifier) {
this.generateBloomFilter = generateBloomFilter;
this.generateMinMaxFilter = generateMinMaxFilter;
this.bloomFilterDefs = bloomFilterDefs;
this.sendToForeman = sendToForeman;
+ this.runtimeFilterIdentifier = runtimeFilterIdentifier;
}
@@ -84,12 +80,11 @@ public class RuntimeFilterDef {
this.sendToForeman = sendToForeman;
}
- @JsonIgnore
- public GroupScan getProbeSideGroupScan() {
- return probeSideGroupScan;
+ public long getRuntimeFilterIdentifier() {
+ return runtimeFilterIdentifier;
}
- public void setProbeSideGroupScan(GroupScan probeSideGroupScan) {
- this.probeSideGroupScan = probeSideGroupScan;
+ public void setRuntimeFilterIdentifier(long runtimeFilterIdentifier) {
+ this.runtimeFilterIdentifier = runtimeFilterIdentifier;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index 6e4a9a8e5..93736c5f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,9 @@ public class RuntimeFilterReporter {
this.context = context;
}
- public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
+ public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, RuntimeFilterDef runtimeFilterDef, int hashJoinOpId) {
+ boolean sendToForeman = runtimeFilterDef.isSendToForeman();
+ long rfIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
DrillBuf[] data = new DrillBuf[bloomFilters.size()];
List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -64,6 +66,7 @@ public class RuntimeFilterReporter {
.setMinorFragmentId(minorFragmentId)
.setToForeman(sendToForeman)
.setHjOpId(hashJoinOpId)
+ .setRfIdentifier(rfIdentifier)
.addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
.build();
RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index 5a8c6fc9e..a4946a96c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -17,39 +17,24 @@
*/
package org.apache.drill.exec.work.filter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.netty.buffer.DrillBuf;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.Consumer;
import org.apache.drill.exec.ops.SendingAccountor;
-import org.apache.drill.exec.ops.StatusHandler;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.Wrapper;
-import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* This class manages the RuntimeFilter routing information of the pushed down join predicate
@@ -69,29 +54,24 @@ import java.util.concurrent.ConcurrentHashMap;
public class RuntimeFilterRouter {
private Wrapper rootWrapper;
- //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
- private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probdeScanEps = new HashMap<>();
- //HashJoin node's major fragment id to its corresponding probe side nodes's number
- private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
- //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
- private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
-
- private DrillbitContext drillbitContext;
private SendingAccountor sendingAccountor = new SendingAccountor();
+ private RuntimeFilterSink runtimeFilterSink;
+
private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
/**
* This class maintains context for the runtime join push down's filter management. It
* does a traversal of the physical operators by leveraging the root wrapper which indirectly
* holds the global PhysicalOperator tree and contains the minor fragment endpoints.
+ *
* @param workUnit
* @param drillbitContext
*/
public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
this.rootWrapper = workUnit.getRootWrapper();
- this.drillbitContext = drillbitContext;
+ runtimeFilterSink = new RuntimeFilterSink(drillbitContext, sendingAccountor);
}
/**
@@ -99,6 +79,12 @@ public class RuntimeFilterRouter {
* record the relationship between the RuntimeFilter producers and consumers.
*/
public void collectRuntimeFilterParallelAndControlInfo() {
+ //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+ Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
+ //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+ Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
+ Map<Integer, Integer> joinMjId2rfNumber = new HashMap<>();
+
RuntimeFilterParallelismCollector runtimeFilterParallelismCollector = new RuntimeFilterParallelismCollector();
rootWrapper.getNode().getRoot().accept(runtimeFilterParallelismCollector, null);
List<RFHelperHolder> holders = runtimeFilterParallelismCollector.getHolders();
@@ -107,67 +93,33 @@ public class RuntimeFilterRouter {
List<CoordinationProtos.DrillbitEndpoint> probeSideEndpoints = holder.getProbeSideScanEndpoints();
int probeSideScanMajorId = holder.getProbeSideScanMajorId();
int joinNodeMajorId = holder.getJoinMajorId();
+ int buildSideRfNumber = holder.getBuildSideRfNumber();
RuntimeFilterDef runtimeFilterDef = holder.getRuntimeFilterDef();
boolean sendToForeman = runtimeFilterDef.isSendToForeman();
if (sendToForeman) {
//send RuntimeFilter to Foreman
- joinMjId2probdeScanEps.put(joinNodeMajorId, probeSideEndpoints);
- joinMjId2scanSize.put(joinNodeMajorId, probeSideEndpoints.size());
+ joinMjId2probeScanEps.put(joinNodeMajorId, probeSideEndpoints);
joinMjId2ScanMjId.put(joinNodeMajorId, probeSideScanMajorId);
+ joinMjId2rfNumber.put(joinNodeMajorId, buildSideRfNumber);
}
}
+ runtimeFilterSink.setJoinMjId2probeScanEps(joinMjId2probeScanEps);
+ runtimeFilterSink.setJoinMjId2rfNumber(joinMjId2rfNumber);
+ runtimeFilterSink.setJoinMjId2ScanMjId(joinMjId2ScanMjId);
}
-
public void waitForComplete() {
sendingAccountor.waitForSendComplete();
+ runtimeFilterSink.close();
}
/**
* This method is passively invoked by receiving a runtime filter from the network
- * @param runtimeFilterWritable
+ *
+ * @param srcRuntimeFilterWritable
*/
- public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
- broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
- }
-
-
- private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
- BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
- int joinMajorId = runtimeFilterB.getMajorFragmentId();
- UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
- List<String> probeFields = runtimeFilterB.getProbeFieldsList();
- DrillBuf[] data = srcRuntimeFilterWritable.getData();
- List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
- int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
- for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
- BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
- for (String probeField : probeFields) {
- builder.addProbeFields(probeField);
- }
- BitData.RuntimeFilterBDef runtimeFilterBDef = builder
- .setQueryId(queryId)
- .setMajorFragmentId(scanNodeMjId)
- .setMinorFragmentId(minorId)
- .build();
- RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
- CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
- DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
- Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
- @Override
- public void accept(final RpcException e) {
- logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
- }
-
- @Override
- public void interrupt(final InterruptedException e) {
- logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
- }
- };
- RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
- AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
- accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
- }
+ public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+ runtimeFilterSink.add(srcRuntimeFilterWritable);
}
/**
@@ -183,18 +135,29 @@ public class RuntimeFilterRouter {
boolean isHashJoinOp = op instanceof HashJoinPOP;
if (isHashJoinOp) {
HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+ int hashJoinOpId = hashJoinPOP.getOperatorId();
RuntimeFilterDef runtimeFilterDef = hashJoinPOP.getRuntimeFilterDef();
- if (runtimeFilterDef != null) {
- if (holder == null) {
- holder = new RFHelperHolder();
+ if (runtimeFilterDef != null && runtimeFilterDef.isSendToForeman()) {
+ if (holder == null || holder.getJoinOpId() != hashJoinOpId) {
+ holder = new RFHelperHolder(hashJoinOpId);
holders.add(holder);
}
holder.setRuntimeFilterDef(runtimeFilterDef);
- GroupScan probeSideScanOp = runtimeFilterDef.getProbeSideGroupScan();
- Wrapper container = findPhysicalOpContainer(rootWrapper, hashJoinPOP);
+ long runtimeFilterIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
+ WrapperOperatorsVisitor operatorsVisitor = new WrapperOperatorsVisitor(hashJoinPOP);
+ Wrapper container = findTargetWrapper(rootWrapper, operatorsVisitor);
+ if (container == null) {
+ throw new IllegalStateException(String.format("No valid Wrapper found for HashJoinPOP with id=%d", hashJoinPOP.getOperatorId()));
+ }
+ int buildSideRFNumber = container.getAssignedEndpoints().size();
+ holder.setBuildSideRfNumber(buildSideRFNumber);
int majorFragmentId = container.getMajorFragmentId();
holder.setJoinMajorId(majorFragmentId);
- Wrapper probeSideScanContainer = findPhysicalOpContainer(rootWrapper, probeSideScanOp);
+ WrapperRuntimeFilterOperatorsVisitor runtimeFilterOperatorsVisitor = new WrapperRuntimeFilterOperatorsVisitor(runtimeFilterIdentifier);
+ Wrapper probeSideScanContainer = findTargetWrapper(container, runtimeFilterOperatorsVisitor);
+ if (probeSideScanContainer == null) {
+ throw new IllegalStateException(String.format("No valid Wrapper found for RuntimeFilterPOP with id=%d", op.getOperatorId()));
+ }
int probeSideScanMjId = probeSideScanContainer.getMajorFragmentId();
List<CoordinationProtos.DrillbitEndpoint> probeSideScanEps = probeSideScanContainer.getAssignedEndpoints();
holder.setProbeSideScanEndpoints(probeSideScanEps);
@@ -209,59 +172,63 @@ public class RuntimeFilterRouter {
}
}
- private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
+ private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) {
+ targetOpVisitor.setCurrentFragment(wrapper.getNode());
+ wrapper.getNode().getRoot().accept(targetOpVisitor, null);
+ boolean contain = targetOpVisitor.isContain();
+ if (contain) {
+ return wrapper;
+ }
+ List<Wrapper> dependencies = wrapper.getFragmentDependencies();
+ if (CollectionUtils.isEmpty(dependencies)) {
+ return null;
+ }
+ for (Wrapper dependencyWrapper : dependencies) {
+ Wrapper opContainer = findTargetWrapper(dependencyWrapper, targetOpVisitor);
+ if (opContainer != null) {
+ return opContainer;
+ }
+ }
+ return null;
+ }
- private Fragment fragment;
+ private abstract class TargetPhysicalOperatorVisitor<T, X, E extends Throwable> extends AbstractPhysicalVisitor<T, X, E> {
- private boolean contain = false;
+ protected Exchange sendingExchange;
- private boolean targetIsGroupScan;
+ public void setCurrentFragment(Fragment fragment) {
+ sendingExchange = fragment.getSendingExchange();
+ }
- private boolean targetIsHashJoin;
+ public abstract boolean isContain();
+ }
- private String targetGroupScanDigest;
+ private class WrapperOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
- private String targetHashJoinJson;
+ private boolean contain = false;
+ private PhysicalOperator targetOp;
- public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
- this.fragment = fragment;
- this.targetIsGroupScan = targetOp instanceof GroupScan;
- this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
- this.targetGroupScanDigest = targetIsGroupScan ? ((GroupScan) targetOp).getDigest() : null;
- this.targetHashJoinJson = targetIsHashJoin ? jsonOfPhysicalOp(targetOp) : null;
+ public WrapperOperatorsVisitor(PhysicalOperator targetOp) {
+ this.targetOp = targetOp;
}
@Override
public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
- List<Fragment.ExchangeFragmentPair> exchangeFragmentPairs = fragment.getReceivingExchangePairs();
- for (Fragment.ExchangeFragmentPair exchangeFragmentPair : exchangeFragmentPairs) {
- boolean same = exchange == exchangeFragmentPair.getExchange();
- if (same) {
- return null;
- }
+ if (exchange != sendingExchange) {
+ return null;
}
return exchange.getChild().accept(this, value);
}
@Override
public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
- boolean same = false;
- if (targetIsGroupScan && op instanceof GroupScan) {
- //Since GroupScan may be rewrite during the planing, here we use the digest to identify it.
- String currentDigest = ((GroupScan) op).getDigest();
- same = targetGroupScanDigest.equals(currentDigest);
- }
- if (targetIsHashJoin && op instanceof HashJoinPOP) {
- String currentOpJson = jsonOfPhysicalOp(op);
- same = targetHashJoinJson.equals(currentOpJson);
- }
- if (!same) {
+ if (op == targetOp) {
+ contain = true;
+ } else {
for (PhysicalOperator child : op) {
child.accept(this, value);
}
- } else {
- contain = true;
}
return null;
}
@@ -269,42 +236,57 @@ public class RuntimeFilterRouter {
public boolean isContain() {
return contain;
}
+ }
+
+ private class WrapperRuntimeFilterOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
+
+ private boolean contain = false;
+
+ private long identifier;
+
+
+ public WrapperRuntimeFilterOperatorsVisitor(long identifier) {
+ this.identifier = identifier;
+ }
- public String jsonOfPhysicalOp(PhysicalOperator operator) {
- try {
- ObjectMapper objectMapper = new ObjectMapper();
- StringWriter stringWriter = new StringWriter();
- objectMapper.writeValue(stringWriter, operator);
- return stringWriter.toString();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ @Override
+ public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
+ if (exchange != sendingExchange) {
+ return null;
}
+ return exchange.getChild().accept(this, value);
}
- }
- private boolean containsPhysicalOperator(Wrapper wrapper, PhysicalOperator op) {
- WrapperOperatorsVisitor wrapperOpsVistitor = new WrapperOperatorsVisitor(op, wrapper.getNode());
- wrapper.getNode().getRoot().accept(wrapperOpsVistitor, null);
- return wrapperOpsVistitor.isContain();
- }
+ @Override
+ public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
+ boolean same;
+ boolean isRuntimeFilterPop = op instanceof RuntimeFilterPOP;
+ boolean isHashJoinPop = op instanceof HashJoinPOP;
- private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) {
- boolean contain = containsPhysicalOperator(wrapper, op);
- if (contain) {
- return wrapper;
- }
- List<Wrapper> dependencies = wrapper.getFragmentDependencies();
- if (CollectionUtils.isEmpty(dependencies)) {
+ if (isHashJoinPop) {
+ HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+ PhysicalOperator leftPop = hashJoinPOP.getLeft();
+ leftPop.accept(this, value);
+ return null;
+ }
+
+ if (isRuntimeFilterPop) {
+ RuntimeFilterPOP runtimeFilterPOP = (RuntimeFilterPOP) op;
+ same = this.identifier == runtimeFilterPOP.getIdentifier();
+ if (same) {
+ contain = true;
+ return null;
+ }
+ }
+ for (PhysicalOperator child : op) {
+ child.accept(this, value);
+ }
return null;
}
- for (Wrapper dependencyWrapper : dependencies) {
- Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, op);
- if (opContainer != null) {
- return opContainer;
- }
+
+ public boolean isContain() {
+ return contain;
}
- //should not be here
- throw new IllegalStateException(String.format("No valid Wrapper found for physicalOperator with id=%d", op.getOperatorId()));
}
/**
@@ -320,6 +302,22 @@ public class RuntimeFilterRouter {
private RuntimeFilterDef runtimeFilterDef;
+ private int joinOpId;
+
+ private int buildSideRfNumber;
+
+ public RFHelperHolder(int joinOpId) {
+ this.joinOpId = joinOpId;
+ }
+
+ public int getJoinOpId() {
+ return joinOpId;
+ }
+
+ public void setJoinOpId(int joinOpId) {
+ this.joinOpId = joinOpId;
+ }
+
public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
return probeSideScanEndpoints;
}
@@ -352,5 +350,13 @@ public class RuntimeFilterRouter {
public void setRuntimeFilterDef(RuntimeFilterDef runtimeFilterDef) {
this.runtimeFilterDef = runtimeFilterDef;
}
+
+ public int getBuildSideRfNumber() {
+ return buildSideRfNumber;
+ }
+
+ public void setBuildSideRfNumber(int buildSideRfNumber) {
+ this.buildSideRfNumber = buildSideRfNumber;
+ }
}
-}
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 14686254f..f69a44ef7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -17,206 +17,250 @@
*/
package org.apache.drill.exec.work.filter;
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
/**
* This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
*/
-public class RuntimeFilterSink implements AutoCloseable {
-
- private AtomicInteger currentBookId = new AtomicInteger(0);
+public class RuntimeFilterSink implements Closeable
+{
- private int staleBookId = 0;
+ private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
- /**
- * RuntimeFilterWritable holding the aggregated version of all the received filter
- */
- private RuntimeFilterWritable aggregated = null;
+ private Map<Integer, Integer> joinMjId2rfNumber;
- private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+ //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+ private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
- /**
- * Flag used by Minor Fragment thread to indicate it has encountered error
- */
- private AtomicBoolean running = new AtomicBoolean(true);
+ //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+ private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
- /**
- * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this
- * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at
- * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to
- * indicate producer not to put any new elements in it.
- */
- private ReentrantLock queueLock = new ReentrantLock();
+ //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+ private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new HashMap<>();
+ //for debug usage
+ private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<>();
- private Condition notEmpty = queueLock.newCondition();
+ private DrillbitContext drillbitContext;
- private ReentrantLock aggregatedRFLock = new ReentrantLock();
+ private SendingAccountor sendingAccountor;
- private BufferAllocator bufferAllocator;
+ private AsyncAggregateWorker asyncAggregateWorker;
- private Future future;
+ private AtomicBoolean running = new AtomicBoolean(true);
private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
- public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) {
- this.bufferAllocator = bufferAllocator;
- AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
- future = executorService.submit(asyncAggregateWorker);
+ public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor)
+ {
+ this.drillbitContext = drillbitContext;
+ this.sendingAccountor = sendingAccountor;
+ asyncAggregateWorker = new AsyncAggregateWorker();
+ drillbitContext.getExecutor().submit(asyncAggregateWorker);
}
- public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
- if (running.get()) {
- try {
- aggregatedRFLock.lock();
- if (containOne()) {
- boolean same = aggregated.equals(runtimeFilterWritable);
- if (!same) {
- // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
- // share the same FragmentContext.
- aggregated.close();
- currentBookId.set(0);
- staleBookId = 0;
- clearQueued(false);
- }
- }
- } finally {
- aggregatedRFLock.unlock();
- }
+ public void add(RuntimeFilterWritable runtimeFilterWritable)
+ {
+ if (!running.get()) {
+ runtimeFilterWritable.close();
+ return;
+ }
+ runtimeFilterWritable.retainBuffers(1);
+ int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
+ if (joinMjId2Stopwatch.get(joinMjId) == null) {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ joinMjId2Stopwatch.put(joinMjId, stopwatch);
+ }
+ synchronized (rfQueue) {
+ rfQueue.add(runtimeFilterWritable);
+ rfQueue.notify();
+ }
+ }
+ public void close() {
+ running.set(false);
+ if (asyncAggregateWorker != null) {
+ synchronized (rfQueue) {
+ rfQueue.notify();
+ }
+ }
+ while (!asyncAggregateWorker.over.get()) {
try {
- queueLock.lock();
- if (rfQueue != null) {
- rfQueue.add(runtimeFilterWritable);
- notEmpty.signal();
- } else {
- runtimeFilterWritable.close();
- }
- } finally {
- queueLock.unlock();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", e);
}
- } else {
+ }
+ for (RuntimeFilterWritable runtimeFilterWritable : joinMjId2AggregatedRF.values()) {
runtimeFilterWritable.close();
}
}
- public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
- try {
- aggregatedRFLock.lock();
- return aggregated.duplicate(bufferAllocator);
- } finally {
- aggregatedRFLock.unlock();
+ private void aggregate(RuntimeFilterWritable srcRuntimeFilterWritable)
+ {
+ BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+ int joinMajorId = runtimeFilterB.getMajorFragmentId();
+ int buildSideRfNumber;
+ RuntimeFilterWritable toAggregated = null;
+ buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+ buildSideRfNumber--;
+ joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+ toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+ if (toAggregated == null) {
+ toAggregated = srcRuntimeFilterWritable;
+ toAggregated.retainBuffers(1);
+ } else {
+ toAggregated.aggregate(srcRuntimeFilterWritable);
}
- }
-
- /**
- * whether there's a fresh aggregated RuntimeFilter
- *
- * @return
- */
- public boolean hasFreshOne() {
- if (currentBookId.get() > staleBookId) {
- staleBookId = currentBookId.get();
- return true;
+ joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+ if (buildSideRfNumber == 0) {
+ joinMjId2AggregatedRF.remove(joinMajorId);
+ route(toAggregated);
+ joinMjId2rfNumber.remove(joinMajorId);
+ Stopwatch stopwatch = joinMjId2Stopwatch.get(joinMajorId);
+ logger.info(
+ "received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms",
+ joinMajorId,
+ stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ );
}
- return false;
- }
-
- /**
- * whether there's a usable RuntimeFilter.
- *
- * @return
- */
- public boolean containOne() {
- return aggregated != null;
}
- private void doCleanup() {
- running.compareAndSet(true, false);
- try {
- aggregatedRFLock.lock();
- if (containOne()) {
- aggregated.close();
- aggregated = null;
+ private void route(RuntimeFilterWritable srcRuntimeFilterWritable)
+ {
+ BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+ int joinMajorId = runtimeFilterB.getMajorFragmentId();
+ UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+ List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+ List<Integer> sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
+ long rfIdentifier = runtimeFilterB.getRfIdentifier();
+ DrillBuf[] data = srcRuntimeFilterWritable.getData();
+ List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probeScanEps.get(joinMajorId);
+ int scanNodeSize = scanNodeEps.size();
+ srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1);
+ int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
+ for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
+ BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
+ for (String probeField : probeFields) {
+ builder.addProbeFields(probeField);
}
- } finally {
- aggregatedRFLock.unlock();
+ BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId)
+ .setMajorFragmentId(scanNodeMjId)
+ .setMinorFragmentId(minorId)
+ .setToForeman(false)
+ .setRfIdentifier(rfIdentifier)
+ .addAllBloomFilterSizeInBytes(sizeInBytes)
+ .build();
+ RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
+ CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
+
+ DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
+ Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>()
+ {
+ @Override
+ public void accept(final RpcException e)
+ {
+ logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+ }
+
+ @Override
+ public void interrupt(final InterruptedException e)
+ {
+ logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+ }
+ };
+ RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
+ AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
+ accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
}
}
- @Override
- public void close() throws Exception {
- future.cancel(true);
- doCleanup();
+ public void setJoinMjId2rfNumber(Map<Integer, Integer> joinMjId2rfNumber)
+ {
+ this.joinMjId2rfNumber = joinMjId2rfNumber;
}
- private void clearQueued(boolean setToNull) {
- RuntimeFilterWritable toClear;
- try {
- queueLock.lock();
- while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
- toClear.close();
- }
- rfQueue = (setToNull) ? null : rfQueue;
- } finally {
- queueLock.unlock();
- }
+ public void setJoinMjId2probeScanEps(Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps)
+ {
+ this.joinMjId2probeScanEps = joinMjId2probeScanEps;
}
- private class AsyncAggregateWorker implements Runnable {
+ public void setJoinMjId2ScanMjId(Map<Integer, Integer> joinMjId2ScanMjId)
+ {
+ this.joinMjId2ScanMjId = joinMjId2ScanMjId;
+ }
+
+ private class AsyncAggregateWorker implements Runnable
+ {
+ private AtomicBoolean over = new AtomicBoolean(false);
@Override
- public void run() {
- try {
+ public void run()
+ {
+ while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) {
RuntimeFilterWritable toAggregate = null;
- while (running.get()) {
+ synchronized (rfQueue) {
try {
- queueLock.lock();
- toAggregate = (rfQueue != null) ? rfQueue.poll() : null;
- if (toAggregate == null) {
- notEmpty.await();
- continue;
+ toAggregate = rfQueue.poll();
+ while (toAggregate == null && running.get()) {
+ rfQueue.wait();
+ toAggregate = rfQueue.poll();
}
- } finally {
- queueLock.unlock();
+ } catch (InterruptedException ex) {
+ logger.error("RFW_Aggregator thread being interrupted", ex);
+ continue;
}
-
- try {
- aggregatedRFLock.lock();
- if (containOne()) {
- aggregated.aggregate(toAggregate);
-
- // Release the byteBuf referenced by toAggregate since aggregate will not do it
- toAggregate.close();
- } else {
- aggregated = toAggregate;
- }
- } finally {
- aggregatedRFLock.unlock();
+ }
+ if (toAggregate == null) {
+ continue;
+ }
+ // perform aggregate outside the sync block.
+ try {
+ aggregate(toAggregate);
+ } catch (Exception ex) {
+ logger.error("Failed to aggregate or route the RFW", ex);
+ throw new DrillRuntimeException(ex);
+ } finally {
+ if (toAggregate != null) {
+ toAggregate.close();
}
- currentBookId.incrementAndGet();
}
- } catch (InterruptedException e) {
- logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName());
- Thread.currentThread().interrupt();
- } finally {
- doCleanup();
- clearQueued(true);
}
+
+ if (!running.get()) {
+ RuntimeFilterWritable toClose;
+ while ((toClose = rfQueue.poll()) != null) {
+ toClose.close();
+ }
+ }
+ over.set(true);
}
}
}
-
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 9a971e94c..f8c2701b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -103,6 +103,27 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
}
+ public void retainBuffers(final int increment) {
+ if (increment <= 0) {
+ return;
+ }
+ for (final DrillBuf buf : data) {
+ buf.retain(increment);
+ }
+ }
+ //TODO: Not used currently because of DRILL-6826
+ public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) {
+ int bufNum = data.length;
+ DrillBuf [] newBufs = new DrillBuf[bufNum];
+ int i = 0;
+ for (DrillBuf buf : data) {
+ DrillBuf transferredBuffer = buf.transferOwnership(bufferAllocator).buffer;
+ newBufs[i] = transferredBuffer;
+ i++;
+ }
+ return new RuntimeFilterWritable(this.runtimeFilterBDef, newBufs);
+ }
+
public String toString() {
return identifier;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 42b76f278..a379db175 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -151,7 +151,7 @@ public class Foreman implements Runnable {
this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
this.profileOption = setProfileOption(queryContext.getOptions());
- this.enableRuntimeFilter = drillbitContext.getOptionManager().getBoolean(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY);
+ this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8aa3233a9..a2d3cdc7a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -469,6 +469,8 @@ drill.exec.options: {
exec.hashjoin.enable.runtime_filter: false,
exec.hashjoin.bloom_filter.fpp: 0.75,
exec.hashjoin.bloom_filter.max.size: 33554432, #32 MB
+ exec.hashjoin.runtime_filter.waiting.enable: true,
+ exec.hashjoin.runtime_filter.max.waiting.time: 300, #400 ms
exec.hashagg.mem_limit: 0,
exec.hashagg.min_batches_per_partition: 2,
exec.hashagg.num_partitions: 32,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
index 5eae12e6e..a5fc5ba49 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
@@ -40,9 +40,9 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
public void testBroadcastHashJoin1Cond() {
List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01);
- BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft");
+ BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt");
bloomFilterDefs.add(bloomFilterDef);
- RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false );
+ RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
HashJoinPOP joinConf = new HashJoinPOP(null, null,
Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER, runtimeFilterDef);
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
@@ -71,11 +71,11 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
public void testBroadcastHashJoin2Cond() {
List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01);
- BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft");
- BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a");
+ BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt");
+ BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a", "b");
bloomFilterDefs.add(bloomFilterDef);
bloomFilterDefs.add(bloomFilterDef1);
- RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false );
+ RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
HashJoinPOP joinConf = new HashJoinPOP(null, null,
Lists.newArrayList(joinCond("lft", "EQUALS", "rgt"), joinCond("a", "EQUALS", "b")), JoinRelType.INNER, runtimeFilterDef);
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
index 2370ffa06..ac174d1ce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
@@ -34,38 +34,6 @@ public class TestHashJoinJPPDPlan extends JoinTestBase {
}
@Test
- public void testInnnerHashJoin() throws Exception {
- String sql = "SELECT nations.N_NAME, count(*)"
- + "FROM\n"
- + " dfs.`sample-data/nation.parquet` nations\n"
- + "JOIN\n"
- + " dfs.`sample-data/region.parquet` regions\n"
- + " on nations.N_REGIONKEY = regions.R_REGIONKEY "
- + "WHERE nations.N_NAME = 'A' "
- + "group by nations.N_NAME";
- String expectedColNames1 = "\"runtimeFilterDef\"";
- String expectedColNames2 = "\"bloomFilterDefs\"";
- String expectedColNames3 = "\"runtime-filter\"";
- testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
- }
-
- @Test
- public void testRightHashJoin() throws Exception {
- String sql = "SELECT nations.N_NAME, count(*)"
- + "FROM\n"
- + " dfs.`sample-data/nation.parquet` nations\n"
- + "RIGHT JOIN\n"
- + " dfs.`sample-data/region.parquet` regions\n"
- + " on nations.N_REGIONKEY = regions.R_REGIONKEY "
- + "WHERE nations.N_NAME = 'A' "
- + "group by nations.N_NAME";
- String expectedColNames1 = "\"runtimeFilterDef\"";
- String expectedColNames2 = "\"bloomFilterDefs\"";
- String expectedColNames3 = "\"runtime-filter\"";
- testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
- }
-
- @Test
public void testLeftHashJoin() throws Exception {
String sql = "SELECT nations.N_NAME, count(*)"
+ "FROM\n"
@@ -95,24 +63,4 @@ public class TestHashJoinJPPDPlan extends JoinTestBase {
String excludedColNames3 = "\"runtime-filter\"";
testPlanWithAttributesMatchingPatterns(sql, null, new String[]{excludedColNames1, excludedColNames2, excludedColNames3});
}
-
- @Test
- public void testInnnerHashJoinWithRightDeepTree() throws Exception {
- String sql = "SELECT nations.N_NAME, count(*)"
- + "FROM\n"
- + " cp.`tpch/nation.parquet` nations\n"
- + "JOIN\n"
- + " cp.`tpch/region.parquet` regions\n"
- + " on nations.N_REGIONKEY = regions.R_REGIONKEY "
- + "JOIN cp.`tpch/customer.parquet` customers\n"
- + " on nations.N_NATIONKEY = customers.C_NATIONKEY "
- + "WHERE nations.N_NAME = 'A' "
- + "group by nations.N_NAME";
- String expectedColNames1 = "\"runtimeFilterDef\"";
- String expectedColNames2 = "\"bloomFilterDefs\"";
- String expectedColNames3 = "\"runtime-filter\"";
- testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
- }
-
-
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index c05cdfdf8..c1d157600 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -135,7 +135,6 @@ public class BloomFilterTest {
@Test
public void testNotExist() throws Exception {
-
Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), ClassPathScanner.fromPrescan(c));
bit.run();
DrillbitContext bitContext = bit.getContext();
@@ -192,6 +191,12 @@ public class BloomFilterTest {
long hashCode = probeHash64.hash64Code(0, 0, 0);
boolean contain = bloomFilter.find(hashCode);
Assert.assertFalse(contain);
+ bloomFilter.getContent().close();
+ vectorContainer.clear();
+ probeVectorContainer.clear();
+ context.close();
+ bitContext.close();
+ bit.close();
}
@@ -254,6 +259,12 @@ public class BloomFilterTest {
long hashCode = probeHash64.hash64Code(0, 0, 0);
boolean contain = bloomFilter.find(hashCode);
Assert.assertTrue(contain);
+ bloomFilter.getContent().close();
+ vectorContainer.clear();
+ probeVectorContainer.clear();
+ context.close();
+ bitContext.close();
+ bit.close();
}
@@ -324,5 +335,11 @@ public class BloomFilterTest {
long hashCode = probeHash64.hash64Code(0, 0, 0);
boolean contain = bloomFilter.find(hashCode);
Assert.assertTrue(contain);
+ bloomFilter.getContent().close();
+ vectorContainer.clear();
+ probeVectorContainer.clear();
+ context.close();
+ bitContext.close();
+ bit.close();
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index a1e7d0d2a..b41798ded 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.test;
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -26,10 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Function;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import io.netty.buffer.DrillBuf;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
@@ -76,12 +71,7 @@ import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.hadoop.security.UserGroupInformation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
/**
* Test fixture for operator and (especially) "sub-operator" tests.
@@ -182,7 +172,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
private ExecutorState executorState = new OperatorFixture.MockExecutorState();
private ExecutionControls controls;
- private RuntimeFilterSink runtimeFilterSink;
public MockFragmentContext(final DrillConfig config,
final OptionManager options,
@@ -198,7 +187,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
this.controls = new ExecutionControls(options);
compiler = new CodeCompiler(config, options);
bufferManager = new BufferManagerImpl(allocator);
- this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool());
}
private static FunctionImplementationRegistry newFunctionRegistry(
@@ -319,13 +307,18 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
}
@Override
- public RuntimeFilterSink getRuntimeFilterSink() {
- return runtimeFilterSink;
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
}
@Override
- public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
- runtimeFilterSink.aggregate(runtimeFilter);
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+ return null;
+ }
+
+ @Override
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit)
+ {
+ return null;
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 84a7c785b..b0820e926 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -74,6 +73,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
public class PhysicalOpUnitTestBase extends ExecTest {
protected MockExecutorFragmentContext fragContext;
@@ -198,12 +198,10 @@ public class PhysicalOpUnitTestBase extends ExecTest {
* </p>
*/
protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
- private RuntimeFilterSink runtimeFilterSink;
public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
- this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool());
}
@Override
@@ -301,12 +299,17 @@ public class PhysicalOpUnitTestBase extends ExecTest {
@Override
public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
- this.runtimeFilterSink.aggregate(runtimeFilter);
}
@Override
- public RuntimeFilterSink getRuntimeFilterSink() {
- return runtimeFilterSink;
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+ return null;
+ }
+
+ @Override
+ public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit)
+ {
+ return null;
}
}
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index d7921fc0f..e43380db8 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -2536,6 +2536,24 @@ public final class BitData {
* </pre>
*/
int getHjOpId();
+
+ // optional int64 rf_identifier = 8;
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ boolean hasRfIdentifier();
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ long getRfIdentifier();
}
/**
* Protobuf type {@code exec.bit.data.RuntimeFilterBDef}
@@ -2650,6 +2668,11 @@ public final class BitData {
hjOpId_ = input.readInt32();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000020;
+ rfIdentifier_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2867,6 +2890,30 @@ public final class BitData {
return hjOpId_;
}
+ // optional int64 rf_identifier = 8;
+ public static final int RF_IDENTIFIER_FIELD_NUMBER = 8;
+ private long rfIdentifier_;
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ public boolean hasRfIdentifier() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ public long getRfIdentifier() {
+ return rfIdentifier_;
+ }
+
private void initFields() {
queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
majorFragmentId_ = 0;
@@ -2875,6 +2922,7 @@ public final class BitData {
bloomFilterSizeInBytes_ = java.util.Collections.emptyList();
probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
hjOpId_ = 0;
+ rfIdentifier_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -2909,6 +2957,9 @@ public final class BitData {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeInt32(7, hjOpId_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeInt64(8, rfIdentifier_);
+ }
getUnknownFields().writeTo(output);
}
@@ -2956,6 +3007,10 @@ public final class BitData {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(7, hjOpId_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(8, rfIdentifier_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3091,6 +3146,8 @@ public final class BitData {
bitField0_ = (bitField0_ & ~0x00000020);
hjOpId_ = 0;
bitField0_ = (bitField0_ & ~0x00000040);
+ rfIdentifier_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@@ -3154,6 +3211,10 @@ public final class BitData {
to_bitField0_ |= 0x00000010;
}
result.hjOpId_ = hjOpId_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.rfIdentifier_ = rfIdentifier_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3205,6 +3266,9 @@ public final class BitData {
if (other.hasHjOpId()) {
setHjOpId(other.getHjOpId());
}
+ if (other.hasRfIdentifier()) {
+ setRfIdentifier(other.getRfIdentifier());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3708,6 +3772,55 @@ public final class BitData {
return this;
}
+ // optional int64 rf_identifier = 8;
+ private long rfIdentifier_ ;
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ public boolean hasRfIdentifier() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ public long getRfIdentifier() {
+ return rfIdentifier_;
+ }
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ public Builder setRfIdentifier(long value) {
+ bitField0_ |= 0x00000080;
+ rfIdentifier_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 rf_identifier = 8;</code>
+ *
+ * <pre>
+ * the runtime filter identifier
+ * </pre>
+ */
+ public Builder clearRfIdentifier() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ rfIdentifier_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef)
}
@@ -3761,16 +3874,17 @@ public final class BitData {
" \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" +
"!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" +
"f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" +
- "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" +
+ "isLastBatch\030\007 \001(\010\"\350\001\n\021RuntimeFilterBDef\022" +
"&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
"\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" +
"ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" +
"om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" +
- "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" +
- "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n",
- "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" +
- "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" +
- "l.exec.protoB\007BitDataH\001"
+ "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" +
+ "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007",
+ "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" +
+ "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" +
+ "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" +
+ "DataH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3800,7 +3914,7 @@ public final class BitData {
internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_data_RuntimeFilterBDef_descriptor,
- new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", });
+ new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", "RfIdentifier", });
return null;
}
};
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 3c88ffced..ecf0f187f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -443,6 +443,8 @@ public final class SchemaBitData
output.writeString(6, probeFields, true);
if(message.hasHjOpId())
output.writeInt32(7, message.getHjOpId(), false);
+ if(message.hasRfIdentifier())
+ output.writeInt64(8, message.getRfIdentifier(), false);
}
public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message)
{
@@ -504,6 +506,9 @@ public final class SchemaBitData
case 7:
builder.setHjOpId(input.readInt32());
break;
+ case 8:
+ builder.setRfIdentifier(input.readInt64());
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -551,6 +556,7 @@ public final class SchemaBitData
case 5: return "bloomFilterSizeInBytes";
case 6: return "probeFields";
case 7: return "hjOpId";
+ case 8: return "rfIdentifier";
default: return null;
}
}
@@ -569,6 +575,7 @@ public final class SchemaBitData
fieldMap.put("bloomFilterSizeInBytes", 5);
fieldMap.put("probeFields", 6);
fieldMap.put("hjOpId", 7);
+ fieldMap.put("rfIdentifier", 8);
}
}
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
index 2d1c2a702..3b2c1027e 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
@@ -56,6 +56,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
private List<Integer> bloomFilterSizeInBytes;
private List<String> probeFields;
private int hjOpId;
+ private long rfIdentifier;
public RuntimeFilterBDef()
{
@@ -155,6 +156,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
return this;
}
+ // rfIdentifier
+
+ public long getRfIdentifier()
+ {
+ return rfIdentifier;
+ }
+
+ public RuntimeFilterBDef setRfIdentifier(long rfIdentifier)
+ {
+ this.rfIdentifier = rfIdentifier;
+ return this;
+ }
+
// java serialization
public void readExternal(ObjectInput in) throws IOException
@@ -235,6 +249,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
case 7:
message.hjOpId = input.readInt32();
break;
+ case 8:
+ message.rfIdentifier = input.readInt64();
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -277,6 +294,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
if(message.hjOpId != 0)
output.writeInt32(7, message.hjOpId, false);
+
+ if(message.rfIdentifier != 0)
+ output.writeInt64(8, message.rfIdentifier, false);
}
public String getFieldName(int number)
@@ -290,6 +310,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
case 5: return "bloomFilterSizeInBytes";
case 6: return "probeFields";
case 7: return "hjOpId";
+ case 8: return "rfIdentifier";
default: return null;
}
}
@@ -310,6 +331,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
__fieldMap.put("bloomFilterSizeInBytes", 5);
__fieldMap.put("probeFields", 6);
__fieldMap.put("hjOpId", 7);
+ __fieldMap.put("rfIdentifier", 8);
}
}
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 15c72308e..ae9c4c709 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -48,4 +48,5 @@ message RuntimeFilterBDef{
repeated int32 bloom_filter_size_in_bytes = 5;
repeated string probe_fields = 6; // probe fields with corresponding BloomFilters
optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter
+ optional int64 rf_identifier = 8; // the runtime filter identifier
}