diff options
author | Paul Rogers <progers@maprtech.com> | 2018-01-10 16:04:53 -0800 |
---|---|---|
committer | Boaz Ben-Zvi <boaz@apache.org> | 2018-01-30 19:54:57 -0800 |
commit | f0d00c62b594e424ea085ebd0a5be26f0f509fda (patch) | |
tree | d2a03b8cbef01e83509321ec48a99d61db792878 /exec/vector | |
parent | 039530a4195ba8fa4532b9ca92980206fa66c181 (diff) |
DRILL-6080: Sort incorrectly limits batch size to 65535 records
closes #1090
* Sort incorrectly limits batch size to 65535 records rather than 65536.
* This PR also includes a few code cleanup items.
* Fix for overflow in offset vector in row set writer
* Performance tool update
* Replace "unsafe" methods with "set" methods
* Also fixes an indexing issue with nullable writers
* Removed debug & timing code
* Increase strictness for batch size
Diffstat (limited to 'exec/vector')
4 files changed, 86 insertions, 27 deletions
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java index 33b12becc..14ec1e879 100644 --- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java +++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java @@ -276,14 +276,14 @@ public class ColumnAccessors { <#assign putAddr = "writeIndex * VALUE_WIDTH"> </#if> <#if varWidth> - drillBuf.unsafeCopyMemory(value, 0, offset, len); + drillBuf.setBytes(offset, value, 0, len); offsetsWriter.setNextOffset(offset + len); <#elseif drillType == "Decimal9"> - drillBuf.unsafePutInt(${putAddr}, + drillBuf.setInt(${putAddr}, DecimalUtility.getDecimal9FromBigDecimal(value, type.getScale(), type.getPrecision())); <#elseif drillType == "Decimal18"> - drillBuf.unsafePutLong(${putAddr}, + drillBuf.setLong(${putAddr}, DecimalUtility.getDecimal18FromBigDecimal(value, type.getScale(), type.getPrecision())); <#elseif drillType == "Decimal38Sparse"> @@ -295,23 +295,23 @@ public class ColumnAccessors { DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH, type.getScale(), type.getPrecision(), 5); <#elseif drillType == "IntervalYear"> - drillBuf.unsafePutInt(${putAddr}, + drillBuf.setInt(${putAddr}, value.getYears() * 12 + value.getMonths()); <#elseif drillType == "IntervalDay"> final int offset = ${putAddr}; - drillBuf.unsafePutInt(offset, value.getDays()); - drillBuf.unsafePutInt(offset + 4, periodToMillis(value)); + drillBuf.setInt(offset, value.getDays()); + drillBuf.setInt(offset + 4, periodToMillis(value)); <#elseif drillType == "Interval"> final int offset = ${putAddr}; - drillBuf.unsafePutInt(offset, value.getYears() * 12 + value.getMonths()); - drillBuf.unsafePutInt(offset + 4, value.getDays()); - drillBuf.unsafePutInt(offset + 8, periodToMillis(value)); + drillBuf.setInt(offset, value.getYears() * 12 + value.getMonths()); + drillBuf.setInt(offset + 4, value.getDays()); + drillBuf.setInt(offset + 8, periodToMillis(value)); <#elseif drillType == "Float4"> - drillBuf.unsafePutInt(${putAddr}, Float.floatToRawIntBits((float) value)); + drillBuf.setInt(${putAddr}, Float.floatToRawIntBits((float) value)); <#elseif drillType == "Float8"> - drillBuf.unsafePutLong(${putAddr}, Double.doubleToRawLongBits(value)); + drillBuf.setLong(${putAddr}, Double.doubleToRawLongBits(value)); <#else> - drillBuf.unsafePut${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value); + drillBuf.set${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value); </#if> vectorIndex.nextElement(); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java index e49f92c62..1107216a3 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java @@ -98,7 +98,7 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter { while (dest < writeIndex) { int length = writeIndex - dest; length = Math.min(length, stride); - drillBuf.unsafeCopyMemory(ZERO_BUF, 0, dest * width, length * width); + drillBuf.setBytes(dest * width, ZERO_BUF, 0, length * width); dest += length; } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java index 6da2b50fc..2068304f4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java @@ -30,8 +30,45 @@ import org.joda.time.Period; public class NullableScalarWriter extends AbstractScalarWriter { + public static final class ChildIndex implements ColumnWriterIndex { + + private final ColumnWriterIndex parentIndex; + + public ChildIndex(ColumnWriterIndex parentIndex) { + this.parentIndex = parentIndex; + } + + @Override + public int rowStartIndex() { + return parentIndex.rowStartIndex(); + } + + @Override + public int vectorIndex() { + return parentIndex.vectorIndex(); + } + + @Override + public void nextElement() { + // Ignore next element requests from children. + // Nullable writers have two children, we don't want + // to increment the index twice. + } + + @Override + public void rollover() { + parentIndex.rollover(); + } + + @Override + public ColumnWriterIndex outerIndex() { + return parentIndex.outerIndex(); + } + } + private final UInt1ColumnWriter isSetWriter; private final BaseScalarWriter baseWriter; + private ColumnWriterIndex writerIndex; public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter baseWriter) { isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector()); @@ -54,8 +91,10 @@ public class NullableScalarWriter extends AbstractScalarWriter { @Override public void bindIndex(ColumnWriterIndex index) { - isSetWriter.bindIndex(index); - baseWriter.bindIndex(index); + writerIndex = index; + ColumnWriterIndex childIndex = new ChildIndex(index); + isSetWriter.bindIndex(childIndex); + baseWriter.bindIndex(childIndex); } @Override @@ -76,24 +115,28 @@ public class NullableScalarWriter extends AbstractScalarWriter { public void setNull() { isSetWriter.setInt(0); baseWriter.skipNulls(); + writerIndex.nextElement(); } @Override public void setInt(int value) { baseWriter.setInt(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setLong(long value) { baseWriter.setLong(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setDouble(double value) { baseWriter.setDouble(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override @@ -105,24 +148,28 @@ public class NullableScalarWriter extends AbstractScalarWriter { baseWriter.setString(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setBytes(byte[] value, int len) { baseWriter.setBytes(value, len); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setDecimal(BigDecimal value) { baseWriter.setDecimal(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setPeriod(Period value) { baseWriter.setPeriod(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java index d5f9b30a1..49d16a3e5 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java @@ -171,7 +171,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) { realloc(MIN_BUFFER_SIZE); } - vector.getBuffer().unsafePutInt(0, 0); + vector.getBuffer().setInt(0, 0); } public int nextOffset() { return nextOffset; } @@ -199,7 +199,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { final int valueIndex = vectorIndex.vectorIndex(); int writeIndex = valueIndex + 1; - if (lastWriteIndex < valueIndex - 1 || writeIndex >= capacity) { + if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) { writeIndex = prepareWrite(writeIndex); } @@ -207,7 +207,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { // Recall, it is the value index, which is one less than the (end) // offset index. - lastWriteIndex = writeIndex - 1; + lastWriteIndex = valueIndex; return writeIndex; } @@ -220,24 +220,27 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { // Call to resize may cause rollover, so reset write index // afterwards. - writeIndex = vectorIndex.vectorIndex() + 1; + final int valueIndex = vectorIndex.vectorIndex(); // Fill empties to the write position. + // Fill empties works off the row index, not the write + // index. The write index is one past the row index. + // (Yes, this is complex...) - fillEmpties(writeIndex); - return writeIndex; + fillEmpties(valueIndex); + return valueIndex + 1; } @Override - protected final void fillEmpties(final int writeIndex) { - while (lastWriteIndex < writeIndex - 1) { - drillBuf.unsafePutInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset); + protected final void fillEmpties(final int valueIndex) { + while (lastWriteIndex < valueIndex - 1) { + drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset); } } public final void setNextOffset(final int newOffset) { final int writeIndex = writeIndex(); - drillBuf.unsafePutInt(writeIndex * VALUE_WIDTH, newOffset); + drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset); nextOffset = newOffset; } @@ -267,8 +270,17 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { } @Override - public final void endWrite() { - setValueCount(vectorIndex.vectorIndex() + 1); + public void setValueCount(int valueCount) { + + // Slightly different version of the fixed-width writer + // code. Offset counts are one greater than the value count. + // But for all other purposes, we track the value (row) + // position, not the offset position. + + mandatoryResize(valueCount); + fillEmpties(valueCount); + vector().getBuffer().writerIndex((valueCount + 1) * width()); + lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1); } @Override |