aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec
diff options
context:
space:
mode:
authorvkorukanti <venki.korukanti@gmail.com>2015-01-09 01:41:53 -0800
committervkorukanti <venki.korukanti@gmail.com>2015-01-16 14:03:32 -0800
commitbd4d669d1b836d6990eb3701e81c56f3d109db18 (patch)
tree58fc70fe8071973df432369e8ca26fd9a734abd8 /exec/java-exec
parent8cdab2ed7d4c0d57ea668f6387281acbc26f4890 (diff)
DRILL-1993: Fix allocation issues in HashTable and HashAgg to reduce memory waste
Diffstat (limited to 'exec/java-exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java6
5 files changed, 63 insertions, 15 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index dc2b05ccd..2fd5ce159 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -44,7 +44,10 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
import javax.inject.Named;
@@ -128,7 +131,24 @@ public abstract class HashAggTemplate implements HashAggregator {
MaterializedField outputField = materializedValueFields[i];
// Create a type-specific ValueVector for this value
vector = TypeHelper.getNewVector(outputField, allocator);
- vector.allocateNew();
+
+ // Try to allocate space to store BATCH_SIZE records. Key stored at index i in HashTable has its workspace
+ // variables (such as count, sum etc) stored at index i in HashAgg. HashTable and HashAgg both have
+ // BatchHolders. Whenever a BatchHolder in HashAgg reaches its capacity, a new BatchHolder is added to
+ // HashTable. If HashAgg can't store BATCH_SIZE records in a BatchHolder, it leaves empty slots in current
+ // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it is better to allocate space
+ // to fit as close to as BATCH_SIZE records.
+ if (vector instanceof FixedWidthVector) {
+ ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
+ } else if (vector instanceof VariableWidthVector) {
+ ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
+ HashTable.BATCH_SIZE);
+ } else if (vector instanceof ObjectVector) {
+ ((ObjectVector)vector).allocateNew(HashTable.BATCH_SIZE);
+ } else {
+ vector.allocateNew();
+ }
+
capacity = Math.min(capacity, vector.getValueCapacity());
aggrValuesContainer.add(vector);
@@ -149,10 +169,14 @@ public abstract class HashAggTemplate implements HashAggregator {
outStartIdxHolder.value = batchOutputCount;
outNumRecordsHolder.value = 0;
boolean status = true;
- for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
- if (outputRecordValues(i, batchOutputCount)) {
+
+ // Output records starting from 'batchOutputCount' in current batch until there are no more records
+ // or output vectors have no space left. In destination vectors, start filling records from 0th position.
+ while(batchOutputCount <= maxOccupiedIdx) {
+ if (outputRecordValues(batchOutputCount, outNumRecordsHolder.value)) {
if (EXTRA_DEBUG_2) {
- logger.debug("Outputting values to output index: {}", batchOutputCount);
+ logger.debug("Outputting values from input index {} to output index: {}",
+ batchOutputCount, outNumRecordsHolder.value);
}
batchOutputCount++;
outNumRecordsHolder.value++;
@@ -256,7 +280,7 @@ public abstract class HashAggTemplate implements HashAggregator {
numGroupByOutFields = groupByOutFieldIds.length;
batchHolders = new ArrayList<BatchHolder>();
- addBatchHolder();
+ // First BatchHolder is created when the first put request is received.
doSetup(incoming);
}
@@ -494,7 +518,11 @@ public abstract class HashAggTemplate implements HashAggregator {
logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
lastBatchOutputCount = numOutputRecords;
- outBatchIndex++;
+ // If there are no more records to output, go to the next batch. If there are any records left refer to the
+ // same BatchHolder. Remaining records will be outputted in next outputCurrentBatch() call(s).
+ if (batchHolders.get(outBatchIndex).getNumPendingOutput() == 0) {
+ outBatchIndex++;
+ }
if (outBatchIndex == batchHolders.size()) {
allFlushed = true;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 322fd1f90..b5cfdca45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -163,7 +163,6 @@ public class ChainedHashTable {
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
// create a type-specific ValueVector for this key
ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
- vv.allocateNew();
htKeyFieldIds[i] = htContainerOrig.add(vv);
i++;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 6966ba1fd..1ec74bfef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -51,6 +51,9 @@ public interface HashTable {
static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
static final public int BATCH_MASK = 0x0000FFFF;
+ /** Variable width vector size in bytes */
+ public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE;
+
public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
RecordBatch incomingBuild, RecordBatch incomingProbe,
RecordBatch outgoing, VectorContainer htContainerOrig);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index c80e97af9..ba980d72b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -30,8 +30,10 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
import javax.inject.Named;
import java.util.ArrayList;
@@ -116,15 +118,24 @@ public abstract class HashTableTemplate implements HashTable {
private BatchHolder(int idx) {
this.batchIndex = idx;
- if (idx == 0) { // first batch holder can use the original htContainer
- htContainer = htContainerOrig;
- } else { // otherwise create a new one using the original's fields
- htContainer = new VectorContainer();
- for (VectorWrapper<?> w : htContainerOrig) {
- ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
+ htContainer = new VectorContainer();
+ for (VectorWrapper<?> w : htContainerOrig) {
+ ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
+
+ // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for
+ // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE
+ // records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will
+ // result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new
+ // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper.
+ if (vv instanceof FixedWidthVector) {
+ ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
+ } else if (vv instanceof VariableWidthVector) {
+ ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE);
+ } else {
vv.allocateNew();
- htContainer.add(vv);
}
+
+ htContainer.add(vv);
}
links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
@@ -454,7 +465,7 @@ public abstract class HashTableTemplate implements HashTable {
// Create the first batch holder
batchHolders = new ArrayList<BatchHolder>();
- addBatchHolder();
+ // First BatchHolder is created when the first put request is received.
doSetup(incomingBuild, incomingProbe);
@@ -753,6 +764,7 @@ public abstract class HashTableTemplate implements HashTable {
public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) {
assert batchIdx < batchHolders.size();
+
if (!batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords)) {
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index b68f08941..3c15db314 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -97,6 +97,12 @@ public class ObjectVector extends BaseValueVector{
addNewArray();
}
+ public void allocateNew(int valueCount) throws OutOfMemoryRuntimeException {
+ while (maxCount < valueCount) {
+ addNewArray();
+ }
+ }
+
@Override
public boolean allocateNewSafe() {
allocateNew();