From 85344abd1ddb73448bdf67cdc6883cb98795a910 Mon Sep 17 00:00:00 2001 From: Vitalii Diravka Date: Thu, 19 Jul 2018 16:00:40 +0300 Subject: DRILL-6614: Allow usage of MapRDBFormatPlugin for HiveStoragePlugin --- .../drill/exec/store/mapr/db/MapRDBGroupScan.java | 6 +++--- ...vertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java | 10 ++-------- .../drill/exec/store/hive/HiveStoragePlugin.java | 21 +++++++++++++++++++++ .../drill/exec/store/AbstractStoragePlugin.java | 10 +++++++++- .../org/apache/drill/exec/store/StoragePlugin.java | 11 +++++++++++ .../drill/exec/store/StoragePluginRegistryImpl.java | 11 ++--------- .../drill/exec/store/dfs/FileSystemPlugin.java | 5 +++-- 7 files changed, 51 insertions(+), 23 deletions(-) diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java index 927bd70c3..1adbbee09 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java @@ -32,11 +32,11 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.AbstractStoragePlugin; -import org.apache.drill.exec.store.dfs.FileSystemConfig; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; @@ -249,8 +249,8 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan { } @JsonProperty("storage") - public FileSystemConfig getStorageConfig() { - return (FileSystemConfig) storagePlugin.getConfig(); + public StoragePluginConfig getStorageConfig() { + return storagePlugin.getConfig(); } @JsonIgnore diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java index 3bc33b331..9eb43a251 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java @@ -124,13 +124,6 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi Map parameters = hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters(); JsonScanSpec scanSpec = new JsonScanSpec(parameters.get(MAPRDB_TABLE_NAME), null); - MapRDBFormatPlugin mapRDBFormatPlugin = new MapRDBFormatPlugin( - "hive-maprdb", - hiveScan.getStoragePlugin().getContext(), - hiveScan.getHiveConf(), - hiveScan.getStoragePlugin().getConfig(), - new MapRDBFormatPluginConfig() - ); List hiveScanCols = hiveScanRel.getColumns().stream() .map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath)) .collect(Collectors.toList()); @@ -138,7 +131,8 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi new JsonTableGroupScan( hiveScan.getUserName(), hiveScan.getStoragePlugin(), - mapRDBFormatPlugin, + // TODO: We should use Hive format plugins here, once it will be implemented. DRILL-6621 + (MapRDBFormatPlugin) hiveScan.getStoragePlugin().getFormatPlugin(new MapRDBFormatPluginConfig()), scanSpec, hiveScanCols ); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index adf134843..bdd230e0c 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -37,6 +37,7 @@ import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; @@ -49,10 +50,13 @@ import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -62,6 +66,8 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class); + public static final String HIVE_MAPRDB_FORMAT_PLUGIN_NAME = "hive-maprdb"; + private final HiveStoragePluginConfig config; private HiveSchemaFactory schemaFactory; private final HiveConf hiveConf; @@ -216,4 +222,19 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { return ruleBuilder.build(); } + @Override + public FormatPlugin getFormatPlugin(FormatPluginConfig formatConfig) { + // TODO: implement formatCreator similar to FileSystemPlugin formatCreator. DRILL-6621 + if (formatConfig instanceof MapRDBFormatPluginConfig) { + try { + return new MapRDBFormatPlugin(HIVE_MAPRDB_FORMAT_PLUGIN_NAME, context, hiveConf, config, + (MapRDBFormatPluginConfig) formatConfig); + } catch (IOException e) { + throw new DrillRuntimeException("The error is occurred while connecting to MapR-DB", e); + } + } + throw new DrillRuntimeException(String.format("Hive storage plugin doesn't support usage of %s format plugin", + formatConfig.getClass().getName())); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java index d37a0a212..9818ff368 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.calcite.plan.RelOptRule; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.planner.PlannerPhase; @@ -31,13 +32,14 @@ import org.apache.drill.exec.planner.PlannerPhase; import com.google.common.collect.ImmutableSet; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.SessionOptionManager; +import org.apache.drill.exec.store.dfs.FormatPlugin; /** Abstract class for StorePlugin implementations. * See StoragePlugin for description of the interface intent and its methods. */ public abstract class AbstractStoragePlugin implements StoragePlugin { - private final DrillbitContext context; + protected final DrillbitContext context; private final String name; protected AbstractStoragePlugin(DrillbitContext inContext, String inName) { @@ -130,6 +132,11 @@ public abstract class AbstractStoragePlugin implements StoragePlugin { @Override public void close() throws Exception { } + @Override + public FormatPlugin getFormatPlugin(FormatPluginConfig config) { + throw new UnsupportedOperationException(String.format("%s doesn't support format plugins", getClass().getName())); + } + public DrillbitContext getContext() { return context; } @@ -137,4 +144,5 @@ public abstract class AbstractStoragePlugin implements StoragePlugin { public String getName() { return name; } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java index 2617065ad..7bd7eaf7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java @@ -24,10 +24,12 @@ import java.util.Set; import org.apache.calcite.plan.RelOptRule; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.server.options.SessionOptionManager; +import org.apache.drill.exec.store.dfs.FormatPlugin; /** Interface for all implementations of the storage plugins. Different implementations of the storage * formats will implement methods that indicate if Drill can write or read its tables from that format, @@ -101,4 +103,13 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable { * Initialize the storage plugin. The storage plugin will not be used until this method is called. */ void start() throws IOException; + + /** + * Allows to get the format plugin for current storage plugin based on appropriate format plugin config usage. + * + * @param config format plugin config + * @return format plugin instance + * @throws UnsupportedOperationException, if storage plugin doesn't support format plugins. + */ + FormatPlugin getFormatPlugin(FormatPluginConfig config); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java index 8e5fba449..03ce5a9b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java @@ -49,7 +49,6 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.planner.logical.StoragePlugins; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.ischema.InfoSchemaConfig; import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin; @@ -327,14 +326,8 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { @Override public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException { - StoragePlugin p = getPlugin(storageConfig); - if (!(p instanceof FileSystemPlugin)) { - throw new ExecutionSetupException( - String.format("You tried to request a format plugin for a storage plugin that wasn't of type " - + "FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName())); - } - FileSystemPlugin storage = (FileSystemPlugin) p; - return storage.getFormatPlugin(formatConfig); + StoragePlugin storagePlugin = getPlugin(storageConfig); + return storagePlugin.getFormatPlugin(formatConfig); } private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index e71e7e145..b1f41a414 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -152,6 +152,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin { * @param config format plugin configuration * @return format plugin for given configuration if found, null otherwise */ + @Override public FormatPlugin getFormatPlugin(FormatPluginConfig config) { if (config instanceof NamedFormatPluginConfig) { return formatCreator.getFormatPluginByName(((NamedFormatPluginConfig) config).name); @@ -167,9 +168,9 @@ public class FileSystemPlugin extends AbstractStoragePlugin { @Override public Set getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) { Builder setBuilder = ImmutableSet.builder(); - for(FormatPlugin plugin : formatCreator.getConfiguredFormatPlugins()){ + for (FormatPlugin plugin : formatCreator.getConfiguredFormatPlugins()) { Set rules = plugin.getOptimizerRules(); - if(rules != null && rules.size() > 0){ + if (rules != null && rules.size() > 0) { setBuilder.addAll(rules); } } -- cgit v1.2.3