aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJinfeng Ni <jni@maprtech.com>2014-06-09 13:02:52 -0700
committerJacques Nadeau <jacques@apache.org>2014-06-16 07:58:08 -0700
commitc789f1c8ace3e12d81ccae05e79e4539cf9c829c (patch)
treebfed836cffcd30f1e1572366d15e7faf180682a2
parentaaaca6a7829733a120018e1388c68672d035820b (diff)
DRILL-935: Run-time code generation support for function which decodes string/varbinary into complex JSON object.
-rw-r--r--common/src/main/java/org/apache/drill/common/types/Types.java2
-rw-r--r--exec/java-exec/src/main/codegen/templates/BaseWriter.java4
-rw-r--r--exec/java-exec/src/main/codegen/templates/ListWriters.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java107
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java101
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java105
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java30
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeWriter.java113
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java3
24 files changed, 614 insertions, 79 deletions
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 10d84e1d1..9280d03ee 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -349,7 +349,7 @@ public class Types {
case "binary":
return withMode(MinorType.VARBINARY, mode);
case "json":
- return withMode(MinorType.VARBINARY, mode);
+ return withMode(MinorType.LATE, mode);
default:
throw new UnsupportedOperationException("Could not determine type: " + typeName);
}
diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
index 0c241b21d..e281bc7e7 100644
--- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
@@ -74,6 +74,10 @@ public interface BaseWriter extends Positionable{
MapWriter rootAsMap();
ListWriter rootAsList();
boolean ok();
+
+ public void setPosition(int index);
+ public void setValueCount(int count);
+ public void reset();
}
}
diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java
index 64d92e218..278fddcd3 100644
--- a/exec/java-exec/src/main/codegen/templates/ListWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java
@@ -121,7 +121,8 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
switch(mode){
case INIT:
int vectorCount = container.size();
- Repeated${capName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, Repeated${capName}Vector.class);
+ Repeated${capName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, Repeated${capName}Vector.class);
+ innerVector = vector;
writer = new Repeated${capName}WriterImpl(vector, this);
if(vectorCount != container.size()) writer.allocate();
writer.setPosition(${index});
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 4a1814907..a5b65ecc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -86,6 +86,7 @@ public class PrintingResultsListener implements UserResultsListener {
if (isLastChunk) {
allocator.close();
latch.countDown();
+ System.out.println("Total rows returned : " + count.get());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
index e6370d804..f164bd84a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
@@ -22,12 +22,13 @@ import java.util.List;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.common.types.TypeProtos.MajorType;
-
+import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
public class DrillFuncHolderExpr extends FunctionHolderExpression implements Iterable<LogicalExpression>{
@@ -66,4 +67,8 @@ public class DrillFuncHolderExpr extends FunctionHolderExpression implements Ite
public boolean argConstantOnly(int i) {
return holder.isConstant(i);
}
+
+ public boolean isComplexWriterFuncHolder() {
+ return holder instanceof DrillComplexWriterFuncHolder;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 99d423f45..9b80dc08a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
import org.apache.drill.common.expression.ErrorCollector;
@@ -63,6 +64,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.HiveFuncHolder;
@@ -81,7 +83,12 @@ public class ExpressionTreeMaterializer {
};
public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry) {
- LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector), registry);
+ return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, registry, false);
+ }
+
+ public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry,
+ boolean allowComplexWriterExpr) {
+ LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr), registry);
if(out instanceof NullExpression){
return new TypedNullConstant(Types.optional(MinorType.INT));
}else{
@@ -93,10 +100,12 @@ public class ExpressionTreeMaterializer {
private ExpressionValidator validator = new ExpressionValidator();
private final ErrorCollector errorCollector;
private final VectorAccessible batch;
+ private final boolean allowComplexWriter;
- public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector) {
+ public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector, boolean allowComplexWriter) {
this.batch = batch;
this.errorCollector = errorCollector;
+ this.allowComplexWriter = allowComplexWriter;
}
private LogicalExpression validateNewExpr(LogicalExpression newExpr) {
@@ -135,6 +144,10 @@ public class ExpressionTreeMaterializer {
DrillFuncHolder matchedFuncHolder =
resolver.getBestMatch(registry.getDrillRegistry().getMethods().get(call.getName()), call);
+ if (matchedFuncHolder instanceof DrillComplexWriterFuncHolder && ! allowComplexWriter) {
+ errorCollector.addGeneralError(call.getPosition(), "Only ProjectRecordBatch could have complex writer function. You are using complex writer function " + call.getName() + " in a non-project operation!");
+ }
+
//new arg lists, possible with implicit cast inserted.
List<LogicalExpression> argsWithCast = Lists.newArrayList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
new file mode 100644
index 000000000..0efee6332
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.expr.fn;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
+import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JClass;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JMod;
+import com.sun.codemodel.JVar;
+
+public class DrillComplexWriterFuncHolder extends DrillSimpleFuncHolder{
+
+ private FieldReference ref;
+
+ public DrillComplexWriterFuncHolder(FunctionScope scope, NullHandling nullHandling, boolean isBinaryCommutative, boolean isRandom,
+ String[] registeredNames, ValueReference[] parameters, ValueReference returnValue, WorkspaceReference[] workspaceVars,
+ Map<String, String> methods, List<String> imports) {
+ super(scope, nullHandling, isBinaryCommutative, isRandom, registeredNames, parameters, returnValue, workspaceVars, methods, imports);
+ }
+
+ public void setReference(FieldReference ref) {
+ this.ref = ref;
+ }
+
+ @Override
+ protected HoldingContainer generateEvalBody(ClassGenerator<?> g, HoldingContainer[] inputVariables, String body, JVar[] workspaceJVars) {
+
+ g.getEvalBlock().directStatement(String.format("//---- start of eval portion of %s function. ----//", registeredNames[0]));
+
+ JBlock sub = new JBlock(true, true);
+ JBlock topSub = sub;
+
+ JVar complexWriter = g.declareClassField("complexWriter", g.getModel()._ref(ComplexWriter.class));
+
+ JClass cwClass = g.getModel().ref(ComplexWriterImpl.class);
+
+ JInvocation container = g.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
+
+ //Default name is "col", if not passed in a reference name for the output vector.
+ String refName = ref == null? "col" : ref.getRootSegment().getPath();
+
+ g.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+
+ JClass projBatchClass = g.getModel().ref(ProjectRecordBatch.class);
+ JExpression projBatch = JExpr.cast(projBatchClass, g.getMappingSet().getOutgoing());
+
+ g.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter));
+
+
+ g.getEvalBlock().add(complexWriter.invoke("setPosition").arg(g.getMappingSet().getValueWriteIndex()));
+
+ sub.decl(g.getModel()._ref(ComplexWriter.class), returnValue.name, complexWriter);
+
+ // add the subblock after the out declaration.
+ g.getEvalBlock().add(topSub);
+
+ addProtectedBlock(g, sub, body, inputVariables, workspaceJVars, false);
+
+
+ JConditional jc = g.getEvalBlock()._if(complexWriter.invoke("ok").not());
+
+ jc._then().add(complexWriter.invoke("reset"));
+ //jc._then().directStatement("System.out.println(\"debug : write ok fail!, inIndex = \" + inIndex);");
+ jc._then()._return(JExpr.FALSE);
+
+ //jc._else().directStatement("System.out.println(\"debug : write successful, inIndex = \" + inIndex);");
+
+ g.getEvalBlock().directStatement(String.format("//---- end of eval portion of %s function. ----//", registeredNames[0]));
+
+ return null;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 5d5acf0bc..bbe76de5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -236,8 +236,9 @@ public abstract class DrillFuncHolder {
public static class ValueReference {
MajorType type;
String name;
- boolean isConstant;
- boolean isFieldReader;
+ boolean isConstant = false;
+ boolean isFieldReader = false;
+ boolean isComplexWriter = false;
public ValueReference(MajorType type, String name) {
super();
@@ -245,8 +246,6 @@ public abstract class DrillFuncHolder {
Preconditions.checkNotNull(name);
this.type = type;
this.name = name;
- isConstant = false;
- this.isFieldReader = false;
}
public void setConstant(boolean isConstant) {
@@ -265,6 +264,15 @@ public abstract class DrillFuncHolder {
return ref;
}
+
+ public static ValueReference createComplexWriterRef(String name) {
+ MajorType type = Types.required(MinorType.LATE);
+ ValueReference ref = new ValueReference(type, name);
+ ref.isComplexWriter = true;
+
+ return ref;
+ }
+
}
public static class WorkspaceReference {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
index 44dd49d3d..75c3cbb11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder.ValueReference;
import org.apache.drill.exec.expr.fn.DrillFuncHolder.WorkspaceReference;
import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.janino.Java.CompilationUnit;
import org.codehaus.janino.Parser;
@@ -129,6 +130,17 @@ public class FunctionConverter {
continue;
}
+ // Special processing for @Output ComplexWriter
+ if (output != null && ComplexWriter.class.isAssignableFrom(field.getType())) {
+
+ if(outputField != null){
+ return failure("You've declared more than one @Output field. You must declare one and only @Output field per Function class.", clazz, field);
+ }else{
+ outputField = ValueReference.createComplexWriterRef(field.getName());
+ }
+ continue;
+ }
+
// check that param and output are value holders.
if(!ValueHolder.class.isAssignableFrom(field.getType())){
return failure(String.format("The field doesn't holds value of type %s which does not implement the ValueHolder interface. All fields of type @Param or @Output must extend this interface..", field.getType()), clazz, field);
@@ -154,15 +166,11 @@ public class FunctionConverter {
return failure("You've declared more than one @Output field. You must declare one and only @Output field per Function class.", clazz, field);
}else{
outputField = p;
-
}
-
}
-
+
}else{
// workspace work.
-// logger.debug("Found workspace field {}:{}", field.getType(), field.getName());
- //workspaceFields.add(new WorkspaceReference(field.getType(), field.getName()));
WorkspaceReference wsReference = new WorkspaceReference(field.getType(), field.getName());
if (template.scope() == FunctionScope.POINT_AGGREGATE && !ValueHolder.class.isAssignableFrom(field.getType()) ) {
@@ -217,8 +225,16 @@ public class FunctionConverter {
return new DrillDecimalAggFuncHolder(template.scope(), template.nulls(), template.isBinaryCommutative(),
template.isRandom(), registeredNames, ps, outputField, works, methods, imports);
case SIMPLE:
- return new DrillSimpleFuncHolder(template.scope(), template.nulls(), template.isBinaryCommutative(),
- template.isRandom(), registeredNames, ps, outputField, works, methods, imports);
+ if (outputField.isComplexWriter)
+ return new DrillComplexWriterFuncHolder(template.scope(), template.nulls(),
+ template.isBinaryCommutative(),
+ template.isRandom(), registeredNames,
+ ps, outputField, works, methods, imports);
+ else
+ return new DrillSimpleFuncHolder(template.scope(), template.nulls(),
+ template.isBinaryCommutative(),
+ template.isRandom(), registeredNames,
+ ps, outputField, works, methods, imports);
case DECIMAL_MAX_SCALE:
return new DrillDecimalMaxScaleFuncHolder(template.scope(), template.nulls(), template.isBinaryCommutative(),
template.isRandom(), registeredNames, ps, outputField, works, methods, imports);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
new file mode 100644
index 000000000..d8df54333
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.expr.fn.impl.conv;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.StringReader;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.vector.complex.fn.JsonWriter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import com.google.common.base.Charsets;
+
+public class JsonConvertFrom {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonConvertFrom.class);
+
+ private JsonConvertFrom(){}
+
+ @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL, isRandom = true)
+ public static class ConvertFromJson implements DrillSimpleFunc{
+
+ @Param VarBinaryHolder in;
+ @Output ComplexWriter writer;
+
+ public void setup(RecordBatch incoming){
+ }
+
+ public void eval(){
+
+ byte[] buf = new byte[in.end - in.start];
+ in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
+ String input = new String(buf, com.google.common.base.Charsets.UTF_8);
+
+ try {
+ org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader();
+
+ jsonReader.write(new java.io.StringReader(input), writer);
+
+ } catch (Exception e) {
+ System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace());
+ }
+ }
+ }
+
+ @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL, isRandom = true)
+ public static class ConvertFromJsonVarchar implements DrillSimpleFunc{
+
+ @Param VarCharHolder in;
+ @Output ComplexWriter writer;
+
+ public void setup(RecordBatch incoming){
+ }
+
+ public void eval(){
+
+ byte[] buf = new byte[in.end - in.start];
+ in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
+ String input = new String(buf, com.google.common.base.Charsets.UTF_8);
+
+ try {
+ org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader();
+
+ jsonReader.write(new java.io.StringReader(input), writer);
+
+ } catch (Exception e) {
+ System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace());
+ }
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 2914b671a..d142ff847 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -284,4 +284,9 @@ public class ScanBatch implements RecordBatch {
oContext.close();
}
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 51962792f..c9cd2dd69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -363,6 +363,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
}
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index fb681ffc0..c7fc8135e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -147,4 +147,9 @@ public class WireRecordBatch implements RecordBatch {
fragProvider.cleanup();
}
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 61c256b7e..f5a4444f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -37,10 +37,12 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
@@ -50,8 +52,10 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.carrotsearch.hppc.IntOpenHashSet;
import com.google.common.base.Preconditions;
@@ -63,6 +67,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private Projector projector;
private List<ValueVector> allocationVectors;
+ private List<ComplexWriter> complexWriters;
private boolean hasRemainder = false;
private int remainderIndex = 0;
private int recordCount;
@@ -85,55 +90,47 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
return super.innerNext();
}
+ public VectorContainer getOutgoingContainer() {
+ return this.container;
+ }
+
@Override
protected void doWork() {
// VectorUtil.showVectorAccessibleContent(incoming, ",");
int incomingRecordCount = incoming.getRecordCount();
- for(ValueVector v : this.allocationVectors){
-// AllocationHelper.allocate(v, incomingRecordCount, 250);
-// v.allocateNew();
- v.allocateNewSafe();
- }
+
+ doAlloc();
+
int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
if (outputRecords < incomingRecordCount) {
- for(ValueVector v : allocationVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(outputRecords);
- }
+ setValueCount(outputRecords);
hasRemainder = true;
remainderIndex = outputRecords;
this.recordCount = remainderIndex;
} else {
- for(ValueVector v : allocationVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(incomingRecordCount);
- }
+ setValueCount(incomingRecordCount);
for(VectorWrapper<?> v: incoming) {
v.clear();
}
this.recordCount = outputRecords;
}
+ // In case of complex writer expression, vectors would be added to batch run-time.
+ // We have to re-build the schema.
+ if (complexWriters != null) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ }
}
private void handleRemainder() {
int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
- for(ValueVector v : this.allocationVectors){
- //AllocationHelper.allocate(v, remainingRecordCount, 250);
- v.allocateNewSafe();
- }
+ doAlloc();
int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
if (projRecords < remainingRecordCount) {
- for(ValueVector v : allocationVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(projRecords);
- }
+ setValueCount(projRecords);
this.recordCount = projRecords;
remainderIndex += projRecords;
} else {
- for(ValueVector v : allocationVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(remainingRecordCount);
- }
+ setValueCount(remainingRecordCount);
hasRemainder = false;
remainderIndex = 0;
for(VectorWrapper<?> v: incoming) {
@@ -141,8 +138,48 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
this.recordCount = remainingRecordCount;
}
+ // In case of complex writer expression, vectors would be added to batch run-time.
+ // We have to re-build the schema.
+ if (complexWriters != null) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ }
}
+ public void addComplexWriter(ComplexWriter writer) {
+ complexWriters.add(writer);
+ }
+
+ private boolean doAlloc() {
+ //Allocate vv in the allocationVectors.
+ for(ValueVector v : this.allocationVectors){
+ //AllocationHelper.allocate(v, remainingRecordCount, 250);
+ if (!v.allocateNewSafe())
+ return false;
+ }
+
+ //Allocate vv for complexWriters.
+ if (complexWriters == null)
+ return true;
+
+ for (ComplexWriter writer : complexWriters)
+ writer.allocate();
+
+ return true;
+ }
+
+ private void setValueCount(int count) {
+ for(ValueVector v : allocationVectors){
+ ValueVector.Mutator m = v.getMutator();
+ m.setValueCount(count);
+ }
+
+ if (complexWriters == null)
+ return;
+
+ for (ComplexWriter writer : complexWriters)
+ writer.setValueCount(count);
+ }
+
/** hack to make ref and full work together... need to figure out if this is still necessary. **/
private FieldReference getRef(NamedExpression e){
FieldReference ref = e.getRef();
@@ -200,13 +237,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}else{
for(int i = 0; i < exprs.size(); i++){
final NamedExpression namedExpression = exprs.get(i);
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry());
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
if(collector.hasErrors()){
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
-
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
&& !((ValueVectorReadExpression) expr).hasReadPath()
@@ -223,7 +259,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
container.add(tp.getTo());
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
// logger.debug("Added transfer.");
- }else{
+ } else if (expr instanceof DrillFuncHolderExpr &&
+ ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
+ // Need to process ComplexWriter function evaluation.
+ // Lazy initialization of the list of complex writers, if not done yet.
+ if (complexWriters == null)
+ complexWriters = Lists.newArrayList();
+
+ // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
+ ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
+ cg.addExpr(expr);
+ } else{
// need to do evaluation.
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
allocationVectors.add(vector);
@@ -231,13 +277,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
HoldingContainer hc = cg.addExpr(write);
-
cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
logger.debug("Added eval.");
}
}
-
-
}
cg.rotateBlock();
cg.getEvalBlock()._return(JExpr.TRUE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 1f73192b2..19f649745 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -137,4 +138,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
public void cleanup() {
incoming.cleanup();
}
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 7b7b708db..d71b8112f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
- protected final VectorContainer container = new VectorContainer();
+ protected final VectorContainer container; //= new VectorContainer();
protected final T popConfig;
protected final FragmentContext context;
protected final OperatorContext oContext;
@@ -43,6 +43,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
this.popConfig = popConfig;
this.oContext = new OperatorContext(popConfig, context);
this.stats = oContext.getStats();
+ this.container = new VectorContainer(this.oContext);
}
@Override
@@ -135,4 +136,10 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return batch;
}
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 75b9b0ce0..7617d9185 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -85,6 +85,9 @@ public interface RecordBatch extends VectorAccessible {
public abstract SelectionVector4 getSelectionVector4();
+
+ public VectorContainer getOutgoingContainer();
+
/**
* Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the
* same as the ordinal position of the field within the Iterator provided this classes implementation of
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 3c674662f..1f2c33a4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -26,6 +26,8 @@ import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
@@ -39,8 +41,14 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto
protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
private BatchSchema schema;
private int recordCount = -1;
-
+ private final OperatorContext oContext;
+
public VectorContainer() {
+ this.oContext = null;
+ }
+
+ public VectorContainer( OperatorContext oContext) {
+ this.oContext = oContext;
}
// public VectorContainer(List<ValueVector> vectors, List<ValueVector[]> hyperVectors) {
@@ -66,8 +74,17 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto
add(vv, releasable);
}
- public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){
- return null;
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){
+ MaterializedField field = MaterializedField.create(name, type);
+ ValueVector v = TypeHelper.getNewVector(field, this.oContext.getAllocator());
+
+ add(v);
+
+ if(clazz.isAssignableFrom(v.getClass())){
+ return (T) v;
+ }else{
+ throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ }
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index d3168305e..028a7d647 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter;
import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -46,7 +47,7 @@ public class JSONRecordReader2 implements RecordReader{
private Path hadoopPath;
private FileSystem fileSystem;
private InputStream stream;
- private JsonReader jsonReader;
+ private JsonReaderWithState jsonReader;
public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
List<SchemaPath> columns) throws OutOfMemoryException {
@@ -61,7 +62,7 @@ public class JSONRecordReader2 implements RecordReader{
JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream);
this.writer = new VectorContainerWriter(output);
this.mutator = output;
- jsonReader = new JsonReader(splitter);
+ jsonReader = new JsonReaderWithState(splitter);
}catch(IOException e){
throw new ExecutionSetupException("Failure reading JSON file.", e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index e5d4b5be0..485d13492 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -43,30 +43,16 @@ import com.google.common.base.Charsets;
public class JsonReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
- public static enum WriteState {
- WRITE_SUCCEED, WRITE_FAILED, NO_MORE
- }
private final JsonFactory factory = new JsonFactory();
- private ByteBufInputStream stream;
- private long byteOffset;
- private JsonRecordSplitter splitter;
- private Reader reader;
private JsonParser parser;
- public JsonReader(JsonRecordSplitter splitter) throws JsonParseException, IOException {
- this.splitter = splitter;
+ public JsonReader() throws JsonParseException, IOException {
factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
factory.configure(Feature.ALLOW_COMMENTS, true);
- reader = splitter.getNextReader();
}
- public WriteState write(ComplexWriter writer) throws JsonParseException, IOException {
- if(reader == null){
- reader = splitter.getNextReader();
- if(reader == null) return WriteState.NO_MORE;
-
- }
+ public boolean write(Reader reader, ComplexWriter writer) throws JsonParseException, IOException {
parser = factory.createJsonParser(reader);
reader.mark(1024*128);
@@ -82,7 +68,7 @@ public class JsonReader {
writeData(writer.rootAsList());
break;
case NOT_AVAILABLE:
- return WriteState.NO_MORE;
+ return false;
default:
throw new JsonParseException(
String.format("Failure while parsing JSON. Found token of [%s] Drill currently only supports parsing "
@@ -91,13 +77,7 @@ public class JsonReader {
parser.getCurrentLocation());
}
- if (!writer.ok()) {
- reader.reset();
- return WriteState.WRITE_FAILED;
- } else {
- reader = null;
- return WriteState.WRITE_SUCCEED;
- }
+ return true;
}
@@ -227,5 +207,7 @@ public class JsonReader {
}
}
list.end();
+
+
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
new file mode 100644
index 000000000..cf885a437
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.io.Reader;
+
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import com.fasterxml.jackson.core.JsonParseException;
+
+public class JsonReaderWithState {
+
+ public static enum WriteState {
+ WRITE_SUCCEED, WRITE_FAILED, NO_MORE
+ }
+
+ private Reader reader;
+ private JsonRecordSplitter splitter;
+ private JsonReader jsonReader;
+
+ public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{
+ this.splitter = splitter;
+ reader = splitter.getNextReader();
+ jsonReader = new JsonReader();
+ }
+
+ public WriteState write(ComplexWriter writer) throws JsonParseException, IOException {
+ if (reader == null) {
+ reader = splitter.getNextReader();
+ if (reader == null)
+ return WriteState.NO_MORE;
+
+ }
+
+ jsonReader.write(reader, writer);
+
+ if (!writer.ok()) {
+ reader.reset();
+ return WriteState.WRITE_FAILED;
+ } else {
+ reader = null;
+ return WriteState.WRITE_SUCCEED;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index 0c7378a46..2fa72f755 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -17,11 +17,15 @@
*/
package org.apache.drill.exec.vector.complex.impl;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.StateTool;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.google.common.base.Preconditions;
@@ -160,5 +164,29 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
return listRoot;
}
+
+ private static class VectorAccessibleFacade extends MapVector {
+
+ private final VectorContainer vc;
+
+ public VectorAccessibleFacade(VectorContainer vc) {
+ super("", null);
+ this.vc = vc;
+ }
+
+ @Override
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+ ValueVector v = vc.addOrGet(name, type, clazz);
+ this.put(name, v);
+ return this.typeify(v, clazz);
+
+ }
+
+ }
+
+ public static ComplexWriter getWriter(String name, VectorContainer container){
+ VectorAccessibleFacade vc = new VectorAccessibleFacade(container);
+ return new ComplexWriterImpl(name, vc);
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeWriter.java
new file mode 100644
index 000000000..9011ff322
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeWriter.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector.complex.writer;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestComplexTypeWriter extends BaseTestQuery{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestComplexTypeReader.class);
+
+ @Test
+ //basic case. convert varchar into json.
+ public void testA0() throws Exception{
+ test(" select convert_from('{x:100, y:215.6}' ,'JSON') as mycol from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //map contains int, float, repeated list , repeated map, nested repeated map, etc.
+ public void testA1() throws Exception{
+ test(" select convert_from('{x:100, y:215.6, z: [1, 2, 3], s : [[5, 6, 7], [8, 9]], " +
+ " t : [{a : 100, b: 200}, {a:300, b: 400}], " +
+ " nrmp: [ { x: [{ id: 123}], y: { y : \"SQL\"} }] }' ,'JSON') " +
+ " as mycol from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //two convert functions.
+ public void testA2() throws Exception{
+ test(" select convert_from('{x:100, y:215.6}' ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //two convert functions. One convert's input comes from a string concat function.
+ public void testA3() throws Exception{
+ test(" select convert_from(concat('{x:100,', 'y:215.6}') ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //two convert functions. One's input is an empty map.
+ public void testA4() throws Exception{
+ test(" select convert_from('{}' ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //two convert functions. One's input is an empty list ( ok to have null in the result?)
+ public void testA5() throws Exception{
+ test(" select convert_from('[]' ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //input is a list of BigInt. Output will be a repeated list vector.
+ public void testA6() throws Exception{
+ test(" select convert_from('[1, 2, 3]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //input is a list of float. Output will be a repeated list vector.
+ public void testA7() throws Exception{
+ test(" select convert_from('[1.2, 2.3, 3.5]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //input is a list of list of big int. Output will be a repeated list vector.
+ public void testA8() throws Exception{
+ test(" select convert_from('[ [1, 2], [3, 4], [5]]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //input is a list of map. Output will be a repeated list vector.
+ public void testA9() throws Exception{
+ test(" select convert_from('[{a : 100, b: 200}, {a:300, b: 400}]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;");
+ }
+
+ @Test
+ //two convert functions, one regular nest functions, used with Filter op.
+ public void testA10() throws Exception{
+ test(" select convert_from('{x:100, y:215.6}' ,'JSON') as mycol1, " +
+ " convert_from('{x:200, y:678.9}' ,'JSON') as mycol2, " +
+ " 1 + 2 * 3 as numvalue " +
+ " from cp.`tpch/nation.parquet` where n_nationkey > 5;");
+ }
+
+ @Test
+ //convert from string constructed from columns in parquet file.
+ public void testA11() throws Exception{
+ test(" select convert_from(concat(concat('{ NationName: \"', N_NAME) , '\"}'), 'JSON')" +
+ " from cp.`tpch/nation.parquet` where n_nationkey > 5;");
+ }
+
+ @Test
+ //Test multiple batches creation ( require multiple alloc for complex writer during Project ).
+ public void testA100() throws Exception{
+ test(" select convert_from(concat(concat('{ Price : ', L_EXTENDEDPRICE) , '}') , 'JSON') " +
+ " from cp.`tpch/lineitem.parquet` limit 10; ");
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 1f3b54059..8b31958a5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
import org.apache.drill.exec.vector.complex.fn.JsonWriter;
import org.apache.drill.exec.vector.complex.fn.ReaderJSONRecordSplitter;
import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
@@ -87,7 +88,7 @@ public class TestJsonReader {
writer.allocate();
- JsonReader jsonReader = new JsonReader(new ReaderJSONRecordSplitter(compound));
+ JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound));
int i =0;
List<Integer> batchSizes = Lists.newArrayList();