diff options
-rw-r--r-- | contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java index 59a31256f..594b9f875 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java @@ -28,6 +28,7 @@ import java.util.NoSuchElementException; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.drill.exec.store.sys.PStoreConfig.Mode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -147,12 +148,13 @@ public class HBasePStore<V> implements PStore<V> { private Result current = null; private Result last = null; private boolean done = false; + private int rowsRead = 0; Iter() { try { Scan scan = new Scan(tableNameStartKey, tableNameStopKey); scan.addColumn(FAMILY, QUALIFIER); - scan.setCaching(100); + scan.setCaching(config.getMaxIteratorSize() > 100 ? 100 : config.getMaxIteratorSize()); scanner = table.getScanner(scan); } catch (IOException e) { throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e); @@ -161,16 +163,22 @@ public class HBasePStore<V> implements PStore<V> { @Override public boolean hasNext() { - if (!done && current == null) { + if (config.getMode() == Mode.BLOB_PERSISTENT + && rowsRead >= config.getMaxIteratorSize()) { + done = true; + } else if (!done && current == null) { try { if ((current = scanner.next()) == null) { done = true; - scanner.close(); } } catch (IOException e) { throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e); } } + + if (done && scanner != null) { + scanner.close(); + } return (current != null); } @@ -181,6 +189,7 @@ public class HBasePStore<V> implements PStore<V> { } last = current; current = null; + rowsRead++; return new DeferredEntry(last); } |