diff options
author | weijie.tong <weijie.tong@alipay.com> | 2018-10-14 19:41:51 +0800 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-11-29 18:33:23 +0200 |
commit | 9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch) | |
tree | cb68cd4bbedf6f84e00168cb0ab300c6dacdb35c /protocol/src | |
parent | 325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff) |
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
Diffstat (limited to 'protocol/src')
4 files changed, 151 insertions, 7 deletions
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java index d7921fc0f..e43380db8 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java @@ -2536,6 +2536,24 @@ public final class BitData { * </pre> */ int getHjOpId(); + + // optional int64 rf_identifier = 8; + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + boolean hasRfIdentifier(); + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + long getRfIdentifier(); } /** * Protobuf type {@code exec.bit.data.RuntimeFilterBDef} @@ -2650,6 +2668,11 @@ public final class BitData { hjOpId_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000020; + rfIdentifier_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -2867,6 +2890,30 @@ public final class BitData { return hjOpId_; } + // optional int64 rf_identifier = 8; + public static final int RF_IDENTIFIER_FIELD_NUMBER = 8; + private long rfIdentifier_; + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + public boolean hasRfIdentifier() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + public long getRfIdentifier() { + return rfIdentifier_; + } + private void initFields() { queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance(); majorFragmentId_ = 0; @@ -2875,6 +2922,7 @@ public final class BitData { bloomFilterSizeInBytes_ = java.util.Collections.emptyList(); probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY; hjOpId_ = 0; + rfIdentifier_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2909,6 +2957,9 @@ public final class BitData { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeInt32(7, hjOpId_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt64(8, rfIdentifier_); + } getUnknownFields().writeTo(output); } @@ -2956,6 +3007,10 @@ public final class BitData { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, hjOpId_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(8, rfIdentifier_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3091,6 +3146,8 @@ public final class BitData { bitField0_ = (bitField0_ & ~0x00000020); hjOpId_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + rfIdentifier_ = 0L; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -3154,6 +3211,10 @@ public final class BitData { to_bitField0_ |= 0x00000010; } result.hjOpId_ = hjOpId_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000020; + } + result.rfIdentifier_ = rfIdentifier_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3205,6 +3266,9 @@ public final class BitData { if (other.hasHjOpId()) { setHjOpId(other.getHjOpId()); } + if (other.hasRfIdentifier()) { + setRfIdentifier(other.getRfIdentifier()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3708,6 +3772,55 @@ public final class BitData { return this; } + // optional int64 rf_identifier = 8; + private long rfIdentifier_ ; + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + public boolean hasRfIdentifier() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + public long getRfIdentifier() { + return rfIdentifier_; + } + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + public Builder setRfIdentifier(long value) { + bitField0_ |= 0x00000080; + rfIdentifier_ = value; + onChanged(); + return this; + } + /** + * <code>optional int64 rf_identifier = 8;</code> + * + * <pre> + * the runtime filter identifier + * </pre> + */ + public Builder clearRfIdentifier() { + bitField0_ = (bitField0_ & ~0x00000080); + rfIdentifier_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef) } @@ -3761,16 +3874,17 @@ public final class BitData { " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" + "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" + "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" + - "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" + + "isLastBatch\030\007 \001(\010\"\350\001\n\021RuntimeFilterBDef\022" + "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" + "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" + "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" + "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" + - "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" + - "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n", - "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" + - "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" + - "l.exec.protoB\007BitDataH\001" + "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" + + "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007", + "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" + + "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" + + "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" + + "DataH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3800,7 +3914,7 @@ public final class BitData { internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_exec_bit_data_RuntimeFilterBDef_descriptor, - new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", }); + new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", "RfIdentifier", }); return null; } }; diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java index 3c88ffced..ecf0f187f 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java @@ -443,6 +443,8 @@ public final class SchemaBitData output.writeString(6, probeFields, true); if(message.hasHjOpId()) output.writeInt32(7, message.getHjOpId(), false); + if(message.hasRfIdentifier()) + output.writeInt64(8, message.getRfIdentifier(), false); } public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message) { @@ -504,6 +506,9 @@ public final class SchemaBitData case 7: builder.setHjOpId(input.readInt32()); break; + case 8: + builder.setRfIdentifier(input.readInt64()); + break; default: input.handleUnknownField(number, this); } @@ -551,6 +556,7 @@ public final class SchemaBitData case 5: return "bloomFilterSizeInBytes"; case 6: return "probeFields"; case 7: return "hjOpId"; + case 8: return "rfIdentifier"; default: return null; } } @@ -569,6 +575,7 @@ public final class SchemaBitData fieldMap.put("bloomFilterSizeInBytes", 5); fieldMap.put("probeFields", 6); fieldMap.put("hjOpId", 7); + fieldMap.put("rfIdentifier", 8); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java index 2d1c2a702..3b2c1027e 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java @@ -56,6 +56,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF private List<Integer> bloomFilterSizeInBytes; private List<String> probeFields; private int hjOpId; + private long rfIdentifier; public RuntimeFilterBDef() { @@ -155,6 +156,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF return this; } + // rfIdentifier + + public long getRfIdentifier() + { + return rfIdentifier; + } + + public RuntimeFilterBDef setRfIdentifier(long rfIdentifier) + { + this.rfIdentifier = rfIdentifier; + return this; + } + // java serialization public void readExternal(ObjectInput in) throws IOException @@ -235,6 +249,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF case 7: message.hjOpId = input.readInt32(); break; + case 8: + message.rfIdentifier = input.readInt64(); + break; default: input.handleUnknownField(number, this); } @@ -277,6 +294,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF if(message.hjOpId != 0) output.writeInt32(7, message.hjOpId, false); + + if(message.rfIdentifier != 0) + output.writeInt64(8, message.rfIdentifier, false); } public String getFieldName(int number) @@ -290,6 +310,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF case 5: return "bloomFilterSizeInBytes"; case 6: return "probeFields"; case 7: return "hjOpId"; + case 8: return "rfIdentifier"; default: return null; } } @@ -310,6 +331,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF __fieldMap.put("bloomFilterSizeInBytes", 5); __fieldMap.put("probeFields", 6); __fieldMap.put("hjOpId", 7); + __fieldMap.put("rfIdentifier", 8); } } diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto index 15c72308e..ae9c4c709 100644 --- a/protocol/src/main/protobuf/BitData.proto +++ b/protocol/src/main/protobuf/BitData.proto @@ -48,4 +48,5 @@ message RuntimeFilterBDef{ repeated int32 bloom_filter_size_in_bytes = 5; repeated string probe_fields = 6; // probe fields with corresponding BloomFilters optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter + optional int64 rf_identifier = 8; // the runtime filter identifier } |