aboutsummaryrefslogtreecommitdiff
path: root/exec/vector
diff options
context:
space:
mode:
authorSalim Achouche <sachouche2@gmail.com>2018-04-17 20:12:03 -0700
committerBen-Zvi <bben-zvi@mapr.com>2018-05-21 15:06:50 -0700
commit399fc99827c5eff413cb1aa8489af09cc1266ff3 (patch)
tree86710bfd00572b2774f531793351114ee42f72bf /exec/vector
parent82e1a1229203efc3f8899c620a7efc60dff6d388 (diff)
DRILL-5846: Improve parquet performance for Flat Data Types
closes #1060
Diffstat (limited to 'exec/vector')
-rw-r--r--exec/vector/src/main/codegen/templates/FixedValueVectors.java191
-rw-r--r--exec/vector/src/main/codegen/templates/NullableValueVectors.java103
-rw-r--r--exec/vector/src/main/codegen/templates/VariableLengthVectors.java240
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java12
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkEntry.java72
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkInput.java56
6 files changed, 636 insertions, 38 deletions
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index eb2c93d5f..eb413b045 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -27,6 +27,10 @@
package org.apache.drill.exec.vector;
<#include "/@includes/vv_imports.ftl" />
+<#if minor.class == "Int" || minor.class == "UInt4" || minor.class == "UInt1">
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+</#if>
import org.apache.drill.exec.util.DecimalUtility;
@@ -385,12 +389,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
return false;
}
<#if (type.width > 8)>
-
+
public ${minor.javaType!type.javaType} get(int index) {
return data.slice(index * VALUE_WIDTH, VALUE_WIDTH);
}
<#if (minor.class == "Interval")>
-
+
public void get(int index, ${minor.class}Holder holder) {
final int offsetIndex = index * VALUE_WIDTH;
holder.months = data.getInt(offsetIndex);
@@ -419,11 +423,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
final int offsetIndex = index * VALUE_WIDTH;
final int months = data.getInt(offsetIndex);
final int days = data.getInt(offsetIndex + ${minor.daysOffset});
- final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+ final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
return DateUtilities.intervalStringBuilder(months, days, millis);
}
<#elseif (minor.class == "IntervalDay")>
-
+
public void get(int index, ${minor.class}Holder holder) {
final int offsetIndex = index * VALUE_WIDTH;
holder.days = data.getInt(offsetIndex);
@@ -448,7 +452,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
public StringBuilder getAsStringBuilder(int index) {
final int offsetIndex = index * VALUE_WIDTH;
final int days = data.getInt(offsetIndex);
- final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+ final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
return DateUtilities.intervalDayStringBuilder(days, millis);
}
<#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense">
@@ -498,12 +502,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
}
</#if>
<#else> <#-- type.width <= 8 -->
-
+
public ${minor.javaType!type.javaType} get(int index) {
return data.get${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH);
}
<#if type.width == 4>
-
+
public long getTwoAsLong(int index) {
return data.getLong(index * VALUE_WIDTH);
}
@@ -599,7 +603,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
* value to set
*/
<#if (type.width > 8)>
-
+
public void set(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
data.setBytes(index * VALUE_WIDTH, value, 0, VALUE_WIDTH);
}
@@ -611,7 +615,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
data.setBytes(index * VALUE_WIDTH, value, 0, VALUE_WIDTH);
}
<#if minor.class == "Interval">
-
+
public void set(int index, int months, int days, int milliseconds) {
final int offsetIndex = index * VALUE_WIDTH;
data.setInt(offsetIndex, months);
@@ -642,7 +646,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
setSafe(index, holder.months, holder.days, holder.milliseconds);
}
<#elseif minor.class == "IntervalDay">
-
+
public void set(int index, int days, int milliseconds) {
final int offsetIndex = index * VALUE_WIDTH;
data.setInt(offsetIndex, days);
@@ -672,7 +676,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
setSafe(index, holder.days, holder.milliseconds);
}
<#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense">
-
+
public void setSafe(int index, int start, DrillBuf buffer) {
while(index >= getValueCapacity()) {
reAlloc();
@@ -695,8 +699,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
public void setSafe(int index, Nullable${minor.class}Holder holder) {
setSafe(index, holder.start, holder.buffer);
}
- <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse">
-
+
+ <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse">
public void set(int index, BigDecimal value) {
DecimalUtility.getSparseFromBigDecimal(value, data, index * VALUE_WIDTH,
field.getScale(), ${minor.nDecimalDigits});
@@ -708,13 +712,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
}
set(index, value);
}
- </#if>
-
+
+ </#if>
public void set(int index, int start, DrillBuf buffer) {
data.setBytes(index * VALUE_WIDTH, buffer, start, VALUE_WIDTH);
}
</#if>
-
+
@Override
public void generateTestData(int count) {
setValueCount(count);
@@ -728,18 +732,23 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
}
}
<#else> <#-- type.width <= 8 -->
-
+
public void set(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
data.set${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH, value);
}
+ <#if (type.width == 1)>
+ public void set(int index, byte[] values, int startOff, int len) {
+ data.setBytes(index * VALUE_WIDTH, values, startOff, len);
+ }
+ </#if>
+
/**
* Set the value of a required or nullable vector. Grows the vector as needed.
* Does not enforce size limits; scalar fixed-width types can never overflow
* a vector.
* @param index item to write
*/
-
public void setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
while(index >= getValueCapacity()) {
reAlloc();
@@ -796,7 +805,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
}
}
</#if> <#-- type.width -->
-
+
@Override
public void setValueCount(int valueCount) {
final int currentValueCapacity = getValueCapacity();
@@ -813,6 +822,150 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
data.writerIndex(valueCount * VALUE_WIDTH);
}
}
+
+ <#if minor.class == "Int" || minor.class == "UInt4" || minor.class == "UInt1">
+ /**
+ * Helper class to buffer container mutation as a means to optimize native memory copy operations.
+ *
+ * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
+ */
+ public static final class BufferedMutator {
+ /** The default buffer size */
+ private static final int DEFAULT_BUFF_SZ = 1024 << 2;
+ /** Byte buffer */
+ private final ByteBuffer buffer;
+
+ /** Tracks the index where to copy future values */
+ private int currentIdx;
+ /** Parent conatiner object */
+ private final ${minor.class}Vector parent;
+
+ /** @see {@link #BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent)} */
+ public BufferedMutator(int startIdx, ${minor.class}Vector parent) {
+ this(startIdx, DEFAULT_BUFF_SZ, parent);
+ }
+
+ /**
+ * Buffered mutator to optimize bulk access to the underlying vector container
+ * @param startIdx start idex of the first value to be copied
+ * @param buffSz buffer length to us
+ * @param parent parent container object
+ */
+ public BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent) {
+ this.buffer = ByteBuffer.allocate(buffSz);
+
+ // set the buffer to the native byte order
+ buffer.order(ByteOrder.nativeOrder());
+
+ this.currentIdx = startIdx;
+ this.parent = parent;
+ }
+
+ <#if minor.class == "Int" || minor.class == "UInt4">
+ public void setSafe(int value) {
+ if (buffer.remaining() < 4) {
+ flush();
+ }
+
+ int tgtPos = buffer.position();
+ byte[] bufferArray = buffer.array();
+
+ writeInt(value, bufferArray, tgtPos);
+ buffer.position(tgtPos + 4);
+ }
+
+ public void setSafe(int[] values, int numValues) {
+ int remaining = numValues;
+ byte[] bufferArray = buffer.array();
+ int srcPos = 0;
+
+ do {
+ if (buffer.remaining() < 4) {
+ flush();
+ }
+
+ int toCopy = Math.min(remaining, buffer.remaining() / 4);
+ int tgtPos = buffer.position();
+
+ for (int idx = 0; idx < toCopy; idx++, tgtPos += 4, srcPos++) {
+ writeInt(values[srcPos], bufferArray, tgtPos);
+ }
+
+ // Update counters
+ buffer.position(tgtPos);
+ remaining -= toCopy;
+
+ } while (remaining > 0);
+ }
+
+ public static final void writeInt(int val, byte[] buffer, int pos) {
+ DrillBuf.putInt(buffer, pos, val);
+ }
+ </#if> <#-- minor.class -->
+
+ <#if minor.class == "UInt1">
+ public void setSafe(byte value) {
+ if (buffer.remaining() < 1) {
+ flush();
+ }
+ buffer.put(value);
+ }
+
+ public void setSafe(byte[] values, int numValues) {
+ int remaining = numValues;
+ byte[] bufferArray = buffer.array();
+ int srcPos = 0;
+
+ do {
+ if (buffer.remaining() < 1) {
+ flush();
+ }
+
+ int toCopy = Math.min(remaining, buffer.remaining());
+ int tgtPos = buffer.position();
+
+ for (int idx = 0; idx < toCopy; idx++) {
+ bufferArray[tgtPos++] = values[srcPos++];
+ }
+
+ // Update counters
+ buffer.position(tgtPos);
+ remaining -= toCopy;
+
+ } while (remaining > 0);
+ }
+ </#if> <#-- minor.class -->
+
+ /**
+ * @return the backing byte buffer; this is useful when the caller can infer the values to write but
+ * wants to avoid having to use yet another intermediary byte array; caller is responsible for
+ * flushing the buffer
+ */
+ public ByteBuffer getByteBuffer() {
+ return buffer;
+ }
+
+ public void flush() {
+ int numElements = buffer.position() / ${minor.class}Vector.VALUE_WIDTH;
+
+ if (numElements == 0) {
+ return; // NOOP
+ }
+
+ while((currentIdx + numElements -1) >= parent.getValueCapacity()) {
+ parent.reAlloc();
+ }
+
+ parent.data.setBytes(currentIdx * ${minor.class}Vector.VALUE_WIDTH, buffer.array(), 0, buffer.position());
+
+ // Update the start index
+ currentIdx += numElements;
+
+ // Reset the byte buffer
+ buffer.clear();
+ }
+ }
+ </#if> <#-- minor.class -->
}
</#if> <#-- type.major -->
</#list>
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 6d386a655..cdff55687 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -52,6 +52,17 @@ package org.apache.drill.exec.vector;
public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector {
+ /**
+ * Optimization to set contiguous values nullable state in a bulk manner; cannot define this array
+ * within the Mutator class as Java doesn't allow static initialization within a non static inner class.
+ */
+ private static final int DEFINED_VALUES_ARRAY_LEN = 1 << 10;
+ private static final byte[] DEFINED_VALUES_ARRAY = new byte[DEFINED_VALUES_ARRAY_LEN];
+
+ static {
+ Arrays.fill(DEFINED_VALUES_ARRAY, (byte) 1);
+ }
+
private final FieldReader reader = new Nullable${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
/**
@@ -537,6 +548,18 @@ public final class ${className} extends BaseDataValueVector implements <#if type
bits.getMutator().set(index, 1);
}
+ /** {@inheritDoc} */
+ @Override
+ public void setIndexDefined(int index, int numValues) {
+ int remaining = numValues;
+
+ while (remaining > 0) {
+ int batchSz = Math.min(remaining, DEFINED_VALUES_ARRAY_LEN);
+ bits.getMutator().set(index + (numValues - remaining), DEFINED_VALUES_ARRAY, 0, batchSz);
+ remaining -= batchSz;
+ }
+ }
+
/**
* Set the variable length element at the specified index to the supplied value.
*
@@ -736,7 +759,85 @@ public final class ${className} extends BaseDataValueVector implements <#if type
values.getMutator().setValueCount(valueCount);
bits.getMutator().setValueCount(valueCount);
}
+ <#if type.major == "VarLen">
+ /** Enables this wrapper container class to participate in bulk mutator logic */
+ private final class VarLenBulkInputCallbackImpl implements VarLenBulkInput.BulkInputCallback<VarLenBulkEntry> {
+ /** The default buffer size */
+ private static final int DEFAULT_BUFF_SZ = 1024 << 2;
+ /** A buffered mutator to the bits vector */
+ private final UInt1Vector.BufferedMutator bitsMutator;
+
+ private VarLenBulkInputCallbackImpl(int _start_idx) {
+ bitsMutator = new UInt1Vector.BufferedMutator(_start_idx, DEFAULT_BUFF_SZ, bits);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onNewBulkEntry(final VarLenBulkEntry entry) {
+ final int[] lengths = entry.getValuesLength();
+ final ByteBuffer buffer = bitsMutator.getByteBuffer();
+ final byte[] bufferArray = buffer.array();
+ int remaining = entry.getNumValues();
+ int srcPos = 0;
+
+ // We need to set the bit indicators
+
+ do {
+ if (buffer.remaining() < 1) {
+ bitsMutator.flush();
+ }
+
+ final int toCopy = Math.min(remaining, buffer.remaining());
+ final int startTgtPos = buffer.position();
+ final int maxTgtPos = startTgtPos + toCopy;
+
+ if (entry.hasNulls()) {
+ for (int idx = startTgtPos; idx < maxTgtPos; idx++) {
+ final int valLen = lengths[srcPos++];
+
+ if (valLen >= 0) {
+ bufferArray[idx] = 1;
+ ++setCount;
+ } else {
+ // This is a null entry
+ bufferArray[idx] = 0;
+ }
+ }
+ } else { // Optimization when there are no nulls within this bulk entry
+ for (int idx = startTgtPos; idx < maxTgtPos; idx++) {
+ bufferArray[idx] = 1;
+ }
+ setCount += toCopy;
+ }
+
+ // Update counters
+ buffer.position(maxTgtPos);
+ remaining -= toCopy;
+
+ } while (remaining > 0);
+ <#if type.major == "VarLen">
+ // Update global counters
+ lastSet += entry.getNumValues();
+ </#if>
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onEndBulkInput() {
+ bitsMutator.flush();
+ }
+ }
+ /** {@inheritDoc} */
+ public void setSafe(VarLenBulkInput<VarLenBulkEntry> input) {
+ // Register a callback so that we can assign indicators to each value
+ VarLenBulkInput.BulkInputCallback<VarLenBulkEntry> callback = new VarLenBulkInputCallbackImpl(input.getStartIndex());
+
+ // Now delegate bulk processing to the value container
+ values.getMutator().setSafe(input, callback);
+ }
+
+ </#if>
@Override
public void generateTestData(int valueCount){
bits.getMutator().generateTestDataAlt(valueCount);
@@ -779,4 +880,4 @@ public final class ${className} extends BaseDataValueVector implements <#if type
}
}
</#list>
-</#list>
+</#list> \ No newline at end of file
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 6a67772ce..daef7ba1c 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -16,6 +16,8 @@
* limitations under the License.
*/
import java.lang.Override;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Set;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -39,6 +41,9 @@ package org.apache.drill.exec.vector;
<#include "/@includes/vv_imports.ftl" />
+import java.util.Iterator;
+import java.nio.ByteOrder;
+
/**
* ${minor.class}Vector implements a vector of variable width values. Elements in the vector
* are accessed by position from the logical start of the vector. A fixed width offsetVector
@@ -56,7 +61,7 @@ package org.apache.drill.exec.vector;
public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector {
private static final int INITIAL_BYTE_COUNT = Math.min(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT, MAX_BUFFER_SIZE);
-
+
private final UInt${type.width}Vector offsetVector = new UInt${type.width}Vector(offsetsField, allocator);
private final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this);
@@ -73,7 +78,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
}
@Override
- public FieldReader getReader() {
+ public FieldReader getReader(){
return reader;
}
@@ -101,12 +106,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
}
@Override
- public int getValueCapacity() {
+ public int getValueCapacity(){
return Math.max(offsetVector.getValueCapacity() - 1, 0);
}
@Override
- public int getByteCapacity() {
+ public int getByteCapacity(){
return data.capacity();
}
@@ -127,7 +132,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
*/
public int getVarByteLength(){
final int valueCount = getAccessor().getValueCount();
- if (valueCount == 0) {
+ if(valueCount == 0) {
return 0;
}
return offsetVector.getAccessor().get(valueCount);
@@ -173,22 +178,22 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
return buffers;
}
- public long getOffsetAddr() {
+ public long getOffsetAddr(){
return offsetVector.getBuffer().memoryAddress();
}
@Override
- public UInt${type.width}Vector getOffsetVector() {
+ public UInt${type.width}Vector getOffsetVector(){
return offsetVector;
}
@Override
- public TransferPair getTransferPair(BufferAllocator allocator) {
+ public TransferPair getTransferPair(BufferAllocator allocator){
return new TransferImpl(getField(), allocator);
}
@Override
- public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
+ public TransferPair getTransferPair(String ref, BufferAllocator allocator){
return new TransferImpl(getField().withPath(ref), allocator);
}
@@ -197,7 +202,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
return new TransferImpl((${minor.class}Vector) to);
}
- public void transferTo(${minor.class}Vector target) {
+ public void transferTo(${minor.class}Vector target){
target.clear();
this.offsetVector.transferTo(target.offsetVector);
target.data = data.transferOwnership(target.allocator).buffer;
@@ -282,17 +287,17 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
to = new ${minor.class}Vector(field, allocator);
}
- public TransferImpl(${minor.class}Vector to) {
+ public TransferImpl(${minor.class}Vector to){
this.to = to;
}
@Override
- public ${minor.class}Vector getTo() {
+ public ${minor.class}Vector getTo(){
return to;
}
@Override
- public void transfer() {
+ public void transfer(){
transferTo(to);
}
@@ -313,7 +318,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
if (size > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
}
- allocationSizeInBytes = (int)size;
+ allocationSizeInBytes = (int) size;
offsetVector.setInitialCapacity(valueCount + 1);
}
@@ -439,7 +444,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public final class Accessor extends BaseValueVector.BaseAccessor implements VariableWidthAccessor {
final UInt${type.width}Vector.Accessor oAccessor = offsetVector.getAccessor();
-
+
public long getStartEnd(int index){
return oAccessor.getTwoAsLong(index);
}
@@ -606,6 +611,46 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
}
/**
+ * Copies the bulk input into this value vector and extends its capacity if necessary.
+ * @param input bulk input
+ */
+ public <T extends VarLenBulkEntry> void setSafe(VarLenBulkInput<T> input) {
+ setSafe(input, null);
+ }
+
+ /**
+ * Copies the bulk input into this value vector and extends its capacity if necessary. The callback
+ * mechanism allows decoration as caller is invoked for each bulk entry.
+ *
+ * @param input bulk input
+ * @param callback a bulk input callback object (optional)
+ */
+ public <T extends VarLenBulkEntry> void setSafe(VarLenBulkInput<T> input, VarLenBulkInput.BulkInputCallback<T> callback) {
+ // Let's allocate a buffered mutator to optimize memory copy performance
+ BufferedMutator bufferedMutator = new BufferedMutator(input.getStartIndex(), ${minor.class}Vector.this);
+
+ // Let's process the input
+ while (input.hasNext()) {
+ T entry = input.next();
+ bufferedMutator.setSafe(entry);
+
+ if (callback != null) {
+ callback.onNewBulkEntry(entry);
+ }
+ }
+
+ // Flush any data not yet copied to this VL container
+ bufferedMutator.flush();
+
+ // Inform the input object we're done reading
+ input.done();
+
+ if (callback != null) {
+ callback.onEndBulkInput();
+ }
+ }
+
+ /**
* Set the variable length element at the specified index to the supplied byte array.
*
* @param index position of the bit to set
@@ -728,7 +773,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
}
}
- protected void set(int index, int start, int length, DrillBuf buffer) {
+ protected void set(int index, int start, int length, DrillBuf buffer){
assert index >= 0;
final int currentOffset = offsetVector.getAccessor().get(index);
offsetVector.getMutator().set(index + 1, currentOffset + length);
@@ -736,14 +781,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
data.setBytes(currentOffset, bb);
}
- protected void set(int index, Nullable${minor.class}Holder holder) {
+ protected void set(int index, Nullable${minor.class}Holder holder){
final int length = holder.end - holder.start;
final int currentOffset = offsetVector.getAccessor().get(index);
offsetVector.getMutator().set(index + 1, currentOffset + length);
data.setBytes(currentOffset, holder.buffer, holder.start, length);
}
- protected void set(int index, ${minor.class}Holder holder) {
+ protected void set(int index, ${minor.class}Holder holder){
final int length = holder.end - holder.start;
final int currentOffset = offsetVector.getAccessor().get(index);
offsetVector.getMutator().set(index + 1, currentOffset + length);
@@ -806,6 +851,165 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
setValueCount(size);
}
}
+
+ /**
+ * Helper class to buffer container mutation as a means to optimize native memory copy operations. Ideally, this
+ * should be done transparently as part of the Mutator and Accessor APIs.
+ *
+ * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
+ */
+ public static final class BufferedMutator {
+ /** The default buffer size */
+ private static final int DEFAULT_BUFF_SZ = 1024 << 2;
+ /** Byte buffer */
+ private final ByteBuffer buffer;
+ /** Indicator on whether to enable data buffering */
+ private final boolean enableDataBuffering = false;
+ /** Current offset within the data buffer */
+ private int dataBuffOff;
+ /** Total data length (contained within data and buffer) */
+ private int totalDataLen;
+ /** Parent container */
+ private final ${minor.class}Vector parent;
+ /** A buffered mutator to the offsets vector */
+ private final UInt4Vector.BufferedMutator offsetsMutator;
+
+ /** @see {@link #BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent)} */
+ public BufferedMutator(int startIdx, ${minor.class}Vector parent) {
+ this(startIdx, DEFAULT_BUFF_SZ, parent);
+ }
+
+ /**
+ * Buffered mutator to optimize bulk access to the underlying vector container
+ * @param startIdx start idex of the first value to be copied
+ * @param buffSz buffer length to us
+ * @param parent parent container object
+ */
+ public BufferedMutator(int startIdx, int buffSz, ${minor.class}Vector parent) {
+ if (enableDataBuffering) {
+ this.buffer = ByteBuffer.allocate(buffSz);
+ // set the buffer to the native byte order
+ this.buffer.order(ByteOrder.nativeOrder());
+ } else {
+ this.buffer = null;
+ }
+
+ this.parent = parent;
+ this.dataBuffOff = this.parent.offsetVector.getAccessor().get(startIdx);
+ this.totalDataLen = this.dataBuffOff;
+ this.offsetsMutator = new UInt4Vector.BufferedMutator(startIdx, buffSz * 4, parent.offsetVector);
+
+ // Forcing the offsetsMutator to operate at index+1
+ this.offsetsMutator.setSafe(this.dataBuffOff);
+ }
+
+ public void setSafe(VarLenBulkEntry bulkEntry) {
+ // The new entry doesn't fit in remaining space
+ if (enableDataBuffering && buffer.remaining() < bulkEntry.getTotalLength()) {
+ flushInternal();
+ }
+
+ // Now update the offsets vector with new information
+ int[] lengths = bulkEntry.getValuesLength();
+ int numValues = bulkEntry.getNumValues();
+
+ setOffsets(lengths, numValues, bulkEntry.hasNulls());
+
+ // Now we're able to buffer the new bulk entry
+ if (enableDataBuffering && buffer.remaining() >= bulkEntry.getTotalLength() && bulkEntry.arrayBacked()) {
+ buffer.put(bulkEntry.getArrayData(), bulkEntry.getDataStartOffset(), bulkEntry.getTotalLength());
+
+ } else {
+ // The new entry is larger than the buffer (note at this point we know the buffer has been flushed)
+ while (parent.data.capacity() < totalDataLen) {
+ parent.reAlloc();
+ }
+
+ if (bulkEntry.arrayBacked()) {
+ parent.data.setBytes(dataBuffOff,
+ bulkEntry.getArrayData(),
+ bulkEntry.getDataStartOffset(),
+ bulkEntry.getTotalLength());
+
+ } else {
+ parent.data.setBytes(dataBuffOff,
+ bulkEntry.getData(),
+ bulkEntry.getDataStartOffset(),
+ bulkEntry.getTotalLength());
+ }
+
+ // Update the underlying DrillBuf offset
+ dataBuffOff += bulkEntry.getTotalLength();
+ }
+ }
+
+ public void flush() {
+ flushInternal();
+ offsetsMutator.flush();
+ }
+
+ private void flushInternal() {
+ if (!enableDataBuffering) {
+ return; // NOOP
+ }
+ int numElements = buffer.position();
+
+ if (numElements == 0) {
+ return; // NOOP
+ }
+
+ while (parent.data.capacity() < totalDataLen) {
+ parent.reAlloc();
+ }
+
+ try {
+ parent.data.setBytes(dataBuffOff, buffer.array(), 0, buffer.position());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // Update counters
+ dataBuffOff += buffer.position();
+
+ // Reset the byte buffer
+ buffer.clear();
+ }
+
+ private void setOffsets(int[] lengths, int numValues, boolean hasNulls) {
+ // We need to compute source offsets using the current larget offset and the value length array.
+ final ByteBuffer offByteBuff = offsetsMutator.getByteBuffer();
+ final byte[] bufferArray = offByteBuff.array();
+ int remaining = numValues;
+ int srcPos = 0;
+
+ do {
+ if (offByteBuff.remaining() < 4) {
+ offsetsMutator.flush();
+ }
+
+ final int toCopy = Math.min(remaining, offByteBuff.remaining() / 4);
+ int tgtPos = offByteBuff.position();
+
+ if (!hasNulls) {
+ for (int idx = 0; idx < toCopy; idx++, tgtPos += 4, srcPos++) {
+ totalDataLen += lengths[srcPos];
+ UInt4Vector.BufferedMutator.writeInt(totalDataLen, bufferArray, tgtPos);
+ }
+ } else {
+ for (int idx = 0; idx < toCopy; idx++, tgtPos += 4, srcPos++) {
+ final int curr_len = lengths[srcPos];
+ totalDataLen += (curr_len >= 0) ? curr_len : 0;
+ UInt4Vector.BufferedMutator.writeInt(totalDataLen, bufferArray, tgtPos);
+ }
+ }
+
+ // Update counters
+ offByteBuff.position(tgtPos);
+ remaining -= toCopy;
+
+ } while (remaining > 0);
+ }
+ }
}
</#if> <#-- type.major -->
</#list>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java
index e702ac8d3..283555bab 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVectorDefinitionSetter.java
@@ -19,5 +19,17 @@ package org.apache.drill.exec.vector;
public interface NullableVectorDefinitionSetter {
+ /**
+ * Set value at position "index" to be defined.
+ * @param index value position
+ */
public void setIndexDefined(int index);
+
+ /**
+ * Set a contiguous set of values starting at position "index" to be defined.
+ * @param index value position
+ * @param number of contiguous values
+ */
+ public void setIndexDefined(int index, int numValues);
+
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkEntry.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkEntry.java
new file mode 100644
index 000000000..011b96d84
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkEntry.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * A bulk input entry enables us to process potentially multiple VL values in one shot (especially for very
+ * small values); please refer to {@link org.apache.drill.exec.vector.VarBinaryVector.BulkInput}.
+ *
+ * <p><b>Format -</b>
+ * <ul>
+ * <li>data: [&lt;val1&gt;&lt;val2&gt;&lt;val3&gt;..]
+ * <li>value lengths: [&lt;val1-len&gt;&lt;val2-len&gt;&lt;val3-len&gt;..]
+ * </ul>
+ *
+ * <p><b>NOTE - </b>Bulk entries are immutable
+ */
+public interface VarLenBulkEntry {
+ /**
+ * @return total length of this bulk entry data
+ */
+ int getTotalLength();
+ /**
+ * @return data start offset
+ */
+ int getDataStartOffset();
+ /**
+ * @return true if this entry's data is backed by an array
+ */
+ boolean arrayBacked();
+ /**
+ * @return byte buffer containing the data; the data is located within buffer[start-offset, length-1] where
+ * buffer is the byte array returned by this method
+ */
+ byte[] getArrayData();
+ /**
+ * @return {@link DrillBuf} containing the data; the data is located within [start-offset, length-1]
+ */
+ DrillBuf getData();
+ /**
+ * @return length table (one per VL value)
+ */
+ int[] getValuesLength();
+ /**
+ * @return number of values (including null values)
+ */
+ int getNumValues();
+ /**
+ * @return number of non-null values
+ */
+ int getNumNonNullValues();
+ /**
+ * @return true if this bulk entry has nulls; false otherwise
+ */
+ boolean hasNulls();
+} \ No newline at end of file
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkInput.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkInput.java
new file mode 100644
index 000000000..7da040c73
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/VarLenBulkInput.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import java.util.Iterator;
+
+/**
+ * Allows caller to provide input in a bulk manner while abstracting the underlying data structure
+ * to provide performance optimizations opportunities.
+ */
+public interface VarLenBulkInput<T extends VarLenBulkEntry> extends Iterator<T> {
+ /**
+ * @return start index of this bulk input (relative to this VL container)
+ */
+ int getStartIndex();
+
+ /**
+ * Indicates we're done processing (processor might stop processing when memory buffers
+ * are depleted); this allows caller to re-submit any unprocessed data.
+ *
+ * @param numCommitted number of processed entries
+ */
+ void done();
+
+ /**
+ * Enables caller (such as wrapper vector objects) to include more processing logic as the data is being
+ * streamed.
+ */
+ public interface BulkInputCallback<T extends VarLenBulkEntry> {
+ /**
+ * Invoked when a new bulk entry is read (entry is immutable)
+ * @param entry bulk entry
+ */
+ void onNewBulkEntry(final T entry);
+
+ /**
+ * Indicates the bulk input is done
+ */
+ void onEndBulkInput();
+ }
+} \ No newline at end of file