diff options
author | Sorabh Hamirwasia <sorabh@apache.org> | 2019-01-23 17:38:44 -0800 |
---|---|---|
committer | Volodymyr Vysotskyi <vvovyk@gmail.com> | 2019-01-25 17:34:29 +0200 |
commit | b557b796dc1ca7796b0db956e39df1d52f212f12 (patch) | |
tree | da4eed950b5d094340da2fec7529730ee06be5df | |
parent | 5026cd12e317e71c4dac1561c347991a36c94e4d (diff) |
DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete
closes #1621
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java index f69a44ef7..c0eceae28 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -99,6 +99,10 @@ public class RuntimeFilterSink implements Closeable joinMjId2Stopwatch.put(joinMjId, stopwatch); } synchronized (rfQueue) { + if (!running.get()) { + runtimeFilterWritable.close(); + return; + } rfQueue.add(runtimeFilterWritable); rfQueue.notify(); } @@ -246,14 +250,22 @@ public class RuntimeFilterSink implements Closeable aggregate(toAggregate); } catch (Exception ex) { logger.error("Failed to aggregate or route the RFW", ex); + + // Set running to false and cleanup pending RFW in queue. This will make sure producer + // thread is also indicated to stop and queue is cleaned up properly in failure cases + synchronized (rfQueue) { + running.set(false); + } + cleanupQueue(); throw new DrillRuntimeException(ex); } finally { - if (toAggregate != null) { toAggregate.close(); - } } } + cleanupQueue(); + } + private void cleanupQueue() { if (!running.get()) { RuntimeFilterWritable toClose; while ((toClose = rfQueue.poll()) != null) { |