diff options
author | Paul Rogers <progers@cloudera.com> | 2018-04-16 21:44:10 -0700 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-04-29 23:20:55 +0300 |
commit | dbff1646601db234a6606d400d5630db4deee192 (patch) | |
tree | 29ac669179399cecbaea688d2ec83790572b9f06 /exec/vector | |
parent | 883c8d94b0021a83059fa79563dd516c4299b70a (diff) |
DRILL-6335: Column accessor refactoring
closes #1218
Diffstat (limited to 'exec/vector')
26 files changed, 1227 insertions, 844 deletions
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java index 4836099b1..d0a2ace6f 100644 --- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java +++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java @@ -268,69 +268,58 @@ public class ColumnAccessors { </#if> <@getType drillType label /> - <#if accessorType == "byte[]"> - <#assign args = ", int len"> - <#else> - <#assign args = ""> - </#if> - <#if javaType == "char"> - <#assign putType = "short" /> - <#assign doCast = true /> - <#else> - <#assign putType = javaType /> - <#assign doCast = (cast == "set") /> - </#if> <#if ! varWidth> </#if> @Override - public final void set${label}(final ${accessorType} value${args}) { + public final void set${label}(final ${accessorType} value${putArgs}) { <#-- Must compute the write offset first; can't be inline because the writeOffset() function has a side effect of possibly changing the buffer address (bufAddr). --> - <#if varWidth> - final int offset = writeIndex(len); - <#else> - final int writeIndex = writeIndex(); - <#assign putAddr = "writeIndex * VALUE_WIDTH"> + <#if ! varWidth> + final int writeOffset = prepareWrite(); + <#assign putOffset = "writeOffset * VALUE_WIDTH"> </#if> <#if varWidth> + final int offset = prepareWrite(len); drillBuf.setBytes(offset, value, 0, len); offsetsWriter.setNextOffset(offset + len); <#elseif drillType == "Decimal9"> - drillBuf.setInt(${putAddr}, + drillBuf.setInt(${putOffset}, DecimalUtility.getDecimal9FromBigDecimal(value, - type.getScale(), type.getPrecision())); + type.getScale(), type.getPrecision())); <#elseif drillType == "Decimal18"> - drillBuf.setLong(${putAddr}, + drillBuf.setLong(${putOffset}, DecimalUtility.getDecimal18FromBigDecimal(value, - type.getScale(), type.getPrecision())); + type.getScale(), type.getPrecision())); <#elseif drillType == "Decimal38Sparse"> <#-- Hard to optimize this case. Just use the available tools. --> - DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH, - type.getScale(), type.getPrecision(), 6); + DecimalUtility.getSparseFromBigDecimal(value, drillBuf, + ${putOffset}, + type.getScale(), type.getPrecision(), 6); <#elseif drillType == "Decimal28Sparse"> <#-- Hard to optimize this case. Just use the available tools. --> - DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH, - type.getScale(), type.getPrecision(), 5); + DecimalUtility.getSparseFromBigDecimal(value, drillBuf, + ${putOffset}, + type.getScale(), type.getPrecision(), 5); <#elseif drillType == "IntervalYear"> - drillBuf.setInt(${putAddr}, - value.getYears() * 12 + value.getMonths()); + drillBuf.setInt(${putOffset}, + value.getYears() * 12 + value.getMonths()); <#elseif drillType == "IntervalDay"> - final int offset = ${putAddr}; - drillBuf.setInt(offset, value.getDays()); - drillBuf.setInt(offset + 4, DateUtilities.periodToMillis(value)); + final int offset = ${putOffset}; + drillBuf.setInt(offset, value.getDays()); + drillBuf.setInt(offset + ${minor.millisecondsOffset}, DateUtilities.periodToMillis(value)); <#elseif drillType == "Interval"> - final int offset = ${putAddr}; - drillBuf.setInt(offset, value.getYears() * 12 + value.getMonths()); - drillBuf.setInt(offset + 4, value.getDays()); - drillBuf.setInt(offset + 8, DateUtilities.periodToMillis(value)); + final int offset = ${putOffset}; + drillBuf.setInt(offset, DateUtilities.periodToMonths(value)); + drillBuf.setInt(offset + ${minor.daysOffset}, value.getDays()); + drillBuf.setInt(offset + ${minor.millisecondsOffset}, DateUtilities.periodToMillis(value)); <#elseif drillType == "Float4"> - drillBuf.setInt(${putAddr}, Float.floatToRawIntBits((float) value)); + drillBuf.setInt(${putOffset}, Float.floatToRawIntBits((float) value)); <#elseif drillType == "Float8"> - drillBuf.setLong(${putAddr}, Double.doubleToRawLongBits(value)); + drillBuf.setLong(${putOffset}, Double.doubleToRawLongBits(value)); <#else> - drillBuf.set${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value); + drillBuf.set${putType?cap_first}(${putOffset}, <#if doCast>(${putType}) </#if>value); </#if> vectorIndex.nextElement(); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java index 49726056c..12f0f110e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java @@ -24,4 +24,4 @@ public enum ProjectionType { TUPLE, // x.y ARRAY, // x[0] TUPLE_ARRAY // x[0].y -}
\ No newline at end of file +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java index 49a1e7770..c15791ab9 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java @@ -22,10 +22,42 @@ package org.apache.drill.exec.vector.accessor; * each call to a <tt>setFoo()</tt> method writes a value and advances the array * index. * <p> - * {@see ArrayReader} + * The array writer represents a Drill repeated type, including repeated maps. + * The array writer also represents the Drill list and repeated list types as + * follows: + * <ul> + * <li>A repeated scalar type is presented as an array writer with scalar + * entries. As a convenience, writing to the scalar automatically advances + * the current array write position, since exactly one item can be written + * per array entry.</li> + * <li>A repeated map type is presented as an array writer with tuple + * entries. The client must advance the array write position explicitly since + * a tuple can have any number of entries and the array writer cannot determine + * when a value is complete.</li> + * <li>A list type is presented as an array of variant entries. The client + * must explicitly advance the array position.</li> + * <li>A repeated list type is presented as an array of arrays of variants. + * The client advances the array position of both lists.</li> + * <li>Lists of repeated lists have three levels of arrays, repeated lists + * of repeated lists have four levels of arrays, and so on.</li> + * </ul> + * <p> + * Although the list vector supports a union of any Drill type, the only sane + * combinations are: + * <ul> + * <li>One of a (single or repeated) (map or list), or</li> + * <li>One or more scalar type.</li> + * </ul> + * + * If a particular array has only one type (single/repeated map/list), then, + * for convenience, the caller can directly request a writer of that type + * without having to first retrieve the variant (although the indirect + * route is, of course, available.) + * + * @see {@link ArrayReader} */ -public interface ArrayWriter { +public interface ArrayWriter extends ColumnWriter { /** * Number of elements written thus far to the array. @@ -35,20 +67,22 @@ public interface ArrayWriter { int size(); /** - * The object type of the list entry. All entries have the same - * type. - * @return the object type of each entry - */ - - ObjectWriter entry(); - - /** * Return a generic object writer for the array entry. * * @return generic object reader */ ObjectType entryType(); + + void setNull(boolean isNull); + + /** + * The object type of the list entry. All entries have the same + * type. + * @return the object type of each entry + */ + + ObjectWriter entry(); ScalarWriter scalar(); TupleWriter tuple(); ArrayWriter array(); @@ -60,23 +94,4 @@ public interface ArrayWriter { */ void save(); - - /** - * Write the values of an array from a list of arguments. - * @param values values for each array element - * @throws VectorOverflowException - */ - void set(Object ...values); - - /** - * Write the array given an array of values. The type of array must match - * the type of element in the array. That is, if the value is an <tt>int</tt>, - * provide an <tt>int[]</tt> array. - * - * @param array array of values to write - * @throws VectorOverflowException - */ - - void setObject(Object array); -// void setList(List<? extends Object> list); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java new file mode 100644 index 000000000..5d1e79fbc --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor; + +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; +import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; + +/** + * Generic information about a column writer including: + * <ul> + * <li>Metadata</li> + * <li>Write position information about a writer needed by a vector overflow + * implementation. Hides the details of implementation and the writer class + * hierarchy, exposing just the required write position information.</li> + * <li>Generic methods for writing to the object, primarily used for + * testing.</li> + */ + +public interface ColumnWriter extends WriterPosition { + + interface TupleListenable { + + /** + * Bind a listener to the underlying map or map array column. Not valid if the + * underlying writer is a scalar or scalar array. + * + * @param listener + * the tuple listener to bind + */ + + void bindListener(TupleWriterListener listener); + } + + interface ScalarListenable { + /** + * Bind a listener to the underlying scalar column, or array of scalar + * columns. Not valid if the underlying writer is a map or array of maps. + * + * @param listener + * the column listener to bind + */ + + void bindListener(ColumnWriterListener listener); + } + + /** + * Return the object (structure) type of this writer. + * + * @return type indicating if this is a scalar, tuple or array + */ + + ObjectType type(); + + /** + * Whether this writer allows nulls. This is not as simple as checking + * for the {@link DataMode#OPTIONAL} type in the schema. List entries + * are nullable, if they are primitive, but not if they are maps or lists. + * Unions are nullable, regardless of cardinality. + * + * @return true if a call to {@link #setNull()} is supported, false + * if not + */ + + boolean nullable(); + + /** + * Returns the schema of the column associated with this writer. + * + * @return schema for this writer's column + */ + + ColumnMetadata schema(); + + /** + * Set the current value to null. Support depends on the underlying + * implementation: only nullable types support this operation. + * + * throws IllegalStateException if called on a non-nullable value. + */ + + void setNull(); + + /** + * Generic technique to write data as a generic Java object. The + * type of the object must match the target writer. + * Primarily for testing. + * <ul> + * <li>Scalar: The type of the Java object must match the type of + * the target vector. <tt>String</tt> or <tt>byte[]</tt> can be + * used for Varchar vectors.</li> + * <li>Array: Write the array given an array of values. The object + * must be a Java array. The type of the array must match the type of + * element in the repeated vector. That is, if the vector is + * a <tt>Repeated Int</tt>, provide an <tt>int[]</tt> array.</tt></li> + * <li>Tuple (map or row): The Java object must be an array of objects + * in which the members of the array have a 1:1 correspondence with the + * members of the tuple in the order defined by the writer metadata. + * That is, if the map is (Int, Varchar), provide a <tt>Object[]</tt> + * array like this: <tt>{10, "fred"}</tt>.</li> + * <li>Union: Uses the Java object type to determine the type of the + * backing vector. Creates a vector + * of the required type if needed.</li> + * + * @param value value to write to the vector. The Java type of the + * object indicates the Drill storage type + * @throws IllegalArgumentException if the type of the Java object + * cannot be mapped to the type of the underlying vector or + * vector structure + */ + + void setObject(Object value); +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java index 113778f5a..81d4bac40 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java @@ -17,10 +17,6 @@ */ package org.apache.drill.exec.vector.accessor; -import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; -import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; - /** * Represents a column within a tuple. A column can be an array, a scalar or a * tuple. Each has an associated column metadata (schema) and a writer. The @@ -44,58 +40,11 @@ import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; * {@see ObjectReader} */ -public interface ObjectWriter { - - /** - * Returns the schema of the column associated with this writer. - * - * @return schema for this writer's column - */ - - ColumnMetadata schema(); - - /** - * Bind a listener to the underlying scalar column, or array of scalar - * columns. Not valid if the underlying writer is a map or array of maps. - * - * @param listener - * the column listener to bind - */ - - void bindListener(ColumnWriterListener listener); - - /** - * Bind a listener to the underlying map or map array column. Not valid if the - * underlying writer is a scalar or scalar array. - * - * @param listener - * the tuple listener to bind - */ - - void bindListener(TupleWriterListener listener); - - /** - * Return the object (structure) type of this writer. - * - * @return type indicating if this is a scalar, tuple or array - */ - - ObjectType type(); +public interface ObjectWriter extends ColumnWriter { ScalarWriter scalar(); TupleWriter tuple(); ArrayWriter array(); - - /** - * For debugging, set the object to the proper form of Java object as defined - * by the underlying writer type. - * - * @param value - * Java object value to write - * @throws VectorOverflowException - */ - - void set(Object value); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java index 776dc9cf1..87c798890 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java @@ -21,6 +21,8 @@ import java.math.BigDecimal; import org.joda.time.Period; +import org.apache.drill.exec.vector.accessor.ColumnWriter.ScalarListenable; + /** * Represents a scalar value: a required column, a nullable column, * or one element within an array of scalars. @@ -42,7 +44,7 @@ import org.joda.time.Period; * {@see ScalarElementReader} */ -public interface ScalarWriter { +public interface ScalarWriter extends ColumnWriter, ScalarListenable { /** * Listener (callback) for vector overflow events. To be optionally @@ -78,8 +80,6 @@ public interface ScalarWriter { boolean canExpand(ScalarWriter writer, int delta); } - void bindListener(ColumnWriterListener listener); - /** * Describe the type of the value. This is a compression of the * value vector type: it describes which method will return the @@ -89,7 +89,6 @@ public interface ScalarWriter { */ ValueType valueType(); - void setNull(); void setInt(int value); void setLong(long value); void setDouble(double value); @@ -97,6 +96,4 @@ public interface ScalarWriter { void setBytes(byte[] value, int len); void setDecimal(BigDecimal value); void setPeriod(Period value); - - void setObject(Object value); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java index 0a522833b..331df2a01 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java @@ -19,7 +19,9 @@ package org.apache.drill.exec.vector.accessor; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.ProjectionType; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ColumnWriter.TupleListenable; /** * Writer for a tuple. A tuple is composed of columns with a fixed order and @@ -50,7 +52,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; * @see {@link SingleMapWriter}, the class which this class replaces */ -public interface TupleWriter { +public interface TupleWriter extends ColumnWriter, TupleListenable { /** * Listener (callback) to handle requests to add a new column to a tuple (row @@ -59,10 +61,13 @@ public interface TupleWriter { * throws an exception. */ - public interface TupleWriterListener { + interface TupleWriterListener { + ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column); ObjectWriter addColumn(TupleWriter tuple, MaterializedField field); + + ProjectionType projectionType(String columnName); } /** @@ -75,13 +80,26 @@ public interface TupleWriter { */ @SuppressWarnings("serial") - public static class UndefinedColumnException extends RuntimeException { + class UndefinedColumnException extends RuntimeException { public UndefinedColumnException(String colName) { super("Undefined column: " + colName); } } - void bindListener(TupleWriterListener listener); + + /** + * Allows a client to "sniff" the projection set to determine if a + * field is projected. Some clients can omit steps if they know that + * a field is not needed. Others will simply create the column, allowing + * the implementation to create a dummy writer if the column is not + * projected. + * + * @param columnName name of an existing or new column + * @return whether the column is projected, and, if so, the implied + * type of the projected column + */ + + ProjectionType projectionType(String columnName); /** * Add a column to the tuple (row or map) that backs this writer. Support for @@ -100,7 +118,7 @@ public interface TupleWriter { int addColumn(MaterializedField schema); - TupleMetadata schema(); + TupleMetadata tupleSchema(); int size(); @@ -142,28 +160,4 @@ public interface TupleWriter { */ void set(int colIndex, Object value); - - /** - * Write a row or map of values, given by Java objects. Object type must match - * expected column type. - * <p> - * Note that a single-column tuple is ambiguous if that column is an array. To - * avoid ambiguity, use <tt>set(0, value)</tt> in this case. - * - * @param values - * variable-length argument list of column values - * @return true if the row was written, false if any column caused vector - * overflow. - */ - - void setTuple(Object... values); - - /** - * Set the tuple from an array of objects. Primarily for use in test tools. - * - * @param value - * the object to set, which must be a generic <tt>Object</tt> array - */ - - void setObject(Object value); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/WriterPosition.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/WriterPosition.java new file mode 100644 index 000000000..609545a8e --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/WriterPosition.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor; + +/** + * Position information about a writer used during vector overflow. + */ + +public interface WriterPosition { + + /** + * Position within the vector of the first value for the current row. + * Note that this is always the first value for the row, even for a + * writer deeply nested within a hierarchy of arrays. (The first + * position for the current array is not exposed in this API.) + * + * @return the vector offset of the first value for the current + * row + */ + + int rowStartIndex(); + + /** + * Return the last write position in the vector. This may be the + * same as the writer index position (if the vector was written at + * that point), or an earlier point. In either case, this value + * points to the last valid value in the vector. + * + * @return index of the last valid value in the vector + */ + + int lastWriteIndex(); + + /** + * Current write index for the writer. This is the global + * array location for arrays, same as the row index for top-level + * columns. + * + * @return current write index + */ + + int writeIndex(); +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java index 58cda5755..2a2e3e10d 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java @@ -24,9 +24,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ObjectWriter; import org.apache.drill.exec.vector.accessor.ScalarWriter; -import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; import org.apache.drill.exec.vector.accessor.TupleWriter; -import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; /** @@ -98,36 +96,17 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { private AbstractArrayWriter arrayWriter; - public ArrayObjectWriter(ColumnMetadata schema, AbstractArrayWriter arrayWriter) { - super(schema); + public ArrayObjectWriter(AbstractArrayWriter arrayWriter) { this.arrayWriter = arrayWriter; } @Override - public ObjectType type() { return ObjectType.ARRAY; } - - @Override - public void set(Object value) { - arrayWriter.setObject(value); - } - - @Override public ArrayWriter array() { return arrayWriter; } @Override public WriterEvents events() { return arrayWriter; } @Override - public void bindListener(ColumnWriterListener listener) { - arrayWriter.bindListener(listener); - } - - @Override - public void bindListener(TupleWriterListener listener) { - arrayWriter.bindListener(listener); - } - - @Override public void dump(HierarchicalFormatter format) { format .startObject(this) @@ -137,62 +116,57 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { } } - public static abstract class BaseArrayWriter extends AbstractArrayWriter { - - /** - * Index into the vector of elements for a repeated vector. - * Keeps track of the current offset in terms of value positions. - * Forwards overflow events to the base index. - */ + /** + * Index into the vector of elements for a repeated vector. + * Keeps track of the current offset in terms of value positions. + * Forwards overflow events to the base index. + */ - public class ArrayElementWriterIndex implements ColumnWriterIndex { + public class ArrayElementWriterIndex implements ColumnWriterIndex { - private int elementIndex; + private int elementIndex; - public void reset() { elementIndex = 0; } + public void reset() { elementIndex = 0; } - @Override - public int vectorIndex() { return elementIndex + offsetsWriter.nextOffset(); } + @Override + public int vectorIndex() { return elementIndex + offsetsWriter.nextOffset(); } - @Override - public int rowStartIndex() { return offsetsWriter.rowStartOffset(); } + @Override + public int rowStartIndex() { return offsetsWriter.rowStartOffset(); } - public int arraySize() { return elementIndex; } + public int arraySize() { return elementIndex; } - @Override - public void nextElement() { } + @Override + public void nextElement() { } - public final void next() { elementIndex++; } + public void next() { elementIndex++; } - public int valueStartOffset() { return offsetsWriter.nextOffset(); } + public int valueStartOffset() { return offsetsWriter.nextOffset(); } - @Override - public void rollover() { } + @Override + public void rollover() { } - @Override - public ColumnWriterIndex outerIndex() { - return outerIndex; - } + @Override + public ColumnWriterIndex outerIndex() { + return outerIndex; + } - @Override - public String toString() { - return new StringBuilder() - .append("[") - .append(getClass().getSimpleName()) - .append(" elementIndex = ") - .append(elementIndex) - .append("]") - .toString(); - } + @Override + public String toString() { + return new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" elementIndex = ") + .append(elementIndex) + .append("]") + .toString(); } + } - private final OffsetVectorWriter offsetsWriter; - private ColumnWriterIndex outerIndex; - protected ArrayElementWriterIndex elementIndex; + public static abstract class BaseArrayWriter extends AbstractArrayWriter { - public BaseArrayWriter(UInt4Vector offsetVector, AbstractObjectWriter elementObjWriter) { - super(elementObjWriter); - offsetsWriter = new OffsetVectorWriter(offsetVector); + public BaseArrayWriter(ColumnMetadata schema, UInt4Vector offsetVector, AbstractObjectWriter elementObjWriter) { + super(schema, elementObjWriter, new OffsetVectorWriterImpl(offsetVector)); } @Override @@ -204,12 +178,6 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { } @Override - public ColumnWriterIndex writerIndex() { return outerIndex; } - - @Override - public int size() { return elementIndex.arraySize(); } - - @Override public void startWrite() { elementIndex.reset(); offsetsWriter.startWrite(); @@ -248,12 +216,6 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { } @Override - public void endWrite() { - offsetsWriter.endWrite(); - elementObjWriter.events().endWrite(); - } - - @Override public void preRollover() { elementObjWriter.events().preRollover(); offsetsWriter.preRollover(); @@ -271,28 +233,13 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { } @Override - public int lastWriteIndex() { return outerIndex.vectorIndex(); } - - /** - * Return the writer for the offset vector for this array. Primarily used - * to handle overflow; other clients should not attempt to muck about with - * the offset vector directly. - * - * @return the writer for the offset vector associated with this array - */ - - @Override - public OffsetVectorWriter offsetWriter() { return offsetsWriter; } - - @Override - public void bindListener(ColumnWriterListener listener) { - elementObjWriter.bindListener(listener); + public void endWrite() { + offsetsWriter.endWrite(); + elementObjWriter.events().endWrite(); } @Override - public void bindListener(TupleWriterListener listener) { - elementObjWriter.bindListener(listener); - } + public int lastWriteIndex() { return outerIndex.vectorIndex(); } @Override public void dump(HierarchicalFormatter format) { @@ -305,18 +252,30 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { } } - protected final AbstractObjectWriter elementObjWriter; + protected final ColumnMetadata schema; + protected AbstractObjectWriter elementObjWriter; + protected final OffsetVectorWriter offsetsWriter; + protected ColumnWriterIndex outerIndex; + protected ArrayElementWriterIndex elementIndex; - public AbstractArrayWriter(AbstractObjectWriter elementObjWriter) { + public AbstractArrayWriter(ColumnMetadata schema, AbstractObjectWriter elementObjWriter, OffsetVectorWriter offsetVectorWriter) { + this.schema = schema; this.elementObjWriter = elementObjWriter; + this.offsetsWriter = offsetVectorWriter; } @Override + public ObjectType type() { return ObjectType.ARRAY; } + + @Override public ObjectType entryType() { return elementObjWriter.type(); } @Override + public ColumnMetadata schema() { return schema; } + + @Override public ObjectWriter entry() { return elementObjWriter; } @Override @@ -334,9 +293,48 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { return elementObjWriter.array(); } - public abstract void bindListener(ColumnWriterListener listener); - public abstract void bindListener(TupleWriterListener listener); - public abstract OffsetVectorWriter offsetWriter(); + @Override + public int size() { return elementIndex.arraySize(); } + + @Override + public boolean nullable() { return false; } + + @Override + public void setNull() { + throw new IllegalStateException("Not nullable"); + } + + @Override + public int rowStartIndex() { + return outerIndex.rowStartIndex(); + } + + @Override + public int lastWriteIndex() { + return offsetsWriter.lastWriteIndex(); + } + + @Override + public int writeIndex() { + return outerIndex.vectorIndex(); + } + + @Override + public void setNull(boolean isNull) { + if (isNull == true) { + throw new UnsupportedOperationException(); + } + } + + /** + * Return the writer for the offset vector for this array. Primarily used + * to handle overflow; other clients should not attempt to muck about with + * the offset vector directly. + * + * @return the writer for the offset vector associated with this array + */ + + public OffsetVectorWriter offsetWriter() { return offsetsWriter; } public void dump(HierarchicalFormatter format) { format diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java index 1107216a3..921cb002e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java @@ -46,14 +46,14 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter { * The current vector buffer, and buffer address, will change in * this method when a vector grows or overflows. So, don't use this * method in inline calls of the form<br><code> - * vector.getBuffer().doSomething(writeIndex());</code></br> + * vector.getBuffer().doSomething(prepareWrite());</code></br> * The buffer obtained by <tt>getBuffer()</tt> can be different than - * the current buffer after <tt>writeIndex()</tt>. + * the current buffer after <tt>prepareWrite()</tt>. * * @return the index at which to write the current value */ - protected final int writeIndex() { + protected final int prepareWrite() { // "Fast path" for the normal case of no fills, no overflow. // This is the only bounds check we want to do for the entire @@ -191,6 +191,18 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter { @Override public int lastWriteIndex() { return lastWriteIndex; } + /** + * For internal use only to update the write position on those + * very rare occasions in which the vector is written to outside + * of this writer framework. Not to be called by application code! + * + * @param index new last write index + */ + + public void setLastWriteIndex(int index) { + lastWriteIndex = index; + } + @Override public void skipNulls() { diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java index 15807cb01..61c201bdb 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java @@ -19,11 +19,11 @@ package org.apache.drill.exec.vector.accessor.writer; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ObjectWriter; import org.apache.drill.exec.vector.accessor.ScalarWriter; -import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; import org.apache.drill.exec.vector.accessor.TupleWriter; -import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; /** @@ -36,14 +36,8 @@ import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; public abstract class AbstractObjectWriter implements ObjectWriter { - private ColumnMetadata schema; - - public AbstractObjectWriter(ColumnMetadata schema) { - this.schema = schema; - } - @Override - public ColumnMetadata schema() { return schema; } + public ColumnMetadata schema() { return baseWriter().schema(); } @Override public ScalarWriter scalar() { @@ -62,11 +56,40 @@ public abstract class AbstractObjectWriter implements ObjectWriter { public abstract WriterEvents events(); + public ColumnWriter baseWriter() { + return (ColumnWriter) events(); + } + @Override - public void bindListener(ColumnWriterListener listener) { } + public ObjectType type() { return baseWriter().type(); } + + @Override + public boolean nullable() { return baseWriter().nullable(); } + + @Override + public void setNull() { + baseWriter().setNull(); + } @Override - public void bindListener(TupleWriterListener listener) { } + public void setObject(Object value) { + baseWriter().setObject(value); + } public abstract void dump(HierarchicalFormatter format); + + @Override + public int rowStartIndex() { + return baseWriter().rowStartIndex(); + } + + @Override + public int lastWriteIndex() { + return baseWriter().lastWriteIndex(); + } + + @Override + public int writeIndex() { + return baseWriter().writeIndex(); + } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java index b6a85d7ff..08e4ac390 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java @@ -21,8 +21,10 @@ import java.math.BigDecimal; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.joda.time.Period; @@ -39,29 +41,17 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents private AbstractScalarWriter scalarWriter; - public ScalarObjectWriter(ColumnMetadata schema, AbstractScalarWriter scalarWriter) { - super(schema); + public ScalarObjectWriter(AbstractScalarWriter scalarWriter) { this.scalarWriter = scalarWriter; } @Override - public ObjectType type() { return ObjectType.SCALAR; } - - @Override - public void set(Object value) { scalarWriter.setObject(value); } - - @Override public ScalarWriter scalar() { return scalarWriter; } @Override public WriterEvents events() { return scalarWriter; } @Override - public void bindListener(ColumnWriterListener listener) { - scalarWriter.bindListener(listener); - } - - @Override public void dump(HierarchicalFormatter format) { format .startObject(this) @@ -71,6 +61,42 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents } } + protected ColumnMetadata schema; + + /** + * Indicates the position in the vector to write. Set via an object so that + * all writers (within the same subtree) can agree on the write position. + * For example, all top-level, simple columns see the same row index. + * All columns within a repeated map see the same (inner) index, etc. + */ + + protected ColumnWriterIndex vectorIndex; + + @Override + public ObjectType type() { return ObjectType.SCALAR; } + + public void bindSchema(ColumnMetadata schema) { + this.schema = schema; + } + + @Override + public void bindIndex(ColumnWriterIndex vectorIndex) { + this.vectorIndex = vectorIndex; + } + + @Override + public int rowStartIndex() { + return vectorIndex.rowStartIndex(); + } + + @Override + public int writeIndex() { + return vectorIndex.vectorIndex(); + } + + @Override + public ColumnMetadata schema() { return schema; } + public abstract BaseDataValueVector vector(); @Override @@ -85,6 +111,10 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents @Override public void saveRow() { } + protected UnsupportedConversionError conversionError(String javaType) { + return UnsupportedConversionError.writeError(schema(), javaType); + } + @Override public void setObject(Object value) { if (value == null) { @@ -111,8 +141,7 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents } else if (value instanceof Float) { setDouble((Float) value); } else { - throw new IllegalArgumentException("Unsupported type " + - value.getClass().getSimpleName()); + throw conversionError(value.getClass().getSimpleName()); } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java index 938f867a2..640f5d36b 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.ProjectionType; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.accessor.ArrayWriter; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; @@ -104,29 +105,17 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { private AbstractTupleWriter tupleWriter; - public TupleObjectWriter(ColumnMetadata schema, AbstractTupleWriter tupleWriter) { - super(schema); + public TupleObjectWriter(AbstractTupleWriter tupleWriter) { this.tupleWriter = tupleWriter; } @Override - public ObjectType type() { return ObjectType.TUPLE; } - - @Override - public void set(Object value) { tupleWriter.setObject(value); } - - @Override public TupleWriter tuple() { return tupleWriter; } @Override public WriterEvents events() { return tupleWriter; } @Override - public void bindListener(TupleWriterListener listener) { - tupleWriter.bindListener(listener); - } - - @Override public void dump(HierarchicalFormatter format) { format .startObject(this) @@ -136,38 +125,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { } } - /** - * Tracks the write state of the tuple to allow applying the correct - * operations to newly-added columns to synchronize them with the rest - * of the tuple. - */ - - public enum State { - /** - * No write is in progress. Nothing need be done to newly-added - * writers. - */ - IDLE, - - /** - * <tt>startWrite()</tt> has been called to start a write operation - * (start a batch), but <tt>startValue()</tt> has not yet been called - * to start a row (or value within an array). <tt>startWrite()</tt> must - * be called on newly added columns. - */ - - IN_WRITE, - - /** - * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on - * the tuple to prepare for writing values, and both must be called on - * newly-added vectors. - */ - - IN_ROW - } - - protected final TupleMetadata schema; + protected final TupleMetadata tupleSchema; protected final List<AbstractObjectWriter> writers; protected ColumnWriterIndex vectorIndex; protected ColumnWriterIndex childIndex; @@ -175,7 +133,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { protected State state = State.IDLE; protected AbstractTupleWriter(TupleMetadata schema, List<AbstractObjectWriter> writers) { - this.schema = schema; + this.tupleSchema = schema; this.writers = writers; } @@ -183,6 +141,9 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { this(schema, new ArrayList<AbstractObjectWriter>()); } + @Override + public ObjectType type() { return ObjectType.TUPLE; } + protected void bindIndex(ColumnWriterIndex index, ColumnWriterIndex childIndex) { vectorIndex = index; this.childIndex = childIndex; @@ -198,7 +159,9 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { } @Override - public ColumnWriterIndex writerIndex() { return vectorIndex; } + public int rowStartIndex() { + return vectorIndex.rowStartIndex(); + } /** * Add a column writer to an existing tuple writer. Used for implementations @@ -209,8 +172,8 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { */ public int addColumnWriter(AbstractObjectWriter colWriter) { - assert writers.size() == schema.size(); - int colIndex = schema.addColumn(colWriter.schema()); + assert writers.size() == tupleSchema.size(); + int colIndex = tupleSchema.addColumn(colWriter.schema()); writers.add(colWriter); colWriter.events().bindIndex(childIndex); if (state != State.IDLE) { @@ -223,6 +186,12 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { } @Override + public ProjectionType projectionType(String columnName) { + return listener == null ? ProjectionType.UNSPECIFIED + : listener.projectionType(columnName); + } + + @Override public int addColumn(ColumnMetadata column) { if (listener == null) { throw new UnsupportedOperationException("addColumn"); @@ -241,10 +210,18 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { } @Override - public TupleMetadata schema() { return schema; } + public TupleMetadata tupleSchema() { return tupleSchema; } @Override - public int size() { return schema().size(); } + public int size() { return tupleSchema().size(); } + + @Override + public boolean nullable() { return false; } + + @Override + public void setNull() { + throw new IllegalStateException("Not nullable"); + } @Override public void startWrite() { @@ -339,7 +316,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { @Override public ObjectWriter column(String colName) { - int index = schema.index(colName); + int index = tupleSchema.index(colName); if (index == -1) { throw new UndefinedColumnException(colName); } @@ -348,35 +325,18 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { @Override public void set(int colIndex, Object value) { - ObjectWriter colWriter = column(colIndex); - switch (colWriter.type()) { - case ARRAY: - colWriter.array().setObject(value); - break; - case SCALAR: - colWriter.scalar().setObject(value); - break; - case TUPLE: - colWriter.tuple().setObject(value); - break; - default: - throw new IllegalStateException("Unexpected object type: " + colWriter.type()); - } - } - - @Override - public void setTuple(Object ...values) { - setObject(values); + column(colIndex).setObject(value); } @Override public void setObject(Object value) { Object values[] = (Object[]) value; - if (values.length != schema.size()) { + if (values.length != tupleSchema.size()) { throw new IllegalArgumentException( - "Map has " + schema.size() + - " columns, but value array has " + - values.length + " values."); + String.format("Map %s has %d columns, but value array has " + + " %d values.", + schema().name(), + tupleSchema.size(), values.length)); } for (int i = 0; i < values.length; i++) { set(i, values[i]); @@ -429,6 +389,11 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { } @Override + public int writeIndex() { + return vectorIndex.vectorIndex(); + } + + @Override public void bindListener(TupleWriterListener listener) { this.listener = listener; } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java index 4793277b7..10a2cf3b5 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.vector.accessor.writer; import java.math.BigDecimal; -import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; +import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.joda.time.Period; @@ -134,15 +134,6 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter { public static final int MIN_BUFFER_SIZE = 256; /** - * Indicates the position in the vector to write. Set via an object so that - * all writers (within the same subtree) can agree on the write position. - * For example, all top-level, simple columns see the same row index. - * All columns within a repeated map see the same (inner) index, etc. - */ - - protected ColumnWriterIndex vectorIndex; - - /** * Listener invoked if the vector overflows. If not provided, then the writer * does not support vector overflow. */ @@ -160,14 +151,6 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter { protected int capacity; @Override - public void bindIndex(ColumnWriterIndex vectorIndex) { - this.vectorIndex = vectorIndex; - } - - @Override - public ColumnWriterIndex writerIndex() { return vectorIndex; } - - @Override public void bindListener(ColumnWriterListener listener) { this.listener = listener; } @@ -176,7 +159,7 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter { * All change of buffer comes through this function to allow capturing * the buffer address and capacity. Only two ways to set the buffer: * by binding to a vector in bindVector(), or by resizing the vector - * in writeIndex(). + * in prepareWrite(). */ protected abstract void setBuffer(); @@ -220,43 +203,46 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter { public abstract void skipNulls(); @Override + public boolean nullable() { return false; } + + @Override public void setNull() { - throw new UnsupportedOperationException("Vector is not nullable"); + throw UnsupportedConversionError.nullError(schema()); } @Override public void setInt(int value) { - throw new UnsupportedOperationException(); + throw conversionError("int"); } @Override public void setLong(long value) { - throw new UnsupportedOperationException(); + throw conversionError("long"); } @Override public void setDouble(double value) { - throw new UnsupportedOperationException(); + throw conversionError("double"); } @Override public void setString(String value) { - throw new UnsupportedOperationException(); + throw conversionError("String"); } @Override public void setBytes(byte[] value, int len) { - throw new UnsupportedOperationException(); + throw conversionError("bytes"); } @Override public void setDecimal(BigDecimal value) { - throw new UnsupportedOperationException(); + throw conversionError("Decimal"); } @Override public void setPeriod(Period value) { - throw new UnsupportedOperationException(); + throw conversionError("Period"); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java index e54625ef4..9db767d1f 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java @@ -36,10 +36,10 @@ import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; */ public abstract class BaseVarWidthWriter extends BaseScalarWriter { - protected final OffsetVectorWriter offsetsWriter; + protected final OffsetVectorWriterImpl offsetsWriter; public BaseVarWidthWriter(UInt4Vector offsetVector) { - offsetsWriter = new OffsetVectorWriter(offsetVector); + offsetsWriter = new OffsetVectorWriterImpl(offsetVector); } @Override @@ -57,7 +57,7 @@ public abstract class BaseVarWidthWriter extends BaseScalarWriter { @Override public void startRow() { offsetsWriter.startRow(); } - protected final int writeIndex(final int width) { + protected final int prepareWrite(final int width) { // This is performance critical code; every operation counts. // Please be thoughtful when changing the code. diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java index 4a668c4ed..3f0cae3c4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java @@ -19,34 +19,28 @@ package org.apache.drill.exec.vector.accessor.writer; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.List; - import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.NullableVector; -import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ColumnAccessorUtils; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter; import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter; -import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter.TupleObjectWriter; -import org.apache.drill.exec.vector.accessor.writer.MapWriter.ArrayMapWriter; -import org.apache.drill.exec.vector.accessor.writer.MapWriter.DummyArrayMapWriter; -import org.apache.drill.exec.vector.accessor.writer.MapWriter.DummyMapWriter; -import org.apache.drill.exec.vector.accessor.writer.MapWriter.SingleMapWriter; import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter; import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter; -import org.apache.drill.exec.vector.complex.AbstractMapVector; -import org.apache.drill.exec.vector.complex.MapVector; -import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; /** * Gather generated writer classes into a set of class tables to allow rapid * run-time creation of writers. Builds the writer and its object writer * wrapper which binds the vector to the writer. + * <p> + * Compared to the reader factory, the writer factor is a bit more complex + * as it must handle both the projected ("real" writer) and unprojected + * ("dummy" writer) cases. Because of the way the various classes interact, + * it is cleaner to put the factory methods here rather than in the various + * writers, as is done in the case of the readers. */ @SuppressWarnings("unchecked") @@ -79,21 +73,38 @@ public class ColumnWriterFactory { default: switch (schema.mode()) { case OPTIONAL: - NullableVector nullableVector = (NullableVector) vector; - return NullableScalarWriter.build(schema, nullableVector, - newWriter(nullableVector.getValuesVector())); + return nullableScalarWriter(schema, (NullableVector) vector); case REQUIRED: - return new ScalarObjectWriter(schema, newWriter(vector)); + return requiredScalarWriter(schema, vector); case REPEATED: - RepeatedValueVector repeatedVector = (RepeatedValueVector) vector; - return ScalarArrayWriter.build(schema, repeatedVector, - newWriter(repeatedVector.getDataVector())); + return repeatedScalarWriter(schema, (RepeatedValueVector) vector); default: throw new UnsupportedOperationException(schema.mode().toString()); } } } + private static ScalarObjectWriter requiredScalarWriter( + ColumnMetadata schema, ValueVector vector) { + BaseScalarWriter baseWriter = newWriter(vector); + baseWriter.bindSchema(schema); + return new ScalarObjectWriter(baseWriter); + } + + private static ScalarObjectWriter nullableScalarWriter( + ColumnMetadata schema, NullableVector vector) { + BaseScalarWriter baseWriter = newWriter(vector.getValuesVector()); + baseWriter.bindSchema(schema); + return NullableScalarWriter.build(schema, vector, baseWriter); + } + + private static AbstractObjectWriter repeatedScalarWriter( + ColumnMetadata schema, RepeatedValueVector vector) { + BaseScalarWriter baseWriter = newWriter(vector.getDataVector()); + baseWriter.bindSchema(schema); + return ScalarArrayWriter.build(schema, vector, baseWriter); + } + /** * Build a writer for a non-projected column. * @param schema schema of the column @@ -104,20 +115,19 @@ public class ColumnWriterFactory { switch (schema.type()) { case GENERIC_OBJECT: case LATE: - case NULL: case LIST: case MAP: throw new UnsupportedOperationException(schema.type().toString()); default: - ScalarObjectWriter scalarWriter = new ScalarObjectWriter(schema, - new DummyScalarWriter()); + ScalarObjectWriter scalarWriter = new ScalarObjectWriter( + new DummyScalarWriter(schema)); switch (schema.mode()) { case OPTIONAL: case REQUIRED: return scalarWriter; case REPEATED: - return new ArrayObjectWriter(schema, - new DummyArrayWriter( + return new ArrayObjectWriter( + new DummyArrayWriter(schema, scalarWriter)); default: throw new UnsupportedOperationException(schema.mode().toString()); @@ -125,59 +135,6 @@ public class ColumnWriterFactory { } } - public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector vector, - List<AbstractObjectWriter> writers) { - MapWriter mapWriter; - if (schema.isProjected()) { - mapWriter = new SingleMapWriter(schema, vector, writers); - } else { - mapWriter = new DummyMapWriter(schema, writers); - } - return new TupleObjectWriter(schema, mapWriter); - } - - public static ArrayObjectWriter buildMapArray(ColumnMetadata schema, - UInt4Vector offsetVector, - List<AbstractObjectWriter> writers) { - MapWriter mapWriter; - if (schema.isProjected()) { - mapWriter = new ArrayMapWriter(schema, writers); - } else { - mapWriter = new DummyArrayMapWriter(schema, writers); - } - TupleObjectWriter mapArray = new TupleObjectWriter(schema, mapWriter); - AbstractArrayWriter arrayWriter; - if (schema.isProjected()) { - arrayWriter = new ObjectArrayWriter( - offsetVector, - mapArray); - } else { - arrayWriter = new DummyArrayWriter(mapArray); - } - return new ArrayObjectWriter(schema, arrayWriter); - } - - public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, - AbstractMapVector vector, - List<AbstractObjectWriter> writers) { - assert (vector != null) == schema.isProjected(); - if (! schema.isArray()) { - return buildMap(schema, (MapVector) vector, writers); - } else if (vector == null) { - return buildMapArray(schema, - null, writers); - } else { - return buildMapArray(schema, - ((RepeatedMapVector) vector).getOffsetVector(), - writers); - } - } - - public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, AbstractMapVector vector) { - assert schema.mapSchema().size() == 0; - return buildMapWriter(schema, vector, new ArrayList<AbstractObjectWriter>()); - } - public static BaseScalarWriter newWriter(ValueVector vector) { MajorType major = vector.getField().getType(); MinorType type = major.getMinorType(); diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java index af8daba58..915c9941d 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java @@ -17,11 +17,18 @@ */ package org.apache.drill.exec.vector.accessor.writer; +import java.util.ArrayList; import java.util.List; import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.ProjectionType; +import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; +import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter; +import org.apache.drill.exec.vector.complex.AbstractMapVector; import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; /** * Writer for a Drill Map type. Maps are actually tuples, just like rows. @@ -46,6 +53,7 @@ public abstract class MapWriter extends AbstractTupleWriter { @Override public int vectorIndex() { return baseIndex.vectorIndex(); } @Override public void nextElement() { } @Override public void rollover() { } + @Override public ColumnWriterIndex outerIndex() { return baseIndex.outerIndex(); } @@ -80,6 +88,8 @@ public abstract class MapWriter extends AbstractTupleWriter { public void endWrite() { super.endWrite(); + // A non repeated map has a field that holds the value count. + // Update it. (A repeated map uses the offset vector's value count.) // Special form of set value count: used only for // this class to avoid setting the value count of children. // Setting these counts was already done. Doing it again @@ -87,13 +97,14 @@ public abstract class MapWriter extends AbstractTupleWriter { // set the "lastSet" field of nullable vector accessors, // and the initial value of -1 will cause all values to // be overwritten. - // - // Note that the map vector can be null if there is no actual - // map vector represented by this writer. - if (mapVector != null) { - mapVector.setMapValueCount(vectorIndex.vectorIndex()); - } + mapVector.setMapValueCount(vectorIndex.vectorIndex()); + } + + @Override + public void preRollover() { + super.preRollover(); + mapVector.setMapValueCount(vectorIndex.rowStartIndex()); } } @@ -122,12 +133,6 @@ public abstract class MapWriter extends AbstractTupleWriter { bindIndex(index, new MemberWriterIndex(index)); } - - // In endWrite(), do not call setValueCount on the map vector. - // Doing so will zero-fill the composite vectors because - // the internal map state does not track the writer state. - // Instead, the code in this structure has set the value - // count for each composite vector individually. } protected static class DummyMapWriter extends MapWriter { @@ -136,6 +141,9 @@ public abstract class MapWriter extends AbstractTupleWriter { List<AbstractObjectWriter> writers) { super(schema, writers); } + + @Override + public ProjectionType projectionType(String columnName) { return ProjectionType.UNPROJECTED; } } protected static class DummyArrayMapWriter extends MapWriter { @@ -144,6 +152,9 @@ public abstract class MapWriter extends AbstractTupleWriter { List<AbstractObjectWriter> writers) { super(schema, writers); } + + @Override + public ProjectionType projectionType(String columnName) { return ProjectionType.UNPROJECTED; } } protected final ColumnMetadata mapColumnSchema; @@ -152,4 +163,61 @@ public abstract class MapWriter extends AbstractTupleWriter { super(schema.mapSchema(), writers); mapColumnSchema = schema; } + + public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector vector, + List<AbstractObjectWriter> writers) { + MapWriter mapWriter; + if (schema.isProjected()) { + + // Vector is not required for a map writer; the map's columns + // are written, but not the (non-array) map. + + mapWriter = new SingleMapWriter(schema, vector, writers); + } else { + assert vector == null; + mapWriter = new DummyMapWriter(schema, writers); + } + return new TupleObjectWriter(mapWriter); + } + + public static ArrayObjectWriter buildMapArray(ColumnMetadata schema, + UInt4Vector offsetVector, + List<AbstractObjectWriter> writers) { + MapWriter mapWriter; + if (schema.isProjected()) { + mapWriter = new ArrayMapWriter(schema, writers); + } else { + mapWriter = new DummyArrayMapWriter(schema, writers); + } + TupleObjectWriter mapArray = new TupleObjectWriter(mapWriter); + AbstractArrayWriter arrayWriter; + if (schema.isProjected()) { + arrayWriter = new ObjectArrayWriter(schema, + offsetVector, + mapArray); + } else { + arrayWriter = new DummyArrayWriter(schema, mapArray); + } + return new ArrayObjectWriter(arrayWriter); + } + + public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, + AbstractMapVector vector, + List<AbstractObjectWriter> writers) { + if (schema.isArray()) { + return MapWriter.buildMapArray(schema, + vector == null ? null : + ((RepeatedMapVector) vector).getOffsetVector(), writers); + } else { + return MapWriter.buildMap(schema, (MapVector) vector, writers); + } + } + + public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, AbstractMapVector vector) { + assert schema.mapSchema().size() == 0; + return buildMapWriter(schema, vector, new ArrayList<AbstractObjectWriter>()); + } + + @Override + public ColumnMetadata schema() { return mapColumnSchema; } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java index c5f798278..75ef57fc4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java @@ -70,15 +70,16 @@ public class NullableScalarWriter extends AbstractScalarWriter { private final BaseScalarWriter baseWriter; private ColumnWriterIndex writerIndex; - public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter baseWriter) { + public NullableScalarWriter(ColumnMetadata schema, NullableVector nullableVector, BaseScalarWriter baseWriter) { + this.schema = schema; isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector()); this.baseWriter = baseWriter; } public static ScalarObjectWriter build(ColumnMetadata schema, NullableVector nullableVector, BaseScalarWriter baseWriter) { - return new ScalarObjectWriter(schema, - new NullableScalarWriter(nullableVector, baseWriter)); + return new ScalarObjectWriter( + new NullableScalarWriter(schema, nullableVector, baseWriter)); } public BaseScalarWriter bitsWriter() { return isSetWriter; } @@ -98,7 +99,9 @@ public class NullableScalarWriter extends AbstractScalarWriter { } @Override - public ColumnWriterIndex writerIndex() { return baseWriter.writerIndex(); } + public int rowStartIndex() { + return baseWriter.rowStartIndex(); + } @Override public ValueType valueType() { @@ -112,6 +115,9 @@ public class NullableScalarWriter extends AbstractScalarWriter { } @Override + public boolean nullable() { return true; } + + @Override public void setNull() { isSetWriter.setInt(0); baseWriter.skipNulls(); @@ -212,7 +218,7 @@ public class NullableScalarWriter extends AbstractScalarWriter { public void endArrayValue() { // Skip calls for performance: they do nothing for // scalar writers -- the only kind supported here. -// isSetWriter.saveValue(); +// isSetWriter.endArrayValue(); baseWriter.endArrayValue(); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java index 3554a3be6..e2a1fc394 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java @@ -17,8 +17,10 @@ */ package org.apache.drill.exec.vector.accessor.writer; +import java.lang.reflect.Array; + +import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.UInt4Vector; -import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter; /** @@ -39,7 +41,7 @@ import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArra * with a map, then we have a single offset vector pointing into a group of * arrays. Consider the simple case of a map of three scalars. Here, we have * a hybrid of the states discussed for the {@link BaseScalarWriter} and those - * discussed for {@link OffsetVectorWriter}. That is, the offset vector + * discussed for {@link OffsetVectorWriterImpl}. That is, the offset vector * points into one map element. The individual elements can we Behind, * Written or Unwritten, depending on the specific actions taken by the * client. @@ -100,19 +102,15 @@ import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArra * The key reason to understand this flow is to understand what happens * in vector overflow: unlike an array of scalars, in which the data * vector can never be in the Behind state, when we have an array of - * maps then each vector can be in an of the scalar writer state. + * maps then each vector can be in any of the scalar writer states. */ public class ObjectArrayWriter extends BaseArrayWriter { - protected ObjectArrayWriter(UInt4Vector offsetVector, AbstractObjectWriter elementWriter) { - super(offsetVector, elementWriter); - } - - @Override - public void bindIndex(ColumnWriterIndex index) { + protected ObjectArrayWriter(ColumnMetadata schema, + UInt4Vector offsetVector, AbstractObjectWriter elementWriter) { + super(schema, offsetVector, elementWriter); elementIndex = new ArrayElementWriterIndex(); - super.bindIndex(index); } @Override @@ -122,22 +120,20 @@ public class ObjectArrayWriter extends BaseArrayWriter { } @Override - public void set(Object... values) { - setObject(values); - } - - @Override public void setObject(Object array) { - Object values[] = (Object[]) array; - for (int i = 0; i < values.length; i++) { - elementObjWriter.set(values[i]); + + // Null array = 0-length array + + if (array == null) { + return; + } + int size = Array.getLength(array); + for (int i = 0; i < size; i++) { + Object value = Array.get(array, i); + if (value != null) { + elementObjWriter.setObject(value); + } save(); } } - - @Override - public int lastWriteIndex() { - // Undefined for arrays - return 0; - } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java index 49d16a3e5..d733b4f49 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java @@ -17,279 +17,16 @@ */ package org.apache.drill.exec.vector.accessor.writer; -import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.UInt4Vector; -import org.apache.drill.exec.vector.accessor.ValueType; +import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; /** - * Specialized column writer for the (hidden) offset vector used - * with variable-length or repeated vectors. See comments in the - * <tt>ColumnAccessors.java</tt> template file for more details. - * <p> - * Note that the <tt>lastWriteIndex</tt> tracked here corresponds - * to the data values; it is one less than the actual offset vector - * last write index due to the nature of offset vector layouts. The selection - * of last write index basis makes roll-over processing easier as only this - * writer need know about the +1 translation required for writing. - * <p> - * The states illustrated in the base class apply here as well, - * remembering that the end offset for a row (or array position) - * is written one ahead of the vector index. - * <p> - * The vector index does create an interesting dynamic for the child - * writers. From the child writer's perspective, the states described in - * the super class are the only states of interest. Here we want to - * take the perspective of the parent. - * <p> - * The offset vector is an implementation of a repeat level. A repeat - * level can occur for a single array, or for a collection of columns - * within a repeated map. (A repeat level also occurs for variable-width - * fields, but this is a bit harder to see, so let's ignore that for - * now.) - * <p> - * The key point to realize is that each repeat level introduces an - * isolation level in terms of indexing. That is, empty values in the - * outer level have no affect on indexing in the inner level. In fact, - * the nature of a repeated outer level means that there are no empties - * in the inner level. - * <p> - * To illustrate:<pre><code> - * Offset Vector Data Vector Indexes - * lw, v > | 10 | - - - - - > | X | 10 - * | 12 | - - + | X | < lw' 11 - * | | + - - > | | < v' 12 - * </code></pre> - * In the above, the client has just written an array of two elements - * at the current write position. The data starts at offset 10 in - * the data vector, and the next write will be at 12. The end offset - * is written one ahead of the vector index. - * <p> - * From the data vector's perspective, its last-write (lw') reflects - * the last element written. If this is an array of scalars, then the - * write index is automatically incremented, as illustrated by v'. - * (For map arrays, the index must be incremented by calling - * <tt>save()</tt> on the map array writer.) - * <p> - * Suppose the client now skips some arrays:<pre><code> - * Offset Vector Data Vector - * lw > | 10 | - - - - - > | X | 10 - * | 12 | - - + | X | < lw' 11 - * | | + - - > | | < v' 12 - * | | | | 13 - * v > | | | | 14 - * </code></pre> - * The last write position does not move and there are gaps in the - * offset vector. The vector index points to the current row. Note - * that the data vector last write and vector indexes do not change, - * this reflects the fact that the the data vector's vector index - * (v') matches the tail offset - * <p> - * The - * client now writes a three-element vector:<pre><code> - * Offset Vector Data Vector - * | 10 | - - - - - > | X | 10 - * | 12 | - - + | X | 11 - * | 12 | - - + - - > | Y | 12 - * | 12 | - - + | Y | 13 - * lw, v > | 12 | - - + | Y | < lw' 14 - * | 15 | - - - - - > | | < v' 15 - * </code></pre> - * Quite a bit just happened. The empty offset slots were back-filled - * with the last write offset in the data vector. The client wrote - * three values, which advanced the last write and vector indexes - * in the data vector. And, the last write index in the offset - * vector also moved to reflect the update of the offset vector. - * Note that as a result, multiple positions in the offset vector - * point to the same location in the data vector. This is fine; we - * compute the number of entries as the difference between two successive - * offset vector positions, so the empty positions have become 0-length - * arrays. - * <p> - * Note that, for an array of scalars, when overflow occurs, - * we need only worry about two - * states in the data vector. Either data has been written for the - * row (as in the third example above), and so must be moved to the - * roll-over vector, or no data has been written and no move is - * needed. We never have to worry about missing values because the - * cannot occur in the data vector. - * <p> - * See {@link ObjectArrayWriter} for information about arrays of - * maps (arrays of multiple columns.) + * Interface for specialized operations on an offset vector. */ -public class OffsetVectorWriter extends AbstractFixedWidthWriter { - - private static final int VALUE_WIDTH = UInt4Vector.VALUE_WIDTH; - - private UInt4Vector vector; - - /** - * Offset of the first value for the current row. Used during - * overflow or if the row is restarted. - */ - - private int rowStartOffset; - - /** - * Cached value of the end offset for the current value. Used - * primarily for variable-width columns to allow the column to be - * rewritten multiple times within the same row. The start offset - * value is updated with the end offset only when the value is - * committed in {@link @endValue()}. - */ - - private int nextOffset; - - public OffsetVectorWriter(UInt4Vector vector) { - this.vector = vector; - } - - @Override public BaseDataValueVector vector() { return vector; } - @Override public int width() { return VALUE_WIDTH; } - - @Override - protected void realloc(int size) { - vector.reallocRaw(size); - setBuffer(); - } - - @Override - public ValueType valueType() { return ValueType.INTEGER; } - - @Override - public void startWrite() { - super.startWrite(); - nextOffset = 0; - rowStartOffset = 0; - - // Special handling for first value. Alloc vector if needed. - // Offset vectors require a 0 at position 0. The (end) offset - // for row 0 starts at position 1, which is handled in - // writeOffset() below. - - if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) { - realloc(MIN_BUFFER_SIZE); - } - vector.getBuffer().setInt(0, 0); - } - - public int nextOffset() { return nextOffset; } - public int rowStartOffset() { return rowStartOffset; } - - @Override - public void startRow() { rowStartOffset = nextOffset; } - - /** - * Return the write offset, which is one greater than the index reported - * by the vector index. - * - * @return the offset in which to write the current offset of the end - * of the current data value - */ - - protected final int writeIndex() { - - // "Fast path" for the normal case of no fills, no overflow. - // This is the only bounds check we want to do for the entire - // set operation. - - // This is performance critical code; every operation counts. - // Please be thoughtful when changing the code. - - final int valueIndex = vectorIndex.vectorIndex(); - int writeIndex = valueIndex + 1; - if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) { - writeIndex = prepareWrite(writeIndex); - } - - // Track the last write location for zero-fill use next time around. - // Recall, it is the value index, which is one less than the (end) - // offset index. - - lastWriteIndex = valueIndex; - return writeIndex; - } - - protected int prepareWrite(int writeIndex) { - - // Either empties must be filled or the vector is full. - - resize(writeIndex); - - // Call to resize may cause rollover, so reset write index - // afterwards. - - final int valueIndex = vectorIndex.vectorIndex(); - - // Fill empties to the write position. - // Fill empties works off the row index, not the write - // index. The write index is one past the row index. - // (Yes, this is complex...) - - fillEmpties(valueIndex); - return valueIndex + 1; - } - - @Override - protected final void fillEmpties(final int valueIndex) { - while (lastWriteIndex < valueIndex - 1) { - drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset); - } - } - - public final void setNextOffset(final int newOffset) { - final int writeIndex = writeIndex(); - drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset); - nextOffset = newOffset; - } - - @Override - public void skipNulls() { - - // Nothing to do. Fill empties logic will fill in missing - // offsets. - } - - @Override - public void restartRow() { - nextOffset = rowStartOffset; - super.restartRow(); - } - - @Override - public void preRollover() { - setValueCount(vectorIndex.rowStartIndex() + 1); - } - - @Override - public void postRollover() { - final int newNext = nextOffset - rowStartOffset; - super.postRollover(); - nextOffset = newNext; - } - - @Override - public void setValueCount(int valueCount) { - - // Slightly different version of the fixed-width writer - // code. Offset counts are one greater than the value count. - // But for all other purposes, we track the value (row) - // position, not the offset position. - - mandatoryResize(valueCount); - fillEmpties(valueCount); - vector().getBuffer().writerIndex((valueCount + 1) * width()); - lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1); - } - - @Override - public void dump(HierarchicalFormatter format) { - format.extend(); - super.dump(format); - format - .attribute("lastWriteIndex", lastWriteIndex) - .attribute("nextOffset", nextOffset) - .endObject(); - } +public interface OffsetVectorWriter extends ScalarWriter, WriterEvents { + int rowStartOffset(); + int nextOffset(); + void setNextOffset(int vectorIndex); + void dump(HierarchicalFormatter format); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java new file mode 100644 index 000000000..edcb9f584 --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor.writer; + +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.accessor.ValueType; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; + +/** + * Specialized column writer for the (hidden) offset vector used + * with variable-length or repeated vectors. See comments in the + * <tt>ColumnAccessors.java</tt> template file for more details. + * <p> + * Note that the <tt>lastWriteIndex</tt> tracked here corresponds + * to the data values; it is one less than the actual offset vector + * last write index due to the nature of offset vector layouts. The selection + * of last write index basis makes roll-over processing easier as only this + * writer need know about the +1 translation required for writing. + * <p> + * The states illustrated in the base class apply here as well, + * remembering that the end offset for a row (or array position) + * is written one ahead of the vector index. + * <p> + * The vector index does create an interesting dynamic for the child + * writers. From the child writer's perspective, the states described in + * the super class are the only states of interest. Here we want to + * take the perspective of the parent. + * <p> + * The offset vector is an implementation of a repeat level. A repeat + * level can occur for a single array, or for a collection of columns + * within a repeated map. (A repeat level also occurs for variable-width + * fields, but this is a bit harder to see, so let's ignore that for + * now.) + * <p> + * The key point to realize is that each repeat level introduces an + * isolation level in terms of indexing. That is, empty values in the + * outer level have no affect on indexing in the inner level. In fact, + * the nature of a repeated outer level means that there are no empties + * in the inner level. + * <p> + * To illustrate:<pre><code> + * Offset Vector Data Vector Indexes + * lw, v > | 10 | - - - - - > | X | 10 + * | 12 | - - + | X | < lw' 11 + * | | + - - > | | < v' 12 + * </code></pre> + * In the above, the client has just written an array of two elements + * at the current write position. The data starts at offset 10 in + * the data vector, and the next write will be at 12. The end offset + * is written one ahead of the vector index. + * <p> + * From the data vector's perspective, its last-write (lw') reflects + * the last element written. If this is an array of scalars, then the + * write index is automatically incremented, as illustrated by v'. + * (For map arrays, the index must be incremented by calling + * <tt>save()</tt> on the map array writer.) + * <p> + * Suppose the client now skips some arrays:<pre><code> + * Offset Vector Data Vector + * lw > | 10 | - - - - - > | X | 10 + * | 12 | - - + | X | < lw' 11 + * | | + - - > | | < v' 12 + * | | | | 13 + * v > | | | | 14 + * </code></pre> + * The last write position does not move and there are gaps in the + * offset vector. The vector index points to the current row. Note + * that the data vector last write and vector indexes do not change, + * this reflects the fact that the the data vector's vector index + * (v') matches the tail offset + * <p> + * The + * client now writes a three-element vector:<pre><code> + * Offset Vector Data Vector + * | 10 | - - - - - > | X | 10 + * | 12 | - - + | X | 11 + * | 12 | - - + - - > | Y | 12 + * | 12 | - - + | Y | 13 + * lw, v > | 12 | - - + | Y | < lw' 14 + * | 15 | - - - - - > | | < v' 15 + * </code></pre> + * Quite a bit just happened. The empty offset slots were back-filled + * with the last write offset in the data vector. The client wrote + * three values, which advanced the last write and vector indexes + * in the data vector. And, the last write index in the offset + * vector also moved to reflect the update of the offset vector. + * Note that as a result, multiple positions in the offset vector + * point to the same location in the data vector. This is fine; we + * compute the number of entries as the difference between two successive + * offset vector positions, so the empty positions have become 0-length + * arrays. + * <p> + * Note that, for an array of scalars, when overflow occurs, + * we need only worry about two + * states in the data vector. Either data has been written for the + * row (as in the third example above), and so must be moved to the + * roll-over vector, or no data has been written and no move is + * needed. We never have to worry about missing values because the + * cannot occur in the data vector. + * <p> + * See {@link ObjectArrayWriter} for information about arrays of + * maps (arrays of multiple columns.) + */ + +public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements OffsetVectorWriter { + + private static final int VALUE_WIDTH = UInt4Vector.VALUE_WIDTH; + + private final UInt4Vector vector; + + /** + * Offset of the first value for the current row. Used during + * overflow or if the row is restarted. + */ + + private int rowStartOffset; + + /** + * Cached value of the end offset for the current value. Used + * primarily for variable-width columns to allow the column to be + * rewritten multiple times within the same row. The start offset + * value is updated with the end offset only when the value is + * committed in {@link @endValue()}. + */ + + private int nextOffset; + + public OffsetVectorWriterImpl(UInt4Vector vector) { + this.vector = vector; + } + + @Override public BaseDataValueVector vector() { return vector; } + @Override public int width() { return VALUE_WIDTH; } + + @Override + protected void realloc(int size) { + vector.reallocRaw(size); + setBuffer(); + } + + @Override + public ValueType valueType() { return ValueType.INTEGER; } + + @Override + public void startWrite() { + super.startWrite(); + nextOffset = 0; + rowStartOffset = 0; + + // Special handling for first value. Alloc vector if needed. + // Offset vectors require a 0 at position 0. The (end) offset + // for row 0 starts at position 1, which is handled in + // writeOffset() below. + + if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) { + realloc(MIN_BUFFER_SIZE); + } + drillBuf.setInt(0, 0); + } + + @Override + public int nextOffset() { return nextOffset; } + + @Override + public int rowStartOffset() { return rowStartOffset; } + + @Override + public void startRow() { rowStartOffset = nextOffset; } + + /** + * Return the write offset, which is one greater than the index reported + * by the vector index. + * + * @return the offset in which to write the current offset of the end + * of the current data value + */ + + protected final int prepareWrite() { + + // "Fast path" for the normal case of no fills, no overflow. + // This is the only bounds check we want to do for the entire + // set operation. + + // This is performance critical code; every operation counts. + // Please be thoughtful when changing the code. + + final int valueIndex = vectorIndex.vectorIndex(); + int writeIndex = valueIndex + 1; + if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) { + writeIndex = prepareWrite(writeIndex); + } + + // Track the last write location for zero-fill use next time around. + + lastWriteIndex = valueIndex; + return writeIndex; + } + + protected int prepareWrite(int writeIndex) { + + // Either empties must be filed or the vector is full. + + resize(writeIndex); + + // Call to resize may cause rollover, so reset write index + // afterwards. + + final int valueIndex = vectorIndex.vectorIndex(); + + // Fill empties to the write position. + // Fill empties works of the row index, not the write + // index. (Yes, this is complex...) + + fillEmpties(valueIndex); + return valueIndex + 1; + } + + @Override + protected final void fillEmpties(final int valueIndex) { + while (lastWriteIndex < valueIndex - 1) { + drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset); + } + } + + @Override + public final void setNextOffset(final int newOffset) { + final int writeIndex = prepareWrite(); + drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset); + nextOffset = newOffset; + } + + @Override + public void skipNulls() { + + // Nothing to do. Fill empties logic will fill in missing + // offsets. + } + + @Override + public void restartRow() { + nextOffset = rowStartOffset; + super.restartRow(); + } + + @Override + public void preRollover() { + + // Rollover is occurring. This means the current row is not complete. + // We want to keep 0..(row index - 1) which gives us (row index) + // rows. But, this being an offset vector, we add one to account + // for the extra 0 value at the start. + + setValueCount(vectorIndex.rowStartIndex() + 1); + } + + @Override + public void postRollover() { + final int newNext = nextOffset - rowStartOffset; + super.postRollover(); + nextOffset = newNext; + } + + @Override + public void setValueCount(int valueCount) { + mandatoryResize(valueCount); + + // Value count are in offset vector positions. Fill empties + // works in row positions. + + fillEmpties(valueCount); + vector().getBuffer().writerIndex((valueCount + 1) * VALUE_WIDTH); + } + + @Override + public void dump(HierarchicalFormatter format) { + format.extend(); + super.dump(format); + format + .attribute("lastWriteIndex", lastWriteIndex) + .attribute("nextOffset", nextOffset) + .endObject(); + } +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java index 8382ad17a..2ac7d45b5 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java @@ -17,11 +17,11 @@ */ package org.apache.drill.exec.vector.accessor.writer; +import java.lang.reflect.Array; import java.math.BigDecimal; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; -import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter; import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter; import org.apache.drill.exec.vector.complex.RepeatedValueVector; @@ -64,14 +64,14 @@ public class ScalarArrayWriter extends BaseArrayWriter { public ScalarArrayWriter(ColumnMetadata schema, RepeatedValueVector vector, BaseScalarWriter elementWriter) { - super(vector.getOffsetVector(), - new ScalarObjectWriter(schema, elementWriter)); + super(schema, vector.getOffsetVector(), + new ScalarObjectWriter(elementWriter)); this.elementWriter = elementWriter; } public static ArrayObjectWriter build(ColumnMetadata schema, RepeatedValueVector repeatedVector, BaseScalarWriter elementWriter) { - return new ArrayObjectWriter(schema, + return new ArrayObjectWriter( new ScalarArrayWriter(schema, repeatedVector, elementWriter)); } @@ -83,25 +83,28 @@ public class ScalarArrayWriter extends BaseArrayWriter { } @Override - public void bindListener(ColumnWriterListener listener) { - elementWriter.bindListener(listener); - } - - @Override public void save() { // No-op: done when writing each scalar value + // May be called, however, by code that also supports + // lists as a list does require an explicit save. } - @Override - public void set(Object... values) { - for (Object value : values) { - entry().set(value); - } - } + /** + * Set a repeated vector based on a Java array of the proper + * type. This function involves parsing the array type and so is + * suitable only for test code. The array can be either a primitive + * (<tt>int [], say</tt>) or a typed array of boxed values + * (<tt>Integer[], say</tt>). + */ @Override public void setObject(Object array) { - if (array == null) { + + // Accept an empty array (of any type) to mean + // an empty array of the type of this writer. + + if (array == null || Array.getLength(array) == 0) { + // Assume null means a 0-element array since Drill does // not support null for the whole array. @@ -109,7 +112,9 @@ public class ScalarArrayWriter extends BaseArrayWriter { } String objClass = array.getClass().getName(); if (! objClass.startsWith("[")) { - throw new IllegalArgumentException("Argument must be an array"); + throw new IllegalArgumentException( + String.format("Argument must be an array. Column `%s`, value = %s", + schema.name(), array.toString())); } // Figure out type @@ -125,39 +130,50 @@ public class ScalarArrayWriter extends BaseArrayWriter { setBytesArray((byte[][]) array); break; default: - throw new IllegalArgumentException( "Unknown Java array type: " + objClass ); + throw new IllegalArgumentException( + String.format("Unknown Java array type: %s, for column `%s`", + objClass, schema.name())); } break; + case 'B': + setByteArray((byte[]) array); + break; case 'S': - setShortArray((short[]) array ); + setShortArray((short[]) array); break; case 'I': - setIntArray((int[]) array ); + setIntArray((int[]) array); break; case 'J': - setLongArray((long[]) array ); + setLongArray((long[]) array); break; case 'F': - setFloatArray((float[]) array ); + setFloatArray((float[]) array); break; case 'D': - setDoubleArray((double[]) array ); + setDoubleArray((double[]) array); break; case 'Z': - setBooleanArray((boolean[]) array ); + setBooleanArray((boolean[]) array); break; case 'L': int posn = objClass.indexOf(';'); // If the array is of type Object, then we have no type info. - String memberClassName = objClass.substring( 2, posn ); + String memberClassName = objClass.substring(2, posn); if (memberClassName.equals(String.class.getName())) { - setStringArray((String[]) array ); + setStringArray((String[]) array); } else if (memberClassName.equals(Period.class.getName())) { - setPeriodArray((Period[]) array ); + setPeriodArray((Period[]) array); } else if (memberClassName.equals(BigDecimal.class.getName())) { - setBigDecimalArray((BigDecimal[]) array ); + setBigDecimalArray((BigDecimal[]) array); + } else if (memberClassName.equals(Integer.class.getName())) { + setIntObjectArray((Integer[]) array); + } else if (memberClassName.equals(Long.class.getName())) { + setLongObjectArray((Long[]) array); + } else if (memberClassName.equals(Double.class.getName())) { + setDoubleObjectArray((Double[]) array); } else { throw new IllegalArgumentException( "Unknown Java array type: " + memberClassName ); } @@ -179,6 +195,12 @@ public class ScalarArrayWriter extends BaseArrayWriter { } } + public void setByteArray(byte[] value) { + for (int i = 0; i < value.length; i++) { + elementWriter.setInt(value[i]); + } + } + public void setShortArray(short[] value) { for (int i = 0; i < value.length; i++) { elementWriter.setInt(value[i]); @@ -191,12 +213,34 @@ public class ScalarArrayWriter extends BaseArrayWriter { } } + public void setIntObjectArray(Integer[] value) { + for (int i = 0; i < value.length; i++) { + Integer element = value[i]; + if (element == null) { + elementWriter.setNull(); + } else { + elementWriter.setInt(element); + } + } + } + public void setLongArray(long[] value) { for (int i = 0; i < value.length; i++) { elementWriter.setLong(value[i]); } } + public void setLongObjectArray(Long[] value) { + for (int i = 0; i < value.length; i++) { + Long element = value[i]; + if (element == null) { + elementWriter.setNull(); + } else { + elementWriter.setLong(element); + } + } + } + public void setFloatArray(float[] value) { for (int i = 0; i < value.length; i++) { elementWriter.setDouble(value[i]); @@ -209,6 +253,17 @@ public class ScalarArrayWriter extends BaseArrayWriter { } } + public void setDoubleObjectArray(Double[] value) { + for (int i = 0; i < value.length; i++) { + Double element = value[i]; + if (element == null) { + elementWriter.setNull(); + } else { + elementWriter.setDouble(element); + } + } + } + public void setStringArray(String[] value) { for (int i = 0; i < value.length; i++) { elementWriter.setString(value[i]); diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java index 7566f2861..c8f9f4896 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java @@ -40,6 +40,38 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; public interface WriterEvents { /** + * Tracks the write state of a tuple or variant to allow applying the correct + * operations to newly-added columns to synchronize them with the rest + * of the writers. + */ + + enum State { + + /** + * No write is in progress. Nothing need be done to newly-added + * writers. + */ + IDLE, + + /** + * <tt>startWrite()</tt> has been called to start a write operation + * (start a batch), but <tt>startValue()</tt> has not yet been called + * to start a row (or value within an array). <tt>startWrite()</tt> must + * be called on newly added columns. + */ + + IN_WRITE, + + /** + * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on + * the tuple to prepare for writing values, and both must be called on + * newly-added vectors. + */ + + IN_ROW + } + + /** * Bind the writer to a writer index. * * @param index the writer index (top level or nested for @@ -48,8 +80,6 @@ public interface WriterEvents { void bindIndex(ColumnWriterIndex index); - ColumnWriterIndex writerIndex(); - /** * Start a write (batch) operation. Performs any vector initialization * required at the start of a batch (especially for offset vectors.) @@ -113,15 +143,4 @@ public interface WriterEvents { */ void postRollover(); - - /** - * Return the last write position in the vector. This may be the - * same as the writer index position (if the vector was written at - * that point), or an earlier point. In either case, this value - * points to the last valid value in the vector. - * - * @return index of the last valid value in the vector - */ - - int lastWriteIndex(); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java index 7c9f8ba80..3d64bb6b7 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java @@ -17,9 +17,8 @@ */ package org.apache.drill.exec.vector.accessor.writer.dummy; +import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; -import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; -import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter; import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter; @@ -35,30 +34,37 @@ import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter; */ public class DummyArrayWriter extends AbstractArrayWriter { + public static class DummyOffsetVectorWriter extends DummyScalarWriter implements OffsetVectorWriter { + + public DummyOffsetVectorWriter() { + super(null); + } + + @Override + public int rowStartOffset() { return 0; } + + @Override + public int nextOffset() { return 0; } + + @Override + public void setNextOffset(int vectorIndex) { } + } + + public static final DummyOffsetVectorWriter offsetVectorWriter = new DummyOffsetVectorWriter(); + public DummyArrayWriter( + ColumnMetadata schema, AbstractObjectWriter elementWriter) { - super(elementWriter); + super(schema, elementWriter, offsetVectorWriter); } @Override - public int size() { return 0; } - - @Override public void save() { } @Override - public void set(Object... values) { } - - @Override public void setObject(Object array) { } @Override - public void bindIndex(ColumnWriterIndex index) { } - - @Override - public ColumnWriterIndex writerIndex() { return null; } - - @Override public void startWrite() { } @Override @@ -86,11 +92,5 @@ public class DummyArrayWriter extends AbstractArrayWriter { public int lastWriteIndex() { return 0; } @Override - public void bindListener(ColumnWriterListener listener) { } - - @Override - public void bindListener(TupleWriterListener listener) { } - - @Override - public OffsetVectorWriter offsetWriter() { return null; } + public void bindIndex(ColumnWriterIndex index) { } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java index e8272d62c..9cfe56e22 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.vector.accessor.writer.dummy; import java.math.BigDecimal; +import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ValueType; @@ -33,11 +34,18 @@ import org.joda.time.Period; public class DummyScalarWriter extends AbstractScalarWriter { + public DummyScalarWriter(ColumnMetadata schema) { + this.schema = schema; + } + @Override public void bindListener(ColumnWriterListener listener) { } @Override - public ValueType valueType() { return null; } + public ValueType valueType() { return ValueType.NULL; } + + @Override + public boolean nullable() { return true; } @Override public void setNull() { } @@ -67,9 +75,6 @@ public class DummyScalarWriter extends AbstractScalarWriter { public void bindIndex(ColumnWriterIndex index) { } @Override - public ColumnWriterIndex writerIndex() { return null; } - - @Override public void restartRow() { } @Override @@ -86,4 +91,7 @@ public class DummyScalarWriter extends AbstractScalarWriter { @Override public BaseDataValueVector vector() { return null; } + + @Override + public int rowStartIndex() { return 0; } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java index 5e827faa6..9efc59cb5 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java @@ -104,11 +104,18 @@ * </ul> * <p> * This data model is similar to; but has important differences from, the prior, - * generated, readers and writers. + * generated, readers and writers. This version is based on the concept of + * minimizing the number of writer classes, and leveraging Java primitives to + * keep the number of get/set methods to a reasonable size. This version also + * automates vector allocation, vector overflow and so on. * <p> * The object layer is new: it is the simplest way to model the three "object * types." An app using this code would use just the leaf scalar readers and * writers. + * <p> + * Similarly, the {@link ColumnWriter} interface provides a uniform way to + * access services common to all writer types, while allowing each JSON-like + * writer to provide type-specific ways to access data. * * <h4>Writer Performance</h4> * @@ -145,6 +152,94 @@ * vector and works the same for repeated scalars, repeated maps and * (eventually) lists and repeated lists.</li> * </ul> + * + * <h4>Possible Improvements</h4> + * + * The code here works and has extensive unit tests. But, many improvements + * are possible: + * <ul> + * <li>Drill has four "container" vector types (Map, Union, List, Repeated + * List), each with its own quirky semantics. The code could be far simpler + * if the container semantics were unified.</li> + * <li>Similarly, the corresponding writers implement some very awkward + * logic to handle union and list containers. But, these vectors are not + * fully supported in Drill. This means that the code implements (and tests) + * many odd cases which no one may ever use. Better to limit the data types + * that Drill supports, implement those well, and deprecate the obscure + * cases.</li> + * <li>The same schema-parsing logic appears over and over in different + * guises. Some parse vectors, some parse batch schemas, others parse the + * column metadata (which provides information not available in the + * materialized field) and so on. Would be great to find some common way + * to do this work, perhaps in the form of a visitor. An earlier version of + * this code tried using visitors. But, since each representation has its + * own quirks, that approach didn't work out. A first step would be to come + * up with a standard schema description which can be used in all cases, + * then build a visitor on that.</li> + * <li>Many tests exist. But, the careful reader will note that Drill's + * vectors define a potentially very complex recursive structure (most + * variations of which are never used.) Additional testing should cover + * all cases, such as repeated lists that contain unions, or unions that + * contain repeated lists of tuples.</li> + * <li>Experience with the code may show additional redundancies that can + * be removed. Each fresh set of eyes may see things that prior folks + * missed.</li> + * </ul> + * + * <h4>Caveats</h4> + * + * The column accessors are divided into two packages: <tt>vector</tt> and + * <tt>java-exec</tt>. It is easy to add functionality in the wrong place, + * breaking abstraction and encapsulation. Here are some general guidelines: + * <ul> + * <li>The core reader and writer logic is implemented in this <tt>vector</tt> + * package. This package provides low-level tools to build accessors, but + * not the construction logic itself. (That is found in the <tt>java-exec</tt> + * layer.)</li> + * <li>The vector layer is designed to support both the simple "row set" and + * the more complex "result set loader" implementations.</li> + * <li>The "row set" layer wraps the accessors in tools that work on one batch + * (row set) at a time, without regard for schema versions, schema changes + * and the like. The row set layer is primarily for testing: building an input + * batch for some operator, and verifying an output batch. It also serves as a + * simple test framework for the accessors, without the complexity of other + * layers.</li> + * <li>The "result set loader" layer handles the complexity of dynamic schema + * evolution, schema versioning, vector overflow and projection. It provides + * an "industrial strength" version of the accessor mechanism intended for + * use in the scan operator (but which can be generalized for other operators.) + * </li> + * <li>The "listener" pattern is used to allow higher levels to "plug in" + * functionality to the accessor layer. This is especially true for schema + * evolution: listeners notify the higher layer to add a column, delegating + * the actual work to that higher layer.</li> + * </ul> + * + * Given all this, plan carefully where to make any improvement. If your change + * violates the dependencies below, perhaps reconsider another way to do the + * change. + * <code><pre> + * +------------+ + * +-------------------------- | Result Set | + * v | Loader | + * +----------------+ +---------+ +------------+ + * | Metadata | <-- | Row Set | | + * | Implementation | | Tools | | + * +----------------+ +---------+ | + * java-exec | | | + * ------------------- | ------------------- | ------ | ------------ + * vector v v v + * +------------+ +-----------+ + * | Metadata | <--------- | Column | + * | Interfaces | | Accessors | + * +------------+ +-----------+ + * | + * v + * +---------+ + * | Value | + * | Vectors | + * +---------+ + * </pre></code> */ package org.apache.drill.exec.vector.accessor.writer; |