diff options
author | adeneche <adeneche@gmail.com> | 2014-12-29 10:15:42 -0800 |
---|---|---|
committer | Timothy Chen <tnachen@gmail.com> | 2014-12-29 10:15:42 -0800 |
commit | 309e1bede2cc68bc5ac67067e3de425d0192fbb2 (patch) | |
tree | 3c79ca523b753da0f5d42598c00109726cac9a53 | |
parent | b6b139deaade3153a2fb1b9a1b6850fe6d9ee800 (diff) |
DRILL-1845: window function throws an exception
2 files changed, 71 insertions, 67 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java index 38c6884ad..a3e79404a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java @@ -242,6 +242,7 @@ public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<W StreamingWindowFramer.AggOutcome out = framer.doWork(); while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) { + framer.cleanup(); framer = null; try { setupNewSchema(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java index 25128a5ab..7c044776c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.window; import com.google.common.base.Charsets; import com.google.common.io.Files; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.FileUtils; @@ -34,7 +33,6 @@ import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.NullableBigIntVector; import org.apache.drill.exec.vector.ValueVector; -import org.junit.Ignore; import org.junit.Test; import java.util.List; @@ -45,7 +43,6 @@ import static org.junit.Assert.assertTrue; public class TestWindowFrame extends PopUnitTestBase { @Test - @Ignore // Fast schema problems public void testWindowFrameWithOneKeyCount() throws Throwable { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); @@ -66,34 +63,36 @@ public class TestWindowFrame extends PopUnitTestBase { RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); int recordCount = 0; - assertEquals(3, results.size()); + for (QueryResultBatch batch : results) { + if (!batch.hasData()) { + continue; + } - QueryResultBatch batch = results.get(1); - assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); - batchLoader.load(batch.getHeader().getDef(), batch.getData()); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + batchLoader.load(batch.getHeader().getDef(), batch.getData()); - for (int r = 0; r < batchLoader.getRecordCount(); r++) { - recordCount++; - VectorWrapper<?> wrapper = batchLoader.getValueAccessorById( + for (int r = 0; r < batchLoader.getRecordCount(); r++) { + recordCount++; + VectorWrapper<?> wrapper = batchLoader.getValueAccessorById( BigIntVector.class, batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0] - ); - assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r)); - wrapper = batchLoader.getValueAccessorById( + ); + assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + wrapper = batchLoader.getValueAccessorById( NullableBigIntVector.class, batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0] - ); - assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + ); + assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + } + batchLoader.clear(); + batch.release(); } - batchLoader.clear(); - batch.release(); assertEquals(4, recordCount); } } @Test - @Ignore public void testWindowFrameWithOneKeyMultipleBatches() throws Throwable { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); @@ -110,54 +109,56 @@ public class TestWindowFrame extends PopUnitTestBase { RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); int recordCount = 0; - assertEquals(3, results.size()); + for (QueryResultBatch batch : results) { + if (!batch.hasData()) { + continue; + } - QueryResultBatch batch = results.get(0); - assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); - batchLoader.load(batch.getHeader().getDef(), batch.getData()); - ValueVector.Accessor output = batchLoader.getValueAccessorById(NullableBigIntVector.class, + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + ValueVector.Accessor output = batchLoader.getValueAccessorById(NullableBigIntVector.class, batchLoader.getValueVectorId( - new SchemaPath(new PathSegment.NameSegment("output"))).getFieldIds()[0] - ).getValueVector().getAccessor(); - ValueVector.Accessor sum = batchLoader.getValueAccessorById( + new SchemaPath(new PathSegment.NameSegment("output"))).getFieldIds()[0] + ).getValueVector().getAccessor(); + ValueVector.Accessor sum = batchLoader.getValueAccessorById( BigIntVector.class, batchLoader.getValueVectorId( - new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0] - ).getValueVector().getAccessor(); - ValueVector.Accessor cnt = batchLoader.getValueAccessorById( + new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0] + ).getValueVector().getAccessor(); + ValueVector.Accessor cnt = batchLoader.getValueAccessorById( BigIntVector.class, batchLoader.getValueVectorId( - new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0] - ).getValueVector().getAccessor(); - int lastGroup = -1; - long groupCounter = 0; - long s = 0; - for (int r = 1; r <= batchLoader.getRecordCount(); r++) { - recordCount++; - int group = r / 4; - if(lastGroup != group) { - lastGroup = group; - groupCounter = 1; - s = 0; - } else { - groupCounter++; + new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0] + ).getValueVector().getAccessor(); + int lastGroup = -1; + long groupCounter = 0; + long s = 0; + for (int r = 1; r <= batchLoader.getRecordCount(); r++) { + recordCount++; + int group = r / 4; + if (lastGroup != group) { + lastGroup = group; + groupCounter = 1; + s = 0; + } else { + groupCounter++; + } + + s += group * 8 + r % 4; + + assertEquals("Count, Row " + r, groupCounter, cnt.getObject(r - 1)); + assertEquals("Sum, Row " + r, s, sum.getObject(r - 1)); + assertEquals("Output, Row " + r, s, output.getObject(r - 1)); } - - s += group * 8 + r % 4; - - assertEquals("Count, Row " + r, groupCounter, cnt.getObject(r - 1)); - assertEquals("Sum, Row " + r, s, sum.getObject(r - 1)); - assertEquals("Output, Row " + r, s, output.getObject(r - 1)); + batchLoader.clear(); + batch.release(); } - batchLoader.clear(); - batch.release(); assertEquals(1000, recordCount); } } @Test - @Ignore public void testWindowFrameWithTwoKeys() throws Throwable { try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); Drillbit bit = new Drillbit(CONFIG, serviceSet); @@ -178,28 +179,30 @@ public class TestWindowFrame extends PopUnitTestBase { RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); int recordCount = 0; - assertEquals(3, results.size()); + for (QueryResultBatch batch : results) { + if (!batch.hasData()) { + continue; + } - QueryResultBatch batch = results.get(0); - assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); - batchLoader.load(batch.getHeader().getDef(), batch.getData()); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + batchLoader.load(batch.getHeader().getDef(), batch.getData()); - for (int r = 0; r < batchLoader.getRecordCount(); r++) { - recordCount++; - VectorWrapper<?> wrapper = batchLoader.getValueAccessorById( + for (int r = 0; r < batchLoader.getRecordCount(); r++) { + recordCount++; + VectorWrapper<?> wrapper = batchLoader.getValueAccessorById( BigIntVector.class, batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0] - ); - assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r)); - wrapper = batchLoader.getValueAccessorById( + ); + assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + wrapper = batchLoader.getValueAccessorById( NullableBigIntVector.class, batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0] - ); - assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + ); + assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + } + batchLoader.clear(); + batch.release(); } - batchLoader.clear(); - batch.release(); - assertEquals(8, recordCount); } } |