aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java15
1 files changed, 12 insertions, 3 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
index 59a31256f..594b9f875 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
@@ -28,6 +28,7 @@ import java.util.NoSuchElementException;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -147,12 +148,13 @@ public class HBasePStore<V> implements PStore<V> {
private Result current = null;
private Result last = null;
private boolean done = false;
+ private int rowsRead = 0;
Iter() {
try {
Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
scan.addColumn(FAMILY, QUALIFIER);
- scan.setCaching(100);
+ scan.setCaching(config.getMaxIteratorSize() > 100 ? 100 : config.getMaxIteratorSize());
scanner = table.getScanner(scan);
} catch (IOException e) {
throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e);
@@ -161,16 +163,22 @@ public class HBasePStore<V> implements PStore<V> {
@Override
public boolean hasNext() {
- if (!done && current == null) {
+ if (config.getMode() == Mode.BLOB_PERSISTENT
+ && rowsRead >= config.getMaxIteratorSize()) {
+ done = true;
+ } else if (!done && current == null) {
try {
if ((current = scanner.next()) == null) {
done = true;
- scanner.close();
}
} catch (IOException e) {
throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e);
}
}
+
+ if (done && scanner != null) {
+ scanner.close();
+ }
return (current != null);
}
@@ -181,6 +189,7 @@ public class HBasePStore<V> implements PStore<V> {
}
last = current;
current = null;
+ rowsRead++;
return new DeferredEntry(last);
}