aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/src/main/java/org/apache/drill/common/AutoCloseables.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java59
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java39
6 files changed, 108 insertions, 76 deletions
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index 39c5d7897..fa1eb92b0 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -39,7 +39,7 @@ public class AutoCloseables {
try {
ac.close();
} catch(Exception e) {
- logger.info("Failure on close(): " + e);
+ logger.warn("Failure on close(): " + e);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index a78deb62e..9670c7ec1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -289,7 +289,7 @@ public class TopLevelAllocator implements BufferAllocator {
if (!child.isClosed()) {
StringBuilder sb = new StringBuilder();
StackTraceElement[] elements = children.get(child);
- for (int i = 3; i < elements.length; i++) {
+ for (int i = 1; i < elements.length; i++) {
sb.append("\t\t");
sb.append(elements[i]);
sb.append("\n");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 5cea748b2..77ca0f5fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -17,12 +17,14 @@
*/
package org.apache.drill.exec.physical.impl;
+import java.io.Closeable;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
@@ -74,21 +76,30 @@ public class ImplCreator {
final ImplCreator creator = new ImplCreator();
Stopwatch watch = new Stopwatch();
watch.start();
- final RootExec rootExec = creator.getRootExec(root, context);
- // skip over this for SimpleRootExec (testing)
- if (rootExec instanceof BaseRootExec) {
- ((BaseRootExec) rootExec).setOperators(creator.getOperators());
- }
-
- logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS));
- if (rootExec == null) {
- throw new ExecutionSetupException(
- "The provided fragment did not have a root node that correctly created a RootExec value.");
- }
+ boolean success = false;
+ try {
+ final RootExec rootExec = creator.getRootExec(root, context);
+ // skip over this for SimpleRootExec (testing)
+ if (rootExec instanceof BaseRootExec) {
+ ((BaseRootExec) rootExec).setOperators(creator.getOperators());
+ }
- return rootExec;
+ logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS));
+ if (rootExec == null) {
+ throw new ExecutionSetupException(
+ "The provided fragment did not have a root node that correctly created a RootExec value.");
+ }
+ success = true;
+ return rootExec;
+ } finally {
+ if (!success) {
+ for(final CloseableRecordBatch crb : creator.getOperators()) {
+ AutoCloseables.close(crb, logger);
+ }
+ }
+ }
}
/** Create RootExec and its children (RecordBatches) for given FragmentRoot */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index e92de4021..8af15082d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -128,34 +128,41 @@ public abstract class HashAggTemplate implements HashAggregator {
private BatchHolder() {
aggrValuesContainer = new VectorContainer();
+ boolean success = false;
+ try {
+ ValueVector vector;
+
+ for (int i = 0; i < materializedValueFields.length; i++) {
+ MaterializedField outputField = materializedValueFields[i];
+ // Create a type-specific ValueVector for this value
+ vector = TypeHelper.getNewVector(outputField, allocator);
+
+ // Try to allocate space to store BATCH_SIZE records. Key stored at index i in HashTable has its workspace
+ // variables (such as count, sum etc) stored at index i in HashAgg. HashTable and HashAgg both have
+ // BatchHolders. Whenever a BatchHolder in HashAgg reaches its capacity, a new BatchHolder is added to
+ // HashTable. If HashAgg can't store BATCH_SIZE records in a BatchHolder, it leaves empty slots in current
+ // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it is better to allocate space
+ // to fit as close to as BATCH_SIZE records.
+ if (vector instanceof FixedWidthVector) {
+ ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
+ } else if (vector instanceof VariableWidthVector) {
+ ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
+ HashTable.BATCH_SIZE);
+ } else if (vector instanceof ObjectVector) {
+ ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
+ } else {
+ vector.allocateNew();
+ }
- ValueVector vector;
-
- for (int i = 0; i < materializedValueFields.length; i++) {
- MaterializedField outputField = materializedValueFields[i];
- // Create a type-specific ValueVector for this value
- vector = TypeHelper.getNewVector(outputField, allocator);
-
- // Try to allocate space to store BATCH_SIZE records. Key stored at index i in HashTable has its workspace
- // variables (such as count, sum etc) stored at index i in HashAgg. HashTable and HashAgg both have
- // BatchHolders. Whenever a BatchHolder in HashAgg reaches its capacity, a new BatchHolder is added to
- // HashTable. If HashAgg can't store BATCH_SIZE records in a BatchHolder, it leaves empty slots in current
- // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it is better to allocate space
- // to fit as close to as BATCH_SIZE records.
- if (vector instanceof FixedWidthVector) {
- ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
- } else if (vector instanceof VariableWidthVector) {
- ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
- HashTable.BATCH_SIZE);
- } else if (vector instanceof ObjectVector) {
- ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
- } else {
- vector.allocateNew();
- }
-
- capacity = Math.min(capacity, vector.getValueCapacity());
+ capacity = Math.min(capacity, vector.getValueCapacity());
- aggrValuesContainer.add(vector);
+ aggrValuesContainer.add(vector);
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ aggrValuesContainer.clear();
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 0908e50a5..e0876af79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -123,27 +123,38 @@ public abstract class HashTableTemplate implements HashTable {
this.batchIndex = idx;
htContainer = new VectorContainer();
- for (VectorWrapper<?> w : htContainerOrig) {
- ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
-
- // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for
- // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE
- // records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will
- // result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new
- // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper.
- if (vv instanceof FixedWidthVector) {
- ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
- } else if (vv instanceof VariableWidthVector) {
- ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE);
- } else {
- vv.allocateNew();
+ boolean success = false;
+ try {
+ for (VectorWrapper<?> w : htContainerOrig) {
+ ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
+
+ // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for
+ // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE
+ // records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will
+ // result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new
+ // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper.
+ if (vv instanceof FixedWidthVector) {
+ ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
+ } else if (vv instanceof VariableWidthVector) {
+ ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE);
+ } else {
+ vv.allocateNew();
+ }
+
+ htContainer.add(vv);
}
- htContainer.add(vv);
+ links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
+ hashValues = allocMetadataVector(HashTable.BATCH_SIZE, 0);
+ success = true;
+ } finally {
+ if (!success) {
+ htContainer.clear();
+ if (links != null) {
+ links.clear();
+ }
+ }
}
-
- links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
- hashValues = allocMetadataVector(HashTable.BATCH_SIZE, 0);
}
private void init(IntVector links, IntVector hashValues, int size) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 612777e92..529a6ca43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -149,27 +149,30 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public void close() {
- if (batchGroups != null) {
- for (BatchGroup group: batchGroups) {
- try {
- group.cleanup();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ try {
+ if (batchGroups != null) {
+ for (BatchGroup group: batchGroups) {
+ try {
+ group.cleanup();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
+ } finally {
+ if (builder != null) {
+ builder.clear();
+ builder.close();
+ }
+ if (sv4 != null) {
+ sv4.clear();
+ }
+ if (copier != null) {
+ copier.cleanup();
+ }
+ copierAllocator.close();
+ super.close();
}
- if (builder != null) {
- builder.clear();
- builder.close();
- }
- if (sv4 != null) {
- sv4.clear();
- }
- if (copier != null) {
- copier.cleanup();
- }
- copierAllocator.close();
- super.close();
}
@Override