diff options
author | Jinfeng Ni <jni@maprtech.com> | 2014-06-09 13:02:52 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-06-16 07:58:08 -0700 |
commit | c789f1c8ace3e12d81ccae05e79e4539cf9c829c (patch) | |
tree | bfed836cffcd30f1e1572366d15e7faf180682a2 | |
parent | aaaca6a7829733a120018e1388c68672d035820b (diff) |
DRILL-935: Run-time code generation support for function which decodes string/varbinary into complex JSON object.
24 files changed, 614 insertions, 79 deletions
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java index 10d84e1d1..9280d03ee 100644 --- a/common/src/main/java/org/apache/drill/common/types/Types.java +++ b/common/src/main/java/org/apache/drill/common/types/Types.java @@ -349,7 +349,7 @@ public class Types { case "binary": return withMode(MinorType.VARBINARY, mode); case "json": - return withMode(MinorType.VARBINARY, mode); + return withMode(MinorType.LATE, mode); default: throw new UnsupportedOperationException("Could not determine type: " + typeName); } diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java index 0c241b21d..e281bc7e7 100644 --- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java +++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java @@ -74,6 +74,10 @@ public interface BaseWriter extends Positionable{ MapWriter rootAsMap(); ListWriter rootAsList(); boolean ok(); + + public void setPosition(int index); + public void setValueCount(int count); + public void reset(); } } diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java index 64d92e218..278fddcd3 100644 --- a/exec/java-exec/src/main/codegen/templates/ListWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java @@ -121,7 +121,8 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ switch(mode){ case INIT: int vectorCount = container.size(); - Repeated${capName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, Repeated${capName}Vector.class); + Repeated${capName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, Repeated${capName}Vector.class); + innerVector = vector; writer = new Repeated${capName}WriterImpl(vector, this); if(vectorCount != container.size()) writer.allocate(); writer.setPosition(${index}); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java index 4a1814907..a5b65ecc0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java @@ -86,6 +86,7 @@ public class PrintingResultsListener implements UserResultsListener { if (isLastChunk) { allocator.close(); latch.countDown(); + System.out.println("Total rows returned : " + count.get()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java index e6370d804..f164bd84a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java @@ -22,12 +22,13 @@ import java.util.List; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.visitors.ExprVisitor; import org.apache.drill.common.types.TypeProtos.MajorType; - +import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder; import org.apache.drill.exec.expr.fn.DrillFuncHolder; public class DrillFuncHolderExpr extends FunctionHolderExpression implements Iterable<LogicalExpression>{ @@ -66,4 +67,8 @@ public class DrillFuncHolderExpr extends FunctionHolderExpression implements Ite public boolean argConstantOnly(int i) { return holder.isConstant(i); } + + public boolean isComplexWriterFuncHolder() { + return holder instanceof DrillComplexWriterFuncHolder; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java index 99d423f45..9b80dc08a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java @@ -25,6 +25,7 @@ import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; + import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; import org.apache.drill.common.expression.ErrorCollector; @@ -63,6 +64,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder; import org.apache.drill.exec.expr.fn.DrillFuncHolder; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.expr.fn.HiveFuncHolder; @@ -81,7 +83,12 @@ public class ExpressionTreeMaterializer { }; public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry) { - LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector), registry); + return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, registry, false); + } + + public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry, + boolean allowComplexWriterExpr) { + LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr), registry); if(out instanceof NullExpression){ return new TypedNullConstant(Types.optional(MinorType.INT)); }else{ @@ -93,10 +100,12 @@ public class ExpressionTreeMaterializer { private ExpressionValidator validator = new ExpressionValidator(); private final ErrorCollector errorCollector; private final VectorAccessible batch; + private final boolean allowComplexWriter; - public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector) { + public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector, boolean allowComplexWriter) { this.batch = batch; this.errorCollector = errorCollector; + this.allowComplexWriter = allowComplexWriter; } private LogicalExpression validateNewExpr(LogicalExpression newExpr) { @@ -135,6 +144,10 @@ public class ExpressionTreeMaterializer { DrillFuncHolder matchedFuncHolder = resolver.getBestMatch(registry.getDrillRegistry().getMethods().get(call.getName()), call); + if (matchedFuncHolder instanceof DrillComplexWriterFuncHolder && ! allowComplexWriter) { + errorCollector.addGeneralError(call.getPosition(), "Only ProjectRecordBatch could have complex writer function. You are using complex writer function " + call.getName() + " in a non-project operation!"); + } + //new arg lists, possible with implicit cast inserted. List<LogicalExpression> argsWithCast = Lists.newArrayList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java new file mode 100644 index 000000000..0efee6332 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.expr.fn; + +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; +import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +import com.sun.codemodel.JBlock; +import com.sun.codemodel.JClass; +import com.sun.codemodel.JConditional; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JInvocation; +import com.sun.codemodel.JMod; +import com.sun.codemodel.JVar; + +public class DrillComplexWriterFuncHolder extends DrillSimpleFuncHolder{ + + private FieldReference ref; + + public DrillComplexWriterFuncHolder(FunctionScope scope, NullHandling nullHandling, boolean isBinaryCommutative, boolean isRandom, + String[] registeredNames, ValueReference[] parameters, ValueReference returnValue, WorkspaceReference[] workspaceVars, + Map<String, String> methods, List<String> imports) { + super(scope, nullHandling, isBinaryCommutative, isRandom, registeredNames, parameters, returnValue, workspaceVars, methods, imports); + } + + public void setReference(FieldReference ref) { + this.ref = ref; + } + + @Override + protected HoldingContainer generateEvalBody(ClassGenerator<?> g, HoldingContainer[] inputVariables, String body, JVar[] workspaceJVars) { + + g.getEvalBlock().directStatement(String.format("//---- start of eval portion of %s function. ----//", registeredNames[0])); + + JBlock sub = new JBlock(true, true); + JBlock topSub = sub; + + JVar complexWriter = g.declareClassField("complexWriter", g.getModel()._ref(ComplexWriter.class)); + + JClass cwClass = g.getModel().ref(ComplexWriterImpl.class); + + JInvocation container = g.getMappingSet().getOutgoing().invoke("getOutgoingContainer"); + + //Default name is "col", if not passed in a reference name for the output vector. + String refName = ref == null? "col" : ref.getRootSegment().getPath(); + + g.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container)); + + JClass projBatchClass = g.getModel().ref(ProjectRecordBatch.class); + JExpression projBatch = JExpr.cast(projBatchClass, g.getMappingSet().getOutgoing()); + + g.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter)); + + + g.getEvalBlock().add(complexWriter.invoke("setPosition").arg(g.getMappingSet().getValueWriteIndex())); + + sub.decl(g.getModel()._ref(ComplexWriter.class), returnValue.name, complexWriter); + + // add the subblock after the out declaration. + g.getEvalBlock().add(topSub); + + addProtectedBlock(g, sub, body, inputVariables, workspaceJVars, false); + + + JConditional jc = g.getEvalBlock()._if(complexWriter.invoke("ok").not()); + + jc._then().add(complexWriter.invoke("reset")); + //jc._then().directStatement("System.out.println(\"debug : write ok fail!, inIndex = \" + inIndex);"); + jc._then()._return(JExpr.FALSE); + + //jc._else().directStatement("System.out.println(\"debug : write successful, inIndex = \" + inIndex);"); + + g.getEvalBlock().directStatement(String.format("//---- end of eval portion of %s function. ----//", registeredNames[0])); + + return null; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java index 5d5acf0bc..bbe76de5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java @@ -236,8 +236,9 @@ public abstract class DrillFuncHolder { public static class ValueReference { MajorType type; String name; - boolean isConstant; - boolean isFieldReader; + boolean isConstant = false; + boolean isFieldReader = false; + boolean isComplexWriter = false; public ValueReference(MajorType type, String name) { super(); @@ -245,8 +246,6 @@ public abstract class DrillFuncHolder { Preconditions.checkNotNull(name); this.type = type; this.name = name; - isConstant = false; - this.isFieldReader = false; } public void setConstant(boolean isConstant) { @@ -265,6 +264,15 @@ public abstract class DrillFuncHolder { return ref; } + + public static ValueReference createComplexWriterRef(String name) { + MajorType type = Types.required(MinorType.LATE); + ValueReference ref = new ValueReference(type, name); + ref.isComplexWriter = true; + + return ref; + } + } public static class WorkspaceReference { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java index 44dd49d3d..75c3cbb11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder.ValueReference; import org.apache.drill.exec.expr.fn.DrillFuncHolder.WorkspaceReference; import org.apache.drill.exec.expr.holders.ValueHolder; import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import org.codehaus.commons.compiler.CompileException; import org.codehaus.janino.Java.CompilationUnit; import org.codehaus.janino.Parser; @@ -129,6 +130,17 @@ public class FunctionConverter { continue; } + // Special processing for @Output ComplexWriter + if (output != null && ComplexWriter.class.isAssignableFrom(field.getType())) { + + if(outputField != null){ + return failure("You've declared more than one @Output field. You must declare one and only @Output field per Function class.", clazz, field); + }else{ + outputField = ValueReference.createComplexWriterRef(field.getName()); + } + continue; + } + // check that param and output are value holders. if(!ValueHolder.class.isAssignableFrom(field.getType())){ return failure(String.format("The field doesn't holds value of type %s which does not implement the ValueHolder interface. All fields of type @Param or @Output must extend this interface..", field.getType()), clazz, field); @@ -154,15 +166,11 @@ public class FunctionConverter { return failure("You've declared more than one @Output field. You must declare one and only @Output field per Function class.", clazz, field); }else{ outputField = p; - } - } - + }else{ // workspace work. -// logger.debug("Found workspace field {}:{}", field.getType(), field.getName()); - //workspaceFields.add(new WorkspaceReference(field.getType(), field.getName())); WorkspaceReference wsReference = new WorkspaceReference(field.getType(), field.getName()); if (template.scope() == FunctionScope.POINT_AGGREGATE && !ValueHolder.class.isAssignableFrom(field.getType()) ) { @@ -217,8 +225,16 @@ public class FunctionConverter { return new DrillDecimalAggFuncHolder(template.scope(), template.nulls(), template.isBinaryCommutative(), template.isRandom(), registeredNames, ps, outputField, works, methods, imports); case SIMPLE: - return new DrillSimpleFuncHolder(template.scope(), template.nulls(), template.isBinaryCommutative(), - template.isRandom(), registeredNames, ps, outputField, works, methods, imports); + if (outputField.isComplexWriter) + return new DrillComplexWriterFuncHolder(template.scope(), template.nulls(), + template.isBinaryCommutative(), + template.isRandom(), registeredNames, + ps, outputField, works, methods, imports); + else + return new DrillSimpleFuncHolder(template.scope(), template.nulls(), + template.isBinaryCommutative(), + template.isRandom(), registeredNames, + ps, outputField, works, methods, imports); case DECIMAL_MAX_SCALE: return new DrillDecimalMaxScaleFuncHolder(template.scope(), template.nulls(), template.isBinaryCommutative(), template.isRandom(), registeredNames, ps, outputField, works, methods, imports); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java new file mode 100644 index 000000000..d8df54333 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.expr.fn.impl.conv; + + +import java.io.ByteArrayOutputStream; +import java.io.StringReader; + +import io.netty.buffer.ByteBuf; + +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.annotations.Workspace; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.expr.holders.VarBinaryHolder; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.vector.complex.fn.JsonWriter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +import com.google.common.base.Charsets; + +public class JsonConvertFrom { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonConvertFrom.class); + + private JsonConvertFrom(){} + + @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL, isRandom = true) + public static class ConvertFromJson implements DrillSimpleFunc{ + + @Param VarBinaryHolder in; + @Output ComplexWriter writer; + + public void setup(RecordBatch incoming){ + } + + public void eval(){ + + byte[] buf = new byte[in.end - in.start]; + in.buffer.getBytes(in.start, buf, 0, in.end - in.start); + String input = new String(buf, com.google.common.base.Charsets.UTF_8); + + try { + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(); + + jsonReader.write(new java.io.StringReader(input), writer); + + } catch (Exception e) { + System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); + } + } + } + + @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL, isRandom = true) + public static class ConvertFromJsonVarchar implements DrillSimpleFunc{ + + @Param VarCharHolder in; + @Output ComplexWriter writer; + + public void setup(RecordBatch incoming){ + } + + public void eval(){ + + byte[] buf = new byte[in.end - in.start]; + in.buffer.getBytes(in.start, buf, 0, in.end - in.start); + String input = new String(buf, com.google.common.base.Charsets.UTF_8); + + try { + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(); + + jsonReader.write(new java.io.StringReader(input), writer); + + } catch (Exception e) { + System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); + } + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 2914b671a..d142ff847 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -284,4 +284,9 @@ public class ScanBatch implements RecordBatch { oContext.close(); } + @Override + public VectorContainer getOutgoingContainer() { + throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 51962792f..c9cd2dd69 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -363,6 +363,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { public Iterator<VectorWrapper<?>> iterator() { return container.iterator(); } + + @Override + public VectorContainer getOutgoingContainer() { + throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java index fb681ffc0..c7fc8135e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java @@ -147,4 +147,9 @@ public class WireRecordBatch implements RecordBatch { fragProvider.cleanup(); } + @Override + public VectorContainer getOutgoingContainer() { + throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 61c256b7e..f5a4444f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -37,10 +37,12 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.DrillFuncHolderExpr; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; @@ -50,8 +52,10 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntOpenHashSet; import com.google.common.base.Preconditions; @@ -63,6 +67,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private Projector projector; private List<ValueVector> allocationVectors; + private List<ComplexWriter> complexWriters; private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; @@ -85,55 +90,47 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ return super.innerNext(); } + public VectorContainer getOutgoingContainer() { + return this.container; + } + @Override protected void doWork() { // VectorUtil.showVectorAccessibleContent(incoming, ","); int incomingRecordCount = incoming.getRecordCount(); - for(ValueVector v : this.allocationVectors){ -// AllocationHelper.allocate(v, incomingRecordCount, 250); -// v.allocateNew(); - v.allocateNewSafe(); - } + + doAlloc(); + int outputRecords = projector.projectRecords(0, incomingRecordCount, 0); if (outputRecords < incomingRecordCount) { - for(ValueVector v : allocationVectors){ - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(outputRecords); - } + setValueCount(outputRecords); hasRemainder = true; remainderIndex = outputRecords; this.recordCount = remainderIndex; } else { - for(ValueVector v : allocationVectors){ - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(incomingRecordCount); - } + setValueCount(incomingRecordCount); for(VectorWrapper<?> v: incoming) { v.clear(); } this.recordCount = outputRecords; } + // In case of complex writer expression, vectors would be added to batch run-time. + // We have to re-build the schema. + if (complexWriters != null) { + container.buildSchema(SelectionVectorMode.NONE); + } } private void handleRemainder() { int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - for(ValueVector v : this.allocationVectors){ - //AllocationHelper.allocate(v, remainingRecordCount, 250); - v.allocateNewSafe(); - } + doAlloc(); int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0); if (projRecords < remainingRecordCount) { - for(ValueVector v : allocationVectors){ - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(projRecords); - } + setValueCount(projRecords); this.recordCount = projRecords; remainderIndex += projRecords; } else { - for(ValueVector v : allocationVectors){ - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(remainingRecordCount); - } + setValueCount(remainingRecordCount); hasRemainder = false; remainderIndex = 0; for(VectorWrapper<?> v: incoming) { @@ -141,8 +138,48 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } this.recordCount = remainingRecordCount; } + // In case of complex writer expression, vectors would be added to batch run-time. + // We have to re-build the schema. + if (complexWriters != null) { + container.buildSchema(SelectionVectorMode.NONE); + } } + public void addComplexWriter(ComplexWriter writer) { + complexWriters.add(writer); + } + + private boolean doAlloc() { + //Allocate vv in the allocationVectors. + for(ValueVector v : this.allocationVectors){ + //AllocationHelper.allocate(v, remainingRecordCount, 250); + if (!v.allocateNewSafe()) + return false; + } + + //Allocate vv for complexWriters. + if (complexWriters == null) + return true; + + for (ComplexWriter writer : complexWriters) + writer.allocate(); + + return true; + } + + private void setValueCount(int count) { + for(ValueVector v : allocationVectors){ + ValueVector.Mutator m = v.getMutator(); + m.setValueCount(count); + } + + if (complexWriters == null) + return; + + for (ComplexWriter writer : complexWriters) + writer.setValueCount(count); + } + /** hack to make ref and full work together... need to figure out if this is still necessary. **/ private FieldReference getRef(NamedExpression e){ FieldReference ref = e.getRef(); @@ -200,13 +237,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ }else{ for(int i = 0; i < exprs.size(); i++){ final NamedExpression namedExpression = exprs.get(i); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry()); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType()); if(collector.hasErrors()){ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } - // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE && !((ValueVectorReadExpression) expr).hasReadPath() @@ -223,7 +259,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ container.add(tp.getTo()); transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); // logger.debug("Added transfer."); - }else{ + } else if (expr instanceof DrillFuncHolderExpr && + ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) { + // Need to process ComplexWriter function evaluation. + // Lazy initialization of the list of complex writers, if not done yet. + if (complexWriters == null) + complexWriters = Lists.newArrayList(); + + // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. + ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); + cg.addExpr(expr); + } else{ // need to do evaluation. ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); allocationVectors.add(vector); @@ -231,13 +277,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); HoldingContainer hc = cg.addExpr(write); - cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); logger.debug("Added eval."); } } - - } cg.rotateBlock(); cg.getEvalBlock()._return(JExpr.TRUE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 1f73192b2..19f649745 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; @@ -137,4 +138,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch { public void cleanup() { incoming.cleanup(); } + + @Override + public VectorContainer getOutgoingContainer() { + throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 7b7b708db..d71b8112f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -31,7 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - protected final VectorContainer container = new VectorContainer(); + protected final VectorContainer container; //= new VectorContainer(); protected final T popConfig; protected final FragmentContext context; protected final OperatorContext oContext; @@ -43,6 +43,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements this.popConfig = popConfig; this.oContext = new OperatorContext(popConfig, context); this.stats = oContext.getStats(); + this.container = new VectorContainer(this.oContext); } @Override @@ -135,4 +136,10 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements return batch; } + + @Override + public VectorContainer getOutgoingContainer() { + throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 75b9b0ce0..7617d9185 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -85,6 +85,9 @@ public interface RecordBatch extends VectorAccessible { public abstract SelectionVector4 getSelectionVector4(); + + public VectorContainer getOutgoingContainer(); + /** * Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the * same as the ordinal position of the field within the Iterator provided this classes implementation of diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 3c674662f..1f2c33a4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -26,6 +26,8 @@ import java.util.List; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; @@ -39,8 +41,14 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList(); private BatchSchema schema; private int recordCount = -1; - + private final OperatorContext oContext; + public VectorContainer() { + this.oContext = null; + } + + public VectorContainer( OperatorContext oContext) { + this.oContext = oContext; } // public VectorContainer(List<ValueVector> vectors, List<ValueVector[]> hyperVectors) { @@ -66,8 +74,17 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto add(vv, releasable); } - public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){ - return null; + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){ + MaterializedField field = MaterializedField.create(name, type); + ValueVector v = TypeHelper.getNewVector(field, this.oContext.getAllocator()); + + add(v); + + if(clazz.isAssignableFrom(v.getClass())){ + return (T) v; + }else{ + throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName())); + } } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java index d3168305e..028a7d647 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.complex.fn.JsonReader; +import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState; import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter; import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; @@ -46,7 +47,7 @@ public class JSONRecordReader2 implements RecordReader{ private Path hadoopPath; private FileSystem fileSystem; private InputStream stream; - private JsonReader jsonReader; + private JsonReaderWithState jsonReader; public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, List<SchemaPath> columns) throws OutOfMemoryException { @@ -61,7 +62,7 @@ public class JSONRecordReader2 implements RecordReader{ JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream); this.writer = new VectorContainerWriter(output); this.mutator = output; - jsonReader = new JsonReader(splitter); + jsonReader = new JsonReaderWithState(splitter); }catch(IOException e){ throw new ExecutionSetupException("Failure reading JSON file.", e); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index e5d4b5be0..485d13492 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -43,30 +43,16 @@ import com.google.common.base.Charsets; public class JsonReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class); - public static enum WriteState { - WRITE_SUCCEED, WRITE_FAILED, NO_MORE - } private final JsonFactory factory = new JsonFactory(); - private ByteBufInputStream stream; - private long byteOffset; - private JsonRecordSplitter splitter; - private Reader reader; private JsonParser parser; - public JsonReader(JsonRecordSplitter splitter) throws JsonParseException, IOException { - this.splitter = splitter; + public JsonReader() throws JsonParseException, IOException { factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); factory.configure(Feature.ALLOW_COMMENTS, true); - reader = splitter.getNextReader(); } - public WriteState write(ComplexWriter writer) throws JsonParseException, IOException { - if(reader == null){ - reader = splitter.getNextReader(); - if(reader == null) return WriteState.NO_MORE; - - } + public boolean write(Reader reader, ComplexWriter writer) throws JsonParseException, IOException { parser = factory.createJsonParser(reader); reader.mark(1024*128); @@ -82,7 +68,7 @@ public class JsonReader { writeData(writer.rootAsList()); break; case NOT_AVAILABLE: - return WriteState.NO_MORE; + return false; default: throw new JsonParseException( String.format("Failure while parsing JSON. Found token of [%s] Drill currently only supports parsing " @@ -91,13 +77,7 @@ public class JsonReader { parser.getCurrentLocation()); } - if (!writer.ok()) { - reader.reset(); - return WriteState.WRITE_FAILED; - } else { - reader = null; - return WriteState.WRITE_SUCCEED; - } + return true; } @@ -227,5 +207,7 @@ public class JsonReader { } } list.end(); + + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java new file mode 100644 index 000000000..cf885a437 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.vector.complex.fn; + +import java.io.IOException; +import java.io.Reader; + +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +import com.fasterxml.jackson.core.JsonParseException; + +public class JsonReaderWithState { + + public static enum WriteState { + WRITE_SUCCEED, WRITE_FAILED, NO_MORE + } + + private Reader reader; + private JsonRecordSplitter splitter; + private JsonReader jsonReader; + + public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{ + this.splitter = splitter; + reader = splitter.getNextReader(); + jsonReader = new JsonReader(); + } + + public WriteState write(ComplexWriter writer) throws JsonParseException, IOException { + if (reader == null) { + reader = splitter.getNextReader(); + if (reader == null) + return WriteState.NO_MORE; + + } + + jsonReader.write(reader, writer); + + if (!writer.ok()) { + reader.reset(); + return WriteState.WRITE_FAILED; + } else { + reader = null; + return WriteState.WRITE_SUCCEED; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java index 0c7378a46..2fa72f755 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java @@ -17,11 +17,15 @@ */ package org.apache.drill.exec.vector.complex.impl; +import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.StateTool; -import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.google.common.base.Preconditions; @@ -160,5 +164,29 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri return listRoot; } + + private static class VectorAccessibleFacade extends MapVector { + + private final VectorContainer vc; + + public VectorAccessibleFacade(VectorContainer vc) { + super("", null); + this.vc = vc; + } + + @Override + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { + ValueVector v = vc.addOrGet(name, type, clazz); + this.put(name, v); + return this.typeify(v, clazz); + + } + + } + + public static ComplexWriter getWriter(String name, VectorContainer container){ + VectorAccessibleFacade vc = new VectorAccessibleFacade(container); + return new ComplexWriterImpl(name, vc); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeWriter.java new file mode 100644 index 000000000..9011ff322 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeWriter.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.vector.complex.writer; + +import org.apache.drill.BaseTestQuery; +import org.junit.Test; + +public class TestComplexTypeWriter extends BaseTestQuery{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestComplexTypeReader.class); + + @Test + //basic case. convert varchar into json. + public void testA0() throws Exception{ + test(" select convert_from('{x:100, y:215.6}' ,'JSON') as mycol from cp.`tpch/nation.parquet`;"); + } + + @Test + //map contains int, float, repeated list , repeated map, nested repeated map, etc. + public void testA1() throws Exception{ + test(" select convert_from('{x:100, y:215.6, z: [1, 2, 3], s : [[5, 6, 7], [8, 9]], " + + " t : [{a : 100, b: 200}, {a:300, b: 400}], " + + " nrmp: [ { x: [{ id: 123}], y: { y : \"SQL\"} }] }' ,'JSON') " + + " as mycol from cp.`tpch/nation.parquet`;"); + } + + @Test + //two convert functions. + public void testA2() throws Exception{ + test(" select convert_from('{x:100, y:215.6}' ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;"); + } + + @Test + //two convert functions. One convert's input comes from a string concat function. + public void testA3() throws Exception{ + test(" select convert_from(concat('{x:100,', 'y:215.6}') ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;"); + } + + @Test + //two convert functions. One's input is an empty map. + public void testA4() throws Exception{ + test(" select convert_from('{}' ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;"); + } + + @Test + //two convert functions. One's input is an empty list ( ok to have null in the result?) + public void testA5() throws Exception{ + test(" select convert_from('[]' ,'JSON') as mycol1, convert_from('{x:100, y:215.6}' ,'JSON') as mycol2 from cp.`tpch/nation.parquet`;"); + } + + @Test + //input is a list of BigInt. Output will be a repeated list vector. + public void testA6() throws Exception{ + test(" select convert_from('[1, 2, 3]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;"); + } + + @Test + //input is a list of float. Output will be a repeated list vector. + public void testA7() throws Exception{ + test(" select convert_from('[1.2, 2.3, 3.5]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;"); + } + + @Test + //input is a list of list of big int. Output will be a repeated list vector. + public void testA8() throws Exception{ + test(" select convert_from('[ [1, 2], [3, 4], [5]]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;"); + } + + @Test + //input is a list of map. Output will be a repeated list vector. + public void testA9() throws Exception{ + test(" select convert_from('[{a : 100, b: 200}, {a:300, b: 400}]' ,'JSON') as mycol1 from cp.`tpch/nation.parquet`;"); + } + + @Test + //two convert functions, one regular nest functions, used with Filter op. + public void testA10() throws Exception{ + test(" select convert_from('{x:100, y:215.6}' ,'JSON') as mycol1, " + + " convert_from('{x:200, y:678.9}' ,'JSON') as mycol2, " + + " 1 + 2 * 3 as numvalue " + + " from cp.`tpch/nation.parquet` where n_nationkey > 5;"); + } + + @Test + //convert from string constructed from columns in parquet file. + public void testA11() throws Exception{ + test(" select convert_from(concat(concat('{ NationName: \"', N_NAME) , '\"}'), 'JSON')" + + " from cp.`tpch/nation.parquet` where n_nationkey > 5;"); + } + + @Test + //Test multiple batches creation ( require multiple alloc for complex writer during Project ). + public void testA100() throws Exception{ + test(" select convert_from(concat(concat('{ Price : ', L_EXTENDEDPRICE) , '}') , 'JSON') " + + " from cp.`tpch/lineitem.parquet` limit 10; "); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java index 1f3b54059..8b31958a5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.fn.JsonReader; +import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState; import org.apache.drill.exec.vector.complex.fn.JsonWriter; import org.apache.drill.exec.vector.complex.fn.ReaderJSONRecordSplitter; import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; @@ -87,7 +88,7 @@ public class TestJsonReader { writer.allocate(); - JsonReader jsonReader = new JsonReader(new ReaderJSONRecordSplitter(compound)); + JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound)); int i =0; List<Integer> batchSizes = Lists.newArrayList(); |