diff options
author | Salim Achouche <sachouche2@gmail.com> | 2018-04-17 20:12:03 -0700 |
---|---|---|
committer | Ben-Zvi <bben-zvi@mapr.com> | 2018-05-21 15:06:50 -0700 |
commit | 399fc99827c5eff413cb1aa8489af09cc1266ff3 (patch) | |
tree | 86710bfd00572b2774f531793351114ee42f72bf /exec/vector | |
parent | 82e1a1229203efc3f8899c620a7efc60dff6d388 (diff) |
DRILL-5846: Improve parquet performance for Flat Data Types
closes #1060
Diffstat (limited to 'exec/vector')
6 files changed, 636 insertions, 38 deletions
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java index eb2c93d5f..eb413b045 100644 --- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java @@ -27,6 +27,10 @@ package org.apache.drill.exec.vector; <#include "/@includes/vv_imports.ftl" /> +<#if minor.class == "Int" || minor.class == "UInt4" || minor.class == "UInt1"> +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +</#if> import org.apache.drill.exec.util.DecimalUtility; @@ -385,12 +389,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F return false; } <#if (type.width > 8)> - + public ${minor.javaType!type.javaType} get(int index) { return data.slice(index * VALUE_WIDTH, VALUE_WIDTH); } <#if (minor.class == "Interval")> - + public void get(int index, ${minor.class}Holder holder) { final int offsetIndex = index * VALUE_WIDTH; holder.months = data.getInt(offsetIndex); @@ -419,11 +423,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F final int offsetIndex = index * VALUE_WIDTH; final int months = data.getInt(offsetIndex); final int days = data.getInt(offsetIndex + ${minor.daysOffset}); - final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); + final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); return DateUtilities.intervalStringBuilder(months, days, millis); } <#elseif (minor.class == "IntervalDay")> - + public void get(int index, ${minor.class}Holder holder) { final int offsetIndex = index * VALUE_WIDTH; holder.days = data.getInt(offsetIndex); @@ -448,7 +452,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public StringBuilder getAsStringBuilder(int index) { final int offsetIndex = index * VALUE_WIDTH; final int days = data.getInt(offsetIndex); - final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); + final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); return DateUtilities.intervalDayStringBuilder(days, millis); } <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense"> @@ -498,12 +502,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } </#if> <#else> <#-- type.width <= 8 --> - + public ${minor.javaType!type.javaType} get(int index) { return data.get${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH); } <#if type.width == 4> - + public long getTwoAsLong(int index) { return data.getLong(index * VALUE_WIDTH); } @@ -599,7 +603,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F * value to set */ <#if (type.width > 8)> - + public void set(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) { data.setBytes(index * VALUE_WIDTH, value, 0, VALUE_WIDTH); } @@ -611,7 +615,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F data.setBytes(index * VALUE_WIDTH, value, 0, VALUE_WIDTH); } <#if minor.class == "Interval"> - + public void set(int index, int months, int days, int milliseconds) { final int offsetIndex = index * VALUE_WIDTH; data.setInt(offsetIndex, months); @@ -642,7 +646,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F setSafe(index, holder.months, holder.days, holder.milliseconds); } <#elseif minor.class == "IntervalDay"> - + public void set(int index, int days, int milliseconds) { final int offsetIndex = index * VALUE_WIDTH; data.setInt(offsetIndex, days); @@ -672,7 +676,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F setSafe(index, holder.days, holder.milliseconds); } <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense"> - + public void setSafe(int index, int start, DrillBuf buffer) { while(index >= getValueCapacity()) { reAlloc(); @@ -695,8 +699,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public void setSafe(int index, Nullable${minor.class}Holder holder) { setSafe(index, holder.start, holder.buffer); } - <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse"> - + + <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse"> public void set(int index, BigDecimal value) { DecimalUtility.getSparseFromBigDecimal(value, data, index * VALUE_WIDTH, field.getScale(), ${minor.nDecimalDigits}); @@ -708,13 +712,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } set(index, value); } - </#if> - + + </#if> public void set(int index, int start, DrillBuf buffer) { data.setBytes(index * VALUE_WIDTH, buffer, start, VALUE_WIDTH); } </#if> - + @Override public void generateTestData(int count) { setValueCount(count); @@ -728,18 +732,23 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } } <#else> <#-- type.width <= 8 --> - + public void set(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) { data.set${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH, value); } + <#if (type.width == 1)> + public void set(int index, byte[] values, int startOff, int len) { + data.setBytes(index * VALUE_WIDTH, values, startOff, len); + } + </#if> + /** * Set the value of a required or nullable vector. Grows the vector as needed. * Does not enforce size limits; scalar fixed-width types can never overflow * a vector. * @param index item to write */ - public void setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) { while(index >= getValueCapacity()) { reAlloc(); @@ -796,7 +805,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } } </#if> <#-- type.width --> - + @Override public void setValueCount(int valueCount) { final int currentValueCapacity = getValueCapacity(); @@ -813,6 +822,150 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F data.writerIndex(valueCount * VALUE_WIDTH); } } + + <#if minor.class == "Int" || minor.class == "UInt4" || minor.class == "UInt1"> + /** + * Helper class to buffer container mutation as a means to optimize native memory copy operations. + * + * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. + */ + public static final class BufferedMutator { + /** The default buffer size */ + private static final int DEFAULT_BUFF_SZ = 1024 << 2; + /** Byte buffer */ + private final ByteBuffer buffer; + + /** Tracks the index where to copy future values */ + private int currentIdx; + /** Parent conatiner object */ + private final ${minor.class}Vector parent; + + /** @see {@link #BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent)} */ + public BufferedMutator(int startIdx, ${minor.class}Vector parent) { + this(startIdx, DEFAULT_BUFF_SZ, parent); + } + + /** + * Buffered mutator to optimize bulk access to the underlying vector container + * @param startIdx start idex of the first value to be copied + * @param buffSz buffer length to us + * @param parent parent container object + */ + public BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent) { + this.buffer = ByteBuffer.allocate(buffSz); + + // set the buffer to the native byte order + buffer.order(ByteOrder.nativeOrder()); + + this.currentIdx = startIdx; + this.parent = parent; + } + + <#if minor.class == "Int" || minor.class == "UInt4"> + public void setSafe(int value) { + if (buffer.remaining() < 4) { + flush(); + } + + int tgtPos = buffer.position(); + byte[] bufferArray = buffer.array(); + + writeInt(value, bufferArray, tgtPos); + buffer.position(tgtPos + 4); + } + + public void setSafe(int[] values, int numValues) { + int remaining = numValues; + byte[] bufferArray = buffer.array(); + int srcPos = 0; + + do { + if (buffer.remaining() < 4) { + flush(); + } + + int toCopy = Math.min(remaining, buffer.remaining() / 4); + int tgtPos = buffer.position(); + + for (int idx = 0; idx < toCopy; idx++, tgtPos += 4, srcPos++) { + writeInt(values[srcPos], bufferArray, tgtPos); + } + + // Update counters + buffer.position(tgtPos); + remaining -= toCopy; + + } while (remaining > 0); + } + + public static final void writeInt(int val, byte[] buffer, int pos) { + DrillBuf.putInt(buffer, pos, val); + } + </#if> <#-- minor.class --> + + <#if minor.class == "UInt1"> + public void setSafe(byte value) { + if (buffer.remaining() < 1) { + flush(); + } + buffer.put(value); + } + + public void setSafe(byte[] values, int numValues) { + int remaining = numValues; + byte[] bufferArray = buffer.array(); + int srcPos = 0; + + do { + if (buffer.remaining() < 1) { + flush(); + } + + int toCopy = Math.min(remaining, buffer.remaining()); + int tgtPos = buffer.position(); + + for (int idx = 0; idx < toCopy; idx++) { + bufferArray[tgtPos++] = values[srcPos++]; + } + + // Update counters + buffer.position(tgtPos); + remaining -= toCopy; + + } while (remaining > 0); + } + </#if> <#-- minor.class --> + + /** + * @return the backing byte buffer; this is useful when the caller can infer the values to write but + * wants to avoid having to use yet another intermediary byte array; caller is responsible for + * flushing the buffer + */ + public ByteBuffer getByteBuffer() { + return buffer; + } + + public void flush() { + int numElements = buffer.position() / ${minor.class}Vector.VALUE_WIDTH; + + if (numElements == 0) { + return; // NOOP + } + + while((currentIdx + numElements -1) >= parent.getValueCapacity()) { + parent.reAlloc(); + } + + parent.data.setBytes(currentIdx * ${minor.class}Vector.VALUE_WIDTH, buffer.array(), 0, buffer.position()); + + // Update the start index + currentIdx += numElements; + + // Reset the byte buffer + buffer.clear(); + } + } + </#if> <#-- minor.class --> } </#if> <#-- type.major --> </#list> diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java index 6d386a655..cdff55687 100644 --- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java @@ -52,6 +52,17 @@ package org.apache.drill.exec.vector; public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector { + /** + * Optimization to set contiguous values nullable state in a bulk manner; cannot define this array + * within the Mutator class as Java doesn't allow static initialization within a non static inner class. + */ + private static final int DEFINED_VALUES_ARRAY_LEN = 1 << 10; + private static final byte[] DEFINED_VALUES_ARRAY = new byte[DEFINED_VALUES_ARRAY_LEN]; + + static { + Arrays.fill(DEFINED_VALUES_ARRAY, (byte) 1); + } + private final FieldReader reader = new Nullable${minor.class}ReaderImpl(Nullable${minor.class}Vector.this); /** @@ -537,6 +548,18 @@ public final class ${className} extends BaseDataValueVector implements <#if type bits.getMutator().set(index, 1); } + /** {@inheritDoc} */ + @Override + public void setIndexDefined(int index, int numValues) { + int remaining = numValues; + + while (remaining > 0) { + int batchSz = Math.min(remaining, DEFINED_VALUES_ARRAY_LEN); + bits.getMutator().set(index + (numValues - remaining), DEFINED_VALUES_ARRAY, 0, batchSz); + remaining -= batchSz; + } + } + /** * Set the variable length element at the specified index to the supplied value. * @@ -736,7 +759,85 @@ public final class ${className} extends BaseDataValueVector implements <#if type values.getMutator().setValueCount(valueCount); bits.getMutator().setValueCount(valueCount); } + <#if type.major == "VarLen"> + /** Enables this wrapper container class to participate in bulk mutator logic */ + private final class VarLenBulkInputCallbackImpl implements VarLenBulkInput.BulkInputCallback<VarLenBulkEntry> { + /** The default buffer size */ + private static final int DEFAULT_BUFF_SZ = 1024 << 2; + /** A buffered mutator to the bits vector */ + private final UInt1Vector.BufferedMutator bitsMutator; + + private VarLenBulkInputCallbackImpl(int _start_idx) { + bitsMutator = new UInt1Vector.BufferedMutator(_start_idx, DEFAULT_BUFF_SZ, bits); + } + + /** {@inheritDoc} */ + @Override + public void onNewBulkEntry(final VarLenBulkEntry entry) { + final int[] lengths = entry.getValuesLength(); + final ByteBuffer buffer = bitsMutator.getByteBuffer(); + final byte[] bufferArray = buffer.array(); + int remaining = entry.getNumValues(); + int srcPos = 0; + + // We need to set the bit indicators + + do { + if (buffer.remaining() < 1) { + bitsMutator.flush(); + } + + final int toCopy = Math.min(remaining, buffer.remaining()); + final int startTgtPos = buffer.position(); + final int maxTgtPos = startTgtPos + toCopy; + + if (entry.hasNulls()) { + for (int idx = startTgtPos; idx < maxTgtPos; idx++) { + final int valLen = lengths[srcPos++]; + + if (valLen >= 0) { + bufferArray[idx] = 1; + ++setCount; + } else { + // This is a null entry + bufferArray[idx] = 0; + } + } + } else { // Optimization when there are no nulls within this bulk entry + for (int idx = startTgtPos; idx < maxTgtPos; idx++) { + bufferArray[idx] = 1; + } + setCount += toCopy; + } + + // Update counters + buffer.position(maxTgtPos); + remaining -= toCopy; + + } while (remaining > 0); + <#if type.major == "VarLen"> + // Update global counters + lastSet += entry.getNumValues(); + </#if> + } + + /** {@inheritDoc} */ + @Override + public void onEndBulkInput() { + bitsMutator.flush(); + } + } + /** {@inheritDoc} */ + public void setSafe(VarLenBulkInput<VarLenBulkEntry> input) { + // Register a callback so that we can assign indicators to each value + VarLenBulkInput.BulkInputCallback<VarLenBulkEntry> callback = new VarLenBulkInputCallbackImpl(input.getStartIndex()); + + // Now delegate bulk processing to the value container + values.getMutator().setSafe(input, callback); + } + + </#if> @Override public void generateTestData(int valueCount){ bits.getMutator().generateTestDataAlt(valueCount); @@ -779,4 +880,4 @@ public final class ${className} extends BaseDataValueVector implements <#if type } } </#list> -</#list> +</#list>
\ No newline at end of file diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index 6a67772ce..daef7ba1c 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -16,6 +16,8 @@ * limitations under the License. */ import java.lang.Override; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Set; import org.apache.drill.exec.exception.OutOfMemoryException; @@ -39,6 +41,9 @@ package org.apache.drill.exec.vector; <#include "/@includes/vv_imports.ftl" /> +import java.util.Iterator; +import java.nio.ByteOrder; + /** * ${minor.class}Vector implements a vector of variable width values. Elements in the vector * are accessed by position from the logical start of the vector. A fixed width offsetVector @@ -56,7 +61,7 @@ package org.apache.drill.exec.vector; public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector { private static final int INITIAL_BYTE_COUNT = Math.min(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT, MAX_BUFFER_SIZE); - + private final UInt${type.width}Vector offsetVector = new UInt${type.width}Vector(offsetsField, allocator); private final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this); @@ -73,7 +78,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } @Override - public FieldReader getReader() { + public FieldReader getReader(){ return reader; } @@ -101,12 +106,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } @Override - public int getValueCapacity() { + public int getValueCapacity(){ return Math.max(offsetVector.getValueCapacity() - 1, 0); } @Override - public int getByteCapacity() { + public int getByteCapacity(){ return data.capacity(); } @@ -127,7 +132,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V */ public int getVarByteLength(){ final int valueCount = getAccessor().getValueCount(); - if (valueCount == 0) { + if(valueCount == 0) { return 0; } return offsetVector.getAccessor().get(valueCount); @@ -173,22 +178,22 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V return buffers; } - public long getOffsetAddr() { + public long getOffsetAddr(){ return offsetVector.getBuffer().memoryAddress(); } @Override - public UInt${type.width}Vector getOffsetVector() { + public UInt${type.width}Vector getOffsetVector(){ return offsetVector; } @Override - public TransferPair getTransferPair(BufferAllocator allocator) { + public TransferPair getTransferPair(BufferAllocator allocator){ return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(String ref, BufferAllocator allocator) { + public TransferPair getTransferPair(String ref, BufferAllocator allocator){ return new TransferImpl(getField().withPath(ref), allocator); } @@ -197,7 +202,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V return new TransferImpl((${minor.class}Vector) to); } - public void transferTo(${minor.class}Vector target) { + public void transferTo(${minor.class}Vector target){ target.clear(); this.offsetVector.transferTo(target.offsetVector); target.data = data.transferOwnership(target.allocator).buffer; @@ -282,17 +287,17 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V to = new ${minor.class}Vector(field, allocator); } - public TransferImpl(${minor.class}Vector to) { + public TransferImpl(${minor.class}Vector to){ this.to = to; } @Override - public ${minor.class}Vector getTo() { + public ${minor.class}Vector getTo(){ return to; } @Override - public void transfer() { + public void transfer(){ transferTo(to); } @@ -313,7 +318,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V if (size > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size"); } - allocationSizeInBytes = (int)size; + allocationSizeInBytes = (int) size; offsetVector.setInitialCapacity(valueCount + 1); } @@ -439,7 +444,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public final class Accessor extends BaseValueVector.BaseAccessor implements VariableWidthAccessor { final UInt${type.width}Vector.Accessor oAccessor = offsetVector.getAccessor(); - + public long getStartEnd(int index){ return oAccessor.getTwoAsLong(index); } @@ -606,6 +611,46 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } /** + * Copies the bulk input into this value vector and extends its capacity if necessary. + * @param input bulk input + */ + public <T extends VarLenBulkEntry> void setSafe(VarLenBulkInput<T> input) { + setSafe(input, null); + } + + /** + * Copies the bulk input into this value vector and extends its capacity if necessary. The callback + * mechanism allows decoration as caller is invoked for each bulk entry. + * + * @param input bulk input + * @param callback a bulk input callback object (optional) + */ + public <T extends VarLenBulkEntry> void setSafe(VarLenBulkInput<T> input, VarLenBulkInput.BulkInputCallback<T> callback) { + // Let's allocate a buffered mutator to optimize memory copy performance + BufferedMutator bufferedMutator = new BufferedMutator(input.getStartIndex(), ${minor.class}Vector.this); + + // Let's process the input + while (input.hasNext()) { + T entry = input.next(); + bufferedMutator.setSafe(entry); + + if (callback != null) { + callback.onNewBulkEntry(entry); + } + } + + // Flush any data not yet copied to this VL container + bufferedMutator.flush(); + + // Inform the input object we're done reading + input.done(); + + if (callback != null) { + callback.onEndBulkInput(); + } + } + + /** * Set the variable length element at the specified index to the supplied byte array. * * @param index position of the bit to set @@ -728,7 +773,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } } - protected void set(int index, int start, int length, DrillBuf buffer) { + protected void set(int index, int start, int length, DrillBuf buffer){ assert index >= 0; final int currentOffset = offsetVector.getAccessor().get(index); offsetVector.getMutator().set(index + 1, currentOffset + length); @@ -736,14 +781,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V data.setBytes(currentOffset, bb); } - protected void set(int index, Nullable${minor.class}Holder holder) { + protected void set(int index, Nullable${minor.class}Holder holder){ final int length = holder.end - holder.start; final int currentOffset = offsetVector.getAccessor().get(index); offsetVector.getMutator().set(index + 1, currentOffset + length); data.setBytes(currentOffset, holder.buffer, holder.start, length); } - protected void set(int index, ${minor.class}Holder holder) { + protected void set(int index, ${minor.class}Holder holder){ final int length = holder.end - holder.start; final int currentOffset = offsetVector.getAccessor().get(index); offsetVector.getMutator().set(index + 1, currentOffset + length); @@ -806,6 +851,165 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V setValueCount(size); } } + + /** + * Helper class to buffer container mutation as a means to optimize native memory copy operations. Ideally, this + * should be done transparently as part of the Mutator and Accessor APIs. + * + * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. + */ + public static final class BufferedMutator { + /** The default buffer size */ + private static final int DEFAULT_BUFF_SZ = 1024 << 2; + /** Byte buffer */ + private final ByteBuffer buffer; + /** Indicator on whether to enable data buffering */ + private final boolean enableDataBuffering = false; + /** Current offset within the data buffer */ + private int dataBuffOff; + /** Total data length (contained within data and buffer) */ + private int totalDataLen; + /** Parent container */ + private final ${minor.class}Vector parent; + /** A buffered mutator to the offsets vector */ + private final UInt4Vector.BufferedMutator offsetsMutator; + + /** @see {@link #BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent)} */ + public BufferedMutator(int startIdx, ${minor.class}Vector parent) { + this(startIdx, DEFAULT_BUFF_SZ, parent); + } + + /** + * Buffered mutator to optimize bulk access to the underlying vector container + * @param startIdx start idex of the first value to be copied + * @param buffSz buffer length to us + * @param parent parent container object + */ + public BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent) { + if (enableDataBuffering) { + this.buffer = ByteBuffer.allocate(buffSz); + // set the buffer to the native byte order + this.buffer.order(ByteOrder.nativeOrder()); + } else { + this.buffer = null; + } + + this.parent = parent; + this.dataBuffOff = this.parent.offsetVector.getAccessor().get(startIdx); + this.totalDataLen = this.dataBuffOff; + this.offsetsMutator = new UInt4Vector.BufferedMutator(startIdx, buffSz * 4, parent.offsetVector); + + // Forcing the offsetsMutator to operate at index+1 + this.offsetsMutator.setSafe(this.dataBuffOff); + } + + public void setSafe(VarLenBulkEntry bulkEntry) { + // The new entry doesn't fit in remaining space + if (enableDataBuffering && buffer.remaining() < bulkEntry.getTotalLength()) { + flushInternal(); + } + + // Now update the offsets vector with new information + int[] lengths = bulkEntry.getValuesLength(); + int numValues = bulkEntry.getNumValues(); + + setOffsets(lengths, numValues, bulkEntry.hasNulls()); + + // Now we're able to buffer the new bulk entry + if (enableDataBuffering && buffer.remaining() >= bulkEntry.getTotalLength() && bulkEntry.arrayBacked()) { + buffer.put(bulkEntry.getArrayData(), bulkEntry.getDataStartOffset(), bulkEntry.getTotalLength()); + + } else { + // The new entry is larger than the buffer (note at this point we know the buffer has been flushed) + while (parent.data.capacity() < totalDataLen) { + parent.reAlloc(); + } + + if (bulkEntry.arrayBacked()) { + parent.data.setBytes(dataBuffOff, + bulkEntry.getArrayData(), + bulkEntry.getDataStartOffset(), + bulkEntry.getTotalLength()); + + } else { + parent.data.setBytes(dataBuffOff, + bulkEntry.getData(), + bulkEntry.getDataStartOffset(), + bulkEntry.getTotalLength()); + } + + // Update the underlying DrillBuf offset + dataBuffOff += bulkEntry.getTotalLength(); + } + } + + public void flush() { + flushInternal(); + offsetsMutator.flush(); + } + + private void flushInternal() { + if (!enableDataBuffering) { + return; // NOOP + } + int numElements = buffer.position(); + + if (numElements == 0) { + return; // NOOP + } + + while (parent.data.capacity() < totalDataLen) { + parent.reAlloc(); + } + + try { + parent.data.setBytes(dataBuffOff, buffer.array(), 0, buffer.position()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Update counters + dataBuffOff += buffer.position(); + + // Reset the byte buffer + buffer.clear(); + } + + private void setOffsets(int[] lengths, int numValues, boolean hasNulls) { + // We need to compute source offsets using the current larget offset and the value length array. + final ByteBuffer offByteBuff = offsetsMutator.getByteBuffer(); + final byte[] bufferArray = offByteBuff.array(); + int remaining = numValues; + int srcPos = 0; + + do { + if (offByteBuff.remaining() < 4) { + offsetsMutator.flush(); + } + + final int toCopy = Math.min(remaining, offByteBuff.remaining() / 4); + int tgtPos = offByteBuff.position(); + + if (!hasNulls) { + for (int idx = 0; idx < toCopy; idx++, tgtPos += 4, srcPos++) { + totalDataLen += lengths[srcPos]; + UInt4Vector.BufferedMutator.writeInt(totalDataLen, bufferArray, tgtPos); + } + } else { + for (int idx = 0; idx < toCopy; idx++, tgtPos += 4, srcPos++) { + final int curr_len = lengths[srcPos]; + totalDataLen += (curr_len >= 0) ? curr_len : 0; + UInt4Vector.BufferedMutator.writeInt(totalDataLen, bufferArray, tgtPos); + } + } + + // Update counters + offByteBuff.position(tgtPos); + remaining -= toCopy; + + } while (remaining > 0); + } + } } </#if> <#-- type.major --> </#list> diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java index e702ac8d3..283555bab 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java @@ -19,5 +19,17 @@ package org.apache.drill.exec.vector; public interface NullableVectorDefinitionSetter { + /** + * Set value at position "index" to be defined. + * @param index value position + */ public void setIndexDefined(int index); + + /** + * Set a contiguous set of values starting at position "index" to be defined. + * @param index value position + * @param number of contiguous values + */ + public void setIndexDefined(int index, int numValues); + } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkEntry.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkEntry.java new file mode 100644 index 000000000..011b96d84 --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkEntry.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import io.netty.buffer.DrillBuf; + +/** + * A bulk input entry enables us to process potentially multiple VL values in one shot (especially for very + * small values); please refer to {@link org.apache.drill.exec.vector.VarBinaryVector.BulkInput}. + * + * <p><b>Format -</b> + * <ul> + * <li>data: [<val1><val2><val3>..] + * <li>value lengths: [<val1-len><val2-len><val3-len>..] + * </ul> + * + * <p><b>NOTE - </b>Bulk entries are immutable + */ +public interface VarLenBulkEntry { + /** + * @return total length of this bulk entry data + */ + int getTotalLength(); + /** + * @return data start offset + */ + int getDataStartOffset(); + /** + * @return true if this entry's data is backed by an array + */ + boolean arrayBacked(); + /** + * @return byte buffer containing the data; the data is located within buffer[start-offset, length-1] where + * buffer is the byte array returned by this method + */ + byte[] getArrayData(); + /** + * @return {@link DrillBuf} containing the data; the data is located within [start-offset, length-1] + */ + DrillBuf getData(); + /** + * @return length table (one per VL value) + */ + int[] getValuesLength(); + /** + * @return number of values (including null values) + */ + int getNumValues(); + /** + * @return number of non-null values + */ + int getNumNonNullValues(); + /** + * @return true if this bulk entry has nulls; false otherwise + */ + boolean hasNulls(); +}
\ No newline at end of file diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkInput.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkInput.java new file mode 100644 index 000000000..7da040c73 --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkInput.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import java.util.Iterator; + +/** + * Allows caller to provide input in a bulk manner while abstracting the underlying data structure + * to provide performance optimizations opportunities. + */ +public interface VarLenBulkInput<T extends VarLenBulkEntry> extends Iterator<T> { + /** + * @return start index of this bulk input (relative to this VL container) + */ + int getStartIndex(); + + /** + * Indicates we're done processing (processor might stop processing when memory buffers + * are depleted); this allows caller to re-submit any unprocessed data. + * + * @param numCommitted number of processed entries + */ + void done(); + + /** + * Enables caller (such as wrapper vector objects) to include more processing logic as the data is being + * streamed. + */ + public interface BulkInputCallback<T extends VarLenBulkEntry> { + /** + * Invoked when a new bulk entry is read (entry is immutable) + * @param entry bulk entry + */ + void onNewBulkEntry(final T entry); + + /** + * Indicates the bulk input is done + */ + void onEndBulkInput(); + } +}
\ No newline at end of file |