diff options
6 files changed, 108 insertions, 76 deletions
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java index 39c5d7897..fa1eb92b0 100644 --- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java +++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java @@ -39,7 +39,7 @@ public class AutoCloseables { try { ac.close(); } catch(Exception e) { - logger.info("Failure on close(): " + e); + logger.warn("Failure on close(): " + e); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index a78deb62e..9670c7ec1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -289,7 +289,7 @@ public class TopLevelAllocator implements BufferAllocator { if (!child.isClosed()) { StringBuilder sb = new StringBuilder(); StackTraceElement[] elements = children.get(child); - for (int i = 3; i < elements.length; i++) { + for (int i = 1; i < elements.length; i++) { sb.append("\t\t"); sb.append(elements[i]); sb.append("\n"); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 5cea748b2..77ca0f5fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -17,12 +17,14 @@ */ package org.apache.drill.exec.physical.impl; +import java.io.Closeable; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; @@ -74,21 +76,30 @@ public class ImplCreator { final ImplCreator creator = new ImplCreator(); Stopwatch watch = new Stopwatch(); watch.start(); - final RootExec rootExec = creator.getRootExec(root, context); - // skip over this for SimpleRootExec (testing) - if (rootExec instanceof BaseRootExec) { - ((BaseRootExec) rootExec).setOperators(creator.getOperators()); - } - - logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS)); - if (rootExec == null) { - throw new ExecutionSetupException( - "The provided fragment did not have a root node that correctly created a RootExec value."); - } + boolean success = false; + try { + final RootExec rootExec = creator.getRootExec(root, context); + // skip over this for SimpleRootExec (testing) + if (rootExec instanceof BaseRootExec) { + ((BaseRootExec) rootExec).setOperators(creator.getOperators()); + } - return rootExec; + logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS)); + if (rootExec == null) { + throw new ExecutionSetupException( + "The provided fragment did not have a root node that correctly created a RootExec value."); + } + success = true; + return rootExec; + } finally { + if (!success) { + for(final CloseableRecordBatch crb : creator.getOperators()) { + AutoCloseables.close(crb, logger); + } + } + } } /** Create RootExec and its children (RecordBatches) for given FragmentRoot */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index e92de4021..8af15082d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -128,34 +128,41 @@ public abstract class HashAggTemplate implements HashAggregator { private BatchHolder() { aggrValuesContainer = new VectorContainer(); + boolean success = false; + try { + ValueVector vector; + + for (int i = 0; i < materializedValueFields.length; i++) { + MaterializedField outputField = materializedValueFields[i]; + // Create a type-specific ValueVector for this value + vector = TypeHelper.getNewVector(outputField, allocator); + + // Try to allocate space to store BATCH_SIZE records. Key stored at index i in HashTable has its workspace + // variables (such as count, sum etc) stored at index i in HashAgg. HashTable and HashAgg both have + // BatchHolders. Whenever a BatchHolder in HashAgg reaches its capacity, a new BatchHolder is added to + // HashTable. If HashAgg can't store BATCH_SIZE records in a BatchHolder, it leaves empty slots in current + // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it is better to allocate space + // to fit as close to as BATCH_SIZE records. + if (vector instanceof FixedWidthVector) { + ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE); + } else if (vector instanceof VariableWidthVector) { + ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE, + HashTable.BATCH_SIZE); + } else if (vector instanceof ObjectVector) { + ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE); + } else { + vector.allocateNew(); + } - ValueVector vector; - - for (int i = 0; i < materializedValueFields.length; i++) { - MaterializedField outputField = materializedValueFields[i]; - // Create a type-specific ValueVector for this value - vector = TypeHelper.getNewVector(outputField, allocator); - - // Try to allocate space to store BATCH_SIZE records. Key stored at index i in HashTable has its workspace - // variables (such as count, sum etc) stored at index i in HashAgg. HashTable and HashAgg both have - // BatchHolders. Whenever a BatchHolder in HashAgg reaches its capacity, a new BatchHolder is added to - // HashTable. If HashAgg can't store BATCH_SIZE records in a BatchHolder, it leaves empty slots in current - // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it is better to allocate space - // to fit as close to as BATCH_SIZE records. - if (vector instanceof FixedWidthVector) { - ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE); - } else if (vector instanceof VariableWidthVector) { - ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE, - HashTable.BATCH_SIZE); - } else if (vector instanceof ObjectVector) { - ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE); - } else { - vector.allocateNew(); - } - - capacity = Math.min(capacity, vector.getValueCapacity()); + capacity = Math.min(capacity, vector.getValueCapacity()); - aggrValuesContainer.add(vector); + aggrValuesContainer.add(vector); + } + success = true; + } finally { + if (!success) { + aggrValuesContainer.clear(); + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 0908e50a5..e0876af79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -123,27 +123,38 @@ public abstract class HashTableTemplate implements HashTable { this.batchIndex = idx; htContainer = new VectorContainer(); - for (VectorWrapper<?> w : htContainerOrig) { - ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator); - - // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for - // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE - // records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will - // result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new - // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper. - if (vv instanceof FixedWidthVector) { - ((FixedWidthVector) vv).allocateNew(BATCH_SIZE); - } else if (vv instanceof VariableWidthVector) { - ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE); - } else { - vv.allocateNew(); + boolean success = false; + try { + for (VectorWrapper<?> w : htContainerOrig) { + ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator); + + // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for + // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE + // records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will + // result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new + // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper. + if (vv instanceof FixedWidthVector) { + ((FixedWidthVector) vv).allocateNew(BATCH_SIZE); + } else if (vv instanceof VariableWidthVector) { + ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE); + } else { + vv.allocateNew(); + } + + htContainer.add(vv); } - htContainer.add(vv); + links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT); + hashValues = allocMetadataVector(HashTable.BATCH_SIZE, 0); + success = true; + } finally { + if (!success) { + htContainer.clear(); + if (links != null) { + links.clear(); + } + } } - - links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT); - hashValues = allocMetadataVector(HashTable.BATCH_SIZE, 0); } private void init(IntVector links, IntVector hashValues, int size) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 612777e92..529a6ca43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -149,27 +149,30 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { @Override public void close() { - if (batchGroups != null) { - for (BatchGroup group: batchGroups) { - try { - group.cleanup(); - } catch (IOException e) { - throw new RuntimeException(e); + try { + if (batchGroups != null) { + for (BatchGroup group: batchGroups) { + try { + group.cleanup(); + } catch (IOException e) { + throw new RuntimeException(e); + } } } + } finally { + if (builder != null) { + builder.clear(); + builder.close(); + } + if (sv4 != null) { + sv4.clear(); + } + if (copier != null) { + copier.cleanup(); + } + copierAllocator.close(); + super.close(); } - if (builder != null) { - builder.clear(); - builder.close(); - } - if (sv4 != null) { - sv4.clear(); - } - if (copier != null) { - copier.cleanup(); - } - copierAllocator.close(); - super.close(); } @Override |