diff options
author | Yuliya Feldman <yfeldman@maprtech.com> | 2015-01-12 14:11:45 -0800 |
---|---|---|
committer | Parth Chandra <pchandra@maprtech.com> | 2015-01-13 14:29:28 -0800 |
commit | 69db15ebbdc3a8f4a038e6f47a0675c32d14cdf4 (patch) | |
tree | 2b3cd96d12c1ceac9946fea571298688c4b4c072 | |
parent | 2a4107753870aa408447fe6fb0becabcbb0691ef (diff) |
DRILL-1859 Issue with killing/stopping operator processing - limit is one of the users of this logic
3 files changed, 65 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java index a7535c3c5..141c434fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java @@ -22,6 +22,7 @@ import java.util.Queue; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.data.DataRpcConfig; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; public class ResponseSenderQueue { @@ -55,4 +56,8 @@ public class ResponseSenderQueue { return i; } + @VisibleForTesting + boolean isEmpty() { + return q.isEmpty(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 35aec9350..895918c93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.record.RawFragmentBatch; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @@ -63,12 +64,12 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public void enqueue(RawFragmentBatch batch) throws IOException { - if (state == BufferState.KILLED) { - batch.release(); - } if (isFinished()) { if (state == BufferState.KILLED) { + // do not even enqueue just release and send ack back batch.release(); + batch.sendOk(); + return; } else { throw new IOException("Attempted to enqueue batch after finished"); } @@ -107,29 +108,29 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ if (!context.isFailed() && !context.isCancelled()) { context.fail(new IllegalStateException("Batches still in queue during cleanup")); logger.error("{} Batches in queue.", buffer.size()); - RawFragmentBatch batch; - while ((batch = buffer.poll()) != null) { - logger.error("Batch left in queue: {}", batch); - } - } - RawFragmentBatch batch; - while ((batch = buffer.poll()) != null) { - if (batch.getBody() != null) { - batch.getBody().release(); - } } + clearBufferWithBody(); } } @Override public void kill(FragmentContext context) { state = BufferState.KILLED; + clearBufferWithBody(); + } + + /** + * Helper method to clear buffer with request bodies release + * also flushes ack queue - in case there are still responses pending + */ + private void clearBufferWithBody() { while (!buffer.isEmpty()) { RawFragmentBatch batch = buffer.poll(); if (batch.getBody() != null) { batch.getBody().release(); } } + readController.flushResponses(); } @Override @@ -205,4 +206,13 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ return (state == BufferState.KILLED || state == BufferState.FINISHED); } + @VisibleForTesting + ResponseSenderQueue getReadController() { + return readController; + } + + @VisibleForTesting + boolean isBufferEmpty() { + return buffer.isEmpty(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java index 15ee3f31e..a710d2104 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; @@ -29,6 +30,7 @@ import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.data.DataRpcConfig; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -47,6 +49,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest { private static int FRAGMENT_COUNT = 5; private DrillConfig dc = DrillConfig.create(); + private MySender mySender; + private UnlimitedRawBatchBuffer rawBuffer; + private RawFragmentBatch batch; + private FragmentContext context; + private int softLimit; private static class MySender implements ResponseSender { @@ -66,17 +73,17 @@ public class TestUnlimitedBatchBuffer extends ExecTest { sendCount = 0; } } - @Test - public void testBackPressure() throws Exception { - final MySender mySender = new MySender(); - FragmentContext context = Mockito.mock(FragmentContext.class); + @Before + public void setUp() { + mySender = new MySender(); + context = Mockito.mock(FragmentContext.class); Mockito.when(context.getConfig()).thenReturn(dc); - UnlimitedRawBatchBuffer rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT); + rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT); - RawFragmentBatch batch = Mockito.mock(RawFragmentBatch.class); + batch = Mockito.mock(RawFragmentBatch.class); Mockito.when(batch.getSender()).thenReturn(mySender); Mockito.doAnswer(new Answer<Void>() { @@ -91,8 +98,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest { /// start the real test int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE); - int softLimit = incomingBufferSize * FRAGMENT_COUNT; + softLimit = incomingBufferSize * FRAGMENT_COUNT; + } + @Test + public void testBackPressure() throws Exception { // No back pressure should be kicked in for ( int i = 0; i < softLimit-1; i++) { rawBuffer.enqueue(batch); @@ -131,4 +141,24 @@ public class TestUnlimitedBatchBuffer extends ExecTest { } } + @Test + public void testAcksWithKill() throws Exception { + // Back pressure should be kicked in + for ( int i = 0; i < 2*softLimit; i++) { + rawBuffer.enqueue(batch); + } + assertEquals(softLimit - 1, mySender.getSendCount()); + assertTrue(!rawBuffer.getReadController().isEmpty()); + + rawBuffer.kill(context); + + // UnlimitedBatchBuffer queue should be cleared + assertTrue(rawBuffer.isBufferEmpty()); + + // acks queue should be cleared as well + assertTrue(rawBuffer.getReadController().isEmpty()); + + // all acks should be sent + assertEquals(2*softLimit, mySender.getSendCount()); + } } |