aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYuliya Feldman <yfeldman@maprtech.com>2015-01-12 14:11:45 -0800
committerParth Chandra <pchandra@maprtech.com>2015-01-13 14:29:28 -0800
commit69db15ebbdc3a8f4a038e6f47a0675c32d14cdf4 (patch)
tree2b3cd96d12c1ceac9946fea571298688c4b4c072
parent2a4107753870aa408447fe6fb0becabcbb0691ef (diff)
DRILL-1859 Issue with killing/stopping operator processing - limit is one of the users of this logic
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java36
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java44
3 files changed, 65 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
index a7535c3c5..141c434fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
@@ -22,6 +22,7 @@ import java.util.Queue;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
public class ResponseSenderQueue {
@@ -55,4 +56,8 @@ public class ResponseSenderQueue {
return i;
}
+ @VisibleForTesting
+ boolean isEmpty() {
+ return q.isEmpty();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 35aec9350..895918c93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.record.RawFragmentBatch;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@@ -63,12 +64,12 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@Override
public void enqueue(RawFragmentBatch batch) throws IOException {
- if (state == BufferState.KILLED) {
- batch.release();
- }
if (isFinished()) {
if (state == BufferState.KILLED) {
+ // do not even enqueue just release and send ack back
batch.release();
+ batch.sendOk();
+ return;
} else {
throw new IOException("Attempted to enqueue batch after finished");
}
@@ -107,29 +108,29 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
if (!context.isFailed() && !context.isCancelled()) {
context.fail(new IllegalStateException("Batches still in queue during cleanup"));
logger.error("{} Batches in queue.", buffer.size());
- RawFragmentBatch batch;
- while ((batch = buffer.poll()) != null) {
- logger.error("Batch left in queue: {}", batch);
- }
- }
- RawFragmentBatch batch;
- while ((batch = buffer.poll()) != null) {
- if (batch.getBody() != null) {
- batch.getBody().release();
- }
}
+ clearBufferWithBody();
}
}
@Override
public void kill(FragmentContext context) {
state = BufferState.KILLED;
+ clearBufferWithBody();
+ }
+
+ /**
+ * Helper method to clear buffer with request bodies release
+ * also flushes ack queue - in case there are still responses pending
+ */
+ private void clearBufferWithBody() {
while (!buffer.isEmpty()) {
RawFragmentBatch batch = buffer.poll();
if (batch.getBody() != null) {
batch.getBody().release();
}
}
+ readController.flushResponses();
}
@Override
@@ -205,4 +206,13 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
return (state == BufferState.KILLED || state == BufferState.FINISHED);
}
+ @VisibleForTesting
+ ResponseSenderQueue getReadController() {
+ return readController;
+ }
+
+ @VisibleForTesting
+ boolean isBufferEmpty() {
+ return buffer.isEmpty();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
index 15ee3f31e..a710d2104 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -47,6 +49,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
private static int FRAGMENT_COUNT = 5;
private DrillConfig dc = DrillConfig.create();
+ private MySender mySender;
+ private UnlimitedRawBatchBuffer rawBuffer;
+ private RawFragmentBatch batch;
+ private FragmentContext context;
+ private int softLimit;
private static class MySender implements ResponseSender {
@@ -66,17 +73,17 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
sendCount = 0;
}
}
- @Test
- public void testBackPressure() throws Exception {
- final MySender mySender = new MySender();
- FragmentContext context = Mockito.mock(FragmentContext.class);
+ @Before
+ public void setUp() {
+ mySender = new MySender();
+ context = Mockito.mock(FragmentContext.class);
Mockito.when(context.getConfig()).thenReturn(dc);
- UnlimitedRawBatchBuffer rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
+ rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
- RawFragmentBatch batch = Mockito.mock(RawFragmentBatch.class);
+ batch = Mockito.mock(RawFragmentBatch.class);
Mockito.when(batch.getSender()).thenReturn(mySender);
Mockito.doAnswer(new Answer<Void>() {
@@ -91,8 +98,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
/// start the real test
int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE);
- int softLimit = incomingBufferSize * FRAGMENT_COUNT;
+ softLimit = incomingBufferSize * FRAGMENT_COUNT;
+ }
+ @Test
+ public void testBackPressure() throws Exception {
// No back pressure should be kicked in
for ( int i = 0; i < softLimit-1; i++) {
rawBuffer.enqueue(batch);
@@ -131,4 +141,24 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
}
}
+ @Test
+ public void testAcksWithKill() throws Exception {
+ // Back pressure should be kicked in
+ for ( int i = 0; i < 2*softLimit; i++) {
+ rawBuffer.enqueue(batch);
+ }
+ assertEquals(softLimit - 1, mySender.getSendCount());
+ assertTrue(!rawBuffer.getReadController().isEmpty());
+
+ rawBuffer.kill(context);
+
+ // UnlimitedBatchBuffer queue should be cleared
+ assertTrue(rawBuffer.isBufferEmpty());
+
+ // acks queue should be cleared as well
+ assertTrue(rawBuffer.getReadController().isEmpty());
+
+ // all acks should be sent
+ assertEquals(2*softLimit, mySender.getSendCount());
+ }
}