diff options
author | vkorukanti <venki.korukanti@gmail.com> | 2015-01-09 01:41:53 -0800 |
---|---|---|
committer | vkorukanti <venki.korukanti@gmail.com> | 2015-01-16 14:03:32 -0800 |
commit | bd4d669d1b836d6990eb3701e81c56f3d109db18 (patch) | |
tree | 58fc70fe8071973df432369e8ca26fd9a734abd8 /exec/java-exec | |
parent | 8cdab2ed7d4c0d57ea668f6387281acbc26f4890 (diff) |
DRILL-1993: Fix allocation issues in HashTable and HashAgg to reduce memory waste
Diffstat (limited to 'exec/java-exec')
5 files changed, 63 insertions, 15 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index dc2b05ccd..2fd5ce159 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -44,7 +44,10 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.ObjectVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import javax.inject.Named; @@ -128,7 +131,24 @@ public abstract class HashAggTemplate implements HashAggregator { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator); - vector.allocateNew(); + + // Try to allocate space to store BATCH_SIZE records. Key stored at index i in HashTable has its workspace + // variables (such as count, sum etc) stored at index i in HashAgg. HashTable and HashAgg both have + // BatchHolders. Whenever a BatchHolder in HashAgg reaches its capacity, a new BatchHolder is added to + // HashTable. If HashAgg can't store BATCH_SIZE records in a BatchHolder, it leaves empty slots in current + // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it is better to allocate space + // to fit as close to as BATCH_SIZE records. + if (vector instanceof FixedWidthVector) { + ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE); + } else if (vector instanceof VariableWidthVector) { + ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE, + HashTable.BATCH_SIZE); + } else if (vector instanceof ObjectVector) { + ((ObjectVector)vector).allocateNew(HashTable.BATCH_SIZE); + } else { + vector.allocateNew(); + } + capacity = Math.min(capacity, vector.getValueCapacity()); aggrValuesContainer.add(vector); @@ -149,10 +169,14 @@ public abstract class HashAggTemplate implements HashAggregator { outStartIdxHolder.value = batchOutputCount; outNumRecordsHolder.value = 0; boolean status = true; - for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { - if (outputRecordValues(i, batchOutputCount)) { + + // Output records starting from 'batchOutputCount' in current batch until there are no more records + // or output vectors have no space left. In destination vectors, start filling records from 0th position. + while(batchOutputCount <= maxOccupiedIdx) { + if (outputRecordValues(batchOutputCount, outNumRecordsHolder.value)) { if (EXTRA_DEBUG_2) { - logger.debug("Outputting values to output index: {}", batchOutputCount); + logger.debug("Outputting values from input index {} to output index: {}", + batchOutputCount, outNumRecordsHolder.value); } batchOutputCount++; outNumRecordsHolder.value++; @@ -256,7 +280,7 @@ public abstract class HashAggTemplate implements HashAggregator { numGroupByOutFields = groupByOutFieldIds.length; batchHolders = new ArrayList<BatchHolder>(); - addBatchHolder(); + // First BatchHolder is created when the first put request is received. doSetup(incoming); } @@ -494,7 +518,11 @@ public abstract class HashAggTemplate implements HashAggregator { logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords); lastBatchOutputCount = numOutputRecords; - outBatchIndex++; + // If there are no more records to output, go to the next batch. If there are any records left refer to the + // same BatchHolder. Remaining records will be outputted in next outputCurrentBatch() call(s). + if (batchHolders.get(outBatchIndex).getNumPendingOutput() == 0) { + outBatchIndex++; + } if (outBatchIndex == batchHolders.size()) { allFlushed = true; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 322fd1f90..b5cfdca45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -163,7 +163,6 @@ public class ChainedHashTable { final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); // create a type-specific ValueVector for this key ValueVector vv = TypeHelper.getNewVector(outputField, allocator); - vv.allocateNew(); htKeyFieldIds[i] = htContainerOrig.add(vv); i++; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 6966ba1fd..1ec74bfef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -51,6 +51,9 @@ public interface HashTable { static final public int BATCH_SIZE = Character.MAX_VALUE + 1; static final public int BATCH_MASK = 0x0000FFFF; + /** Variable width vector size in bytes */ + public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE; + public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index c80e97af9..ba980d72b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -30,8 +30,10 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; import javax.inject.Named; import java.util.ArrayList; @@ -116,15 +118,24 @@ public abstract class HashTableTemplate implements HashTable { private BatchHolder(int idx) { this.batchIndex = idx; - if (idx == 0) { // first batch holder can use the original htContainer - htContainer = htContainerOrig; - } else { // otherwise create a new one using the original's fields - htContainer = new VectorContainer(); - for (VectorWrapper<?> w : htContainerOrig) { - ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator); + htContainer = new VectorContainer(); + for (VectorWrapper<?> w : htContainerOrig) { + ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator); + + // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for + // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE + // records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will + // result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new + // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper. + if (vv instanceof FixedWidthVector) { + ((FixedWidthVector) vv).allocateNew(BATCH_SIZE); + } else if (vv instanceof VariableWidthVector) { + ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE); + } else { vv.allocateNew(); - htContainer.add(vv); } + + htContainer.add(vv); } links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT); @@ -454,7 +465,7 @@ public abstract class HashTableTemplate implements HashTable { // Create the first batch holder batchHolders = new ArrayList<BatchHolder>(); - addBatchHolder(); + // First BatchHolder is created when the first put request is received. doSetup(incomingBuild, incomingProbe); @@ -753,6 +764,7 @@ public abstract class HashTableTemplate implements HashTable { public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) { assert batchIdx < batchHolders.size(); + if (!batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords)) { return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java index b68f08941..3c15db314 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java @@ -97,6 +97,12 @@ public class ObjectVector extends BaseValueVector{ addNewArray(); } + public void allocateNew(int valueCount) throws OutOfMemoryRuntimeException { + while (maxCount < valueCount) { + addNewArray(); + } + } + @Override public boolean allocateNewSafe() { allocateNew(); |