aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoradeneche <adeneche@gmail.com>2014-12-29 10:15:42 -0800
committerTimothy Chen <tnachen@gmail.com>2014-12-29 10:15:42 -0800
commit309e1bede2cc68bc5ac67067e3de425d0192fbb2 (patch)
tree3c79ca523b753da0f5d42598c00109726cac9a53
parentb6b139deaade3153a2fb1b9a1b6850fe6d9ee800 (diff)
DRILL-1845: window function throws an exception
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java1
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java137
2 files changed, 71 insertions, 67 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
index 38c6884ad..a3e79404a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
@@ -242,6 +242,7 @@ public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<W
StreamingWindowFramer.AggOutcome out = framer.doWork();
while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) {
+ framer.cleanup();
framer = null;
try {
setupNewSchema();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 25128a5ab..7c044776c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.window;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.FileUtils;
@@ -34,7 +33,6 @@ import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
@@ -45,7 +43,6 @@ import static org.junit.Assert.assertTrue;
public class TestWindowFrame extends PopUnitTestBase {
@Test
- @Ignore // Fast schema problems
public void testWindowFrameWithOneKeyCount() throws Throwable {
try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
Drillbit bit = new Drillbit(CONFIG, serviceSet);
@@ -66,34 +63,36 @@ public class TestWindowFrame extends PopUnitTestBase {
RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
int recordCount = 0;
- assertEquals(3, results.size());
+ for (QueryResultBatch batch : results) {
+ if (!batch.hasData()) {
+ continue;
+ }
- QueryResultBatch batch = results.get(1);
- assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
- batchLoader.load(batch.getHeader().getDef(), batch.getData());
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
- for (int r = 0; r < batchLoader.getRecordCount(); r++) {
- recordCount++;
- VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
+ for (int r = 0; r < batchLoader.getRecordCount(); r++) {
+ recordCount++;
+ VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
BigIntVector.class,
batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
- );
- assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
- wrapper = batchLoader.getValueAccessorById(
+ );
+ assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ wrapper = batchLoader.getValueAccessorById(
NullableBigIntVector.class,
batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
- );
- assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ );
+ assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ }
+ batchLoader.clear();
+ batch.release();
}
- batchLoader.clear();
- batch.release();
assertEquals(4, recordCount);
}
}
@Test
- @Ignore
public void testWindowFrameWithOneKeyMultipleBatches() throws Throwable {
try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
Drillbit bit = new Drillbit(CONFIG, serviceSet);
@@ -110,54 +109,56 @@ public class TestWindowFrame extends PopUnitTestBase {
RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
int recordCount = 0;
- assertEquals(3, results.size());
+ for (QueryResultBatch batch : results) {
+ if (!batch.hasData()) {
+ continue;
+ }
- QueryResultBatch batch = results.get(0);
- assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
- batchLoader.load(batch.getHeader().getDef(), batch.getData());
- ValueVector.Accessor output = batchLoader.getValueAccessorById(NullableBigIntVector.class,
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
+ ValueVector.Accessor output = batchLoader.getValueAccessorById(NullableBigIntVector.class,
batchLoader.getValueVectorId(
- new SchemaPath(new PathSegment.NameSegment("output"))).getFieldIds()[0]
- ).getValueVector().getAccessor();
- ValueVector.Accessor sum = batchLoader.getValueAccessorById(
+ new SchemaPath(new PathSegment.NameSegment("output"))).getFieldIds()[0]
+ ).getValueVector().getAccessor();
+ ValueVector.Accessor sum = batchLoader.getValueAccessorById(
BigIntVector.class,
batchLoader.getValueVectorId(
- new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
- ).getValueVector().getAccessor();
- ValueVector.Accessor cnt = batchLoader.getValueAccessorById(
+ new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
+ ).getValueVector().getAccessor();
+ ValueVector.Accessor cnt = batchLoader.getValueAccessorById(
BigIntVector.class,
batchLoader.getValueVectorId(
- new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
- ).getValueVector().getAccessor();
- int lastGroup = -1;
- long groupCounter = 0;
- long s = 0;
- for (int r = 1; r <= batchLoader.getRecordCount(); r++) {
- recordCount++;
- int group = r / 4;
- if(lastGroup != group) {
- lastGroup = group;
- groupCounter = 1;
- s = 0;
- } else {
- groupCounter++;
+ new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
+ ).getValueVector().getAccessor();
+ int lastGroup = -1;
+ long groupCounter = 0;
+ long s = 0;
+ for (int r = 1; r <= batchLoader.getRecordCount(); r++) {
+ recordCount++;
+ int group = r / 4;
+ if (lastGroup != group) {
+ lastGroup = group;
+ groupCounter = 1;
+ s = 0;
+ } else {
+ groupCounter++;
+ }
+
+ s += group * 8 + r % 4;
+
+ assertEquals("Count, Row " + r, groupCounter, cnt.getObject(r - 1));
+ assertEquals("Sum, Row " + r, s, sum.getObject(r - 1));
+ assertEquals("Output, Row " + r, s, output.getObject(r - 1));
}
-
- s += group * 8 + r % 4;
-
- assertEquals("Count, Row " + r, groupCounter, cnt.getObject(r - 1));
- assertEquals("Sum, Row " + r, s, sum.getObject(r - 1));
- assertEquals("Output, Row " + r, s, output.getObject(r - 1));
+ batchLoader.clear();
+ batch.release();
}
- batchLoader.clear();
- batch.release();
assertEquals(1000, recordCount);
}
}
@Test
- @Ignore
public void testWindowFrameWithTwoKeys() throws Throwable {
try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
Drillbit bit = new Drillbit(CONFIG, serviceSet);
@@ -178,28 +179,30 @@ public class TestWindowFrame extends PopUnitTestBase {
RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
int recordCount = 0;
- assertEquals(3, results.size());
+ for (QueryResultBatch batch : results) {
+ if (!batch.hasData()) {
+ continue;
+ }
- QueryResultBatch batch = results.get(0);
- assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
- batchLoader.load(batch.getHeader().getDef(), batch.getData());
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
- for (int r = 0; r < batchLoader.getRecordCount(); r++) {
- recordCount++;
- VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
+ for (int r = 0; r < batchLoader.getRecordCount(); r++) {
+ recordCount++;
+ VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
BigIntVector.class,
batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
- );
- assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
- wrapper = batchLoader.getValueAccessorById(
+ );
+ assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ wrapper = batchLoader.getValueAccessorById(
NullableBigIntVector.class,
batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
- );
- assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ );
+ assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ }
+ batchLoader.clear();
+ batch.release();
}
- batchLoader.clear();
- batch.release();
-
assertEquals(8, recordCount);
}
}