aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java64
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/package-info.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryDateTimeInfo.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java57
13 files changed, 308 insertions, 65 deletions
diff --git a/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java b/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java
index a94ef9419..9df7472aa 100644
--- a/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java
+++ b/exec/interpreter/src/test/java/org/apache/drill/exec/expr/ExpressionInterpreterTest.java
@@ -32,7 +32,9 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
+import org.apache.drill.exec.expr.holders.TimeStampTZHolder;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.record.MaterializedField;
@@ -43,8 +45,10 @@ import org.apache.drill.exec.store.mock.MockGroupScanPOP;
import org.apache.drill.exec.store.mock.MockScanBatchCreator;
import org.apache.drill.exec.store.mock.MockSubScanPOP;
import org.apache.drill.exec.vector.ValueVector;
+import org.joda.time.DateTime;
import org.junit.Test;
+import java.nio.ByteBuffer;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -94,8 +98,42 @@ public class ExpressionInterpreterTest extends PopUnitTestBase {
doTest(expressionStr, colNames, colTypes, expectedFirstTwoValues);
}
+ @Test
+ public void interpreterDateTest() throws Exception {
+ String[] colNames = {"col1"};
+ TypeProtos.MajorType[] colTypes = {Types.optional(TypeProtos.MinorType.INT)};
+ String expressionStr = "now()";
+ BitControl.PlanFragment planFragment = BitControl.PlanFragment.getDefaultInstance();
+ QueryDateTimeInfo dateTime = new QueryDateTimeInfo(planFragment.getQueryStartTime(), planFragment.getTimeZone());
+ int timeZoneIndex = dateTime.getRootFragmentTimeZone();
+ org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
+ org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
+
+ long queryStartDate = now.getMillis();
+ int timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString());
+
+ TimeStampTZHolder out = new TimeStampTZHolder();
+
+ out.value = queryStartDate;
+ out.index = timezoneIndex;
+
+ ByteBuffer buffer = ByteBuffer.allocate(12);
+ buffer.putLong(out.value);
+ buffer.putInt(out.index);
+ long l = buffer.getLong(0);
+ DateTime t = new DateTime(l);
+
+ String[] expectedFirstTwoValues = {t.toString(), t.toString()};
+
+ doTest(expressionStr, colNames, colTypes, expectedFirstTwoValues, planFragment);
+ }
+
protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorType[] colTypes, String[] expectFirstTwoValues) throws Exception {
+ doTest(expressionStr, colNames, colTypes, expectFirstTwoValues, BitControl.PlanFragment.getDefaultInstance());
+ }
+
+ protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorType[] colTypes, String[] expectFirstTwoValues, BitControl.PlanFragment planFragment) throws Exception {
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
@@ -114,7 +152,7 @@ public class ExpressionInterpreterTest extends PopUnitTestBase {
MockGroupScanPOP.MockScanEntry entry = new MockGroupScanPOP.MockScanEntry(10, columns);
MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", java.util.Collections.singletonList(entry));
- RecordBatch batch = createMockScanBatch(bit1, scanPOP);
+ RecordBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
batch.next();
@@ -134,12 +172,12 @@ public class ExpressionInterpreterTest extends PopUnitTestBase {
}
- private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP) {
+ private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
List<RecordBatch> children = Lists.newArrayList();
MockScanBatchCreator creator = new MockScanBatchCreator();
try {
- FragmentContext context = new FragmentContext(bit.getContext(), BitControl.PlanFragment.getDefaultInstance(), null, bit.getContext().getFunctionImplementationRegistry());
+ FragmentContext context = new FragmentContext(bit.getContext(), planFragment, null, bit.getContext().getFunctionImplementationRegistry());
return creator.getBatch(context,scanPOP, children);
} catch (Exception ex) {
throw new DrillRuntimeException("Error when setup fragment context" + ex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 279c428c4..1b1348ec7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -21,6 +21,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.LogicalExpression;
@@ -39,6 +41,8 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionCostCategory;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
+import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import com.google.common.base.Preconditions;
@@ -132,7 +136,16 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
}
if (ref.isInject()) {
- g.getBlock(BlockType.SETUP).assign(workspaceJVars[i], g.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer"));
+ if (UdfUtilities.INJECTABLE_GETTER_METHODS.get(ref.getType()) != null) {
+ g.getBlock(BlockType.SETUP).assign(
+ workspaceJVars[i],
+ g.getMappingSet().getIncoming().invoke("getContext").invoke(
+ UdfUtilities.INJECTABLE_GETTER_METHODS.get(ref.getType())
+ ));
+ } else {
+ // Invalid injectable type provided, this should have been caught in FunctionConverter
+ throw new DrillRuntimeException("Invalid injectable type requested in UDF: " + ref.getType().getSimpleName());
+ }
} else {
//g.getBlock(BlockType.SETUP).assign(workspaceJVars[i], JExpr._new(jtype));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
index 0127e6e5a..60e789327 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.expr.fn;
+import com.google.common.base.Joiner;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -24,6 +25,7 @@ import java.io.InputStream;
import java.io.StringReader;
import java.lang.reflect.Field;
import java.net.URL;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -31,7 +33,6 @@ import javax.inject.Inject;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.util.FileUtils;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.DrillFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
@@ -42,6 +43,8 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder.ValueReference;
import org.apache.drill.exec.expr.fn.DrillFuncHolder.WorkspaceReference;
import org.apache.drill.exec.expr.fn.interpreter.InterpreterGenerator;
import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
+import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.codehaus.commons.compiler.CompileException;
@@ -189,8 +192,9 @@ public class FunctionConverter {
} else {
// workspace work.
boolean isInject = inject != null;
- if (isInject && !field.getType().equals(DrillBuf.class)) {
- return failure(String.format("Only DrillBuf is allowed to be injected. You attempted to inject %s.", field.getType()), clazz, field);
+ if (isInject && UdfUtilities.INJECTABLE_GETTER_METHODS.get(field.getType()) == null) {
+ return failure(String.format("A %s cannot be injected into a %s, available injectable classes are: %s.",
+ field.getType(), DrillFunc.class.getSimpleName(), Joiner.on(",").join(UdfUtilities.INJECTABLE_GETTER_METHODS.keySet())), clazz, field);
}
WorkspaceReference wsReference = new WorkspaceReference(field.getType(), field.getName(), isInject);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java
index d43ba2ad2..a628afc2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/DateTypeFunctions.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.expr.holders.TimeHolder;
import org.apache.drill.exec.expr.holders.TimeStampHolder;
import org.apache.drill.exec.expr.holders.TimeStampTZHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
public class DateTypeFunctions {
@@ -213,14 +214,15 @@ public class DateTypeFunctions {
public static class CurrentDate implements DrillSimpleFunc {
@Workspace long queryStartDate;
@Output DateHolder out;
+ @Inject QueryDateTimeInfo dateTime;
public void setup() {
-// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
-// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
-// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
-// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).
-// withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
+ int timeZoneIndex = dateTime.getRootFragmentTimeZone();
+ org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
+ org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
+ queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).
+ withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
}
public void eval() {
@@ -234,14 +236,15 @@ public class DateTypeFunctions {
@Workspace long queryStartDate;
@Workspace int timezoneIndex;
@Output TimeStampTZHolder out;
+ @Inject QueryDateTimeInfo dateTime;
public void setup() {
-// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
-// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
-// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
-// queryStartDate = now.getMillis();
-// timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString());
+ int timeZoneIndex = dateTime.getRootFragmentTimeZone();
+ org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
+ org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
+ queryStartDate = now.getMillis();
+ timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString());
}
public void eval() {
@@ -290,11 +293,12 @@ public class DateTypeFunctions {
public static class LocalTimeStamp implements DrillSimpleFunc {
@Workspace long queryStartDate;
@Output TimeStampHolder out;
+ @Inject QueryDateTimeInfo dateTime;
public void setup() {
-// org.joda.time.DateTime now = (new org.joda.time.DateTime(incoming.getContext().getQueryStartTime())).withZoneRetainFields(org.joda.time.DateTimeZone.UTC);
-// queryStartDate = now.getMillis();
+ org.joda.time.DateTime now = (new org.joda.time.DateTime(dateTime)).withZoneRetainFields(org.joda.time.DateTimeZone.UTC);
+ queryStartDate = now.getMillis();
}
public void eval() {
@@ -306,16 +310,17 @@ public class DateTypeFunctions {
public static class CurrentTime implements DrillSimpleFunc {
@Workspace int queryStartTime;
@Output TimeHolder out;
+ @Inject QueryDateTimeInfo dateTime;
public void setup() {
-// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
-// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
-// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
-// queryStartTime= (int) ((now.getHourOfDay() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) +
-// (now.getMinuteOfHour() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) +
-// (now.getSecondOfMinute() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) +
-// (now.getMillisOfSecond()));
+ int timeZoneIndex = dateTime.getRootFragmentTimeZone();
+ org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
+ org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
+ queryStartTime= (int) ((now.getHourOfDay() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) +
+ (now.getMinuteOfHour() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) +
+ (now.getSecondOfMinute() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) +
+ (now.getMillisOfSecond()));
}
public void eval() {
@@ -399,12 +404,13 @@ public class DateTypeFunctions {
@Param TimeStampHolder right;
@Workspace long queryStartDate;
@Output IntervalHolder out;
+ @Inject QueryDateTimeInfo dateTime;
public void setup() {
-// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
-// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
-// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
-// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
+ int timeZoneIndex = dateTime.getRootFragmentTimeZone();
+ org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
+ org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
+ queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
}
public void eval() {
@@ -441,12 +447,13 @@ public class DateTypeFunctions {
@Param DateHolder right;
@Workspace long queryStartDate;
@Output IntervalHolder out;
+ @Inject QueryDateTimeInfo dateTime;
public void setup() {
-// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
-// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
-// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
-// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
+ int timeZoneIndex = dateTime.getRootFragmentTimeZone();
+ org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
+ org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
+ queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
}
public void eval() {
@@ -477,10 +484,11 @@ public class DateTypeFunctions {
public static class UnixTimeStamp implements DrillSimpleFunc {
@Output BigIntHolder out;
@Workspace long queryStartDate;
+ @Inject QueryDateTimeInfo dateTime;
@Override
public void setup() {
-// queryStartDate = incoming.getContext().getQueryStartTime();
+ queryStartDate = dateTime.getQueryStartTime();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
index a84a776da..a47bf87ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
@@ -843,7 +843,7 @@ public class StringFunctions{
@Output VarCharHolder out;
@Inject DrillBuf buffer;
- public void setup(RecordBatch incoming) {
+ public void setup() {
}
public void eval() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java
index e3696f05e..c6dd43b52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/DrillSimpleFuncInterpreter.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.record.RecordBatch;
public interface DrillSimpleFuncInterpreter extends DrillFuncInterpreter {
- public void doSetup(ValueHolder[] args, RecordBatch incoming);
+ public void doSetup(ValueHolder[] args);
public ValueHolder doEval(ValueHolder [] args) ;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
index 0fe36cb19..ab3ad8b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.expr.fn.interpreter;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.FunctionHolderExpression;
@@ -34,29 +36,55 @@ import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.NullableBitHolder;
import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
+import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.vector.ValueHolderHelper;
import org.apache.drill.exec.vector.ValueVector;
+import org.reflections.Reflections;
+import javax.inject.Inject;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
public class InterpreterEvaluator {
+ public static ValueHolder evaluateConstantExpr(UdfUtilities udfUtilities, LogicalExpression expr) {
+ InterpreterInitVisitor initVisitor = new InterpreterInitVisitor(udfUtilities);
+ InterEvalVisitor evalVisitor = new InterEvalVisitor(null, udfUtilities);
+ expr.accept(initVisitor, null);
+ return expr.accept(evalVisitor, -1);
+ }
+
public static void evaluate(RecordBatch incoming, ValueVector outVV, LogicalExpression expr) {
+ evaluate(incoming.getRecordCount(), incoming.getContext(), incoming, outVV, expr);
+ }
- InterpreterInitVisitor initVisitor = new InterpreterInitVisitor();
- InterEvalVisitor evalVisitor = new InterEvalVisitor(incoming);
+ public static void evaluate(int recordCount, UdfUtilities udfUtilities, RecordBatch incoming, ValueVector outVV, LogicalExpression expr) {
+
+ InterpreterInitVisitor initVisitor = new InterpreterInitVisitor(udfUtilities);
+ InterEvalVisitor evalVisitor = new InterEvalVisitor(incoming, udfUtilities);
expr.accept(initVisitor, incoming);
- for (int i = 0; i < incoming.getRecordCount(); i++) {
+ for (int i = 0; i < recordCount; i++) {
ValueHolder out = expr.accept(evalVisitor, i);
TypeHelper.setValueSafe(outVV, i, out);
}
- outVV.getMutator().setValueCount(incoming.getRecordCount());
+ outVV.getMutator().setValueCount(recordCount);
}
public static class InterpreterInitVisitor extends AbstractExprVisitor<LogicalExpression, RecordBatch, RuntimeException> {
+
+ private UdfUtilities udfUtilities;
+
+ protected InterpreterInitVisitor(UdfUtilities udfUtilities) {
+ super();
+ this.udfUtilities = udfUtilities;
+ }
@Override
public LogicalExpression visitFunctionHolderExpression(FunctionHolderExpression holderExpr, RecordBatch incoming) {
if (! (holderExpr.getHolder() instanceof DrillSimpleFuncHolder)) {
@@ -71,9 +99,26 @@ public class InterpreterEvaluator {
try {
DrillSimpleFuncInterpreter interpreter = holder.createInterpreter();
+ Field[] fields = interpreter.getClass().getDeclaredFields();
+ for (Field f : fields) {
+ // the current interpreter strips off annotations, so we just need to assume
+ // the type available as injectable are only used as injectable
+// if ( f.getAnnotation(Inject.class) != null ) {
+ f.setAccessible(true);
+ Class fieldType = f.getType();
+ if (UdfUtilities.INJECTABLE_GETTER_METHODS.get(fieldType) != null) {
+ Method method = udfUtilities.getClass().getMethod(UdfUtilities.INJECTABLE_GETTER_METHODS.get(fieldType));
+ f.set(interpreter, method.invoke(udfUtilities));
+ } else {
+ // Invalid injectable type provided, this should have been caught in FunctionConverter
+ throw new DrillRuntimeException("Invalid injectable type requested in UDF: " + fieldType.getSimpleName());
+ }
+// } else { // do nothing with non-inject fields here
+// continue;
+// }
+ }
((DrillFuncHolderExpr) holderExpr).setInterpreter(interpreter);
-
return holderExpr;
} catch (Exception ex) {
@@ -94,12 +139,23 @@ public class InterpreterEvaluator {
public static class InterEvalVisitor extends AbstractExprVisitor<ValueHolder, Integer, RuntimeException> {
private RecordBatch incoming;
+ private UdfUtilities udfUtilities;
- protected InterEvalVisitor(RecordBatch incoming) {
+ protected InterEvalVisitor(RecordBatch incoming, UdfUtilities udfUtilities) {
super();
this.incoming = incoming;
+ this.udfUtilities = udfUtilities;
}
+ public DrillBuf getManagedBufferIfAvailable() {
+ if (incoming != null) {
+ return incoming.getContext().getManagedBuffer();
+ } else {
+ return udfUtilities.getManagedBuffer();
+ }
+ }
+
+
@Override
public ValueHolder visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Integer inIndex) {
if (! (holderExpr.getHolder() instanceof DrillSimpleFuncHolder)) {
@@ -134,7 +190,7 @@ public class InterpreterEvaluator {
Preconditions.checkArgument(interpreter != null, "interpreter could not be null when use interpreted model to evaluate function " + holder.getRegisteredNames()[0]);
- interpreter.doSetup(args, incoming);
+ interpreter.doSetup(args);
ValueHolder out = interpreter.doEval(args);
if (TypeHelper.getValueHolderType(out).getMode() == TypeProtos.DataMode.OPTIONAL &&
@@ -148,7 +204,7 @@ public class InterpreterEvaluator {
}
} catch (Exception ex) {
- throw new RuntimeException("Error in evaluating function of " + holderExpr.getName() + ": " + ex);
+ throw new RuntimeException("Error in evaluating function of " + holderExpr.getName(), ex);
}
}
@@ -206,7 +262,7 @@ public class InterpreterEvaluator {
@Override
public ValueHolder visitQuotedStringConstant(ValueExpressions.QuotedString e, Integer value) throws RuntimeException {
- return ValueHolderHelper.getVarCharHolder((incoming).getContext().getManagedBuffer(), e.value);
+ return ValueHolderHelper.getVarCharHolder(getManagedBufferIfAvailable(), e.value);
}
@@ -234,7 +290,7 @@ public class InterpreterEvaluator {
holder = TypeHelper.getValue(vv, inIndex.intValue());
return holder;
default:
- throw new UnsupportedOperationException("Type of " + type + " is not supported yet!");
+ throw new UnsupportedOperationException("Type of " + type + " is not supported yet in interpreted expression evaluation!");
}
} catch (Exception ex){
throw new DrillRuntimeException("Error when evaluate a ValueVectorReadExpression: " + ex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java
index 6cede3376..25b5ff224 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterGenerator.java
@@ -95,18 +95,11 @@ public class InterpreterGenerator {
JMethod doSetupMethod = clazz.method(JMod.PUBLIC, Void.TYPE, SETUP_METHOD);
doSetupMethod.param(valueholderClass.array(), ARG_NAME);
- JVar incomingJVar = doSetupMethod.param(model.ref(RecordBatch.class), "incoming");
if (holder.getSetupBody()!=null && ! holder.getSetupBody().trim().equals("{}")) {
declareAssignParm(model, doSetupMethod.body(), holder, ARG_NAME, true);
}
- for (DrillFuncHolder.WorkspaceReference ws : holder.getWorkspaceVars()) {
- if (ws.isInject()) {
- doSetupMethod.body().assign(wsFieldVars.get(ws), incomingJVar.invoke("getContext").invoke("getManagedBuffer"));
- }
- }
-
doSetupMethod.body().directStatement(holder.getSetupBody());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java
index 5c2adc612..2431dcb08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ValueHolder.java
@@ -17,5 +17,15 @@
*/
package org.apache.drill.exec.expr.holders;
+/**
+ * Wrapper object for an individual value in Drill.
+ *
+ * ValueHolders are designed to be mutable wrapper objects for defining clean
+ * APIs that access data in Drill. For performance, object creation is avoided
+ * at all costs throughout execution. For this reason, ValueHolders are
+ * disallowed from implementing any methods, this allows for them to be
+ * replaced by their java primitive inner members during optimization of
+ * run-time generated code.
+ */
public interface ValueHolder {
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/package-info.java
new file mode 100644
index 000000000..a8095e4f5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/package-info.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Drill expression materialization and evaluation facilities.
+ *
+ * Drill exposes an interface for defining custom scalar and aggregate functions.
+ * These functions are found by scanning the classpath at runtime and allow users
+ * to add their own functions without rebuilding Drill or changing cluster
+ * configuration.
+ *
+ * The classes that define these functions are actually decomposed at the source
+ * level, copied into generated code blocks to evaluate an entire expression
+ * tree. This generated source is built at run-time as schema is discovered.
+ *
+ * This package contains the {@link DrillSimpleFunc} and {@link DrillAggFunc}
+ * interfaces that can be implemented by users to define their own aggregate
+ * and scalar functions.
+ */
+package org.apache.drill.exec.expr;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 108f5bb68..aa1dffdd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -56,7 +56,7 @@ import com.google.common.collect.Maps;
/**
* Contextual objects required for execution of a particular fragment.
*/
-public class FragmentContext implements Closeable {
+public class FragmentContext implements Closeable, UdfUtilities {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
@@ -70,9 +70,8 @@ public class FragmentContext implements Closeable {
private final BufferAllocator allocator;
private final PlanFragment fragment;
private List<Thread> daemonThreads = Lists.newLinkedList();
+ private QueryDateTimeInfo queryDateTimeInfo;
private IncomingBuffers buffers;
- private final long queryStartTime;
- private final int rootFragmentTimeZone;
private final OptionManager fragmentOptions;
private final UserCredentials credentials;
private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
@@ -93,9 +92,8 @@ public class FragmentContext implements Closeable {
this.connection = connection;
this.fragment = fragment;
this.funcRegistry = funcRegistry;
- this.queryStartTime = fragment.getQueryStartTime();
- this.rootFragmentTimeZone = fragment.getTimeZone();
this.credentials = fragment.getCredentials();
+ this.queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone());
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
logger.debug("Fragment max allocation: {}", fragment.getMemMax());
try {
@@ -181,12 +179,8 @@ public class FragmentContext implements Closeable {
return this.stats;
}
- public long getQueryStartTime() {
- return this.queryStartTime;
- }
-
- public int getRootFragmentTimeZone() {
- return this.rootFragmentTimeZone;
+ public QueryDateTimeInfo getQueryDateTimeInfo(){
+ return this.queryDateTimeInfo;
}
public DrillbitEndpoint getForemanEndpoint() {return fragment.getForeman();}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryDateTimeInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryDateTimeInfo.java
new file mode 100644
index 000000000..f3cc6664c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryDateTimeInfo.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+public class QueryDateTimeInfo {
+ private final long queryStartTime;
+ private final int rootFragmentTimeZone;
+
+ public QueryDateTimeInfo(long queryStartTime, int rootFragmentTimeZone) {
+ this.queryStartTime = queryStartTime;
+ this.rootFragmentTimeZone = rootFragmentTimeZone;
+ }
+
+ public long getQueryStartTime() {
+ return this.queryStartTime;
+ }
+
+ public int getRootFragmentTimeZone() {
+ return this.rootFragmentTimeZone;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
new file mode 100644
index 000000000..f7a1a047c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -0,0 +1,57 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Defines the query state and shared resources available to UDFs through
+ * injectables. For use in a function, include a {@link javax.inject.Inject}
+ * annotation on a UDF class member with any of the types available through
+ * this interface.
+ */
+public interface UdfUtilities {
+
+ // Map between injectable classes and their respective getter methods
+ // used for code generation
+ public static final ImmutableMap<Class, String> INJECTABLE_GETTER_METHODS =
+ new ImmutableMap.Builder<Class, String>()
+ .put(DrillBuf.class, "getManagedBuffer")
+ .put(QueryDateTimeInfo.class, "getQueryDateTimeInfo")
+ .build();
+
+ /**
+ * Get the query start time and timezone recorded by the head node during
+ * planning. This allows for SQL functions like now() to return a stable
+ * result within the context of a distributed query.
+ *
+ * @return - object wrapping the raw time and timezone values
+ */
+ QueryDateTimeInfo getQueryDateTimeInfo();
+
+ /**
+ * For UDFs to allocate general purpose intermediate buffers we provide the
+ * DrillBuf type as an injectable, which provides access to an off-heap
+ * buffer that can be tracked by Drill and re-allocated as needed.
+ *
+ * @return - a buffer managed by Drill, connected to the fragment allocator
+ * for memory management
+ */
+ DrillBuf getManagedBuffer();
+}