aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSorabh Hamirwasia <sorabh@apache.org>2019-01-23 17:38:44 -0800
committerVolodymyr Vysotskyi <vvovyk@gmail.com>2019-01-25 17:34:29 +0200
commitb557b796dc1ca7796b0db956e39df1d52f212f12 (patch)
treeda4eed950b5d094340da2fec7529730ee06be5df
parent5026cd12e317e71c4dac1561c347991a36c94e4d (diff)
DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete
closes #1621
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java16
1 files changed, 14 insertions, 2 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index f69a44ef7..c0eceae28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -99,6 +99,10 @@ public class RuntimeFilterSink implements Closeable
joinMjId2Stopwatch.put(joinMjId, stopwatch);
}
synchronized (rfQueue) {
+ if (!running.get()) {
+ runtimeFilterWritable.close();
+ return;
+ }
rfQueue.add(runtimeFilterWritable);
rfQueue.notify();
}
@@ -246,14 +250,22 @@ public class RuntimeFilterSink implements Closeable
aggregate(toAggregate);
} catch (Exception ex) {
logger.error("Failed to aggregate or route the RFW", ex);
+
+ // Set running to false and cleanup pending RFW in queue. This will make sure producer
+ // thread is also indicated to stop and queue is cleaned up properly in failure cases
+ synchronized (rfQueue) {
+ running.set(false);
+ }
+ cleanupQueue();
throw new DrillRuntimeException(ex);
} finally {
- if (toAggregate != null) {
toAggregate.close();
- }
}
}
+ cleanupQueue();
+ }
+ private void cleanupQueue() {
if (!running.get()) {
RuntimeFilterWritable toClose;
while ((toClose = rfQueue.poll()) != null) {