diff options
13 files changed, 308 insertions, 65 deletions
diff --git a/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java b/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java index a94ef9419..9df7472aa 100644 --- a/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java +++ b/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java @@ -32,7 +32,9 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.common.util.DrillStringUtils; import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator; +import org.apache.drill.exec.expr.holders.TimeStampTZHolder; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.QueryDateTimeInfo; import org.apache.drill.exec.pop.PopUnitTestBase; import org.apache.drill.exec.proto.BitControl; import org.apache.drill.exec.record.MaterializedField; @@ -43,8 +45,10 @@ import org.apache.drill.exec.store.mock.MockGroupScanPOP; import org.apache.drill.exec.store.mock.MockScanBatchCreator; import org.apache.drill.exec.store.mock.MockSubScanPOP; import org.apache.drill.exec.vector.ValueVector; +import org.joda.time.DateTime; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.List; import static org.junit.Assert.assertEquals; @@ -94,8 +98,42 @@ public class ExpressionInterpreterTest extends PopUnitTestBase { doTest(expressionStr, colNames, colTypes, expectedFirstTwoValues); } + @Test + public void interpreterDateTest() throws Exception { + String[] colNames = {"col1"}; + TypeProtos.MajorType[] colTypes = {Types.optional(TypeProtos.MinorType.INT)}; + String expressionStr = "now()"; + BitControl.PlanFragment planFragment = BitControl.PlanFragment.getDefaultInstance(); + QueryDateTimeInfo dateTime = new QueryDateTimeInfo(planFragment.getQueryStartTime(), planFragment.getTimeZone()); + int timeZoneIndex = dateTime.getRootFragmentTimeZone(); + org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); + org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone); + + long queryStartDate = now.getMillis(); + int timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString()); + + TimeStampTZHolder out = new TimeStampTZHolder(); + + out.value = queryStartDate; + out.index = timezoneIndex; + + ByteBuffer buffer = ByteBuffer.allocate(12); + buffer.putLong(out.value); + buffer.putInt(out.index); + long l = buffer.getLong(0); + DateTime t = new DateTime(l); + + String[] expectedFirstTwoValues = {t.toString(), t.toString()}; + + doTest(expressionStr, colNames, colTypes, expectedFirstTwoValues, planFragment); + } + protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorType[] colTypes, String[] expectFirstTwoValues) throws Exception { + doTest(expressionStr, colNames, colTypes, expectFirstTwoValues, BitControl.PlanFragment.getDefaultInstance()); + } + + protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorType[] colTypes, String[] expectFirstTwoValues, BitControl.PlanFragment planFragment) throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit1 = new Drillbit(CONFIG, serviceSet); @@ -114,7 +152,7 @@ public class ExpressionInterpreterTest extends PopUnitTestBase { MockGroupScanPOP.MockScanEntry entry = new MockGroupScanPOP.MockScanEntry(10, columns); MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", java.util.Collections.singletonList(entry)); - RecordBatch batch = createMockScanBatch(bit1, scanPOP); + RecordBatch batch = createMockScanBatch(bit1, scanPOP, planFragment); batch.next(); @@ -134,12 +172,12 @@ public class ExpressionInterpreterTest extends PopUnitTestBase { } - private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP) { + private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) { List<RecordBatch> children = Lists.newArrayList(); MockScanBatchCreator creator = new MockScanBatchCreator(); try { - FragmentContext context = new FragmentContext(bit.getContext(), BitControl.PlanFragment.getDefaultInstance(), null, bit.getContext().getFunctionImplementationRegistry()); + FragmentContext context = new FragmentContext(bit.getContext(), planFragment, null, bit.getContext().getFunctionImplementationRegistry()); return creator.getBatch(context,scanPOP, children); } catch (Exception ex) { throw new DrillRuntimeException("Error when setup fragment context" + ex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java index 279c428c4..1b1348ec7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java @@ -21,6 +21,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.LogicalExpression; @@ -39,6 +41,8 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionCostCategory; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.ops.QueryDateTimeInfo; +import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.vector.complex.reader.FieldReader; import com.google.common.base.Preconditions; @@ -132,7 +136,16 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder { } if (ref.isInject()) { - g.getBlock(BlockType.SETUP).assign(workspaceJVars[i], g.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer")); + if (UdfUtilities.INJECTABLE_GETTER_METHODS.get(ref.getType()) != null) { + g.getBlock(BlockType.SETUP).assign( + workspaceJVars[i], + g.getMappingSet().getIncoming().invoke("getContext").invoke( + UdfUtilities.INJECTABLE_GETTER_METHODS.get(ref.getType()) + )); + } else { + // Invalid injectable type provided, this should have been caught in FunctionConverter + throw new DrillRuntimeException("Invalid injectable type requested in UDF: " + ref.getType().getSimpleName()); + } } else { //g.getBlock(BlockType.SETUP).assign(workspaceJVars[i], JExpr._new(jtype)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java index 0127e6e5a..60e789327 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.expr.fn; +import com.google.common.base.Joiner; import io.netty.buffer.DrillBuf; import java.io.IOException; @@ -24,6 +25,7 @@ import java.io.InputStream; import java.io.StringReader; import java.lang.reflect.Field; import java.net.URL; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -31,7 +33,6 @@ import javax.inject.Inject; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.util.FileUtils; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.DrillFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; @@ -42,6 +43,8 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder.ValueReference; import org.apache.drill.exec.expr.fn.DrillFuncHolder.WorkspaceReference; import org.apache.drill.exec.expr.fn.interpreter.InterpreterGenerator; import org.apache.drill.exec.expr.holders.ValueHolder; +import org.apache.drill.exec.ops.QueryDateTimeInfo; +import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import org.codehaus.commons.compiler.CompileException; @@ -189,8 +192,9 @@ public class FunctionConverter { } else { // workspace work. boolean isInject = inject != null; - if (isInject && !field.getType().equals(DrillBuf.class)) { - return failure(String.format("Only DrillBuf is allowed to be injected. You attempted to inject %s.", field.getType()), clazz, field); + if (isInject && UdfUtilities.INJECTABLE_GETTER_METHODS.get(field.getType()) == null) { + return failure(String.format("A %s cannot be injected into a %s, available injectable classes are: %s.", + field.getType(), DrillFunc.class.getSimpleName(), Joiner.on(",").join(UdfUtilities.INJECTABLE_GETTER_METHODS.keySet())), clazz, field); } WorkspaceReference wsReference = new WorkspaceReference(field.getType(), field.getName(), isInject); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java index d43ba2ad2..a628afc2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.expr.holders.TimeHolder; import org.apache.drill.exec.expr.holders.TimeStampHolder; import org.apache.drill.exec.expr.holders.TimeStampTZHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.ops.QueryDateTimeInfo; public class DateTypeFunctions { @@ -213,14 +214,15 @@ public class DateTypeFunctions { public static class CurrentDate implements DrillSimpleFunc { @Workspace long queryStartDate; @Output DateHolder out; + @Inject QueryDateTimeInfo dateTime; public void setup() { -// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone(); -// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); -// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone); -// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)). -// withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis(); + int timeZoneIndex = dateTime.getRootFragmentTimeZone(); + org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); + org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone); + queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)). + withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis(); } public void eval() { @@ -234,14 +236,15 @@ public class DateTypeFunctions { @Workspace long queryStartDate; @Workspace int timezoneIndex; @Output TimeStampTZHolder out; + @Inject QueryDateTimeInfo dateTime; public void setup() { -// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone(); -// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); -// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone); -// queryStartDate = now.getMillis(); -// timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString()); + int timeZoneIndex = dateTime.getRootFragmentTimeZone(); + org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); + org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone); + queryStartDate = now.getMillis(); + timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString()); } public void eval() { @@ -290,11 +293,12 @@ public class DateTypeFunctions { public static class LocalTimeStamp implements DrillSimpleFunc { @Workspace long queryStartDate; @Output TimeStampHolder out; + @Inject QueryDateTimeInfo dateTime; public void setup() { -// org.joda.time.DateTime now = (new org.joda.time.DateTime(incoming.getContext().getQueryStartTime())).withZoneRetainFields(org.joda.time.DateTimeZone.UTC); -// queryStartDate = now.getMillis(); + org.joda.time.DateTime now = (new org.joda.time.DateTime(dateTime)).withZoneRetainFields(org.joda.time.DateTimeZone.UTC); + queryStartDate = now.getMillis(); } public void eval() { @@ -306,16 +310,17 @@ public class DateTypeFunctions { public static class CurrentTime implements DrillSimpleFunc { @Workspace int queryStartTime; @Output TimeHolder out; + @Inject QueryDateTimeInfo dateTime; public void setup() { -// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone(); -// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); -// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone); -// queryStartTime= (int) ((now.getHourOfDay() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) + -// (now.getMinuteOfHour() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) + -// (now.getSecondOfMinute() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) + -// (now.getMillisOfSecond())); + int timeZoneIndex = dateTime.getRootFragmentTimeZone(); + org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); + org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone); + queryStartTime= (int) ((now.getHourOfDay() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) + + (now.getMinuteOfHour() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) + + (now.getSecondOfMinute() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) + + (now.getMillisOfSecond())); } public void eval() { @@ -399,12 +404,13 @@ public class DateTypeFunctions { @Param TimeStampHolder right; @Workspace long queryStartDate; @Output IntervalHolder out; + @Inject QueryDateTimeInfo dateTime; public void setup() { -// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone(); -// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); -// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone); -// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis(); + int timeZoneIndex = dateTime.getRootFragmentTimeZone(); + org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); + org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone); + queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis(); } public void eval() { @@ -441,12 +447,13 @@ public class DateTypeFunctions { @Param DateHolder right; @Workspace long queryStartDate; @Output IntervalHolder out; + @Inject QueryDateTimeInfo dateTime; public void setup() { -// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone(); -// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); -// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone); -// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis(); + int timeZoneIndex = dateTime.getRootFragmentTimeZone(); + org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex)); + org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone); + queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis(); } public void eval() { @@ -477,10 +484,11 @@ public class DateTypeFunctions { public static class UnixTimeStamp implements DrillSimpleFunc { @Output BigIntHolder out; @Workspace long queryStartDate; + @Inject QueryDateTimeInfo dateTime; @Override public void setup() { -// queryStartDate = incoming.getContext().getQueryStartTime(); + queryStartDate = dateTime.getQueryStartTime(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java index a84a776da..a47bf87ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java @@ -843,7 +843,7 @@ public class StringFunctions{ @Output VarCharHolder out; @Inject DrillBuf buffer; - public void setup(RecordBatch incoming) { + public void setup() { } public void eval() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java index e3696f05e..c6dd43b52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java @@ -23,7 +23,7 @@ import org.apache.drill.exec.record.RecordBatch; public interface DrillSimpleFuncInterpreter extends DrillFuncInterpreter { - public void doSetup(ValueHolder[] args, RecordBatch incoming); + public void doSetup(ValueHolder[] args); public ValueHolder doEval(ValueHolder [] args) ; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java index 0fe36cb19..ab3ad8b5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.expr.fn.interpreter; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.FunctionHolderExpression; @@ -34,29 +36,55 @@ import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.NullableBitHolder; import org.apache.drill.exec.expr.holders.ValueHolder; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.QueryDateTimeInfo; +import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.vector.ValueHolderHelper; import org.apache.drill.exec.vector.ValueVector; +import org.reflections.Reflections; +import javax.inject.Inject; +import java.lang.reflect.Field; +import java.lang.reflect.Method; public class InterpreterEvaluator { + public static ValueHolder evaluateConstantExpr(UdfUtilities udfUtilities, LogicalExpression expr) { + InterpreterInitVisitor initVisitor = new InterpreterInitVisitor(udfUtilities); + InterEvalVisitor evalVisitor = new InterEvalVisitor(null, udfUtilities); + expr.accept(initVisitor, null); + return expr.accept(evalVisitor, -1); + } + public static void evaluate(RecordBatch incoming, ValueVector outVV, LogicalExpression expr) { + evaluate(incoming.getRecordCount(), incoming.getContext(), incoming, outVV, expr); + } - InterpreterInitVisitor initVisitor = new InterpreterInitVisitor(); - InterEvalVisitor evalVisitor = new InterEvalVisitor(incoming); + public static void evaluate(int recordCount, UdfUtilities udfUtilities, RecordBatch incoming, ValueVector outVV, LogicalExpression expr) { + + InterpreterInitVisitor initVisitor = new InterpreterInitVisitor(udfUtilities); + InterEvalVisitor evalVisitor = new InterEvalVisitor(incoming, udfUtilities); expr.accept(initVisitor, incoming); - for (int i = 0; i < incoming.getRecordCount(); i++) { + for (int i = 0; i < recordCount; i++) { ValueHolder out = expr.accept(evalVisitor, i); TypeHelper.setValueSafe(outVV, i, out); } - outVV.getMutator().setValueCount(incoming.getRecordCount()); + outVV.getMutator().setValueCount(recordCount); } public static class InterpreterInitVisitor extends AbstractExprVisitor<LogicalExpression, RecordBatch, RuntimeException> { + + private UdfUtilities udfUtilities; + + protected InterpreterInitVisitor(UdfUtilities udfUtilities) { + super(); + this.udfUtilities = udfUtilities; + } @Override public LogicalExpression visitFunctionHolderExpression(FunctionHolderExpression holderExpr, RecordBatch incoming) { if (! (holderExpr.getHolder() instanceof DrillSimpleFuncHolder)) { @@ -71,9 +99,26 @@ public class InterpreterEvaluator { try { DrillSimpleFuncInterpreter interpreter = holder.createInterpreter(); + Field[] fields = interpreter.getClass().getDeclaredFields(); + for (Field f : fields) { + // the current interpreter strips off annotations, so we just need to assume + // the type available as injectable are only used as injectable +// if ( f.getAnnotation(Inject.class) != null ) { + f.setAccessible(true); + Class fieldType = f.getType(); + if (UdfUtilities.INJECTABLE_GETTER_METHODS.get(fieldType) != null) { + Method method = udfUtilities.getClass().getMethod(UdfUtilities.INJECTABLE_GETTER_METHODS.get(fieldType)); + f.set(interpreter, method.invoke(udfUtilities)); + } else { + // Invalid injectable type provided, this should have been caught in FunctionConverter + throw new DrillRuntimeException("Invalid injectable type requested in UDF: " + fieldType.getSimpleName()); + } +// } else { // do nothing with non-inject fields here +// continue; +// } + } ((DrillFuncHolderExpr) holderExpr).setInterpreter(interpreter); - return holderExpr; } catch (Exception ex) { @@ -94,12 +139,23 @@ public class InterpreterEvaluator { public static class InterEvalVisitor extends AbstractExprVisitor<ValueHolder, Integer, RuntimeException> { private RecordBatch incoming; + private UdfUtilities udfUtilities; - protected InterEvalVisitor(RecordBatch incoming) { + protected InterEvalVisitor(RecordBatch incoming, UdfUtilities udfUtilities) { super(); this.incoming = incoming; + this.udfUtilities = udfUtilities; } + public DrillBuf getManagedBufferIfAvailable() { + if (incoming != null) { + return incoming.getContext().getManagedBuffer(); + } else { + return udfUtilities.getManagedBuffer(); + } + } + + @Override public ValueHolder visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Integer inIndex) { if (! (holderExpr.getHolder() instanceof DrillSimpleFuncHolder)) { @@ -134,7 +190,7 @@ public class InterpreterEvaluator { Preconditions.checkArgument(interpreter != null, "interpreter could not be null when use interpreted model to evaluate function " + holder.getRegisteredNames()[0]); - interpreter.doSetup(args, incoming); + interpreter.doSetup(args); ValueHolder out = interpreter.doEval(args); if (TypeHelper.getValueHolderType(out).getMode() == TypeProtos.DataMode.OPTIONAL && @@ -148,7 +204,7 @@ public class InterpreterEvaluator { } } catch (Exception ex) { - throw new RuntimeException("Error in evaluating function of " + holderExpr.getName() + ": " + ex); + throw new RuntimeException("Error in evaluating function of " + holderExpr.getName(), ex); } } @@ -206,7 +262,7 @@ public class InterpreterEvaluator { @Override public ValueHolder visitQuotedStringConstant(ValueExpressions.QuotedString e, Integer value) throws RuntimeException { - return ValueHolderHelper.getVarCharHolder((incoming).getContext().getManagedBuffer(), e.value); + return ValueHolderHelper.getVarCharHolder(getManagedBufferIfAvailable(), e.value); } @@ -234,7 +290,7 @@ public class InterpreterEvaluator { holder = TypeHelper.getValue(vv, inIndex.intValue()); return holder; default: - throw new UnsupportedOperationException("Type of " + type + " is not supported yet!"); + throw new UnsupportedOperationException("Type of " + type + " is not supported yet in interpreted expression evaluation!"); } } catch (Exception ex){ throw new DrillRuntimeException("Error when evaluate a ValueVectorReadExpression: " + ex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java index 6cede3376..25b5ff224 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java @@ -95,18 +95,11 @@ public class InterpreterGenerator { JMethod doSetupMethod = clazz.method(JMod.PUBLIC, Void.TYPE, SETUP_METHOD); doSetupMethod.param(valueholderClass.array(), ARG_NAME); - JVar incomingJVar = doSetupMethod.param(model.ref(RecordBatch.class), "incoming"); if (holder.getSetupBody()!=null && ! holder.getSetupBody().trim().equals("{}")) { declareAssignParm(model, doSetupMethod.body(), holder, ARG_NAME, true); } - for (DrillFuncHolder.WorkspaceReference ws : holder.getWorkspaceVars()) { - if (ws.isInject()) { - doSetupMethod.body().assign(wsFieldVars.get(ws), incomingJVar.invoke("getContext").invoke("getManagedBuffer")); - } - } - doSetupMethod.body().directStatement(holder.getSetupBody()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java index 5c2adc612..2431dcb08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java @@ -17,5 +17,15 @@ */ package org.apache.drill.exec.expr.holders; +/** + * Wrapper object for an individual value in Drill. + * + * ValueHolders are designed to be mutable wrapper objects for defining clean + * APIs that access data in Drill. For performance, object creation is avoided + * at all costs throughout execution. For this reason, ValueHolders are + * disallowed from implementing any methods, this allows for them to be + * replaced by their java primitive inner members during optimization of + * run-time generated code. + */ public interface ValueHolder { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/package-info.java new file mode 100644 index 000000000..a8095e4f5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/package-info.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Drill expression materialization and evaluation facilities. + * + * Drill exposes an interface for defining custom scalar and aggregate functions. + * These functions are found by scanning the classpath at runtime and allow users + * to add their own functions without rebuilding Drill or changing cluster + * configuration. + * + * The classes that define these functions are actually decomposed at the source + * level, copied into generated code blocks to evaluate an entire expression + * tree. This generated source is built at run-time as schema is discovered. + * + * This package contains the {@link DrillSimpleFunc} and {@link DrillAggFunc} + * interfaces that can be implemented by users to define their own aggregate + * and scalar functions. + */ +package org.apache.drill.exec.expr; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 108f5bb68..aa1dffdd0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -56,7 +56,7 @@ import com.google.common.collect.Maps; /** * Contextual objects required for execution of a particular fragment. */ -public class FragmentContext implements Closeable { +public class FragmentContext implements Closeable, UdfUtilities { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); @@ -70,9 +70,8 @@ public class FragmentContext implements Closeable { private final BufferAllocator allocator; private final PlanFragment fragment; private List<Thread> daemonThreads = Lists.newLinkedList(); + private QueryDateTimeInfo queryDateTimeInfo; private IncomingBuffers buffers; - private final long queryStartTime; - private final int rootFragmentTimeZone; private final OptionManager fragmentOptions; private final UserCredentials credentials; private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>(); @@ -93,9 +92,8 @@ public class FragmentContext implements Closeable { this.connection = connection; this.fragment = fragment; this.funcRegistry = funcRegistry; - this.queryStartTime = fragment.getQueryStartTime(); - this.rootFragmentTimeZone = fragment.getTimeZone(); this.credentials = fragment.getCredentials(); + this.queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone()); logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); logger.debug("Fragment max allocation: {}", fragment.getMemMax()); try { @@ -181,12 +179,8 @@ public class FragmentContext implements Closeable { return this.stats; } - public long getQueryStartTime() { - return this.queryStartTime; - } - - public int getRootFragmentTimeZone() { - return this.rootFragmentTimeZone; + public QueryDateTimeInfo getQueryDateTimeInfo(){ + return this.queryDateTimeInfo; } public DrillbitEndpoint getForemanEndpoint() {return fragment.getForeman();} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryDateTimeInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryDateTimeInfo.java new file mode 100644 index 000000000..f3cc6664c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryDateTimeInfo.java @@ -0,0 +1,36 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ops; + +public class QueryDateTimeInfo { + private final long queryStartTime; + private final int rootFragmentTimeZone; + + public QueryDateTimeInfo(long queryStartTime, int rootFragmentTimeZone) { + this.queryStartTime = queryStartTime; + this.rootFragmentTimeZone = rootFragmentTimeZone; + } + + public long getQueryStartTime() { + return this.queryStartTime; + } + + public int getRootFragmentTimeZone() { + return this.rootFragmentTimeZone; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java new file mode 100644 index 000000000..f7a1a047c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java @@ -0,0 +1,57 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.ops; + +import com.google.common.collect.ImmutableMap; +import io.netty.buffer.DrillBuf; + +/** + * Defines the query state and shared resources available to UDFs through + * injectables. For use in a function, include a {@link javax.inject.Inject} + * annotation on a UDF class member with any of the types available through + * this interface. + */ +public interface UdfUtilities { + + // Map between injectable classes and their respective getter methods + // used for code generation + public static final ImmutableMap<Class, String> INJECTABLE_GETTER_METHODS = + new ImmutableMap.Builder<Class, String>() + .put(DrillBuf.class, "getManagedBuffer") + .put(QueryDateTimeInfo.class, "getQueryDateTimeInfo") + .build(); + + /** + * Get the query start time and timezone recorded by the head node during + * planning. This allows for SQL functions like now() to return a stable + * result within the context of a distributed query. + * + * @return - object wrapping the raw time and timezone values + */ + QueryDateTimeInfo getQueryDateTimeInfo(); + + /** + * For UDFs to allocate general purpose intermediate buffers we provide the + * DrillBuf type as an injectable, which provides access to an off-heap + * buffer that can be tracked by Drill and re-allocated as needed. + * + * @return - a buffer managed by Drill, connected to the fragment allocator + * for memory management + */ + DrillBuf getManagedBuffer(); +} |