From 9667e92e1e87ce1826f0eac3f2396187dbfa8aaa Mon Sep 17 00:00:00 2001 From: "weijie.tong" Date: Sun, 14 Oct 2018 19:41:51 +0800 Subject: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree closes #1504 --- .../java/org/apache/drill/exec/proto/BitData.java | 128 +++++++++++++++++++-- .../org/apache/drill/exec/proto/SchemaBitData.java | 7 ++ .../drill/exec/proto/beans/RuntimeFilterBDef.java | 22 ++++ protocol/src/main/protobuf/BitData.proto | 1 + 4 files changed, 151 insertions(+), 7 deletions(-) (limited to 'protocol/src') diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java index d7921fc0f..e43380db8 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java @@ -2536,6 +2536,24 @@ public final class BitData { * */ int getHjOpId(); + + // optional int64 rf_identifier = 8; + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + boolean hasRfIdentifier(); + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + long getRfIdentifier(); } /** * Protobuf type {@code exec.bit.data.RuntimeFilterBDef} @@ -2650,6 +2668,11 @@ public final class BitData { hjOpId_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000020; + rfIdentifier_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -2867,6 +2890,30 @@ public final class BitData { return hjOpId_; } + // optional int64 rf_identifier = 8; + public static final int RF_IDENTIFIER_FIELD_NUMBER = 8; + private long rfIdentifier_; + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + public boolean hasRfIdentifier() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int64 rf_identifier = 8; + * + *
+     * the runtime filter identifier
+     * 
+ */ + public long getRfIdentifier() { + return rfIdentifier_; + } + private void initFields() { queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance(); majorFragmentId_ = 0; @@ -2875,6 +2922,7 @@ public final class BitData { bloomFilterSizeInBytes_ = java.util.Collections.emptyList(); probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY; hjOpId_ = 0; + rfIdentifier_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2909,6 +2957,9 @@ public final class BitData { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeInt32(7, hjOpId_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt64(8, rfIdentifier_); + } getUnknownFields().writeTo(output); } @@ -2956,6 +3007,10 @@ public final class BitData { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, hjOpId_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(8, rfIdentifier_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3091,6 +3146,8 @@ public final class BitData { bitField0_ = (bitField0_ & ~0x00000020); hjOpId_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + rfIdentifier_ = 0L; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -3154,6 +3211,10 @@ public final class BitData { to_bitField0_ |= 0x00000010; } result.hjOpId_ = hjOpId_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000020; + } + result.rfIdentifier_ = rfIdentifier_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3205,6 +3266,9 @@ public final class BitData { if (other.hasHjOpId()) { setHjOpId(other.getHjOpId()); } + if (other.hasRfIdentifier()) { + setRfIdentifier(other.getRfIdentifier()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3708,6 +3772,55 @@ public final class BitData { return this; } + // optional int64 rf_identifier = 8; + private long rfIdentifier_ ; + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public boolean hasRfIdentifier() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public long getRfIdentifier() { + return rfIdentifier_; + } + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public Builder setRfIdentifier(long value) { + bitField0_ |= 0x00000080; + rfIdentifier_ = value; + onChanged(); + return this; + } + /** + * optional int64 rf_identifier = 8; + * + *
+       * the runtime filter identifier
+       * 
+ */ + public Builder clearRfIdentifier() { + bitField0_ = (bitField0_ & ~0x00000080); + rfIdentifier_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef) } @@ -3761,16 +3874,17 @@ public final class BitData { " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" + "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" + "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" + - "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" + + "isLastBatch\030\007 \001(\010\"\350\001\n\021RuntimeFilterBDef\022" + "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" + "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" + "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" + "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" + - "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" + - "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n", - "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" + - "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" + - "l.exec.protoB\007BitDataH\001" + "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" + + "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007", + "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" + + "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" + + "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" + + "DataH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3800,7 +3914,7 @@ public final class BitData { internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_exec_bit_data_RuntimeFilterBDef_descriptor, - new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", }); + new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", "RfIdentifier", }); return null; } }; diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java index 3c88ffced..ecf0f187f 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java @@ -443,6 +443,8 @@ public final class SchemaBitData output.writeString(6, probeFields, true); if(message.hasHjOpId()) output.writeInt32(7, message.getHjOpId(), false); + if(message.hasRfIdentifier()) + output.writeInt64(8, message.getRfIdentifier(), false); } public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message) { @@ -504,6 +506,9 @@ public final class SchemaBitData case 7: builder.setHjOpId(input.readInt32()); break; + case 8: + builder.setRfIdentifier(input.readInt64()); + break; default: input.handleUnknownField(number, this); } @@ -551,6 +556,7 @@ public final class SchemaBitData case 5: return "bloomFilterSizeInBytes"; case 6: return "probeFields"; case 7: return "hjOpId"; + case 8: return "rfIdentifier"; default: return null; } } @@ -569,6 +575,7 @@ public final class SchemaBitData fieldMap.put("bloomFilterSizeInBytes", 5); fieldMap.put("probeFields", 6); fieldMap.put("hjOpId", 7); + fieldMap.put("rfIdentifier", 8); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java index 2d1c2a702..3b2c1027e 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java @@ -56,6 +56,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message bloomFilterSizeInBytes; private List probeFields; private int hjOpId; + private long rfIdentifier; public RuntimeFilterBDef() { @@ -155,6 +156,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message