diff options
author | Aditya Kishore <adi@apache.org> | 2016-06-12 16:28:52 -0700 |
---|---|---|
committer | Aditya Kishore <adi@apache.org> | 2016-06-14 13:37:20 -0700 |
commit | c2d9959e07f47a09a4a11c250f84f4874b7e1db4 (patch) | |
tree | 2379003988fa5cb476abeb91064b342eb2615585 /contrib/storage-hbase/src | |
parent | 6286c0a4b8e39524fe00d623152d1d38db15774f (diff) |
DRILL-4199: Add Support for HBase 1.X
Highlights of the changes:
* Replaced the old HBase APIs (HBaseAdmin/HTable) with the new HBase 1.1 APIs (Connection/Admin/Table).
* Added HBaseConnectionManager class which which manages the life-cycle of HBase connections inside a Drillbit process.
* Updated HBase dependencies version to 1.1.3 and 1.1.1-mapr-1602-m7-5.1.0 for default and "mapr" profiles respectively.
* Added `commons-logging` dependency in the `provided` scope to allow HBase test cluster to come up for Unit tests.
* Relaxed banned dependency rule for `commons-logging` library for `storage-hbase` module alone, in provided scope only.
* Removed the use of many deprecated APIs throughout the modules code.
* Added some missing test to HBase storage plugin's test suit.
* Move the GuavaPatcher code to main code execution path.
* Log a message if GuavaPatcher fails instead of exiting.
All unit tests are green.
Closes #443
Diffstat (limited to 'contrib/storage-hbase/src')
37 files changed, 673 insertions, 531 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java index 3b8391d85..fdef3644e 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesBigIntConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java index d012531f6..16dfa1417 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesBigIntConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java index 463483c7d..161452447 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesBigIntDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java index b2ae26871..6fbd04636 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesDoubleConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java index d90b620b7..a0276ce52 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesDoubleConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java index 944b1d10a..29122c78c 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesDoubleDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java index a66e58005..11cff0f82 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesFloatConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java index e41469c0e..2aa37fff8 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesFloatConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java index 5c40e795e..318c9b391 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesFloatDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java index 6c159471c..60a660a58 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesIntConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java index d703318b3..63e1570e1 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesIntConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java index 6ed4fbffe..0835328d9 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesIntDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java index 2527e8df4..09e7be703 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java @@ -40,7 +40,7 @@ import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; import org.apache.hadoop.hbase.util.Order; import org.apache.hadoop.hbase.util.PositionedByteRange; -import org.apache.hadoop.hbase.util.SimplePositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.Filter; @@ -254,7 +254,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "DOUBLE_OBD": if (valueArg instanceof DoubleExpression) { bb = newByteBuf(9, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING); @@ -269,7 +269,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "FLOAT_OBD": if (valueArg instanceof FloatExpression) { bb = newByteBuf(5, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, ((FloatExpression)valueArg).getFloat(), Order.DESCENDING); @@ -284,7 +284,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "BIGINT_OBD": if (valueArg instanceof LongExpression) { bb = newByteBuf(9, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, ((LongExpression)valueArg).getLong(), Order.DESCENDING); @@ -299,7 +299,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "INT_OBD": if (valueArg instanceof IntExpression) { bb = newByteBuf(5, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, ((IntExpression)valueArg).getInt(), Order.DESCENDING); diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java index e98aed215..b916ae744 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java @@ -21,24 +21,25 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Set; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.sql.type.SqlTypeName; public class DrillHBaseTable extends DrillTable implements DrillHBaseConstants { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHBaseTable.class); - private HTableDescriptor table; + private HTableDescriptor tableDesc; public DrillHBaseTable(String storageEngineName, HBaseStoragePlugin plugin, HBaseScanSpec scanSpec) { super(storageEngineName, plugin, scanSpec); - try(HBaseAdmin admin = new HBaseAdmin(plugin.getConfig().getHBaseConf())) { - table = admin.getTableDescriptor(HBaseUtils.getBytes(scanSpec.getTableName())); + try(Admin admin = plugin.getConnection().getAdmin()) { + tableDesc = admin.getTableDescriptor(TableName.valueOf(scanSpec.getTableName())); } catch (IOException e) { throw UserException.dataReadError() .message("Failure while loading table %s in database %s.", scanSpec.getTableName(), storageEngineName) @@ -55,7 +56,7 @@ public class DrillHBaseTable extends DrillTable implements DrillHBaseConstants { fieldNameList.add(ROW_KEY); typeList.add(typeFactory.createSqlType(SqlTypeName.ANY)); - Set<byte[]> families = table.getFamiliesKeys(); + Set<byte[]> families = tableDesc.getFamiliesKeys(); for (byte[] family : families) { fieldNameList.add(Bytes.toString(family)); typeList.add(typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.ANY))); diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java new file mode 100644 index 000000000..2dd067349 --- /dev/null +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hbase; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.hbase.HBaseStoragePlugin.HBaseConnectionKey; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.UncheckedExecutionException; + +/** + * <p>A singleton class which manages the lifecycle of HBase connections.</p> + * <p>One connection per storage plugin instance is maintained.</p> + */ +public final class HBaseConnectionManager + extends CacheLoader<HBaseConnectionKey, Connection> implements RemovalListener<HBaseConnectionKey, Connection> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseConnectionManager.class); + + public static final HBaseConnectionManager INSTANCE = new HBaseConnectionManager(); + + private final LoadingCache<HBaseConnectionKey, Connection> connectionCache; + + private HBaseConnectionManager() { + this.connectionCache = CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.HOURS) // Connections will be closed after 1 hour of inactivity + .removalListener(this) + .build(this); + } + + private boolean isValid(Connection conn) { + return conn != null + && !conn.isAborted() + && !conn.isClosed(); + } + + @Override + public Connection load(HBaseConnectionKey key) throws Exception { + Connection connection = ConnectionFactory.createConnection(key.getHBaseConf()); + logger.info("HBase connection '{}' created.", connection); + return connection; + } + + @Override + public void onRemoval(RemovalNotification<HBaseConnectionKey, Connection> notification) { + try { + Connection conn = notification.getValue(); + if (isValid(conn)) { + conn.close(); + } + logger.info("HBase connection '{}' closed.", conn); + } catch (Throwable t) { + logger.warn("Error while closing HBase connection.", t); + } + } + + public Connection getConnection(HBaseConnectionKey key) { + checkNotNull(key); + try { + Connection conn = connectionCache.get(key); + if (!isValid(conn)) { + key.lock(); // invalidate the connection with a per storage plugin lock + try { + conn = connectionCache.get(key); + if (!isValid(conn)) { + connectionCache.invalidate(key); + conn = connectionCache.get(key); + } + } finally { + key.unlock(); + } + } + return conn; + } catch (ExecutionException | UncheckedExecutionException e) { + throw UserException.dataReadError(e.getCause()).build(logger); + } + } + + public void closeConnection(HBaseConnectionKey key) { + checkNotNull(key); + connectionCache.invalidate(key); + } + +} diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java index 0e25fa660..394e12811 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.NullComparator; -import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -332,20 +331,19 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, return null; } -private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call, - CompareFunctionsProcessor processor) { + private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call, CompareFunctionsProcessor processor) { byte[] startRow = processor.getRowKeyPrefixStartRow(); byte[] stopRow = processor.getRowKeyPrefixStopRow(); Filter filter = processor.getRowKeyPrefixFilter(); if (startRow != HConstants.EMPTY_START_ROW || - stopRow != HConstants.EMPTY_END_ROW || - filter != null) { + stopRow != HConstants.EMPTY_END_ROW || + filter != null) { return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter); } // else return null; -} + } } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index ae243b329..e474c1182 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -33,7 +33,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import com.fasterxml.jackson.annotation.JsonCreator; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -49,11 +48,16 @@ import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; @@ -146,28 +150,29 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst private void init() { logger.debug("Getting region locations"); - try { - HTable table = new HTable(storagePluginConfig.getHBaseConf(), hbaseScanSpec.getTableName()); - this.hTableDesc = table.getTableDescriptor(); - NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); - statsCalculator = new TableStatsCalculator(table, hbaseScanSpec, storagePlugin.getContext().getConfig(), storagePluginConfig); + TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName()); + Connection conn = storagePlugin.getConnection(); + + try (Admin admin = conn.getAdmin(); + RegionLocator locator = conn.getRegionLocator(tableName)) { + this.hTableDesc = admin.getTableDescriptor(tableName); + List<HRegionLocation> regionLocations = locator.getAllRegionLocations(); + statsCalculator = new TableStatsCalculator(conn, hbaseScanSpec, storagePlugin.getContext().getConfig(), storagePluginConfig); boolean foundStartRegion = false; regionsToScan = new TreeMap<HRegionInfo, ServerName>(); - for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) { - HRegionInfo regionInfo = mapEntry.getKey(); + for (HRegionLocation regionLocation : regionLocations) { + HRegionInfo regionInfo = regionLocation.getRegionInfo(); if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) { continue; } foundStartRegion = true; - regionsToScan.put(regionInfo, mapEntry.getValue()); + regionsToScan.put(regionInfo, regionLocation.getServerName()); scanSizeInBytes += statsCalculator.getRegionSizeInBytes(regionInfo.getRegionName()); if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) { break; } } - - table.close(); } catch (IOException e) { throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java index a1cd9c9fa..172a54765 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java @@ -18,21 +18,20 @@ package org.apache.drill.exec.store.hbase; +import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.FilterPrel; -import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.store.StoragePluginOptimizerRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.rex.RexNode; import com.google.common.collect.ImmutableList; @@ -95,7 +94,7 @@ public abstract class HBasePushFilterIntoScan extends StoragePluginOptimizerRule } // convert the filter to one that references the child of the project - final RexNode condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project); + final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project); doPushFilterToScan(call, filter, project, scan, groupScan, condition); } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 0af00098f..b3a70391c 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -43,14 +43,13 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.complex.MapVector; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import com.google.common.base.Preconditions; @@ -67,20 +66,22 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas private Map<String, MapVector> familyVectorMap; private VarBinaryVector rowKeyVector; - private HTable hTable; + private Table hTable; private ResultScanner resultScanner; - private String hbaseTableName; + private TableName hbaseTableName; private Scan hbaseScan; - private Configuration hbaseConf; private OperatorContext operatorContext; private boolean rowKeyOnly; - public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec, + private final Connection connection; + + public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context) { - hbaseConf = conf; - hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName(); + this.connection = connection; + hbaseTableName = TableName.valueOf( + Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName()); hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); hbaseScan .setFilter(subScanSpec.getScanFilter()) @@ -136,10 +137,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas familyVectorMap = new HashMap<String, MapVector>(); try { - logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", - hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM), - hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - hTable = new HTable(hbaseConf, hbaseTableName); + hTable = connection.getTable(hbaseTableName); // Add top-level column-family map vectors to output in the order specified // when creating reader (order of first appearance in query). diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java index 7f779df24..3a098fc1a 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java @@ -45,7 +45,7 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{ if ((columns = subScan.getColumns())==null) { columns = GroupScan.ALL_COLUMNS; } - readers.add(new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, columns, context)); + readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns, context)); } catch (Exception e1) { throw new ExecutionSetupException(e1); } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java index 866a2dcdc..56dfc10c3 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java @@ -21,15 +21,13 @@ import java.io.IOException; import java.util.Collections; import java.util.Set; -import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; - import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.SchemaFactory; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; @@ -79,11 +77,11 @@ public class HBaseSchemaFactory implements SchemaFactory { @Override public Set<String> getTableNames() { - try(HBaseAdmin admin = new HBaseAdmin(plugin.getConfig().getHBaseConf())) { + try(Admin admin = plugin.getConnection().getAdmin()) { HTableDescriptor[] tables = admin.listTables(); Set<String> tableNames = Sets.newHashSet(); for (HTableDescriptor table : tables) { - tableNames.add(new String(table.getName())); + tableNames.add(new String(table.getTableName().getNameAsString())); } return tableNames; } catch (Exception e) { diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java index 08fd8e114..81899cf60 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java @@ -19,36 +19,39 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.apache.calcite.schema.SchemaPlus; - import org.apache.drill.common.JSONOptions; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; public class HBaseStoragePlugin extends AbstractStoragePlugin { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePlugin.class); + private static final HBaseConnectionManager hbaseConnectionManager = HBaseConnectionManager.INSTANCE; private final DrillbitContext context; - private final HBaseStoragePluginConfig engineConfig; + private final HBaseStoragePluginConfig storeConfig; private final HBaseSchemaFactory schemaFactory; + private final HBaseConnectionKey connectionKey; - @SuppressWarnings("unused") private final String name; - public HBaseStoragePlugin(HBaseStoragePluginConfig configuration, DrillbitContext context, String name) + public HBaseStoragePlugin(HBaseStoragePluginConfig storeConfig, DrillbitContext context, String name) throws IOException { this.context = context; this.schemaFactory = new HBaseSchemaFactory(this, name); - this.engineConfig = configuration; + this.storeConfig = storeConfig; this.name = name; + this.connectionKey = new HBaseConnectionKey(); } public DrillbitContext getContext() { @@ -73,11 +76,85 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin { @Override public HBaseStoragePluginConfig getConfig() { - return engineConfig; + return storeConfig; } @Override public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) { return ImmutableSet.of(HBasePushFilterIntoScan.FILTER_ON_SCAN, HBasePushFilterIntoScan.FILTER_ON_PROJECT); } -}
\ No newline at end of file + + @Override + public void close() throws Exception { + hbaseConnectionManager.closeConnection(connectionKey); + } + + public Connection getConnection() { + return hbaseConnectionManager.getConnection(connectionKey); + } + + /** + * An internal class which serves the key in a map of {@link HBaseStoragePlugin} => {@link Connection}. + */ + class HBaseConnectionKey { + + private final ReentrantLock lock = new ReentrantLock(); + + private HBaseConnectionKey() {} + + public void lock() { + lock.lock(); + } + + public void unlock() { + lock.unlock(); + } + + public Configuration getHBaseConf() { + return storeConfig.getHBaseConf(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((storeConfig == null) ? 0 : storeConfig.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj == null) { + return false; + } else if (getClass() != obj.getClass()) { + return false; + } + + HBaseStoragePlugin other = ((HBaseConnectionKey) obj).getHBaseStoragePlugin(); + if (name == null) { + if (other.name != null) { + return false; + } + } else if (!name.equals(other.name)) { + return false; + } + if (storeConfig == null) { + if (other.storeConfig != null) { + return false; + } + } else if (!storeConfig.equals(other.storeConfig)) { + return false; + } + return true; + } + + private HBaseStoragePlugin getHBaseStoragePlugin() { + return HBaseStoragePlugin.this; + } + + } + +} diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index 08ea7f12e..302ccca4e 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.hbase; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -39,7 +40,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; // Class containing information for reading a single HBase region @JsonTypeName("hbase-region-scan") @@ -111,7 +111,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan { @Override public Iterator<PhysicalOperator> iterator() { - return Iterators.emptyIterator(); + return Collections.emptyIterator(); } public static class HBaseSubScanSpec { diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java index bfd04fe35..4e0365cb9 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -29,15 +30,18 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; /** @@ -64,14 +68,16 @@ public class TableStatsCalculator { /** * Computes size of each region for table. * - * @param table + * @param conn * @param hbaseScanSpec * @param config * @throws IOException */ - public TableStatsCalculator(HTable table, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException { - HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); - try { + public TableStatsCalculator(Connection conn, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException { + TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName()); + try (Admin admin = conn.getAdmin(); + Table table = conn.getTable(tableName); + RegionLocator locator = conn.getRegionLocator(tableName)) { int rowsToSample = rowsToSample(config); if (rowsToSample > 0) { Scan scan = new Scan(hbaseScanSpec.getStartRow(), hbaseScanSpec.getStopRow()); @@ -89,7 +95,7 @@ public class TableStatsCalculator { Cell[] cells = row.rawCells(); if (cells != null) { for (Cell cell : cells) { - rowSizeSum += CellUtil.estimatedSizeOf(cell); + rowSizeSum += CellUtil.estimatedSerializedSizeOf(cell); } } } @@ -105,13 +111,13 @@ public class TableStatsCalculator { return; } - logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + logger.info("Calculating region sizes for table '{}'.", tableName.getNameAsString()); //get regions for table - Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet(); + List<HRegionLocation> tableRegionInfos = locator.getAllRegionLocations(); Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); - for (HRegionInfo regionInfo : tableRegionInfos) { - tableRegions.add(regionInfo.getRegionName()); + for (HRegionLocation regionInfo : tableRegionInfos) { + tableRegions.add(regionInfo.getRegionInfo().getRegionName()); } ClusterStatus clusterStatus = null; @@ -145,8 +151,6 @@ public class TableStatsCalculator { } } logger.debug("Region sizes calculated"); - } finally { - admin.close(); } } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java index ac78eb0c1..2d329a852 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -25,36 +25,40 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.NoSuchElementException; -import com.google.common.collect.Iterators; -import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.store.sys.BasePersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreMode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.collect.Iterators; + public class HBasePersistentStore<V> extends BasePersistentStore<V> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStore.class); private final PersistentStoreConfig<V> config; - private final HTableInterface table; + private final Table hbaseTable; + private final String hbaseTableName; + private final String tableName; private final byte[] tableNameStartKey; private final byte[] tableNameStopKey; - public HBasePersistentStore(PersistentStoreConfig<V> config, HTableInterface table) { + public HBasePersistentStore(PersistentStoreConfig<V> config, Table table) { this.tableName = config.getName() + '\0'; this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00" this.tableNameStopKey = this.tableNameStartKey.clone(); this.tableNameStopKey[tableNameStartKey.length-1] = 1; this.config = config; - this.table = table; + this.hbaseTable = table; + this.hbaseTableName = table.getName().getNameAsString(); } @Override @@ -71,13 +75,15 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { try { Get get = new Get(row(key)); get.addColumn(family, QUALIFIER); - Result r = table.get(get); + Result r = hbaseTable.get(get); if(r.isEmpty()){ return null; } return value(r); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while getting row '%s' from for table '%s'", key, hbaseTableName) + .build(logger); } } @@ -89,10 +95,12 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { protected synchronized void put(String key, byte[] family, V value) { try { Put put = new Put(row(key)); - put.add(family, QUALIFIER, bytes(value)); - table.put(put); + put.addColumn(family, QUALIFIER, bytes(value)); + hbaseTable.put(put); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName) + .build(logger); } } @@ -100,10 +108,12 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { public synchronized boolean putIfAbsent(String key, V value) { try { Put put = new Put(row(key)); - put.add(FAMILY, QUALIFIER, bytes(value)); - return table.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put); + put.addColumn(FAMILY, QUALIFIER, bytes(value)); + return hbaseTable.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName) + .build(logger); } } @@ -127,7 +137,7 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { try { return config.getSerializer().serialize(value); } catch (IOException e) { - throw new DrillRuntimeException(e); + throw UserException.dataReadError(e).build(logger); } } @@ -135,17 +145,18 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { try { return config.getSerializer().deserialize(result.value()); } catch (IOException e) { - throw new DrillRuntimeException(e); + throw UserException.dataReadError(e).build(logger); } } private void delete(byte[] row) { try { Delete del = new Delete(row); - table.delete(del); + hbaseTable.delete(del); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while deleting row '" + Bytes.toStringBinary(row) - + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while deleting row '%s' from for table '%s'", Bytes.toStringBinary(row), hbaseTableName) + .build(logger); } } @@ -161,9 +172,11 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { scan.addColumn(FAMILY, QUALIFIER); scan.setCaching(Math.min(take, 100)); scan.setBatch(take); // set batch size - scanner = table.getScanner(scan); + scanner = hbaseTable.getScanner(scan); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while creating HBase scanner for table '%s'" + hbaseTableName) + .build(logger); } } @@ -175,7 +188,9 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { done = true; } } catch (IOException e) { - throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while fetching rows from for table '%s'", hbaseTableName) + .build(logger); } } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java index 6e379c6e9..1dd44cde5 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.store.hbase.config; import java.io.IOException; import java.util.Map; -import com.google.common.annotations.VisibleForTesting; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.store.hbase.DrillHBaseConstants; @@ -33,12 +32,15 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.annotations.VisibleForTesting; + public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class); @@ -46,13 +48,13 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { static final byte[] QUALIFIER = Bytes.toBytes("d"); - private final String storeTableName; + private final TableName hbaseTableName; private Configuration hbaseConf; - private HConnection connection; + private Connection connection; - private HTableInterface table; + private Table hbaseTable; public HBasePersistentStoreProvider(PersistentStoreRegistry registry) { @SuppressWarnings("unchecked") @@ -64,13 +66,13 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); } } - this.storeTableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE); + this.hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); } @VisibleForTesting public HBasePersistentStoreProvider(Configuration conf, String storeTableName) { this.hbaseConf = conf; - this.storeTableName = storeTableName; + this.hbaseTableName = TableName.valueOf(storeTableName); } @@ -80,7 +82,7 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { switch(config.getMode()){ case BLOB_PERSISTENT: case PERSISTENT: - return new HBasePersistentStore<>(config, this.table); + return new HBasePersistentStore<>(config, this.hbaseTable); default: throw new IllegalStateException(); @@ -89,35 +91,33 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { @Override - @SuppressWarnings("deprecation") public void start() throws IOException { - this.connection = HConnectionManager.createConnection(hbaseConf); + this.connection = ConnectionFactory.createConnection(hbaseConf); - try(HBaseAdmin admin = new HBaseAdmin(connection)) { - if (!admin.tableExists(storeTableName)) { - HTableDescriptor desc = new HTableDescriptor(storeTableName); + try(Admin admin = connection.getAdmin()) { + if (!admin.tableExists(hbaseTableName)) { + HTableDescriptor desc = new HTableDescriptor(hbaseTableName); desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); admin.createTable(desc); } else { - HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName)); + HTableDescriptor desc = admin.getTableDescriptor(hbaseTableName); if (!desc.hasFamily(FAMILY)) { - throw new DrillRuntimeException("The HBase table " + storeTableName + throw new DrillRuntimeException("The HBase table " + hbaseTableName + " specified as persistent store exists but does not contain column family: " + (Bytes.toString(FAMILY))); } } } - this.table = connection.getTable(storeTableName); - this.table.setAutoFlush(true); + this.hbaseTable = connection.getTable(hbaseTableName); } @Override public synchronized void close() { - if (this.table != null) { + if (this.hbaseTable != null) { try { - this.table.close(); - this.table = null; + this.hbaseTable.close(); + this.hbaseTable = null; } catch (IOException e) { logger.warn("Caught exception while closing HBase table.", e); } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java index 0ccd8d662..5ce823c0b 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hbase.HBaseStoragePlugin; import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig; +import org.apache.drill.exec.util.GuavaPatcher; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.junit.AfterClass; @@ -38,6 +39,10 @@ import com.google.common.io.Files; public class BaseHBaseTest extends BaseTestQuery { + static { + GuavaPatcher.patch(); + } + private static final String HBASE_STORAGE_PLUGIN_NAME = "hbase"; protected static Configuration conf = HBaseConfiguration.create(); @@ -46,16 +51,13 @@ public class BaseHBaseTest extends BaseTestQuery { protected static HBaseStoragePluginConfig storagePluginConfig; - @BeforeClass public static void setupDefaultTestCluster() throws Exception { - GuavaPatcher.patch(); - /* * Change the following to HBaseTestsSuite.configure(false, true) * if you want to test against an externally running HBase cluster. */ - HBaseTestsSuite.configure(true, true); + HBaseTestsSuite.configure(true /*manageHBaseCluster*/, true /*createTables*/); HBaseTestsSuite.initCluster(); BaseTestQuery.setupDefaultTestCluster(); @@ -66,7 +68,6 @@ public class BaseHBaseTest extends BaseTestQuery { storagePluginConfig.setEnabled(true); storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort()); pluginRegistry.createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, storagePluginConfig, true); - } @AfterClass @@ -105,9 +106,7 @@ public class BaseHBaseTest extends BaseTestQuery { } protected String canonizeHBaseSQL(String sql) { - return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1); + return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1.getNameAsString()); } - - } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/GuavaPatcher.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/GuavaPatcher.java deleted file mode 100644 index 8f24da802..000000000 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/GuavaPatcher.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.hbase; - -import java.lang.reflect.Modifier; - -import javassist.ClassPool; -import javassist.CtClass; -import javassist.CtConstructor; -import javassist.CtMethod; -import javassist.CtNewMethod; - -import org.apache.drill.common.CatastrophicFailure; - -public class GuavaPatcher { - - private static boolean patched; - - public static synchronized void patch() { - if (!patched) { - try { - patchStopwatch(); - patchCloseables(); - patched = true; - } catch (Exception e) { - CatastrophicFailure.exit(e, "Unable to patch Guava classes.", -100); - } - } - } - - /** - * Makes Guava stopwatch look like the old version for compatibility with hbase-server (for test purposes). - */ - private static void patchStopwatch() throws Exception { - - ClassPool cp = ClassPool.getDefault(); - CtClass cc = cp.get("com.google.common.base.Stopwatch"); - - // Expose the constructor for Stopwatch for old libraries who use the pattern new Stopwatch().start(). - for (CtConstructor c : cc.getConstructors()) { - if (!Modifier.isStatic(c.getModifiers())) { - c.setModifiers(Modifier.PUBLIC); - } - } - - // Add back the Stopwatch.elapsedMillis() method for old consumers. - CtMethod newmethod = CtNewMethod.make( - "public long elapsedMillis() { return elapsed(java.util.concurrent.TimeUnit.MILLISECONDS); }", cc); - cc.addMethod(newmethod); - - // Load the modified class instead of the original. - cc.toClass(); - - System.out.println("Google's Stopwatch patched for old HBase Guava version."); - } - - private static void patchCloseables() throws Exception { - - ClassPool cp = ClassPool.getDefault(); - CtClass cc = cp.get("com.google.common.io.Closeables"); - - - // Add back the Closeables.closeQuietly() method for old consumers. - CtMethod newmethod = CtNewMethod.make( - "public static void closeQuietly(java.io.Closeable closeable) { try{closeable.close();}catch(Exception e){} }", - cc); - cc.addMethod(newmethod); - - // Load the modified class instead of the original. - cc.toClass(); - - System.out.println("Google's Closeables patched for old HBase Guava version."); - } - -} diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java index 6414f8b1a..b297b418d 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java @@ -24,19 +24,19 @@ public class HBaseRecordReaderTest extends BaseHBaseTest { @Test public void testLocalDistributed() throws Exception { String planName = "/hbase/hbase_scan_screen_physical.json"; - runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 8); + runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 8); } @Test public void testLocalDistributedColumnSelect() throws Exception { String planName = "/hbase/hbase_scan_screen_physical_column_select.json"; - runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 3); + runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 3); } @Test public void testLocalDistributedFamilySelect() throws Exception { String planName = "/hbase/hbase_scan_screen_physical_family_select.json"; - runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 4); + runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 4); } } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index 8f59b676c..3dd36081d 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -21,11 +21,16 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.exec.util.GuavaPatcher; +import org.apache.drill.hbase.test.Drill2130StorageHBaseHamcrestConfigurationTest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runner.RunWith; @@ -34,38 +39,46 @@ import org.junit.runners.Suite.SuiteClasses; @RunWith(Suite.class) @SuiteClasses({ - TestHBaseQueries.class, - TestHBaseRegexParser.class, + Drill2130StorageHBaseHamcrestConfigurationTest.class, HBaseRecordReaderTest.class, + TestHBaseCFAsJSONString.class, + TestHBaseConnectionManager.class, TestHBaseFilterPushDown.class, TestHBaseProjectPushDown.class, + TestHBaseQueries.class, + TestHBaseRegexParser.class, TestHBaseRegionScanAssignments.class, TestHBaseTableProvider.class, - TestHBaseCFAsJSONString.class + TestOrderedBytesConvertFunctions.class }) public class HBaseTestsSuite { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class); + static { + GuavaPatcher.patch(); + } + private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0; - protected static final String TEST_TABLE_1 = "TestTable1"; - protected static final String TEST_TABLE_3 = "TestTable3"; - protected static final String TEST_TABLE_COMPOSITE_DATE = "TestTableCompositeDate"; - protected static final String TEST_TABLE_COMPOSITE_TIME = "TestTableCompositeTime"; - protected static final String TEST_TABLE_COMPOSITE_INT = "TestTableCompositeInt"; - protected static final String TEST_TABLE_DOUBLE_OB = "TestTableDoubleOB"; - protected static final String TEST_TABLE_FLOAT_OB = "TestTableFloatOB"; - protected static final String TEST_TABLE_BIGINT_OB = "TestTableBigIntOB"; - protected static final String TEST_TABLE_INT_OB = "TestTableIntOB"; - protected static final String TEST_TABLE_DOUBLE_OB_DESC = "TestTableDoubleOBDesc"; - protected static final String TEST_TABLE_FLOAT_OB_DESC = "TestTableFloatOBDesc"; - protected static final String TEST_TABLE_BIGINT_OB_DESC = "TestTableBigIntOBDesc"; - protected static final String TEST_TABLE_INT_OB_DESC = "TestTableIntOBDesc"; - protected static final String TEST_TABLE_NULL_STR = "TestTableNullStr"; + protected static final TableName TEST_TABLE_1 = TableName.valueOf("TestTable1"); + protected static final TableName TEST_TABLE_3 = TableName.valueOf("TestTable3"); + protected static final TableName TEST_TABLE_COMPOSITE_DATE = TableName.valueOf("TestTableCompositeDate"); + protected static final TableName TEST_TABLE_COMPOSITE_TIME = TableName.valueOf("TestTableCompositeTime"); + protected static final TableName TEST_TABLE_COMPOSITE_INT = TableName.valueOf("TestTableCompositeInt"); + protected static final TableName TEST_TABLE_DOUBLE_OB = TableName.valueOf("TestTableDoubleOB"); + protected static final TableName TEST_TABLE_FLOAT_OB = TableName.valueOf("TestTableFloatOB"); + protected static final TableName TEST_TABLE_BIGINT_OB = TableName.valueOf("TestTableBigIntOB"); + protected static final TableName TEST_TABLE_INT_OB = TableName.valueOf("TestTableIntOB"); + protected static final TableName TEST_TABLE_DOUBLE_OB_DESC = TableName.valueOf("TestTableDoubleOBDesc"); + protected static final TableName TEST_TABLE_FLOAT_OB_DESC = TableName.valueOf("TestTableFloatOBDesc"); + protected static final TableName TEST_TABLE_BIGINT_OB_DESC = TableName.valueOf("TestTableBigIntOBDesc"); + protected static final TableName TEST_TABLE_INT_OB_DESC = TableName.valueOf("TestTableIntOBDesc"); + protected static final TableName TEST_TABLE_NULL_STR = TableName.valueOf("TestTableNullStr"); private static Configuration conf; - private static HBaseAdmin admin; + private static Connection conn; + private static Admin admin; private static HBaseTestingUtility UTIL; @@ -82,15 +95,13 @@ public class HBaseTestsSuite { @BeforeClass public static void initCluster() throws Exception { - GuavaPatcher.patch(); - if (initCount.get() == 0) { synchronized (HBaseTestsSuite.class) { if (initCount.get() == 0) { conf = HBaseConfiguration.create(); conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-unit-tests-client"); if (IS_DEBUG) { - conf.set("hbase.regionserver.lease.period","10000000"); + conf.set("hbase.client.scanner.timeout.period","10000000"); } if (manageHBaseCluster) { @@ -105,7 +116,8 @@ public class HBaseTestsSuite { logger.info("HBase mini cluster started. Zookeeper port: '{}'", getZookeeperPort()); } - admin = new HBaseAdmin(conf); + conn = ConnectionFactory.createConnection(conf); + admin = conn.getAdmin(); if (createTables || !tablesExist()) { createTestTables(); @@ -172,20 +184,20 @@ public class HBaseTestsSuite { * multiple fragments. Hence the number of regions in the HBase table is set to 1. * Will revert to multiple region once the issue is resolved. */ - TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 2); - TestTableGenerator.generateHBaseDataset3(admin, TEST_TABLE_3, 1); - TestTableGenerator.generateHBaseDatasetCompositeKeyDate(admin, TEST_TABLE_COMPOSITE_DATE, 1); - TestTableGenerator.generateHBaseDatasetCompositeKeyTime(admin, TEST_TABLE_COMPOSITE_TIME, 1); - TestTableGenerator.generateHBaseDatasetCompositeKeyInt(admin, TEST_TABLE_COMPOSITE_INT, 1); - TestTableGenerator.generateHBaseDatasetDoubleOB(admin, TEST_TABLE_DOUBLE_OB, 1); - TestTableGenerator.generateHBaseDatasetFloatOB(admin, TEST_TABLE_FLOAT_OB, 1); - TestTableGenerator.generateHBaseDatasetBigIntOB(admin, TEST_TABLE_BIGINT_OB, 1); - TestTableGenerator.generateHBaseDatasetIntOB(admin, TEST_TABLE_INT_OB, 1); - TestTableGenerator.generateHBaseDatasetDoubleOBDesc(admin, TEST_TABLE_DOUBLE_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetFloatOBDesc(admin, TEST_TABLE_FLOAT_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetBigIntOBDesc(admin, TEST_TABLE_BIGINT_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetIntOBDesc(admin, TEST_TABLE_INT_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetNullStr(admin, TEST_TABLE_NULL_STR, 1); + TestTableGenerator.generateHBaseDataset1(conn, admin, TEST_TABLE_1, 2); + TestTableGenerator.generateHBaseDataset3(conn, admin, TEST_TABLE_3, 1); + TestTableGenerator.generateHBaseDatasetCompositeKeyDate(conn, admin, TEST_TABLE_COMPOSITE_DATE, 1); + TestTableGenerator.generateHBaseDatasetCompositeKeyTime(conn, admin, TEST_TABLE_COMPOSITE_TIME, 1); + TestTableGenerator.generateHBaseDatasetCompositeKeyInt(conn, admin, TEST_TABLE_COMPOSITE_INT, 1); + TestTableGenerator.generateHBaseDatasetDoubleOB(conn, admin, TEST_TABLE_DOUBLE_OB, 1); + TestTableGenerator.generateHBaseDatasetFloatOB(conn, admin, TEST_TABLE_FLOAT_OB, 1); + TestTableGenerator.generateHBaseDatasetBigIntOB(conn, admin, TEST_TABLE_BIGINT_OB, 1); + TestTableGenerator.generateHBaseDatasetIntOB(conn, admin, TEST_TABLE_INT_OB, 1); + TestTableGenerator.generateHBaseDatasetDoubleOBDesc(conn, admin, TEST_TABLE_DOUBLE_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetFloatOBDesc(conn, admin, TEST_TABLE_FLOAT_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetBigIntOBDesc(conn, admin, TEST_TABLE_BIGINT_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetIntOBDesc(conn, admin, TEST_TABLE_INT_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetNullStr(conn, admin, TEST_TABLE_NULL_STR, 1); } private static void cleanupTestTables() throws IOException { @@ -228,8 +240,12 @@ public class HBaseTestsSuite { HBaseTestsSuite.createTables = createTables; } - public static HBaseAdmin getAdmin() { + public static Admin getAdmin() { return admin; } + public static Connection getConnection() { + return conn; + } + } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java new file mode 100644 index 000000000..9b3daf036 --- /dev/null +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.hbase; + +import org.junit.Test; + +public class TestHBaseConnectionManager extends BaseHBaseTest { + + @Test + public void testHBaseConnectionManager() throws Exception{ + setColumnWidth(8); + runHBaseSQLVerifyCount("SELECT\n" + + "row_key\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName" + , 8); + + /* + * Simulate HBase connection close and ensure that the connection + * will be reestablished automatically. + */ + storagePlugin.getConnection().close(); + runHBaseSQLVerifyCount("SELECT\n" + + "row_key\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName" + , 8); + + /* + * Simulate HBase cluster restart and ensure that running query against + * HBase does not require Drill cluster restart. + */ + HBaseTestsSuite.getHBaseTestingUtility().shutdownMiniHBaseCluster(); + HBaseTestsSuite.getHBaseTestingUtility().restartHBaseCluster(1); + runHBaseSQLVerifyCount("SELECT\n" + + "row_key\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName" + , 8); + + } + +} diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java index 7ef795451..56386a06d 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java @@ -18,7 +18,6 @@ package org.apache.drill.hbase; import org.apache.drill.PlanTestBase; -import org.junit.Ignore; import org.junit.Test; public class TestHBaseFilterPushDown extends BaseHBaseTest { diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java index ce7d585f8..b054bfa25 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java @@ -23,39 +23,36 @@ import java.util.List; import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.junit.Test; public class TestHBaseQueries extends BaseHBaseTest { @Test public void testWithEmptyFirstAndLastRegion() throws Exception { - HBaseAdmin admin = HBaseTestsSuite.getAdmin(); - String tableName = "drill_ut_empty_regions"; - HTable table = null; + HBaseAdmin admin = (HBaseAdmin) HBaseTestsSuite.getAdmin(); + TableName tableName = TableName.valueOf("drill_ut_empty_regions"); - try { - HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor("f")); - admin.createTable(desc, Arrays.copyOfRange(TestTableGenerator.SPLIT_KEYS, 0, 2)); + try (Table table = HBaseTestsSuite.getConnection().getTable(tableName);) { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + admin.createTable(desc, Arrays.copyOfRange(TestTableGenerator.SPLIT_KEYS, 0, 2)); - table = new HTable(admin.getConfiguration(), tableName); - Put p = new Put("b".getBytes()); - p.add("f".getBytes(), "c".getBytes(), "1".getBytes()); - table.put(p); + Put p = new Put("b".getBytes()); + p.addColumn("f".getBytes(), "c".getBytes(), "1".getBytes()); + table.put(p); - setColumnWidths(new int[] {8, 15}); - runHBaseSQLVerifyCount("SELECT *\n" - + "FROM\n" - + " hbase.`" + tableName + "` tableName\n" - , 1); + setColumnWidths(new int[] {8, 15}); + runHBaseSQLVerifyCount("SELECT *\n" + + "FROM\n" + + " hbase.`" + tableName + "` tableName\n" + , 1); } finally { try { - if (table != null) { - table.close(); - } admin.disableTable(tableName); admin.deleteTable(tableName); } catch (Exception e) { } // ignore @@ -63,20 +60,16 @@ public class TestHBaseQueries extends BaseHBaseTest { } - @Test public void testWithEmptyTable() throws Exception { - HBaseAdmin admin = HBaseTestsSuite.getAdmin(); - String tableName = "drill_ut_empty_table"; - HTable table = null; + Admin admin = HBaseTestsSuite.getAdmin(); + TableName tableName = TableName.valueOf("drill_ut_empty_table"); - try { + try (Table table = HBaseTestsSuite.getConnection().getTable(tableName);) { HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); admin.createTable(desc, Arrays.copyOfRange(TestTableGenerator.SPLIT_KEYS, 0, 2)); - table = new HTable(admin.getConfiguration(), tableName); - setColumnWidths(new int[] {8, 15}); runHBaseSQLVerifyCount("SELECT row_key, count(*)\n" + "FROM\n" @@ -84,14 +77,12 @@ public class TestHBaseQueries extends BaseHBaseTest { , 0); } finally { try { - if (table != null) { - table.close(); - } admin.disableTable(tableName); admin.deleteTable(tableName); } catch (Exception e) { } // ignore } } + @Test public void testCastEmptyStrings() throws Exception { try { @@ -106,4 +97,5 @@ public class TestHBaseQueries extends BaseHBaseTest { test("alter system reset `drill.exec.functions.cast_empty_string_to_null`;"); } } + } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java index 96c3668db..391a616c2 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java @@ -17,42 +17,20 @@ */ package org.apache.drill.hbase; -import static org.apache.drill.TestBuilder.listOf; -import static org.apache.drill.TestBuilder.mapOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import io.netty.buffer.DrillBuf; import java.util.ArrayList; import java.util.List; -import mockit.Injectable; - import org.apache.drill.BaseTestQuery; -import org.apache.drill.TestBuilder; -import org.apache.drill.exec.compile.ClassTransformer; -import org.apache.drill.exec.compile.ClassTransformer.ScalarReplacementOption; -import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.QueryDataBatch; -import org.apache.drill.exec.rpc.user.UserServer; -import org.apache.drill.exec.server.Drillbit; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.server.options.OptionValue; -import org.apache.drill.exec.server.options.OptionValue.OptionType; -import org.apache.drill.exec.util.ByteBufUtil.HadoopWritables; -import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarCharVector; -import org.joda.time.DateTime; -import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import com.google.common.base.Charsets; diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java index f4f3e933d..73df7e412 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java @@ -24,9 +24,15 @@ import java.util.Random; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Order; +import org.apache.hadoop.hbase.util.OrderedBytes; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange; public class TestTableGenerator { @@ -39,7 +45,7 @@ public class TestTableGenerator { static final byte[] FAMILY_F = {'f'}; static final byte[] COLUMN_C = {'c'}; - public static void generateHBaseDataset1(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDataset1(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -54,81 +60,80 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); Put p = new Put("a1".getBytes()); - p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f".getBytes(), "c2".getBytes(), "2".getBytes()); - p.add("f".getBytes(), "c3".getBytes(), "3".getBytes()); - p.add("f".getBytes(), "c4".getBytes(), "4".getBytes()); - p.add("f".getBytes(), "c5".getBytes(), "5".getBytes()); - p.add("f".getBytes(), "c6".getBytes(), "6".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "2".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "3".getBytes()); + p.addColumn("f".getBytes(), "c4".getBytes(), "4".getBytes()); + p.addColumn("f".getBytes(), "c5".getBytes(), "5".getBytes()); + p.addColumn("f".getBytes(), "c6".getBytes(), "6".getBytes()); + table.mutate(p); p = new Put("a2".getBytes()); - p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f".getBytes(), "c2".getBytes(), "2".getBytes()); - p.add("f".getBytes(), "c3".getBytes(), "3".getBytes()); - p.add("f".getBytes(), "c4".getBytes(), "4".getBytes()); - p.add("f".getBytes(), "c5".getBytes(), "5".getBytes()); - p.add("f".getBytes(), "c6".getBytes(), "6".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "2".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "3".getBytes()); + p.addColumn("f".getBytes(), "c4".getBytes(), "4".getBytes()); + p.addColumn("f".getBytes(), "c5".getBytes(), "5".getBytes()); + p.addColumn("f".getBytes(), "c6".getBytes(), "6".getBytes()); + table.mutate(p); p = new Put("a3".getBytes()); - p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f".getBytes(), "c3".getBytes(), "2".getBytes()); - p.add("f".getBytes(), "c5".getBytes(), "3".getBytes()); - p.add("f".getBytes(), "c7".getBytes(), "4".getBytes()); - p.add("f".getBytes(), "c8".getBytes(), "5".getBytes()); - p.add("f".getBytes(), "c9".getBytes(), "6".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "2".getBytes()); + p.addColumn("f".getBytes(), "c5".getBytes(), "3".getBytes()); + p.addColumn("f".getBytes(), "c7".getBytes(), "4".getBytes()); + p.addColumn("f".getBytes(), "c8".getBytes(), "5".getBytes()); + p.addColumn("f".getBytes(), "c9".getBytes(), "6".getBytes()); + table.mutate(p); p = new Put(new byte[]{'b', '4', 0}); - p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f2".getBytes(), "c2".getBytes(), "2".getBytes()); - p.add("f".getBytes(), "c3".getBytes(), "3".getBytes()); - p.add("f2".getBytes(), "c4".getBytes(), "4".getBytes()); - p.add("f".getBytes(), "c5".getBytes(), "5".getBytes()); - p.add("f2".getBytes(), "c6".getBytes(), "6".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f2".getBytes(), "c2".getBytes(), "2".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "3".getBytes()); + p.addColumn("f2".getBytes(), "c4".getBytes(), "4".getBytes()); + p.addColumn("f".getBytes(), "c5".getBytes(), "5".getBytes()); + p.addColumn("f2".getBytes(), "c6".getBytes(), "6".getBytes()); + table.mutate(p); p = new Put("b4".getBytes()); - p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f2".getBytes(), "c2".getBytes(), "2".getBytes()); - p.add("f".getBytes(), "c3".getBytes(), "3".getBytes()); - p.add("f2".getBytes(), "c4".getBytes(), "4".getBytes()); - p.add("f".getBytes(), "c5".getBytes(), "5".getBytes()); - p.add("f2".getBytes(), "c6".getBytes(), "6".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f2".getBytes(), "c2".getBytes(), "2".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "3".getBytes()); + p.addColumn("f2".getBytes(), "c4".getBytes(), "4".getBytes()); + p.addColumn("f".getBytes(), "c5".getBytes(), "5".getBytes()); + p.addColumn("f2".getBytes(), "c6".getBytes(), "6".getBytes()); + table.mutate(p); p = new Put("b5".getBytes()); - p.add("f2".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f".getBytes(), "c2".getBytes(), "2".getBytes()); - p.add("f2".getBytes(), "c3".getBytes(), "3".getBytes()); - p.add("f".getBytes(), "c4".getBytes(), "4".getBytes()); - p.add("f2".getBytes(), "c5".getBytes(), "5".getBytes()); - p.add("f".getBytes(), "c6".getBytes(), "6".getBytes()); - table.put(p); + p.addColumn("f2".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "2".getBytes()); + p.addColumn("f2".getBytes(), "c3".getBytes(), "3".getBytes()); + p.addColumn("f".getBytes(), "c4".getBytes(), "4".getBytes()); + p.addColumn("f2".getBytes(), "c5".getBytes(), "5".getBytes()); + p.addColumn("f".getBytes(), "c6".getBytes(), "6".getBytes()); + table.mutate(p); p = new Put("b6".getBytes()); - p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f2".getBytes(), "c3".getBytes(), "2".getBytes()); - p.add("f".getBytes(), "c5".getBytes(), "3".getBytes()); - p.add("f2".getBytes(), "c7".getBytes(), "4".getBytes()); - p.add("f".getBytes(), "c8".getBytes(), "5".getBytes()); - p.add("f2".getBytes(), "c9".getBytes(), "6".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f2".getBytes(), "c3".getBytes(), "2".getBytes()); + p.addColumn("f".getBytes(), "c5".getBytes(), "3".getBytes()); + p.addColumn("f2".getBytes(), "c7".getBytes(), "4".getBytes()); + p.addColumn("f".getBytes(), "c8".getBytes(), "5".getBytes()); + p.addColumn("f2".getBytes(), "c9".getBytes(), "6".getBytes()); + table.mutate(p); p = new Put("b7".getBytes()); - p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); - p.add("f".getBytes(), "c2".getBytes(), "2".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "2".getBytes()); + table.mutate(p); - table.flushCommits(); table.close(); } - public static void generateHBaseDataset2(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDataset2(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -143,7 +148,7 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); int rowCount = 0; byte[] bytes = null; @@ -156,9 +161,9 @@ public class TestTableGenerator { Put p = new Put((""+rowKeyChar+iteration).getBytes()); for (int j = 1; j <= numColumns; j++) { bytes = new byte[5000]; random.nextBytes(bytes); - p.add("f".getBytes(), ("c"+j).getBytes(), bytes); + p.addColumn("f".getBytes(), ("c"+j).getBytes(), bytes); } - table.put(p); + table.mutate(p); ++rowKeyChar; ++rowCount; @@ -166,13 +171,12 @@ public class TestTableGenerator { ++iteration; } - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDataset3(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDataset3(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -187,34 +191,33 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); for (int i = 0; i <= 100; ++i) { Put p = new Put((String.format("%03d", i)).getBytes()); - p.add(FAMILY_F, COLUMN_C, String.format("value %03d", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03d", i).getBytes()); + table.mutate(p); } for (int i = 0; i <= 1000; ++i) { Put p = new Put((String.format("%04d", i)).getBytes()); - p.add(FAMILY_F, COLUMN_C, String.format("value %04d", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %04d", i).getBytes()); + table.mutate(p); } Put p = new Put("%_AS_PREFIX_ROW1".getBytes()); - p.add(FAMILY_F, COLUMN_C, "dummy".getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, "dummy".getBytes()); + table.mutate(p); p = new Put("%_AS_PREFIX_ROW2".getBytes()); - p.add(FAMILY_F, COLUMN_C, "dummy".getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, "dummy".getBytes()); + table.mutate(p); - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDatasetCompositeKeyDate(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetCompositeKeyDate(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -229,7 +232,7 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); Date startDate = new Date(1408924800000L); long startTime = startDate.getTime(); @@ -246,15 +249,14 @@ public class TestTableGenerator { } Put p = new Put(rowKey); - p.add(FAMILY_F, COLUMN_C, "dummy".getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, "dummy".getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); } - public static void generateHBaseDatasetCompositeKeyTime(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetCompositeKeyTime(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -269,7 +271,7 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); long startTime = 0; long MILLISECONDS_IN_A_SEC = (long)1000; @@ -287,8 +289,8 @@ public class TestTableGenerator { } Put p = new Put(rowKey); - p.add(FAMILY_F, COLUMN_C, "dummy".getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, "dummy".getBytes()); + table.mutate(p); if (interval == smallInterval) { interval = largeInterval; @@ -297,11 +299,10 @@ public class TestTableGenerator { } } - table.flushCommits(); table.close(); } - public static void generateHBaseDatasetCompositeKeyInt(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetCompositeKeyInt(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -316,7 +317,7 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); int startVal = 0; int stopVal = 1000; @@ -330,15 +331,14 @@ public class TestTableGenerator { } Put p = new Put(rowKey); - p.add(FAMILY_F, COLUMN_C, "dummy".getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, "dummy".getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); } - public static void generateHBaseDatasetDoubleOB(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetDoubleOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -353,26 +353,23 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); for (double i = 0.5; i <= 100.00; i += 0.75) { - byte[] bytes = new byte[9]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 9); - org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, i, - org.apache.hadoop.hbase.util.Order.ASCENDING); + byte[] bytes = new byte[9]; + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); + OrderedBytes.encodeFloat64(br, i, Order.ASCENDING); Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDatasetFloatOB(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetFloatOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -387,60 +384,54 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); for (float i = (float)0.5; i <= 100.00; i += 0.75) { byte[] bytes = new byte[5]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 5); - org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, i, - org.apache.hadoop.hbase.util.Order.ASCENDING); + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); + OrderedBytes.encodeFloat32(br, i,Order.ASCENDING); Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDatasetBigIntOB(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetBigIntOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } - HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(FAMILY_F)); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY_F)); - if (numberRegions > 1) { - admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); - } else { - admin.createTable(desc); - } + if (numberRegions > 1) { + admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); + } else { + admin.createTable(desc); + } - HTable table = new HTable(admin.getConfiguration(), tableName); - long startTime = (long)1438034423 * 1000; - for (long i = startTime; i <= startTime + 100; i ++) { - byte[] bytes = new byte[9]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 9); - org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, i, - org.apache.hadoop.hbase.util.Order.ASCENDING); - Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); - table.put(p); - } + BufferedMutator table = conn.getBufferedMutator(tableName); + long startTime = (long)1438034423 * 1000; + for (long i = startTime; i <= startTime + 100; i ++) { + byte[] bytes = new byte[9]; + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); + OrderedBytes.encodeInt64(br, i, Order.ASCENDING); + Put p = new Put(bytes); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); + table.mutate(p); + } - table.flushCommits(); - table.close(); + table.close(); - admin.flush(tableName); + admin.flush(tableName); } - public static void generateHBaseDatasetIntOB(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetIntOB(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -455,26 +446,23 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); for (int i = -49; i <= 100; i ++) { byte[] bytes = new byte[5]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 5); - org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, i, - org.apache.hadoop.hbase.util.Order.ASCENDING); + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); + OrderedBytes.encodeInt32(br, i, Order.ASCENDING); Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDatasetDoubleOBDesc(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetDoubleOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -489,26 +477,23 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); for (double i = 0.5; i <= 100.00; i += 0.75) { - byte[] bytes = new byte[9]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 9); - org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, i, - org.apache.hadoop.hbase.util.Order.DESCENDING); + byte[] bytes = new byte[9]; + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); + OrderedBytes.encodeFloat64(br, i, Order.DESCENDING); Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDatasetFloatOBDesc(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetFloatOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -523,61 +508,55 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); for (float i = (float)0.5; i <= 100.00; i += 0.75) { byte[] bytes = new byte[5]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 5); - org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, i, - org.apache.hadoop.hbase.util.Order.DESCENDING); + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); + OrderedBytes.encodeFloat32(br, i, Order.DESCENDING); Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %03f", i).getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDatasetBigIntOBDesc(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetBigIntOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } - HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(FAMILY_F)); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY_F)); - if (numberRegions > 1) { - admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); - } else { - admin.createTable(desc); - } + if (numberRegions > 1) { + admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); + } else { + admin.createTable(desc); + } - HTable table = new HTable(admin.getConfiguration(), tableName); - long startTime = (long)1438034423 * 1000; - for (long i = startTime; i <= startTime + 100; i ++) { - byte[] bytes = new byte[9]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 9); - org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, i, - org.apache.hadoop.hbase.util.Order.DESCENDING); - Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); - table.put(p); - } + BufferedMutator table = conn.getBufferedMutator(tableName); + long startTime = (long)1438034423 * 1000; + for (long i = startTime; i <= startTime + 100; i ++) { + byte[] bytes = new byte[9]; + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 9); + OrderedBytes.encodeInt64(br, i, Order.DESCENDING); + Put p = new Put(bytes); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); + table.mutate(p); + } - table.flushCommits(); - table.close(); + table.close(); - admin.flush(tableName); + admin.flush(tableName); } - public static void generateHBaseDatasetIntOBDesc(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetIntOBDesc(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -592,26 +571,23 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); for (int i = -49; i <= 100; i ++) { byte[] bytes = new byte[5]; - org.apache.hadoop.hbase.util.PositionedByteRange br = - new org.apache.hadoop.hbase.util.SimplePositionedByteRange(bytes, 0, 5); - org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, i, - org.apache.hadoop.hbase.util.Order.DESCENDING); + PositionedByteRange br = new SimplePositionedMutableByteRange(bytes, 0, 5); + OrderedBytes.encodeInt32(br, i, Order.DESCENDING); Put p = new Put(bytes); - p.add(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); - table.put(p); + p.addColumn(FAMILY_F, COLUMN_C, String.format("value %d", i).getBytes()); + table.mutate(p); } - table.flushCommits(); table.close(); admin.flush(tableName); } - public static void generateHBaseDatasetNullStr(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + public static void generateHBaseDatasetNullStr(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception { if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); @@ -625,16 +601,16 @@ public class TestTableGenerator { admin.createTable(desc); } - HTable table = new HTable(admin.getConfiguration(), tableName); + BufferedMutator table = conn.getBufferedMutator(tableName); Put p = new Put("a1".getBytes()); - p.add("f".getBytes(), "c1".getBytes(), "".getBytes()); - p.add("f".getBytes(), "c2".getBytes(), "".getBytes()); - p.add("f".getBytes(), "c3".getBytes(), "5".getBytes()); - p.add("f".getBytes(), "c4".getBytes(), "".getBytes()); - table.put(p); + p.addColumn("f".getBytes(), "c1".getBytes(), "".getBytes()); + p.addColumn("f".getBytes(), "c2".getBytes(), "".getBytes()); + p.addColumn("f".getBytes(), "c3".getBytes(), "5".getBytes()); + p.addColumn("f".getBytes(), "c4".getBytes(), "".getBytes()); + table.mutate(p); - table.flushCommits(); table.close(); } + } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/test/Drill2130StorageHBaseHamcrestConfigurationTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/test/Drill2130StorageHBaseHamcrestConfigurationTest.java index b52654dd2..f1a21b028 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/test/Drill2130StorageHBaseHamcrestConfigurationTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/test/Drill2130StorageHBaseHamcrestConfigurationTest.java @@ -17,13 +17,15 @@ */ package org.apache.drill.hbase.test; -import org.junit.Test; +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.hamcrest.CoreMatchers.equalTo; + +import org.junit.Test; public class Drill2130StorageHBaseHamcrestConfigurationTest { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drill2130StorageHBaseHamcrestConfigurationTest.class); @SuppressWarnings("unused") private org.hamcrest.MatcherAssert forCompileTimeCheckForNewEnoughHamcrest; @@ -38,7 +40,7 @@ public class Drill2130StorageHBaseHamcrestConfigurationTest { + " Got NoSuchMethodError; e: " + e ); } catch ( AssertionError e ) { - System.out.println( "Class path seems fine re new JUnit vs. old Hamcrest." + logger.info("Class path seems fine re new JUnit vs. old Hamcrest." + " (Got AssertionError, not NoSuchMethodError.)" ); } } diff --git a/contrib/storage-hbase/src/test/resources/logback.xml b/contrib/storage-hbase/src/test/resources/logback.xml index c0a79a88a..8d3fc81c4 100644 --- a/contrib/storage-hbase/src/test/resources/logback.xml +++ b/contrib/storage-hbase/src/test/resources/logback.xml @@ -52,9 +52,17 @@ </logger> <logger name="org.apache.hadoop" additivity="false"> - <level value="info" /> + <level value="warn" /> + <appender-ref ref="STDOUT" /> + <appender-ref ref="SOCKET" /> + <appender-ref ref="FILE" /> + </logger> + + <logger name="org.apache.hadoop.hbase.client.ConnectionManager" additivity="false"> + <level value="error" /> <appender-ref ref="STDOUT" /> <appender-ref ref="SOCKET" /> + <appender-ref ref="FILE" /> </logger> <root> |