aboutsummaryrefslogtreecommitdiff
path: root/exec/vector
diff options
context:
space:
mode:
authorPaul Rogers <progers@cloudera.com>2018-04-16 21:44:10 -0700
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-04-29 23:20:55 +0300
commitdbff1646601db234a6606d400d5630db4deee192 (patch)
tree29ac669179399cecbaea688d2ec83790572b9f06 /exec/vector
parent883c8d94b0021a83059fa79563dd516c4299b70a (diff)
DRILL-6335: Column accessor refactoring
closes #1218
Diffstat (limited to 'exec/vector')
-rw-r--r--exec/vector/src/main/codegen/templates/ColumnAccessors.java65
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java2
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java73
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java128
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java53
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java9
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java52
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/WriterPosition.java58
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java196
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java18
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java45
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java59
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java115
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java40
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java6
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java111
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java92
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java16
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java44
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java277
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java299
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java111
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java45
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java44
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java16
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java97
26 files changed, 1227 insertions, 844 deletions
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
index 4836099b1..d0a2ace6f 100644
--- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java
+++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
@@ -268,69 +268,58 @@ public class ColumnAccessors {
</#if>
<@getType drillType label />
- <#if accessorType == "byte[]">
- <#assign args = ", int len">
- <#else>
- <#assign args = "">
- </#if>
- <#if javaType == "char">
- <#assign putType = "short" />
- <#assign doCast = true />
- <#else>
- <#assign putType = javaType />
- <#assign doCast = (cast == "set") />
- </#if>
<#if ! varWidth>
</#if>
@Override
- public final void set${label}(final ${accessorType} value${args}) {
+ public final void set${label}(final ${accessorType} value${putArgs}) {
<#-- Must compute the write offset first; can't be inline because the
writeOffset() function has a side effect of possibly changing the buffer
address (bufAddr). -->
- <#if varWidth>
- final int offset = writeIndex(len);
- <#else>
- final int writeIndex = writeIndex();
- <#assign putAddr = "writeIndex * VALUE_WIDTH">
+ <#if ! varWidth>
+ final int writeOffset = prepareWrite();
+ <#assign putOffset = "writeOffset * VALUE_WIDTH">
</#if>
<#if varWidth>
+ final int offset = prepareWrite(len);
drillBuf.setBytes(offset, value, 0, len);
offsetsWriter.setNextOffset(offset + len);
<#elseif drillType == "Decimal9">
- drillBuf.setInt(${putAddr},
+ drillBuf.setInt(${putOffset},
DecimalUtility.getDecimal9FromBigDecimal(value,
- type.getScale(), type.getPrecision()));
+ type.getScale(), type.getPrecision()));
<#elseif drillType == "Decimal18">
- drillBuf.setLong(${putAddr},
+ drillBuf.setLong(${putOffset},
DecimalUtility.getDecimal18FromBigDecimal(value,
- type.getScale(), type.getPrecision()));
+ type.getScale(), type.getPrecision()));
<#elseif drillType == "Decimal38Sparse">
<#-- Hard to optimize this case. Just use the available tools. -->
- DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH,
- type.getScale(), type.getPrecision(), 6);
+ DecimalUtility.getSparseFromBigDecimal(value, drillBuf,
+ ${putOffset},
+ type.getScale(), type.getPrecision(), 6);
<#elseif drillType == "Decimal28Sparse">
<#-- Hard to optimize this case. Just use the available tools. -->
- DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH,
- type.getScale(), type.getPrecision(), 5);
+ DecimalUtility.getSparseFromBigDecimal(value, drillBuf,
+ ${putOffset},
+ type.getScale(), type.getPrecision(), 5);
<#elseif drillType == "IntervalYear">
- drillBuf.setInt(${putAddr},
- value.getYears() * 12 + value.getMonths());
+ drillBuf.setInt(${putOffset},
+ value.getYears() * 12 + value.getMonths());
<#elseif drillType == "IntervalDay">
- final int offset = ${putAddr};
- drillBuf.setInt(offset, value.getDays());
- drillBuf.setInt(offset + 4, DateUtilities.periodToMillis(value));
+ final int offset = ${putOffset};
+ drillBuf.setInt(offset, value.getDays());
+ drillBuf.setInt(offset + ${minor.millisecondsOffset}, DateUtilities.periodToMillis(value));
<#elseif drillType == "Interval">
- final int offset = ${putAddr};
- drillBuf.setInt(offset, value.getYears() * 12 + value.getMonths());
- drillBuf.setInt(offset + 4, value.getDays());
- drillBuf.setInt(offset + 8, DateUtilities.periodToMillis(value));
+ final int offset = ${putOffset};
+ drillBuf.setInt(offset, DateUtilities.periodToMonths(value));
+ drillBuf.setInt(offset + ${minor.daysOffset}, value.getDays());
+ drillBuf.setInt(offset + ${minor.millisecondsOffset}, DateUtilities.periodToMillis(value));
<#elseif drillType == "Float4">
- drillBuf.setInt(${putAddr}, Float.floatToRawIntBits((float) value));
+ drillBuf.setInt(${putOffset}, Float.floatToRawIntBits((float) value));
<#elseif drillType == "Float8">
- drillBuf.setLong(${putAddr}, Double.doubleToRawLongBits(value));
+ drillBuf.setLong(${putOffset}, Double.doubleToRawLongBits(value));
<#else>
- drillBuf.set${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value);
+ drillBuf.set${putType?cap_first}(${putOffset}, <#if doCast>(${putType}) </#if>value);
</#if>
vectorIndex.nextElement();
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
index 49726056c..12f0f110e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
@@ -24,4 +24,4 @@ public enum ProjectionType {
TUPLE, // x.y
ARRAY, // x[0]
TUPLE_ARRAY // x[0].y
-} \ No newline at end of file
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
index 49a1e7770..c15791ab9 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
@@ -22,10 +22,42 @@ package org.apache.drill.exec.vector.accessor;
* each call to a <tt>setFoo()</tt> method writes a value and advances the array
* index.
* <p>
- * {@see ArrayReader}
+ * The array writer represents a Drill repeated type, including repeated maps.
+ * The array writer also represents the Drill list and repeated list types as
+ * follows:
+ * <ul>
+ * <li>A repeated scalar type is presented as an array writer with scalar
+ * entries. As a convenience, writing to the scalar automatically advances
+ * the current array write position, since exactly one item can be written
+ * per array entry.</li>
+ * <li>A repeated map type is presented as an array writer with tuple
+ * entries. The client must advance the array write position explicitly since
+ * a tuple can have any number of entries and the array writer cannot determine
+ * when a value is complete.</li>
+ * <li>A list type is presented as an array of variant entries. The client
+ * must explicitly advance the array position.</li>
+ * <li>A repeated list type is presented as an array of arrays of variants.
+ * The client advances the array position of both lists.</li>
+ * <li>Lists of repeated lists have three levels of arrays, repeated lists
+ * of repeated lists have four levels of arrays, and so on.</li>
+ * </ul>
+ * <p>
+ * Although the list vector supports a union of any Drill type, the only sane
+ * combinations are:
+ * <ul>
+ * <li>One of a (single or repeated) (map or list), or</li>
+ * <li>One or more scalar type.</li>
+ * </ul>
+ *
+ * If a particular array has only one type (single/repeated map/list), then,
+ * for convenience, the caller can directly request a writer of that type
+ * without having to first retrieve the variant (although the indirect
+ * route is, of course, available.)
+ *
+ * @see {@link ArrayReader}
*/
-public interface ArrayWriter {
+public interface ArrayWriter extends ColumnWriter {
/**
* Number of elements written thus far to the array.
@@ -35,20 +67,22 @@ public interface ArrayWriter {
int size();
/**
- * The object type of the list entry. All entries have the same
- * type.
- * @return the object type of each entry
- */
-
- ObjectWriter entry();
-
- /**
* Return a generic object writer for the array entry.
*
* @return generic object reader
*/
ObjectType entryType();
+
+ void setNull(boolean isNull);
+
+ /**
+ * The object type of the list entry. All entries have the same
+ * type.
+ * @return the object type of each entry
+ */
+
+ ObjectWriter entry();
ScalarWriter scalar();
TupleWriter tuple();
ArrayWriter array();
@@ -60,23 +94,4 @@ public interface ArrayWriter {
*/
void save();
-
- /**
- * Write the values of an array from a list of arguments.
- * @param values values for each array element
- * @throws VectorOverflowException
- */
- void set(Object ...values);
-
- /**
- * Write the array given an array of values. The type of array must match
- * the type of element in the array. That is, if the value is an <tt>int</tt>,
- * provide an <tt>int[]</tt> array.
- *
- * @param array array of values to write
- * @throws VectorOverflowException
- */
-
- void setObject(Object array);
-// void setList(List<? extends Object> list);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
new file mode 100644
index 000000000..5d1e79fbc
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
+import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
+
+/**
+ * Generic information about a column writer including:
+ * <ul>
+ * <li>Metadata</li>
+ * <li>Write position information about a writer needed by a vector overflow
+ * implementation. Hides the details of implementation and the writer class
+ * hierarchy, exposing just the required write position information.</li>
+ * <li>Generic methods for writing to the object, primarily used for
+ * testing.</li>
+ */
+
+public interface ColumnWriter extends WriterPosition {
+
+ interface TupleListenable {
+
+ /**
+ * Bind a listener to the underlying map or map array column. Not valid if the
+ * underlying writer is a scalar or scalar array.
+ *
+ * @param listener
+ * the tuple listener to bind
+ */
+
+ void bindListener(TupleWriterListener listener);
+ }
+
+ interface ScalarListenable {
+ /**
+ * Bind a listener to the underlying scalar column, or array of scalar
+ * columns. Not valid if the underlying writer is a map or array of maps.
+ *
+ * @param listener
+ * the column listener to bind
+ */
+
+ void bindListener(ColumnWriterListener listener);
+ }
+
+ /**
+ * Return the object (structure) type of this writer.
+ *
+ * @return type indicating if this is a scalar, tuple or array
+ */
+
+ ObjectType type();
+
+ /**
+ * Whether this writer allows nulls. This is not as simple as checking
+ * for the {@link DataMode#OPTIONAL} type in the schema. List entries
+ * are nullable, if they are primitive, but not if they are maps or lists.
+ * Unions are nullable, regardless of cardinality.
+ *
+ * @return true if a call to {@link #setNull()} is supported, false
+ * if not
+ */
+
+ boolean nullable();
+
+ /**
+ * Returns the schema of the column associated with this writer.
+ *
+ * @return schema for this writer's column
+ */
+
+ ColumnMetadata schema();
+
+ /**
+ * Set the current value to null. Support depends on the underlying
+ * implementation: only nullable types support this operation.
+ *
+ * throws IllegalStateException if called on a non-nullable value.
+ */
+
+ void setNull();
+
+ /**
+ * Generic technique to write data as a generic Java object. The
+ * type of the object must match the target writer.
+ * Primarily for testing.
+ * <ul>
+ * <li>Scalar: The type of the Java object must match the type of
+ * the target vector. <tt>String</tt> or <tt>byte[]</tt> can be
+ * used for Varchar vectors.</li>
+ * <li>Array: Write the array given an array of values. The object
+ * must be a Java array. The type of the array must match the type of
+ * element in the repeated vector. That is, if the vector is
+ * a <tt>Repeated Int</tt>, provide an <tt>int[]</tt> array.</tt></li>
+ * <li>Tuple (map or row): The Java object must be an array of objects
+ * in which the members of the array have a 1:1 correspondence with the
+ * members of the tuple in the order defined by the writer metadata.
+ * That is, if the map is (Int, Varchar), provide a <tt>Object[]</tt>
+ * array like this: <tt>{10, "fred"}</tt>.</li>
+ * <li>Union: Uses the Java object type to determine the type of the
+ * backing vector. Creates a vector
+ * of the required type if needed.</li>
+ *
+ * @param value value to write to the vector. The Java type of the
+ * object indicates the Drill storage type
+ * @throws IllegalArgumentException if the type of the Java object
+ * cannot be mapped to the type of the underlying vector or
+ * vector structure
+ */
+
+ void setObject(Object value);
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
index 113778f5a..81d4bac40 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
@@ -17,10 +17,6 @@
*/
package org.apache.drill.exec.vector.accessor;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
-
/**
* Represents a column within a tuple. A column can be an array, a scalar or a
* tuple. Each has an associated column metadata (schema) and a writer. The
@@ -44,58 +40,11 @@ import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
* {@see ObjectReader}
*/
-public interface ObjectWriter {
-
- /**
- * Returns the schema of the column associated with this writer.
- *
- * @return schema for this writer's column
- */
-
- ColumnMetadata schema();
-
- /**
- * Bind a listener to the underlying scalar column, or array of scalar
- * columns. Not valid if the underlying writer is a map or array of maps.
- *
- * @param listener
- * the column listener to bind
- */
-
- void bindListener(ColumnWriterListener listener);
-
- /**
- * Bind a listener to the underlying map or map array column. Not valid if the
- * underlying writer is a scalar or scalar array.
- *
- * @param listener
- * the tuple listener to bind
- */
-
- void bindListener(TupleWriterListener listener);
-
- /**
- * Return the object (structure) type of this writer.
- *
- * @return type indicating if this is a scalar, tuple or array
- */
-
- ObjectType type();
+public interface ObjectWriter extends ColumnWriter {
ScalarWriter scalar();
TupleWriter tuple();
ArrayWriter array();
-
- /**
- * For debugging, set the object to the proper form of Java object as defined
- * by the underlying writer type.
- *
- * @param value
- * Java object value to write
- * @throws VectorOverflowException
- */
-
- void set(Object value);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
index 776dc9cf1..87c798890 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
@@ -21,6 +21,8 @@ import java.math.BigDecimal;
import org.joda.time.Period;
+import org.apache.drill.exec.vector.accessor.ColumnWriter.ScalarListenable;
+
/**
* Represents a scalar value: a required column, a nullable column,
* or one element within an array of scalars.
@@ -42,7 +44,7 @@ import org.joda.time.Period;
* {@see ScalarElementReader}
*/
-public interface ScalarWriter {
+public interface ScalarWriter extends ColumnWriter, ScalarListenable {
/**
* Listener (callback) for vector overflow events. To be optionally
@@ -78,8 +80,6 @@ public interface ScalarWriter {
boolean canExpand(ScalarWriter writer, int delta);
}
- void bindListener(ColumnWriterListener listener);
-
/**
* Describe the type of the value. This is a compression of the
* value vector type: it describes which method will return the
@@ -89,7 +89,6 @@ public interface ScalarWriter {
*/
ValueType valueType();
- void setNull();
void setInt(int value);
void setLong(long value);
void setDouble(double value);
@@ -97,6 +96,4 @@ public interface ScalarWriter {
void setBytes(byte[] value, int len);
void setDecimal(BigDecimal value);
void setPeriod(Period value);
-
- void setObject(Object value);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
index 0a522833b..331df2a01 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
@@ -19,7 +19,9 @@ package org.apache.drill.exec.vector.accessor;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnWriter.TupleListenable;
/**
* Writer for a tuple. A tuple is composed of columns with a fixed order and
@@ -50,7 +52,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
* @see {@link SingleMapWriter}, the class which this class replaces
*/
-public interface TupleWriter {
+public interface TupleWriter extends ColumnWriter, TupleListenable {
/**
* Listener (callback) to handle requests to add a new column to a tuple (row
@@ -59,10 +61,13 @@ public interface TupleWriter {
* throws an exception.
*/
- public interface TupleWriterListener {
+ interface TupleWriterListener {
+
ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column);
ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
+
+ ProjectionType projectionType(String columnName);
}
/**
@@ -75,13 +80,26 @@ public interface TupleWriter {
*/
@SuppressWarnings("serial")
- public static class UndefinedColumnException extends RuntimeException {
+ class UndefinedColumnException extends RuntimeException {
public UndefinedColumnException(String colName) {
super("Undefined column: " + colName);
}
}
- void bindListener(TupleWriterListener listener);
+
+ /**
+ * Allows a client to "sniff" the projection set to determine if a
+ * field is projected. Some clients can omit steps if they know that
+ * a field is not needed. Others will simply create the column, allowing
+ * the implementation to create a dummy writer if the column is not
+ * projected.
+ *
+ * @param columnName name of an existing or new column
+ * @return whether the column is projected, and, if so, the implied
+ * type of the projected column
+ */
+
+ ProjectionType projectionType(String columnName);
/**
* Add a column to the tuple (row or map) that backs this writer. Support for
@@ -100,7 +118,7 @@ public interface TupleWriter {
int addColumn(MaterializedField schema);
- TupleMetadata schema();
+ TupleMetadata tupleSchema();
int size();
@@ -142,28 +160,4 @@ public interface TupleWriter {
*/
void set(int colIndex, Object value);
-
- /**
- * Write a row or map of values, given by Java objects. Object type must match
- * expected column type.
- * <p>
- * Note that a single-column tuple is ambiguous if that column is an array. To
- * avoid ambiguity, use <tt>set(0, value)</tt> in this case.
- *
- * @param values
- * variable-length argument list of column values
- * @return true if the row was written, false if any column caused vector
- * overflow.
- */
-
- void setTuple(Object... values);
-
- /**
- * Set the tuple from an array of objects. Primarily for use in test tools.
- *
- * @param value
- * the object to set, which must be a generic <tt>Object</tt> array
- */
-
- void setObject(Object value);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/WriterPosition.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/WriterPosition.java
new file mode 100644
index 000000000..609545a8e
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/WriterPosition.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+/**
+ * Position information about a writer used during vector overflow.
+ */
+
+public interface WriterPosition {
+
+ /**
+ * Position within the vector of the first value for the current row.
+ * Note that this is always the first value for the row, even for a
+ * writer deeply nested within a hierarchy of arrays. (The first
+ * position for the current array is not exposed in this API.)
+ *
+ * @return the vector offset of the first value for the current
+ * row
+ */
+
+ int rowStartIndex();
+
+ /**
+ * Return the last write position in the vector. This may be the
+ * same as the writer index position (if the vector was written at
+ * that point), or an earlier point. In either case, this value
+ * points to the last valid value in the vector.
+ *
+ * @return index of the last valid value in the vector
+ */
+
+ int lastWriteIndex();
+
+ /**
+ * Current write index for the writer. This is the global
+ * array location for arrays, same as the row index for top-level
+ * columns.
+ *
+ * @return current write index
+ */
+
+ int writeIndex();
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index 58cda5755..2a2e3e10d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -24,9 +24,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
import org.apache.drill.exec.vector.accessor.TupleWriter;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
/**
@@ -98,36 +96,17 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
private AbstractArrayWriter arrayWriter;
- public ArrayObjectWriter(ColumnMetadata schema, AbstractArrayWriter arrayWriter) {
- super(schema);
+ public ArrayObjectWriter(AbstractArrayWriter arrayWriter) {
this.arrayWriter = arrayWriter;
}
@Override
- public ObjectType type() { return ObjectType.ARRAY; }
-
- @Override
- public void set(Object value) {
- arrayWriter.setObject(value);
- }
-
- @Override
public ArrayWriter array() { return arrayWriter; }
@Override
public WriterEvents events() { return arrayWriter; }
@Override
- public void bindListener(ColumnWriterListener listener) {
- arrayWriter.bindListener(listener);
- }
-
- @Override
- public void bindListener(TupleWriterListener listener) {
- arrayWriter.bindListener(listener);
- }
-
- @Override
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
@@ -137,62 +116,57 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
}
}
- public static abstract class BaseArrayWriter extends AbstractArrayWriter {
-
- /**
- * Index into the vector of elements for a repeated vector.
- * Keeps track of the current offset in terms of value positions.
- * Forwards overflow events to the base index.
- */
+ /**
+ * Index into the vector of elements for a repeated vector.
+ * Keeps track of the current offset in terms of value positions.
+ * Forwards overflow events to the base index.
+ */
- public class ArrayElementWriterIndex implements ColumnWriterIndex {
+ public class ArrayElementWriterIndex implements ColumnWriterIndex {
- private int elementIndex;
+ private int elementIndex;
- public void reset() { elementIndex = 0; }
+ public void reset() { elementIndex = 0; }
- @Override
- public int vectorIndex() { return elementIndex + offsetsWriter.nextOffset(); }
+ @Override
+ public int vectorIndex() { return elementIndex + offsetsWriter.nextOffset(); }
- @Override
- public int rowStartIndex() { return offsetsWriter.rowStartOffset(); }
+ @Override
+ public int rowStartIndex() { return offsetsWriter.rowStartOffset(); }
- public int arraySize() { return elementIndex; }
+ public int arraySize() { return elementIndex; }
- @Override
- public void nextElement() { }
+ @Override
+ public void nextElement() { }
- public final void next() { elementIndex++; }
+ public void next() { elementIndex++; }
- public int valueStartOffset() { return offsetsWriter.nextOffset(); }
+ public int valueStartOffset() { return offsetsWriter.nextOffset(); }
- @Override
- public void rollover() { }
+ @Override
+ public void rollover() { }
- @Override
- public ColumnWriterIndex outerIndex() {
- return outerIndex;
- }
+ @Override
+ public ColumnWriterIndex outerIndex() {
+ return outerIndex;
+ }
- @Override
- public String toString() {
- return new StringBuilder()
- .append("[")
- .append(getClass().getSimpleName())
- .append(" elementIndex = ")
- .append(elementIndex)
- .append("]")
- .toString();
- }
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" elementIndex = ")
+ .append(elementIndex)
+ .append("]")
+ .toString();
}
+ }
- private final OffsetVectorWriter offsetsWriter;
- private ColumnWriterIndex outerIndex;
- protected ArrayElementWriterIndex elementIndex;
+ public static abstract class BaseArrayWriter extends AbstractArrayWriter {
- public BaseArrayWriter(UInt4Vector offsetVector, AbstractObjectWriter elementObjWriter) {
- super(elementObjWriter);
- offsetsWriter = new OffsetVectorWriter(offsetVector);
+ public BaseArrayWriter(ColumnMetadata schema, UInt4Vector offsetVector, AbstractObjectWriter elementObjWriter) {
+ super(schema, elementObjWriter, new OffsetVectorWriterImpl(offsetVector));
}
@Override
@@ -204,12 +178,6 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
}
@Override
- public ColumnWriterIndex writerIndex() { return outerIndex; }
-
- @Override
- public int size() { return elementIndex.arraySize(); }
-
- @Override
public void startWrite() {
elementIndex.reset();
offsetsWriter.startWrite();
@@ -248,12 +216,6 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
}
@Override
- public void endWrite() {
- offsetsWriter.endWrite();
- elementObjWriter.events().endWrite();
- }
-
- @Override
public void preRollover() {
elementObjWriter.events().preRollover();
offsetsWriter.preRollover();
@@ -271,28 +233,13 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
}
@Override
- public int lastWriteIndex() { return outerIndex.vectorIndex(); }
-
- /**
- * Return the writer for the offset vector for this array. Primarily used
- * to handle overflow; other clients should not attempt to muck about with
- * the offset vector directly.
- *
- * @return the writer for the offset vector associated with this array
- */
-
- @Override
- public OffsetVectorWriter offsetWriter() { return offsetsWriter; }
-
- @Override
- public void bindListener(ColumnWriterListener listener) {
- elementObjWriter.bindListener(listener);
+ public void endWrite() {
+ offsetsWriter.endWrite();
+ elementObjWriter.events().endWrite();
}
@Override
- public void bindListener(TupleWriterListener listener) {
- elementObjWriter.bindListener(listener);
- }
+ public int lastWriteIndex() { return outerIndex.vectorIndex(); }
@Override
public void dump(HierarchicalFormatter format) {
@@ -305,18 +252,30 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
}
}
- protected final AbstractObjectWriter elementObjWriter;
+ protected final ColumnMetadata schema;
+ protected AbstractObjectWriter elementObjWriter;
+ protected final OffsetVectorWriter offsetsWriter;
+ protected ColumnWriterIndex outerIndex;
+ protected ArrayElementWriterIndex elementIndex;
- public AbstractArrayWriter(AbstractObjectWriter elementObjWriter) {
+ public AbstractArrayWriter(ColumnMetadata schema, AbstractObjectWriter elementObjWriter, OffsetVectorWriter offsetVectorWriter) {
+ this.schema = schema;
this.elementObjWriter = elementObjWriter;
+ this.offsetsWriter = offsetVectorWriter;
}
@Override
+ public ObjectType type() { return ObjectType.ARRAY; }
+
+ @Override
public ObjectType entryType() {
return elementObjWriter.type();
}
@Override
+ public ColumnMetadata schema() { return schema; }
+
+ @Override
public ObjectWriter entry() { return elementObjWriter; }
@Override
@@ -334,9 +293,48 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
return elementObjWriter.array();
}
- public abstract void bindListener(ColumnWriterListener listener);
- public abstract void bindListener(TupleWriterListener listener);
- public abstract OffsetVectorWriter offsetWriter();
+ @Override
+ public int size() { return elementIndex.arraySize(); }
+
+ @Override
+ public boolean nullable() { return false; }
+
+ @Override
+ public void setNull() {
+ throw new IllegalStateException("Not nullable");
+ }
+
+ @Override
+ public int rowStartIndex() {
+ return outerIndex.rowStartIndex();
+ }
+
+ @Override
+ public int lastWriteIndex() {
+ return offsetsWriter.lastWriteIndex();
+ }
+
+ @Override
+ public int writeIndex() {
+ return outerIndex.vectorIndex();
+ }
+
+ @Override
+ public void setNull(boolean isNull) {
+ if (isNull == true) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Return the writer for the offset vector for this array. Primarily used
+ * to handle overflow; other clients should not attempt to muck about with
+ * the offset vector directly.
+ *
+ * @return the writer for the offset vector associated with this array
+ */
+
+ public OffsetVectorWriter offsetWriter() { return offsetsWriter; }
public void dump(HierarchicalFormatter format) {
format
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
index 1107216a3..921cb002e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
@@ -46,14 +46,14 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter {
* The current vector buffer, and buffer address, will change in
* this method when a vector grows or overflows. So, don't use this
* method in inline calls of the form<br><code>
- * vector.getBuffer().doSomething(writeIndex());</code></br>
+ * vector.getBuffer().doSomething(prepareWrite());</code></br>
* The buffer obtained by <tt>getBuffer()</tt> can be different than
- * the current buffer after <tt>writeIndex()</tt>.
+ * the current buffer after <tt>prepareWrite()</tt>.
*
* @return the index at which to write the current value
*/
- protected final int writeIndex() {
+ protected final int prepareWrite() {
// "Fast path" for the normal case of no fills, no overflow.
// This is the only bounds check we want to do for the entire
@@ -191,6 +191,18 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter {
@Override
public int lastWriteIndex() { return lastWriteIndex; }
+ /**
+ * For internal use only to update the write position on those
+ * very rare occasions in which the vector is written to outside
+ * of this writer framework. Not to be called by application code!
+ *
+ * @param index new last write index
+ */
+
+ public void setLastWriteIndex(int index) {
+ lastWriteIndex = index;
+ }
+
@Override
public void skipNulls() {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index 15807cb01..61c201bdb 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -19,11 +19,11 @@ package org.apache.drill.exec.vector.accessor.writer;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
import org.apache.drill.exec.vector.accessor.TupleWriter;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
/**
@@ -36,14 +36,8 @@ import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
public abstract class AbstractObjectWriter implements ObjectWriter {
- private ColumnMetadata schema;
-
- public AbstractObjectWriter(ColumnMetadata schema) {
- this.schema = schema;
- }
-
@Override
- public ColumnMetadata schema() { return schema; }
+ public ColumnMetadata schema() { return baseWriter().schema(); }
@Override
public ScalarWriter scalar() {
@@ -62,11 +56,40 @@ public abstract class AbstractObjectWriter implements ObjectWriter {
public abstract WriterEvents events();
+ public ColumnWriter baseWriter() {
+ return (ColumnWriter) events();
+ }
+
@Override
- public void bindListener(ColumnWriterListener listener) { }
+ public ObjectType type() { return baseWriter().type(); }
+
+ @Override
+ public boolean nullable() { return baseWriter().nullable(); }
+
+ @Override
+ public void setNull() {
+ baseWriter().setNull();
+ }
@Override
- public void bindListener(TupleWriterListener listener) { }
+ public void setObject(Object value) {
+ baseWriter().setObject(value);
+ }
public abstract void dump(HierarchicalFormatter format);
+
+ @Override
+ public int rowStartIndex() {
+ return baseWriter().rowStartIndex();
+ }
+
+ @Override
+ public int lastWriteIndex() {
+ return baseWriter().lastWriteIndex();
+ }
+
+ @Override
+ public int writeIndex() {
+ return baseWriter().writeIndex();
+ }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
index b6a85d7ff..08e4ac390 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
@@ -21,8 +21,10 @@ import java.math.BigDecimal;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.joda.time.Period;
@@ -39,29 +41,17 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents
private AbstractScalarWriter scalarWriter;
- public ScalarObjectWriter(ColumnMetadata schema, AbstractScalarWriter scalarWriter) {
- super(schema);
+ public ScalarObjectWriter(AbstractScalarWriter scalarWriter) {
this.scalarWriter = scalarWriter;
}
@Override
- public ObjectType type() { return ObjectType.SCALAR; }
-
- @Override
- public void set(Object value) { scalarWriter.setObject(value); }
-
- @Override
public ScalarWriter scalar() { return scalarWriter; }
@Override
public WriterEvents events() { return scalarWriter; }
@Override
- public void bindListener(ColumnWriterListener listener) {
- scalarWriter.bindListener(listener);
- }
-
- @Override
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
@@ -71,6 +61,42 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents
}
}
+ protected ColumnMetadata schema;
+
+ /**
+ * Indicates the position in the vector to write. Set via an object so that
+ * all writers (within the same subtree) can agree on the write position.
+ * For example, all top-level, simple columns see the same row index.
+ * All columns within a repeated map see the same (inner) index, etc.
+ */
+
+ protected ColumnWriterIndex vectorIndex;
+
+ @Override
+ public ObjectType type() { return ObjectType.SCALAR; }
+
+ public void bindSchema(ColumnMetadata schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public void bindIndex(ColumnWriterIndex vectorIndex) {
+ this.vectorIndex = vectorIndex;
+ }
+
+ @Override
+ public int rowStartIndex() {
+ return vectorIndex.rowStartIndex();
+ }
+
+ @Override
+ public int writeIndex() {
+ return vectorIndex.vectorIndex();
+ }
+
+ @Override
+ public ColumnMetadata schema() { return schema; }
+
public abstract BaseDataValueVector vector();
@Override
@@ -85,6 +111,10 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents
@Override
public void saveRow() { }
+ protected UnsupportedConversionError conversionError(String javaType) {
+ return UnsupportedConversionError.writeError(schema(), javaType);
+ }
+
@Override
public void setObject(Object value) {
if (value == null) {
@@ -111,8 +141,7 @@ public abstract class AbstractScalarWriter implements ScalarWriter, WriterEvents
} else if (value instanceof Float) {
setDouble((Float) value);
} else {
- throw new IllegalArgumentException("Unsupported type " +
- value.getClass().getSimpleName());
+ throw conversionError(value.getClass().getSimpleName());
}
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 938f867a2..640f5d36b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
@@ -104,29 +105,17 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
private AbstractTupleWriter tupleWriter;
- public TupleObjectWriter(ColumnMetadata schema, AbstractTupleWriter tupleWriter) {
- super(schema);
+ public TupleObjectWriter(AbstractTupleWriter tupleWriter) {
this.tupleWriter = tupleWriter;
}
@Override
- public ObjectType type() { return ObjectType.TUPLE; }
-
- @Override
- public void set(Object value) { tupleWriter.setObject(value); }
-
- @Override
public TupleWriter tuple() { return tupleWriter; }
@Override
public WriterEvents events() { return tupleWriter; }
@Override
- public void bindListener(TupleWriterListener listener) {
- tupleWriter.bindListener(listener);
- }
-
- @Override
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
@@ -136,38 +125,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
}
}
- /**
- * Tracks the write state of the tuple to allow applying the correct
- * operations to newly-added columns to synchronize them with the rest
- * of the tuple.
- */
-
- public enum State {
- /**
- * No write is in progress. Nothing need be done to newly-added
- * writers.
- */
- IDLE,
-
- /**
- * <tt>startWrite()</tt> has been called to start a write operation
- * (start a batch), but <tt>startValue()</tt> has not yet been called
- * to start a row (or value within an array). <tt>startWrite()</tt> must
- * be called on newly added columns.
- */
-
- IN_WRITE,
-
- /**
- * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on
- * the tuple to prepare for writing values, and both must be called on
- * newly-added vectors.
- */
-
- IN_ROW
- }
-
- protected final TupleMetadata schema;
+ protected final TupleMetadata tupleSchema;
protected final List<AbstractObjectWriter> writers;
protected ColumnWriterIndex vectorIndex;
protected ColumnWriterIndex childIndex;
@@ -175,7 +133,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
protected State state = State.IDLE;
protected AbstractTupleWriter(TupleMetadata schema, List<AbstractObjectWriter> writers) {
- this.schema = schema;
+ this.tupleSchema = schema;
this.writers = writers;
}
@@ -183,6 +141,9 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
this(schema, new ArrayList<AbstractObjectWriter>());
}
+ @Override
+ public ObjectType type() { return ObjectType.TUPLE; }
+
protected void bindIndex(ColumnWriterIndex index, ColumnWriterIndex childIndex) {
vectorIndex = index;
this.childIndex = childIndex;
@@ -198,7 +159,9 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
}
@Override
- public ColumnWriterIndex writerIndex() { return vectorIndex; }
+ public int rowStartIndex() {
+ return vectorIndex.rowStartIndex();
+ }
/**
* Add a column writer to an existing tuple writer. Used for implementations
@@ -209,8 +172,8 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
*/
public int addColumnWriter(AbstractObjectWriter colWriter) {
- assert writers.size() == schema.size();
- int colIndex = schema.addColumn(colWriter.schema());
+ assert writers.size() == tupleSchema.size();
+ int colIndex = tupleSchema.addColumn(colWriter.schema());
writers.add(colWriter);
colWriter.events().bindIndex(childIndex);
if (state != State.IDLE) {
@@ -223,6 +186,12 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
}
@Override
+ public ProjectionType projectionType(String columnName) {
+ return listener == null ? ProjectionType.UNSPECIFIED
+ : listener.projectionType(columnName);
+ }
+
+ @Override
public int addColumn(ColumnMetadata column) {
if (listener == null) {
throw new UnsupportedOperationException("addColumn");
@@ -241,10 +210,18 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
}
@Override
- public TupleMetadata schema() { return schema; }
+ public TupleMetadata tupleSchema() { return tupleSchema; }
@Override
- public int size() { return schema().size(); }
+ public int size() { return tupleSchema().size(); }
+
+ @Override
+ public boolean nullable() { return false; }
+
+ @Override
+ public void setNull() {
+ throw new IllegalStateException("Not nullable");
+ }
@Override
public void startWrite() {
@@ -339,7 +316,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
@Override
public ObjectWriter column(String colName) {
- int index = schema.index(colName);
+ int index = tupleSchema.index(colName);
if (index == -1) {
throw new UndefinedColumnException(colName);
}
@@ -348,35 +325,18 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
@Override
public void set(int colIndex, Object value) {
- ObjectWriter colWriter = column(colIndex);
- switch (colWriter.type()) {
- case ARRAY:
- colWriter.array().setObject(value);
- break;
- case SCALAR:
- colWriter.scalar().setObject(value);
- break;
- case TUPLE:
- colWriter.tuple().setObject(value);
- break;
- default:
- throw new IllegalStateException("Unexpected object type: " + colWriter.type());
- }
- }
-
- @Override
- public void setTuple(Object ...values) {
- setObject(values);
+ column(colIndex).setObject(value);
}
@Override
public void setObject(Object value) {
Object values[] = (Object[]) value;
- if (values.length != schema.size()) {
+ if (values.length != tupleSchema.size()) {
throw new IllegalArgumentException(
- "Map has " + schema.size() +
- " columns, but value array has " +
- values.length + " values.");
+ String.format("Map %s has %d columns, but value array has " +
+ " %d values.",
+ schema().name(),
+ tupleSchema.size(), values.length));
}
for (int i = 0; i < values.length; i++) {
set(i, values[i]);
@@ -429,6 +389,11 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
}
@Override
+ public int writeIndex() {
+ return vectorIndex.vectorIndex();
+ }
+
+ @Override
public void bindListener(TupleWriterListener listener) {
this.listener = listener;
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
index 4793277b7..10a2cf3b5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.vector.accessor.writer;
import java.math.BigDecimal;
-import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.joda.time.Period;
@@ -134,15 +134,6 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter {
public static final int MIN_BUFFER_SIZE = 256;
/**
- * Indicates the position in the vector to write. Set via an object so that
- * all writers (within the same subtree) can agree on the write position.
- * For example, all top-level, simple columns see the same row index.
- * All columns within a repeated map see the same (inner) index, etc.
- */
-
- protected ColumnWriterIndex vectorIndex;
-
- /**
* Listener invoked if the vector overflows. If not provided, then the writer
* does not support vector overflow.
*/
@@ -160,14 +151,6 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter {
protected int capacity;
@Override
- public void bindIndex(ColumnWriterIndex vectorIndex) {
- this.vectorIndex = vectorIndex;
- }
-
- @Override
- public ColumnWriterIndex writerIndex() { return vectorIndex; }
-
- @Override
public void bindListener(ColumnWriterListener listener) {
this.listener = listener;
}
@@ -176,7 +159,7 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter {
* All change of buffer comes through this function to allow capturing
* the buffer address and capacity. Only two ways to set the buffer:
* by binding to a vector in bindVector(), or by resizing the vector
- * in writeIndex().
+ * in prepareWrite().
*/
protected abstract void setBuffer();
@@ -220,43 +203,46 @@ public abstract class BaseScalarWriter extends AbstractScalarWriter {
public abstract void skipNulls();
@Override
+ public boolean nullable() { return false; }
+
+ @Override
public void setNull() {
- throw new UnsupportedOperationException("Vector is not nullable");
+ throw UnsupportedConversionError.nullError(schema());
}
@Override
public void setInt(int value) {
- throw new UnsupportedOperationException();
+ throw conversionError("int");
}
@Override
public void setLong(long value) {
- throw new UnsupportedOperationException();
+ throw conversionError("long");
}
@Override
public void setDouble(double value) {
- throw new UnsupportedOperationException();
+ throw conversionError("double");
}
@Override
public void setString(String value) {
- throw new UnsupportedOperationException();
+ throw conversionError("String");
}
@Override
public void setBytes(byte[] value, int len) {
- throw new UnsupportedOperationException();
+ throw conversionError("bytes");
}
@Override
public void setDecimal(BigDecimal value) {
- throw new UnsupportedOperationException();
+ throw conversionError("Decimal");
}
@Override
public void setPeriod(Period value) {
- throw new UnsupportedOperationException();
+ throw conversionError("Period");
}
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
index e54625ef4..9db767d1f 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
@@ -36,10 +36,10 @@ import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
*/
public abstract class BaseVarWidthWriter extends BaseScalarWriter {
- protected final OffsetVectorWriter offsetsWriter;
+ protected final OffsetVectorWriterImpl offsetsWriter;
public BaseVarWidthWriter(UInt4Vector offsetVector) {
- offsetsWriter = new OffsetVectorWriter(offsetVector);
+ offsetsWriter = new OffsetVectorWriterImpl(offsetVector);
}
@Override
@@ -57,7 +57,7 @@ public abstract class BaseVarWidthWriter extends BaseScalarWriter {
@Override
public void startRow() { offsetsWriter.startRow(); }
- protected final int writeIndex(final int width) {
+ protected final int prepareWrite(final int width) {
// This is performance critical code; every operation counts.
// Please be thoughtful when changing the code.
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
index 4a668c4ed..3f0cae3c4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
@@ -19,34 +19,28 @@ package org.apache.drill.exec.vector.accessor.writer;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.NullableVector;
-import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ColumnAccessorUtils;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter.TupleObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter.ArrayMapWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter.DummyArrayMapWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter.DummyMapWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter.SingleMapWriter;
import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter;
-import org.apache.drill.exec.vector.complex.AbstractMapVector;
-import org.apache.drill.exec.vector.complex.MapVector;
-import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
/**
* Gather generated writer classes into a set of class tables to allow rapid
* run-time creation of writers. Builds the writer and its object writer
* wrapper which binds the vector to the writer.
+ * <p>
+ * Compared to the reader factory, the writer factor is a bit more complex
+ * as it must handle both the projected ("real" writer) and unprojected
+ * ("dummy" writer) cases. Because of the way the various classes interact,
+ * it is cleaner to put the factory methods here rather than in the various
+ * writers, as is done in the case of the readers.
*/
@SuppressWarnings("unchecked")
@@ -79,21 +73,38 @@ public class ColumnWriterFactory {
default:
switch (schema.mode()) {
case OPTIONAL:
- NullableVector nullableVector = (NullableVector) vector;
- return NullableScalarWriter.build(schema, nullableVector,
- newWriter(nullableVector.getValuesVector()));
+ return nullableScalarWriter(schema, (NullableVector) vector);
case REQUIRED:
- return new ScalarObjectWriter(schema, newWriter(vector));
+ return requiredScalarWriter(schema, vector);
case REPEATED:
- RepeatedValueVector repeatedVector = (RepeatedValueVector) vector;
- return ScalarArrayWriter.build(schema, repeatedVector,
- newWriter(repeatedVector.getDataVector()));
+ return repeatedScalarWriter(schema, (RepeatedValueVector) vector);
default:
throw new UnsupportedOperationException(schema.mode().toString());
}
}
}
+ private static ScalarObjectWriter requiredScalarWriter(
+ ColumnMetadata schema, ValueVector vector) {
+ BaseScalarWriter baseWriter = newWriter(vector);
+ baseWriter.bindSchema(schema);
+ return new ScalarObjectWriter(baseWriter);
+ }
+
+ private static ScalarObjectWriter nullableScalarWriter(
+ ColumnMetadata schema, NullableVector vector) {
+ BaseScalarWriter baseWriter = newWriter(vector.getValuesVector());
+ baseWriter.bindSchema(schema);
+ return NullableScalarWriter.build(schema, vector, baseWriter);
+ }
+
+ private static AbstractObjectWriter repeatedScalarWriter(
+ ColumnMetadata schema, RepeatedValueVector vector) {
+ BaseScalarWriter baseWriter = newWriter(vector.getDataVector());
+ baseWriter.bindSchema(schema);
+ return ScalarArrayWriter.build(schema, vector, baseWriter);
+ }
+
/**
* Build a writer for a non-projected column.
* @param schema schema of the column
@@ -104,20 +115,19 @@ public class ColumnWriterFactory {
switch (schema.type()) {
case GENERIC_OBJECT:
case LATE:
- case NULL:
case LIST:
case MAP:
throw new UnsupportedOperationException(schema.type().toString());
default:
- ScalarObjectWriter scalarWriter = new ScalarObjectWriter(schema,
- new DummyScalarWriter());
+ ScalarObjectWriter scalarWriter = new ScalarObjectWriter(
+ new DummyScalarWriter(schema));
switch (schema.mode()) {
case OPTIONAL:
case REQUIRED:
return scalarWriter;
case REPEATED:
- return new ArrayObjectWriter(schema,
- new DummyArrayWriter(
+ return new ArrayObjectWriter(
+ new DummyArrayWriter(schema,
scalarWriter));
default:
throw new UnsupportedOperationException(schema.mode().toString());
@@ -125,59 +135,6 @@ public class ColumnWriterFactory {
}
}
- public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector vector,
- List<AbstractObjectWriter> writers) {
- MapWriter mapWriter;
- if (schema.isProjected()) {
- mapWriter = new SingleMapWriter(schema, vector, writers);
- } else {
- mapWriter = new DummyMapWriter(schema, writers);
- }
- return new TupleObjectWriter(schema, mapWriter);
- }
-
- public static ArrayObjectWriter buildMapArray(ColumnMetadata schema,
- UInt4Vector offsetVector,
- List<AbstractObjectWriter> writers) {
- MapWriter mapWriter;
- if (schema.isProjected()) {
- mapWriter = new ArrayMapWriter(schema, writers);
- } else {
- mapWriter = new DummyArrayMapWriter(schema, writers);
- }
- TupleObjectWriter mapArray = new TupleObjectWriter(schema, mapWriter);
- AbstractArrayWriter arrayWriter;
- if (schema.isProjected()) {
- arrayWriter = new ObjectArrayWriter(
- offsetVector,
- mapArray);
- } else {
- arrayWriter = new DummyArrayWriter(mapArray);
- }
- return new ArrayObjectWriter(schema, arrayWriter);
- }
-
- public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema,
- AbstractMapVector vector,
- List<AbstractObjectWriter> writers) {
- assert (vector != null) == schema.isProjected();
- if (! schema.isArray()) {
- return buildMap(schema, (MapVector) vector, writers);
- } else if (vector == null) {
- return buildMapArray(schema,
- null, writers);
- } else {
- return buildMapArray(schema,
- ((RepeatedMapVector) vector).getOffsetVector(),
- writers);
- }
- }
-
- public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, AbstractMapVector vector) {
- assert schema.mapSchema().size() == 0;
- return buildMapWriter(schema, vector, new ArrayList<AbstractObjectWriter>());
- }
-
public static BaseScalarWriter newWriter(ValueVector vector) {
MajorType major = vector.getField().getType();
MinorType type = major.getMinorType();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index af8daba58..915c9941d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -17,11 +17,18 @@
*/
package org.apache.drill.exec.vector.accessor.writer;
+import java.util.ArrayList;
import java.util.List;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
/**
* Writer for a Drill Map type. Maps are actually tuples, just like rows.
@@ -46,6 +53,7 @@ public abstract class MapWriter extends AbstractTupleWriter {
@Override public int vectorIndex() { return baseIndex.vectorIndex(); }
@Override public void nextElement() { }
@Override public void rollover() { }
+
@Override public ColumnWriterIndex outerIndex() {
return baseIndex.outerIndex();
}
@@ -80,6 +88,8 @@ public abstract class MapWriter extends AbstractTupleWriter {
public void endWrite() {
super.endWrite();
+ // A non repeated map has a field that holds the value count.
+ // Update it. (A repeated map uses the offset vector's value count.)
// Special form of set value count: used only for
// this class to avoid setting the value count of children.
// Setting these counts was already done. Doing it again
@@ -87,13 +97,14 @@ public abstract class MapWriter extends AbstractTupleWriter {
// set the "lastSet" field of nullable vector accessors,
// and the initial value of -1 will cause all values to
// be overwritten.
- //
- // Note that the map vector can be null if there is no actual
- // map vector represented by this writer.
- if (mapVector != null) {
- mapVector.setMapValueCount(vectorIndex.vectorIndex());
- }
+ mapVector.setMapValueCount(vectorIndex.vectorIndex());
+ }
+
+ @Override
+ public void preRollover() {
+ super.preRollover();
+ mapVector.setMapValueCount(vectorIndex.rowStartIndex());
}
}
@@ -122,12 +133,6 @@ public abstract class MapWriter extends AbstractTupleWriter {
bindIndex(index, new MemberWriterIndex(index));
}
-
- // In endWrite(), do not call setValueCount on the map vector.
- // Doing so will zero-fill the composite vectors because
- // the internal map state does not track the writer state.
- // Instead, the code in this structure has set the value
- // count for each composite vector individually.
}
protected static class DummyMapWriter extends MapWriter {
@@ -136,6 +141,9 @@ public abstract class MapWriter extends AbstractTupleWriter {
List<AbstractObjectWriter> writers) {
super(schema, writers);
}
+
+ @Override
+ public ProjectionType projectionType(String columnName) { return ProjectionType.UNPROJECTED; }
}
protected static class DummyArrayMapWriter extends MapWriter {
@@ -144,6 +152,9 @@ public abstract class MapWriter extends AbstractTupleWriter {
List<AbstractObjectWriter> writers) {
super(schema, writers);
}
+
+ @Override
+ public ProjectionType projectionType(String columnName) { return ProjectionType.UNPROJECTED; }
}
protected final ColumnMetadata mapColumnSchema;
@@ -152,4 +163,61 @@ public abstract class MapWriter extends AbstractTupleWriter {
super(schema.mapSchema(), writers);
mapColumnSchema = schema;
}
+
+ public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector vector,
+ List<AbstractObjectWriter> writers) {
+ MapWriter mapWriter;
+ if (schema.isProjected()) {
+
+ // Vector is not required for a map writer; the map's columns
+ // are written, but not the (non-array) map.
+
+ mapWriter = new SingleMapWriter(schema, vector, writers);
+ } else {
+ assert vector == null;
+ mapWriter = new DummyMapWriter(schema, writers);
+ }
+ return new TupleObjectWriter(mapWriter);
+ }
+
+ public static ArrayObjectWriter buildMapArray(ColumnMetadata schema,
+ UInt4Vector offsetVector,
+ List<AbstractObjectWriter> writers) {
+ MapWriter mapWriter;
+ if (schema.isProjected()) {
+ mapWriter = new ArrayMapWriter(schema, writers);
+ } else {
+ mapWriter = new DummyArrayMapWriter(schema, writers);
+ }
+ TupleObjectWriter mapArray = new TupleObjectWriter(mapWriter);
+ AbstractArrayWriter arrayWriter;
+ if (schema.isProjected()) {
+ arrayWriter = new ObjectArrayWriter(schema,
+ offsetVector,
+ mapArray);
+ } else {
+ arrayWriter = new DummyArrayWriter(schema, mapArray);
+ }
+ return new ArrayObjectWriter(arrayWriter);
+ }
+
+ public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema,
+ AbstractMapVector vector,
+ List<AbstractObjectWriter> writers) {
+ if (schema.isArray()) {
+ return MapWriter.buildMapArray(schema,
+ vector == null ? null :
+ ((RepeatedMapVector) vector).getOffsetVector(), writers);
+ } else {
+ return MapWriter.buildMap(schema, (MapVector) vector, writers);
+ }
+ }
+
+ public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, AbstractMapVector vector) {
+ assert schema.mapSchema().size() == 0;
+ return buildMapWriter(schema, vector, new ArrayList<AbstractObjectWriter>());
+ }
+
+ @Override
+ public ColumnMetadata schema() { return mapColumnSchema; }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index c5f798278..75ef57fc4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -70,15 +70,16 @@ public class NullableScalarWriter extends AbstractScalarWriter {
private final BaseScalarWriter baseWriter;
private ColumnWriterIndex writerIndex;
- public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter baseWriter) {
+ public NullableScalarWriter(ColumnMetadata schema, NullableVector nullableVector, BaseScalarWriter baseWriter) {
+ this.schema = schema;
isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector());
this.baseWriter = baseWriter;
}
public static ScalarObjectWriter build(ColumnMetadata schema,
NullableVector nullableVector, BaseScalarWriter baseWriter) {
- return new ScalarObjectWriter(schema,
- new NullableScalarWriter(nullableVector, baseWriter));
+ return new ScalarObjectWriter(
+ new NullableScalarWriter(schema, nullableVector, baseWriter));
}
public BaseScalarWriter bitsWriter() { return isSetWriter; }
@@ -98,7 +99,9 @@ public class NullableScalarWriter extends AbstractScalarWriter {
}
@Override
- public ColumnWriterIndex writerIndex() { return baseWriter.writerIndex(); }
+ public int rowStartIndex() {
+ return baseWriter.rowStartIndex();
+ }
@Override
public ValueType valueType() {
@@ -112,6 +115,9 @@ public class NullableScalarWriter extends AbstractScalarWriter {
}
@Override
+ public boolean nullable() { return true; }
+
+ @Override
public void setNull() {
isSetWriter.setInt(0);
baseWriter.skipNulls();
@@ -212,7 +218,7 @@ public class NullableScalarWriter extends AbstractScalarWriter {
public void endArrayValue() {
// Skip calls for performance: they do nothing for
// scalar writers -- the only kind supported here.
-// isSetWriter.saveValue();
+// isSetWriter.endArrayValue();
baseWriter.endArrayValue();
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
index 3554a3be6..e2a1fc394 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectArrayWriter.java
@@ -17,8 +17,10 @@
*/
package org.apache.drill.exec.vector.accessor.writer;
+import java.lang.reflect.Array;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.UInt4Vector;
-import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter;
/**
@@ -39,7 +41,7 @@ import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArra
* with a map, then we have a single offset vector pointing into a group of
* arrays. Consider the simple case of a map of three scalars. Here, we have
* a hybrid of the states discussed for the {@link BaseScalarWriter} and those
- * discussed for {@link OffsetVectorWriter}. That is, the offset vector
+ * discussed for {@link OffsetVectorWriterImpl}. That is, the offset vector
* points into one map element. The individual elements can we Behind,
* Written or Unwritten, depending on the specific actions taken by the
* client.
@@ -100,19 +102,15 @@ import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArra
* The key reason to understand this flow is to understand what happens
* in vector overflow: unlike an array of scalars, in which the data
* vector can never be in the Behind state, when we have an array of
- * maps then each vector can be in an of the scalar writer state.
+ * maps then each vector can be in any of the scalar writer states.
*/
public class ObjectArrayWriter extends BaseArrayWriter {
- protected ObjectArrayWriter(UInt4Vector offsetVector, AbstractObjectWriter elementWriter) {
- super(offsetVector, elementWriter);
- }
-
- @Override
- public void bindIndex(ColumnWriterIndex index) {
+ protected ObjectArrayWriter(ColumnMetadata schema,
+ UInt4Vector offsetVector, AbstractObjectWriter elementWriter) {
+ super(schema, offsetVector, elementWriter);
elementIndex = new ArrayElementWriterIndex();
- super.bindIndex(index);
}
@Override
@@ -122,22 +120,20 @@ public class ObjectArrayWriter extends BaseArrayWriter {
}
@Override
- public void set(Object... values) {
- setObject(values);
- }
-
- @Override
public void setObject(Object array) {
- Object values[] = (Object[]) array;
- for (int i = 0; i < values.length; i++) {
- elementObjWriter.set(values[i]);
+
+ // Null array = 0-length array
+
+ if (array == null) {
+ return;
+ }
+ int size = Array.getLength(array);
+ for (int i = 0; i < size; i++) {
+ Object value = Array.get(array, i);
+ if (value != null) {
+ elementObjWriter.setObject(value);
+ }
save();
}
}
-
- @Override
- public int lastWriteIndex() {
- // Undefined for arrays
- return 0;
- }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
index 49d16a3e5..d733b4f49 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
@@ -17,279 +17,16 @@
*/
package org.apache.drill.exec.vector.accessor.writer;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.UInt4Vector;
-import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
/**
- * Specialized column writer for the (hidden) offset vector used
- * with variable-length or repeated vectors. See comments in the
- * <tt>ColumnAccessors.java</tt> template file for more details.
- * <p>
- * Note that the <tt>lastWriteIndex</tt> tracked here corresponds
- * to the data values; it is one less than the actual offset vector
- * last write index due to the nature of offset vector layouts. The selection
- * of last write index basis makes roll-over processing easier as only this
- * writer need know about the +1 translation required for writing.
- * <p>
- * The states illustrated in the base class apply here as well,
- * remembering that the end offset for a row (or array position)
- * is written one ahead of the vector index.
- * <p>
- * The vector index does create an interesting dynamic for the child
- * writers. From the child writer's perspective, the states described in
- * the super class are the only states of interest. Here we want to
- * take the perspective of the parent.
- * <p>
- * The offset vector is an implementation of a repeat level. A repeat
- * level can occur for a single array, or for a collection of columns
- * within a repeated map. (A repeat level also occurs for variable-width
- * fields, but this is a bit harder to see, so let's ignore that for
- * now.)
- * <p>
- * The key point to realize is that each repeat level introduces an
- * isolation level in terms of indexing. That is, empty values in the
- * outer level have no affect on indexing in the inner level. In fact,
- * the nature of a repeated outer level means that there are no empties
- * in the inner level.
- * <p>
- * To illustrate:<pre><code>
- * Offset Vector Data Vector Indexes
- * lw, v > | 10 | - - - - - > | X | 10
- * | 12 | - - + | X | < lw' 11
- * | | + - - > | | < v' 12
- * </code></pre>
- * In the above, the client has just written an array of two elements
- * at the current write position. The data starts at offset 10 in
- * the data vector, and the next write will be at 12. The end offset
- * is written one ahead of the vector index.
- * <p>
- * From the data vector's perspective, its last-write (lw') reflects
- * the last element written. If this is an array of scalars, then the
- * write index is automatically incremented, as illustrated by v'.
- * (For map arrays, the index must be incremented by calling
- * <tt>save()</tt> on the map array writer.)
- * <p>
- * Suppose the client now skips some arrays:<pre><code>
- * Offset Vector Data Vector
- * lw > | 10 | - - - - - > | X | 10
- * | 12 | - - + | X | < lw' 11
- * | | + - - > | | < v' 12
- * | | | | 13
- * v > | | | | 14
- * </code></pre>
- * The last write position does not move and there are gaps in the
- * offset vector. The vector index points to the current row. Note
- * that the data vector last write and vector indexes do not change,
- * this reflects the fact that the the data vector's vector index
- * (v') matches the tail offset
- * <p>
- * The
- * client now writes a three-element vector:<pre><code>
- * Offset Vector Data Vector
- * | 10 | - - - - - > | X | 10
- * | 12 | - - + | X | 11
- * | 12 | - - + - - > | Y | 12
- * | 12 | - - + | Y | 13
- * lw, v > | 12 | - - + | Y | < lw' 14
- * | 15 | - - - - - > | | < v' 15
- * </code></pre>
- * Quite a bit just happened. The empty offset slots were back-filled
- * with the last write offset in the data vector. The client wrote
- * three values, which advanced the last write and vector indexes
- * in the data vector. And, the last write index in the offset
- * vector also moved to reflect the update of the offset vector.
- * Note that as a result, multiple positions in the offset vector
- * point to the same location in the data vector. This is fine; we
- * compute the number of entries as the difference between two successive
- * offset vector positions, so the empty positions have become 0-length
- * arrays.
- * <p>
- * Note that, for an array of scalars, when overflow occurs,
- * we need only worry about two
- * states in the data vector. Either data has been written for the
- * row (as in the third example above), and so must be moved to the
- * roll-over vector, or no data has been written and no move is
- * needed. We never have to worry about missing values because the
- * cannot occur in the data vector.
- * <p>
- * See {@link ObjectArrayWriter} for information about arrays of
- * maps (arrays of multiple columns.)
+ * Interface for specialized operations on an offset vector.
*/
-public class OffsetVectorWriter extends AbstractFixedWidthWriter {
-
- private static final int VALUE_WIDTH = UInt4Vector.VALUE_WIDTH;
-
- private UInt4Vector vector;
-
- /**
- * Offset of the first value for the current row. Used during
- * overflow or if the row is restarted.
- */
-
- private int rowStartOffset;
-
- /**
- * Cached value of the end offset for the current value. Used
- * primarily for variable-width columns to allow the column to be
- * rewritten multiple times within the same row. The start offset
- * value is updated with the end offset only when the value is
- * committed in {@link @endValue()}.
- */
-
- private int nextOffset;
-
- public OffsetVectorWriter(UInt4Vector vector) {
- this.vector = vector;
- }
-
- @Override public BaseDataValueVector vector() { return vector; }
- @Override public int width() { return VALUE_WIDTH; }
-
- @Override
- protected void realloc(int size) {
- vector.reallocRaw(size);
- setBuffer();
- }
-
- @Override
- public ValueType valueType() { return ValueType.INTEGER; }
-
- @Override
- public void startWrite() {
- super.startWrite();
- nextOffset = 0;
- rowStartOffset = 0;
-
- // Special handling for first value. Alloc vector if needed.
- // Offset vectors require a 0 at position 0. The (end) offset
- // for row 0 starts at position 1, which is handled in
- // writeOffset() below.
-
- if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) {
- realloc(MIN_BUFFER_SIZE);
- }
- vector.getBuffer().setInt(0, 0);
- }
-
- public int nextOffset() { return nextOffset; }
- public int rowStartOffset() { return rowStartOffset; }
-
- @Override
- public void startRow() { rowStartOffset = nextOffset; }
-
- /**
- * Return the write offset, which is one greater than the index reported
- * by the vector index.
- *
- * @return the offset in which to write the current offset of the end
- * of the current data value
- */
-
- protected final int writeIndex() {
-
- // "Fast path" for the normal case of no fills, no overflow.
- // This is the only bounds check we want to do for the entire
- // set operation.
-
- // This is performance critical code; every operation counts.
- // Please be thoughtful when changing the code.
-
- final int valueIndex = vectorIndex.vectorIndex();
- int writeIndex = valueIndex + 1;
- if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) {
- writeIndex = prepareWrite(writeIndex);
- }
-
- // Track the last write location for zero-fill use next time around.
- // Recall, it is the value index, which is one less than the (end)
- // offset index.
-
- lastWriteIndex = valueIndex;
- return writeIndex;
- }
-
- protected int prepareWrite(int writeIndex) {
-
- // Either empties must be filled or the vector is full.
-
- resize(writeIndex);
-
- // Call to resize may cause rollover, so reset write index
- // afterwards.
-
- final int valueIndex = vectorIndex.vectorIndex();
-
- // Fill empties to the write position.
- // Fill empties works off the row index, not the write
- // index. The write index is one past the row index.
- // (Yes, this is complex...)
-
- fillEmpties(valueIndex);
- return valueIndex + 1;
- }
-
- @Override
- protected final void fillEmpties(final int valueIndex) {
- while (lastWriteIndex < valueIndex - 1) {
- drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
- }
- }
-
- public final void setNextOffset(final int newOffset) {
- final int writeIndex = writeIndex();
- drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
- nextOffset = newOffset;
- }
-
- @Override
- public void skipNulls() {
-
- // Nothing to do. Fill empties logic will fill in missing
- // offsets.
- }
-
- @Override
- public void restartRow() {
- nextOffset = rowStartOffset;
- super.restartRow();
- }
-
- @Override
- public void preRollover() {
- setValueCount(vectorIndex.rowStartIndex() + 1);
- }
-
- @Override
- public void postRollover() {
- final int newNext = nextOffset - rowStartOffset;
- super.postRollover();
- nextOffset = newNext;
- }
-
- @Override
- public void setValueCount(int valueCount) {
-
- // Slightly different version of the fixed-width writer
- // code. Offset counts are one greater than the value count.
- // But for all other purposes, we track the value (row)
- // position, not the offset position.
-
- mandatoryResize(valueCount);
- fillEmpties(valueCount);
- vector().getBuffer().writerIndex((valueCount + 1) * width());
- lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1);
- }
-
- @Override
- public void dump(HierarchicalFormatter format) {
- format.extend();
- super.dump(format);
- format
- .attribute("lastWriteIndex", lastWriteIndex)
- .attribute("nextOffset", nextOffset)
- .endObject();
- }
+public interface OffsetVectorWriter extends ScalarWriter, WriterEvents {
+ int rowStartOffset();
+ int nextOffset();
+ void setNextOffset(int vectorIndex);
+ void dump(HierarchicalFormatter format);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
new file mode 100644
index 000000000..edcb9f584
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Specialized column writer for the (hidden) offset vector used
+ * with variable-length or repeated vectors. See comments in the
+ * <tt>ColumnAccessors.java</tt> template file for more details.
+ * <p>
+ * Note that the <tt>lastWriteIndex</tt> tracked here corresponds
+ * to the data values; it is one less than the actual offset vector
+ * last write index due to the nature of offset vector layouts. The selection
+ * of last write index basis makes roll-over processing easier as only this
+ * writer need know about the +1 translation required for writing.
+ * <p>
+ * The states illustrated in the base class apply here as well,
+ * remembering that the end offset for a row (or array position)
+ * is written one ahead of the vector index.
+ * <p>
+ * The vector index does create an interesting dynamic for the child
+ * writers. From the child writer's perspective, the states described in
+ * the super class are the only states of interest. Here we want to
+ * take the perspective of the parent.
+ * <p>
+ * The offset vector is an implementation of a repeat level. A repeat
+ * level can occur for a single array, or for a collection of columns
+ * within a repeated map. (A repeat level also occurs for variable-width
+ * fields, but this is a bit harder to see, so let's ignore that for
+ * now.)
+ * <p>
+ * The key point to realize is that each repeat level introduces an
+ * isolation level in terms of indexing. That is, empty values in the
+ * outer level have no affect on indexing in the inner level. In fact,
+ * the nature of a repeated outer level means that there are no empties
+ * in the inner level.
+ * <p>
+ * To illustrate:<pre><code>
+ * Offset Vector Data Vector Indexes
+ * lw, v > | 10 | - - - - - > | X | 10
+ * | 12 | - - + | X | < lw' 11
+ * | | + - - > | | < v' 12
+ * </code></pre>
+ * In the above, the client has just written an array of two elements
+ * at the current write position. The data starts at offset 10 in
+ * the data vector, and the next write will be at 12. The end offset
+ * is written one ahead of the vector index.
+ * <p>
+ * From the data vector's perspective, its last-write (lw') reflects
+ * the last element written. If this is an array of scalars, then the
+ * write index is automatically incremented, as illustrated by v'.
+ * (For map arrays, the index must be incremented by calling
+ * <tt>save()</tt> on the map array writer.)
+ * <p>
+ * Suppose the client now skips some arrays:<pre><code>
+ * Offset Vector Data Vector
+ * lw > | 10 | - - - - - > | X | 10
+ * | 12 | - - + | X | < lw' 11
+ * | | + - - > | | < v' 12
+ * | | | | 13
+ * v > | | | | 14
+ * </code></pre>
+ * The last write position does not move and there are gaps in the
+ * offset vector. The vector index points to the current row. Note
+ * that the data vector last write and vector indexes do not change,
+ * this reflects the fact that the the data vector's vector index
+ * (v') matches the tail offset
+ * <p>
+ * The
+ * client now writes a three-element vector:<pre><code>
+ * Offset Vector Data Vector
+ * | 10 | - - - - - > | X | 10
+ * | 12 | - - + | X | 11
+ * | 12 | - - + - - > | Y | 12
+ * | 12 | - - + | Y | 13
+ * lw, v > | 12 | - - + | Y | < lw' 14
+ * | 15 | - - - - - > | | < v' 15
+ * </code></pre>
+ * Quite a bit just happened. The empty offset slots were back-filled
+ * with the last write offset in the data vector. The client wrote
+ * three values, which advanced the last write and vector indexes
+ * in the data vector. And, the last write index in the offset
+ * vector also moved to reflect the update of the offset vector.
+ * Note that as a result, multiple positions in the offset vector
+ * point to the same location in the data vector. This is fine; we
+ * compute the number of entries as the difference between two successive
+ * offset vector positions, so the empty positions have become 0-length
+ * arrays.
+ * <p>
+ * Note that, for an array of scalars, when overflow occurs,
+ * we need only worry about two
+ * states in the data vector. Either data has been written for the
+ * row (as in the third example above), and so must be moved to the
+ * roll-over vector, or no data has been written and no move is
+ * needed. We never have to worry about missing values because the
+ * cannot occur in the data vector.
+ * <p>
+ * See {@link ObjectArrayWriter} for information about arrays of
+ * maps (arrays of multiple columns.)
+ */
+
+public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements OffsetVectorWriter {
+
+ private static final int VALUE_WIDTH = UInt4Vector.VALUE_WIDTH;
+
+ private final UInt4Vector vector;
+
+ /**
+ * Offset of the first value for the current row. Used during
+ * overflow or if the row is restarted.
+ */
+
+ private int rowStartOffset;
+
+ /**
+ * Cached value of the end offset for the current value. Used
+ * primarily for variable-width columns to allow the column to be
+ * rewritten multiple times within the same row. The start offset
+ * value is updated with the end offset only when the value is
+ * committed in {@link @endValue()}.
+ */
+
+ private int nextOffset;
+
+ public OffsetVectorWriterImpl(UInt4Vector vector) {
+ this.vector = vector;
+ }
+
+ @Override public BaseDataValueVector vector() { return vector; }
+ @Override public int width() { return VALUE_WIDTH; }
+
+ @Override
+ protected void realloc(int size) {
+ vector.reallocRaw(size);
+ setBuffer();
+ }
+
+ @Override
+ public ValueType valueType() { return ValueType.INTEGER; }
+
+ @Override
+ public void startWrite() {
+ super.startWrite();
+ nextOffset = 0;
+ rowStartOffset = 0;
+
+ // Special handling for first value. Alloc vector if needed.
+ // Offset vectors require a 0 at position 0. The (end) offset
+ // for row 0 starts at position 1, which is handled in
+ // writeOffset() below.
+
+ if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) {
+ realloc(MIN_BUFFER_SIZE);
+ }
+ drillBuf.setInt(0, 0);
+ }
+
+ @Override
+ public int nextOffset() { return nextOffset; }
+
+ @Override
+ public int rowStartOffset() { return rowStartOffset; }
+
+ @Override
+ public void startRow() { rowStartOffset = nextOffset; }
+
+ /**
+ * Return the write offset, which is one greater than the index reported
+ * by the vector index.
+ *
+ * @return the offset in which to write the current offset of the end
+ * of the current data value
+ */
+
+ protected final int prepareWrite() {
+
+ // "Fast path" for the normal case of no fills, no overflow.
+ // This is the only bounds check we want to do for the entire
+ // set operation.
+
+ // This is performance critical code; every operation counts.
+ // Please be thoughtful when changing the code.
+
+ final int valueIndex = vectorIndex.vectorIndex();
+ int writeIndex = valueIndex + 1;
+ if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) {
+ writeIndex = prepareWrite(writeIndex);
+ }
+
+ // Track the last write location for zero-fill use next time around.
+
+ lastWriteIndex = valueIndex;
+ return writeIndex;
+ }
+
+ protected int prepareWrite(int writeIndex) {
+
+ // Either empties must be filed or the vector is full.
+
+ resize(writeIndex);
+
+ // Call to resize may cause rollover, so reset write index
+ // afterwards.
+
+ final int valueIndex = vectorIndex.vectorIndex();
+
+ // Fill empties to the write position.
+ // Fill empties works of the row index, not the write
+ // index. (Yes, this is complex...)
+
+ fillEmpties(valueIndex);
+ return valueIndex + 1;
+ }
+
+ @Override
+ protected final void fillEmpties(final int valueIndex) {
+ while (lastWriteIndex < valueIndex - 1) {
+ drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
+ }
+ }
+
+ @Override
+ public final void setNextOffset(final int newOffset) {
+ final int writeIndex = prepareWrite();
+ drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
+ nextOffset = newOffset;
+ }
+
+ @Override
+ public void skipNulls() {
+
+ // Nothing to do. Fill empties logic will fill in missing
+ // offsets.
+ }
+
+ @Override
+ public void restartRow() {
+ nextOffset = rowStartOffset;
+ super.restartRow();
+ }
+
+ @Override
+ public void preRollover() {
+
+ // Rollover is occurring. This means the current row is not complete.
+ // We want to keep 0..(row index - 1) which gives us (row index)
+ // rows. But, this being an offset vector, we add one to account
+ // for the extra 0 value at the start.
+
+ setValueCount(vectorIndex.rowStartIndex() + 1);
+ }
+
+ @Override
+ public void postRollover() {
+ final int newNext = nextOffset - rowStartOffset;
+ super.postRollover();
+ nextOffset = newNext;
+ }
+
+ @Override
+ public void setValueCount(int valueCount) {
+ mandatoryResize(valueCount);
+
+ // Value count are in offset vector positions. Fill empties
+ // works in row positions.
+
+ fillEmpties(valueCount);
+ vector().getBuffer().writerIndex((valueCount + 1) * VALUE_WIDTH);
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.extend();
+ super.dump(format);
+ format
+ .attribute("lastWriteIndex", lastWriteIndex)
+ .attribute("nextOffset", nextOffset)
+ .endObject();
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
index 8382ad17a..2ac7d45b5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
@@ -17,11 +17,11 @@
*/
package org.apache.drill.exec.vector.accessor.writer;
+import java.lang.reflect.Array;
import java.math.BigDecimal;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -64,14 +64,14 @@ public class ScalarArrayWriter extends BaseArrayWriter {
public ScalarArrayWriter(ColumnMetadata schema,
RepeatedValueVector vector, BaseScalarWriter elementWriter) {
- super(vector.getOffsetVector(),
- new ScalarObjectWriter(schema, elementWriter));
+ super(schema, vector.getOffsetVector(),
+ new ScalarObjectWriter(elementWriter));
this.elementWriter = elementWriter;
}
public static ArrayObjectWriter build(ColumnMetadata schema,
RepeatedValueVector repeatedVector, BaseScalarWriter elementWriter) {
- return new ArrayObjectWriter(schema,
+ return new ArrayObjectWriter(
new ScalarArrayWriter(schema, repeatedVector, elementWriter));
}
@@ -83,25 +83,28 @@ public class ScalarArrayWriter extends BaseArrayWriter {
}
@Override
- public void bindListener(ColumnWriterListener listener) {
- elementWriter.bindListener(listener);
- }
-
- @Override
public void save() {
// No-op: done when writing each scalar value
+ // May be called, however, by code that also supports
+ // lists as a list does require an explicit save.
}
- @Override
- public void set(Object... values) {
- for (Object value : values) {
- entry().set(value);
- }
- }
+ /**
+ * Set a repeated vector based on a Java array of the proper
+ * type. This function involves parsing the array type and so is
+ * suitable only for test code. The array can be either a primitive
+ * (<tt>int [], say</tt>) or a typed array of boxed values
+ * (<tt>Integer[], say</tt>).
+ */
@Override
public void setObject(Object array) {
- if (array == null) {
+
+ // Accept an empty array (of any type) to mean
+ // an empty array of the type of this writer.
+
+ if (array == null || Array.getLength(array) == 0) {
+
// Assume null means a 0-element array since Drill does
// not support null for the whole array.
@@ -109,7 +112,9 @@ public class ScalarArrayWriter extends BaseArrayWriter {
}
String objClass = array.getClass().getName();
if (! objClass.startsWith("[")) {
- throw new IllegalArgumentException("Argument must be an array");
+ throw new IllegalArgumentException(
+ String.format("Argument must be an array. Column `%s`, value = %s",
+ schema.name(), array.toString()));
}
// Figure out type
@@ -125,39 +130,50 @@ public class ScalarArrayWriter extends BaseArrayWriter {
setBytesArray((byte[][]) array);
break;
default:
- throw new IllegalArgumentException( "Unknown Java array type: " + objClass );
+ throw new IllegalArgumentException(
+ String.format("Unknown Java array type: %s, for column `%s`",
+ objClass, schema.name()));
}
break;
+ case 'B':
+ setByteArray((byte[]) array);
+ break;
case 'S':
- setShortArray((short[]) array );
+ setShortArray((short[]) array);
break;
case 'I':
- setIntArray((int[]) array );
+ setIntArray((int[]) array);
break;
case 'J':
- setLongArray((long[]) array );
+ setLongArray((long[]) array);
break;
case 'F':
- setFloatArray((float[]) array );
+ setFloatArray((float[]) array);
break;
case 'D':
- setDoubleArray((double[]) array );
+ setDoubleArray((double[]) array);
break;
case 'Z':
- setBooleanArray((boolean[]) array );
+ setBooleanArray((boolean[]) array);
break;
case 'L':
int posn = objClass.indexOf(';');
// If the array is of type Object, then we have no type info.
- String memberClassName = objClass.substring( 2, posn );
+ String memberClassName = objClass.substring(2, posn);
if (memberClassName.equals(String.class.getName())) {
- setStringArray((String[]) array );
+ setStringArray((String[]) array);
} else if (memberClassName.equals(Period.class.getName())) {
- setPeriodArray((Period[]) array );
+ setPeriodArray((Period[]) array);
} else if (memberClassName.equals(BigDecimal.class.getName())) {
- setBigDecimalArray((BigDecimal[]) array );
+ setBigDecimalArray((BigDecimal[]) array);
+ } else if (memberClassName.equals(Integer.class.getName())) {
+ setIntObjectArray((Integer[]) array);
+ } else if (memberClassName.equals(Long.class.getName())) {
+ setLongObjectArray((Long[]) array);
+ } else if (memberClassName.equals(Double.class.getName())) {
+ setDoubleObjectArray((Double[]) array);
} else {
throw new IllegalArgumentException( "Unknown Java array type: " + memberClassName );
}
@@ -179,6 +195,12 @@ public class ScalarArrayWriter extends BaseArrayWriter {
}
}
+ public void setByteArray(byte[] value) {
+ for (int i = 0; i < value.length; i++) {
+ elementWriter.setInt(value[i]);
+ }
+ }
+
public void setShortArray(short[] value) {
for (int i = 0; i < value.length; i++) {
elementWriter.setInt(value[i]);
@@ -191,12 +213,34 @@ public class ScalarArrayWriter extends BaseArrayWriter {
}
}
+ public void setIntObjectArray(Integer[] value) {
+ for (int i = 0; i < value.length; i++) {
+ Integer element = value[i];
+ if (element == null) {
+ elementWriter.setNull();
+ } else {
+ elementWriter.setInt(element);
+ }
+ }
+ }
+
public void setLongArray(long[] value) {
for (int i = 0; i < value.length; i++) {
elementWriter.setLong(value[i]);
}
}
+ public void setLongObjectArray(Long[] value) {
+ for (int i = 0; i < value.length; i++) {
+ Long element = value[i];
+ if (element == null) {
+ elementWriter.setNull();
+ } else {
+ elementWriter.setLong(element);
+ }
+ }
+ }
+
public void setFloatArray(float[] value) {
for (int i = 0; i < value.length; i++) {
elementWriter.setDouble(value[i]);
@@ -209,6 +253,17 @@ public class ScalarArrayWriter extends BaseArrayWriter {
}
}
+ public void setDoubleObjectArray(Double[] value) {
+ for (int i = 0; i < value.length; i++) {
+ Double element = value[i];
+ if (element == null) {
+ elementWriter.setNull();
+ } else {
+ elementWriter.setDouble(element);
+ }
+ }
+ }
+
public void setStringArray(String[] value) {
for (int i = 0; i < value.length; i++) {
elementWriter.setString(value[i]);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
index 7566f2861..c8f9f4896 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
@@ -40,6 +40,38 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
public interface WriterEvents {
/**
+ * Tracks the write state of a tuple or variant to allow applying the correct
+ * operations to newly-added columns to synchronize them with the rest
+ * of the writers.
+ */
+
+ enum State {
+
+ /**
+ * No write is in progress. Nothing need be done to newly-added
+ * writers.
+ */
+ IDLE,
+
+ /**
+ * <tt>startWrite()</tt> has been called to start a write operation
+ * (start a batch), but <tt>startValue()</tt> has not yet been called
+ * to start a row (or value within an array). <tt>startWrite()</tt> must
+ * be called on newly added columns.
+ */
+
+ IN_WRITE,
+
+ /**
+ * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on
+ * the tuple to prepare for writing values, and both must be called on
+ * newly-added vectors.
+ */
+
+ IN_ROW
+ }
+
+ /**
* Bind the writer to a writer index.
*
* @param index the writer index (top level or nested for
@@ -48,8 +80,6 @@ public interface WriterEvents {
void bindIndex(ColumnWriterIndex index);
- ColumnWriterIndex writerIndex();
-
/**
* Start a write (batch) operation. Performs any vector initialization
* required at the start of a batch (especially for offset vectors.)
@@ -113,15 +143,4 @@ public interface WriterEvents {
*/
void postRollover();
-
- /**
- * Return the last write position in the vector. This may be the
- * same as the writer index position (if the vector was written at
- * that point), or an earlier point. In either case, this value
- * points to the last valid value in the vector.
- *
- * @return index of the last valid value in the vector
- */
-
- int lastWriteIndex();
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
index 7c9f8ba80..3d64bb6b7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
@@ -17,9 +17,8 @@
*/
package org.apache.drill.exec.vector.accessor.writer.dummy;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
@@ -35,30 +34,37 @@ import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
*/
public class DummyArrayWriter extends AbstractArrayWriter {
+ public static class DummyOffsetVectorWriter extends DummyScalarWriter implements OffsetVectorWriter {
+
+ public DummyOffsetVectorWriter() {
+ super(null);
+ }
+
+ @Override
+ public int rowStartOffset() { return 0; }
+
+ @Override
+ public int nextOffset() { return 0; }
+
+ @Override
+ public void setNextOffset(int vectorIndex) { }
+ }
+
+ public static final DummyOffsetVectorWriter offsetVectorWriter = new DummyOffsetVectorWriter();
+
public DummyArrayWriter(
+ ColumnMetadata schema,
AbstractObjectWriter elementWriter) {
- super(elementWriter);
+ super(schema, elementWriter, offsetVectorWriter);
}
@Override
- public int size() { return 0; }
-
- @Override
public void save() { }
@Override
- public void set(Object... values) { }
-
- @Override
public void setObject(Object array) { }
@Override
- public void bindIndex(ColumnWriterIndex index) { }
-
- @Override
- public ColumnWriterIndex writerIndex() { return null; }
-
- @Override
public void startWrite() { }
@Override
@@ -86,11 +92,5 @@ public class DummyArrayWriter extends AbstractArrayWriter {
public int lastWriteIndex() { return 0; }
@Override
- public void bindListener(ColumnWriterListener listener) { }
-
- @Override
- public void bindListener(TupleWriterListener listener) { }
-
- @Override
- public OffsetVectorWriter offsetWriter() { return null; }
+ public void bindIndex(ColumnWriterIndex index) { }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
index e8272d62c..9cfe56e22 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.vector.accessor.writer.dummy;
import java.math.BigDecimal;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ValueType;
@@ -33,11 +34,18 @@ import org.joda.time.Period;
public class DummyScalarWriter extends AbstractScalarWriter {
+ public DummyScalarWriter(ColumnMetadata schema) {
+ this.schema = schema;
+ }
+
@Override
public void bindListener(ColumnWriterListener listener) { }
@Override
- public ValueType valueType() { return null; }
+ public ValueType valueType() { return ValueType.NULL; }
+
+ @Override
+ public boolean nullable() { return true; }
@Override
public void setNull() { }
@@ -67,9 +75,6 @@ public class DummyScalarWriter extends AbstractScalarWriter {
public void bindIndex(ColumnWriterIndex index) { }
@Override
- public ColumnWriterIndex writerIndex() { return null; }
-
- @Override
public void restartRow() { }
@Override
@@ -86,4 +91,7 @@ public class DummyScalarWriter extends AbstractScalarWriter {
@Override
public BaseDataValueVector vector() { return null; }
+
+ @Override
+ public int rowStartIndex() { return 0; }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
index 5e827faa6..9efc59cb5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/package-info.java
@@ -104,11 +104,18 @@
* </ul>
* <p>
* This data model is similar to; but has important differences from, the prior,
- * generated, readers and writers.
+ * generated, readers and writers. This version is based on the concept of
+ * minimizing the number of writer classes, and leveraging Java primitives to
+ * keep the number of get/set methods to a reasonable size. This version also
+ * automates vector allocation, vector overflow and so on.
* <p>
* The object layer is new: it is the simplest way to model the three "object
* types." An app using this code would use just the leaf scalar readers and
* writers.
+ * <p>
+ * Similarly, the {@link ColumnWriter} interface provides a uniform way to
+ * access services common to all writer types, while allowing each JSON-like
+ * writer to provide type-specific ways to access data.
*
* <h4>Writer Performance</h4>
*
@@ -145,6 +152,94 @@
* vector and works the same for repeated scalars, repeated maps and
* (eventually) lists and repeated lists.</li>
* </ul>
+ *
+ * <h4>Possible Improvements</h4>
+ *
+ * The code here works and has extensive unit tests. But, many improvements
+ * are possible:
+ * <ul>
+ * <li>Drill has four "container" vector types (Map, Union, List, Repeated
+ * List), each with its own quirky semantics. The code could be far simpler
+ * if the container semantics were unified.</li>
+ * <li>Similarly, the corresponding writers implement some very awkward
+ * logic to handle union and list containers. But, these vectors are not
+ * fully supported in Drill. This means that the code implements (and tests)
+ * many odd cases which no one may ever use. Better to limit the data types
+ * that Drill supports, implement those well, and deprecate the obscure
+ * cases.</li>
+ * <li>The same schema-parsing logic appears over and over in different
+ * guises. Some parse vectors, some parse batch schemas, others parse the
+ * column metadata (which provides information not available in the
+ * materialized field) and so on. Would be great to find some common way
+ * to do this work, perhaps in the form of a visitor. An earlier version of
+ * this code tried using visitors. But, since each representation has its
+ * own quirks, that approach didn't work out. A first step would be to come
+ * up with a standard schema description which can be used in all cases,
+ * then build a visitor on that.</li>
+ * <li>Many tests exist. But, the careful reader will note that Drill's
+ * vectors define a potentially very complex recursive structure (most
+ * variations of which are never used.) Additional testing should cover
+ * all cases, such as repeated lists that contain unions, or unions that
+ * contain repeated lists of tuples.</li>
+ * <li>Experience with the code may show additional redundancies that can
+ * be removed. Each fresh set of eyes may see things that prior folks
+ * missed.</li>
+ * </ul>
+ *
+ * <h4>Caveats</h4>
+ *
+ * The column accessors are divided into two packages: <tt>vector</tt> and
+ * <tt>java-exec</tt>. It is easy to add functionality in the wrong place,
+ * breaking abstraction and encapsulation. Here are some general guidelines:
+ * <ul>
+ * <li>The core reader and writer logic is implemented in this <tt>vector</tt>
+ * package. This package provides low-level tools to build accessors, but
+ * not the construction logic itself. (That is found in the <tt>java-exec</tt>
+ * layer.)</li>
+ * <li>The vector layer is designed to support both the simple "row set" and
+ * the more complex "result set loader" implementations.</li>
+ * <li>The "row set" layer wraps the accessors in tools that work on one batch
+ * (row set) at a time, without regard for schema versions, schema changes
+ * and the like. The row set layer is primarily for testing: building an input
+ * batch for some operator, and verifying an output batch. It also serves as a
+ * simple test framework for the accessors, without the complexity of other
+ * layers.</li>
+ * <li>The "result set loader" layer handles the complexity of dynamic schema
+ * evolution, schema versioning, vector overflow and projection. It provides
+ * an "industrial strength" version of the accessor mechanism intended for
+ * use in the scan operator (but which can be generalized for other operators.)
+ * </li>
+ * <li>The "listener" pattern is used to allow higher levels to "plug in"
+ * functionality to the accessor layer. This is especially true for schema
+ * evolution: listeners notify the higher layer to add a column, delegating
+ * the actual work to that higher layer.</li>
+ * </ul>
+ *
+ * Given all this, plan carefully where to make any improvement. If your change
+ * violates the dependencies below, perhaps reconsider another way to do the
+ * change.
+ * <code><pre>
+ * +------------+
+ * +-------------------------- | Result Set |
+ * v | Loader |
+ * +----------------+ +---------+ +------------+
+ * | Metadata | <-- | Row Set | |
+ * | Implementation | | Tools | |
+ * +----------------+ +---------+ |
+ * java-exec | | |
+ * ------------------- | ------------------- | ------ | ------------
+ * vector v v v
+ * +------------+ +-----------+
+ * | Metadata | <--------- | Column |
+ * | Interfaces | | Accessors |
+ * +------------+ +-----------+
+ * |
+ * v
+ * +---------+
+ * | Value |
+ * | Vectors |
+ * +---------+
+ * </pre></code>
*/
package org.apache.drill.exec.vector.accessor.writer;