aboutsummaryrefslogtreecommitdiff
path: root/exec/vector
diff options
context:
space:
mode:
authorPaul Rogers <progers@maprtech.com>2018-01-10 16:04:53 -0800
committerBoaz Ben-Zvi <boaz@apache.org>2018-01-30 19:54:57 -0800
commitf0d00c62b594e424ea085ebd0a5be26f0f509fda (patch)
treed2a03b8cbef01e83509321ec48a99d61db792878 /exec/vector
parent039530a4195ba8fa4532b9ca92980206fa66c181 (diff)
DRILL-6080: Sort incorrectly limits batch size to 65535 records
closes #1090 * Sort incorrectly limits batch size to 65535 records rather than 65536. * This PR also includes a few code cleanup items. * Fix for overflow in offset vector in row set writer * Performance tool update * Replace "unsafe" methods with "set" methods * Also fixes an indexing issue with nullable writers * Removed debug & timing code * Increase strictness for batch size
Diffstat (limited to 'exec/vector')
-rw-r--r--exec/vector/src/main/codegen/templates/ColumnAccessors.java24
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java2
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java51
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java36
4 files changed, 86 insertions, 27 deletions
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
index 33b12becc..14ec1e879 100644
--- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java
+++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
@@ -276,14 +276,14 @@ public class ColumnAccessors {
<#assign putAddr = "writeIndex * VALUE_WIDTH">
</#if>
<#if varWidth>
- drillBuf.unsafeCopyMemory(value, 0, offset, len);
+ drillBuf.setBytes(offset, value, 0, len);
offsetsWriter.setNextOffset(offset + len);
<#elseif drillType == "Decimal9">
- drillBuf.unsafePutInt(${putAddr},
+ drillBuf.setInt(${putAddr},
DecimalUtility.getDecimal9FromBigDecimal(value,
type.getScale(), type.getPrecision()));
<#elseif drillType == "Decimal18">
- drillBuf.unsafePutLong(${putAddr},
+ drillBuf.setLong(${putAddr},
DecimalUtility.getDecimal18FromBigDecimal(value,
type.getScale(), type.getPrecision()));
<#elseif drillType == "Decimal38Sparse">
@@ -295,23 +295,23 @@ public class ColumnAccessors {
DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH,
type.getScale(), type.getPrecision(), 5);
<#elseif drillType == "IntervalYear">
- drillBuf.unsafePutInt(${putAddr},
+ drillBuf.setInt(${putAddr},
value.getYears() * 12 + value.getMonths());
<#elseif drillType == "IntervalDay">
final int offset = ${putAddr};
- drillBuf.unsafePutInt(offset, value.getDays());
- drillBuf.unsafePutInt(offset + 4, periodToMillis(value));
+ drillBuf.setInt(offset, value.getDays());
+ drillBuf.setInt(offset + 4, periodToMillis(value));
<#elseif drillType == "Interval">
final int offset = ${putAddr};
- drillBuf.unsafePutInt(offset, value.getYears() * 12 + value.getMonths());
- drillBuf.unsafePutInt(offset + 4, value.getDays());
- drillBuf.unsafePutInt(offset + 8, periodToMillis(value));
+ drillBuf.setInt(offset, value.getYears() * 12 + value.getMonths());
+ drillBuf.setInt(offset + 4, value.getDays());
+ drillBuf.setInt(offset + 8, periodToMillis(value));
<#elseif drillType == "Float4">
- drillBuf.unsafePutInt(${putAddr}, Float.floatToRawIntBits((float) value));
+ drillBuf.setInt(${putAddr}, Float.floatToRawIntBits((float) value));
<#elseif drillType == "Float8">
- drillBuf.unsafePutLong(${putAddr}, Double.doubleToRawLongBits(value));
+ drillBuf.setLong(${putAddr}, Double.doubleToRawLongBits(value));
<#else>
- drillBuf.unsafePut${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value);
+ drillBuf.set${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value);
</#if>
vectorIndex.nextElement();
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
index e49f92c62..1107216a3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
@@ -98,7 +98,7 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter {
while (dest < writeIndex) {
int length = writeIndex - dest;
length = Math.min(length, stride);
- drillBuf.unsafeCopyMemory(ZERO_BUF, 0, dest * width, length * width);
+ drillBuf.setBytes(dest * width, ZERO_BUF, 0, length * width);
dest += length;
}
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index 6da2b50fc..2068304f4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -30,8 +30,45 @@ import org.joda.time.Period;
public class NullableScalarWriter extends AbstractScalarWriter {
+ public static final class ChildIndex implements ColumnWriterIndex {
+
+ private final ColumnWriterIndex parentIndex;
+
+ public ChildIndex(ColumnWriterIndex parentIndex) {
+ this.parentIndex = parentIndex;
+ }
+
+ @Override
+ public int rowStartIndex() {
+ return parentIndex.rowStartIndex();
+ }
+
+ @Override
+ public int vectorIndex() {
+ return parentIndex.vectorIndex();
+ }
+
+ @Override
+ public void nextElement() {
+ // Ignore next element requests from children.
+ // Nullable writers have two children, we don't want
+ // to increment the index twice.
+ }
+
+ @Override
+ public void rollover() {
+ parentIndex.rollover();
+ }
+
+ @Override
+ public ColumnWriterIndex outerIndex() {
+ return parentIndex.outerIndex();
+ }
+ }
+
private final UInt1ColumnWriter isSetWriter;
private final BaseScalarWriter baseWriter;
+ private ColumnWriterIndex writerIndex;
public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter baseWriter) {
isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector());
@@ -54,8 +91,10 @@ public class NullableScalarWriter extends AbstractScalarWriter {
@Override
public void bindIndex(ColumnWriterIndex index) {
- isSetWriter.bindIndex(index);
- baseWriter.bindIndex(index);
+ writerIndex = index;
+ ColumnWriterIndex childIndex = new ChildIndex(index);
+ isSetWriter.bindIndex(childIndex);
+ baseWriter.bindIndex(childIndex);
}
@Override
@@ -76,24 +115,28 @@ public class NullableScalarWriter extends AbstractScalarWriter {
public void setNull() {
isSetWriter.setInt(0);
baseWriter.skipNulls();
+ writerIndex.nextElement();
}
@Override
public void setInt(int value) {
baseWriter.setInt(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setLong(long value) {
baseWriter.setLong(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setDouble(double value) {
baseWriter.setDouble(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
@@ -105,24 +148,28 @@ public class NullableScalarWriter extends AbstractScalarWriter {
baseWriter.setString(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setBytes(byte[] value, int len) {
baseWriter.setBytes(value, len);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setDecimal(BigDecimal value) {
baseWriter.setDecimal(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setPeriod(Period value) {
baseWriter.setPeriod(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
index d5f9b30a1..49d16a3e5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
@@ -171,7 +171,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) {
realloc(MIN_BUFFER_SIZE);
}
- vector.getBuffer().unsafePutInt(0, 0);
+ vector.getBuffer().setInt(0, 0);
}
public int nextOffset() { return nextOffset; }
@@ -199,7 +199,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
final int valueIndex = vectorIndex.vectorIndex();
int writeIndex = valueIndex + 1;
- if (lastWriteIndex < valueIndex - 1 || writeIndex >= capacity) {
+ if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) {
writeIndex = prepareWrite(writeIndex);
}
@@ -207,7 +207,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
// Recall, it is the value index, which is one less than the (end)
// offset index.
- lastWriteIndex = writeIndex - 1;
+ lastWriteIndex = valueIndex;
return writeIndex;
}
@@ -220,24 +220,27 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
// Call to resize may cause rollover, so reset write index
// afterwards.
- writeIndex = vectorIndex.vectorIndex() + 1;
+ final int valueIndex = vectorIndex.vectorIndex();
// Fill empties to the write position.
+ // Fill empties works off the row index, not the write
+ // index. The write index is one past the row index.
+ // (Yes, this is complex...)
- fillEmpties(writeIndex);
- return writeIndex;
+ fillEmpties(valueIndex);
+ return valueIndex + 1;
}
@Override
- protected final void fillEmpties(final int writeIndex) {
- while (lastWriteIndex < writeIndex - 1) {
- drillBuf.unsafePutInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
+ protected final void fillEmpties(final int valueIndex) {
+ while (lastWriteIndex < valueIndex - 1) {
+ drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
}
}
public final void setNextOffset(final int newOffset) {
final int writeIndex = writeIndex();
- drillBuf.unsafePutInt(writeIndex * VALUE_WIDTH, newOffset);
+ drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
nextOffset = newOffset;
}
@@ -267,8 +270,17 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
}
@Override
- public final void endWrite() {
- setValueCount(vectorIndex.vectorIndex() + 1);
+ public void setValueCount(int valueCount) {
+
+ // Slightly different version of the fixed-width writer
+ // code. Offset counts are one greater than the value count.
+ // But for all other purposes, we track the value (row)
+ // position, not the offset position.
+
+ mandatoryResize(valueCount);
+ fillEmpties(valueCount);
+ vector().getBuffer().writerIndex((valueCount + 1) * width());
+ lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1);
}
@Override