aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2015-01-06 21:52:40 -0800
committerJacques Nadeau <jacques@apache.org>2015-01-09 08:15:53 -0800
commit7638dbb82606c9644d6ad02210fbfd5d8f6ae090 (patch)
tree1f60e9e4fd1efb63ae634264cf11da3a4aa82be4 /exec/java-exec
parent1552c96f4ee511af851cb241d0eb4bf3ec4ac4c7 (diff)
DRILL-1947: Cache PStore/EStore instances rather than recreating on each need. As part of this, make sure that PStoreConfig doesn't use identity equality.
Diffstat (limited to 'exec/java-exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java38
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java14
16 files changed, 257 insertions, 35 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 4b9b20d2c..67342c4d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.rest.DrillRestServer;
import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.store.sys.CachingStoreProvider;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.store.sys.PStoreRegistry;
import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
@@ -106,7 +107,7 @@ public class Drillbit implements Closeable{
if(serviceSet != null) {
this.coord = serviceSet.getCoordinator();
- this.storeProvider = new LocalPStoreProvider(config);
+ this.storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
} else {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
this.coord = new ZKClusterCoordinator(config);
@@ -175,7 +176,11 @@ public class Drillbit implements Closeable{
logger.warn("Failure while shutting down embedded jetty server.");
}
Closeables.closeQuietly(engine);
- Closeables.closeQuietly(storeProvider);
+ try{
+ storeProvider.close();
+ }catch(Exception e){
+ logger.warn("Failure while closing store provider.", e);
+ }
Closeables.closeQuietly(coord);
Closeables.closeQuietly(manager);
Closeables.closeQuietly(context);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
new file mode 100644
index 000000000..68440cbcd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class CachingStoreProvider implements PStoreProvider, AutoCloseable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class);
+
+ private final ConcurrentMap<PStoreConfig<?>, PStore<?>> storeCache = Maps.newConcurrentMap();
+ private final PStoreProvider provider;
+
+ public CachingStoreProvider(PStoreProvider provider) {
+ super();
+ this.provider = provider;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+ PStore<?> s = storeCache.get(config);
+ if(s == null){
+ PStore<?> newStore = provider.getStore(config);
+ s = storeCache.putIfAbsent(config, newStore);
+ if(s == null){
+ s = newStore;
+ }else{
+ newStore.close();
+ }
+ }
+
+ return (PStore<V>) s;
+
+ }
+
+ @Override
+ public void start() throws IOException {
+ provider.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ for(PStore<?> store : storeCache.values()){
+ store.close();
+ }
+ storeCache.clear();
+ provider.close();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
index b09c5b4c3..4c79a28ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
@@ -18,12 +18,10 @@
package org.apache.drill.exec.store.sys;
-import java.io.IOException;
/**
* Interface to define the provider which return EStore.
*/
-public interface EStoreProvider {
- public <V> PStore<V> getStore(PStoreConfig<V> table) throws IOException;
+public interface EStoreProvider extends PStoreProvider {
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
index 26c00ea3e..b6296453f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.sys;
import java.util.Map;
+
/**
* Interface for reading and writing values to a persistent storage provider. Iterators are guaranteed to be returned in key order.
* @param <V>
@@ -28,4 +29,5 @@ public interface PStore<V> extends Iterable<Map.Entry<String, V>> {
public void put(String key, V value);
public boolean putIfAbsent(String key, V value);
public void delete(String key);
+ public void close();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
index 83c22430f..bd9d97739 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
@@ -116,4 +116,51 @@ public class PStoreConfig<V> {
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + maxIteratorSize;
+ result = prime * result + ((mode == null) ? 0 : mode.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((valueSerializer == null) ? 0 : valueSerializer.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ PStoreConfig other = (PStoreConfig) obj;
+ if (maxIteratorSize != other.maxIteratorSize) {
+ return false;
+ }
+ if (mode != other.mode) {
+ return false;
+ }
+ if (name == null) {
+ if (other.name != null) {
+ return false;
+ }
+ } else if (!name.equals(other.name)) {
+ return false;
+ }
+ if (valueSerializer == null) {
+ if (other.valueSerializer != null) {
+ return false;
+ }
+ } else if (!valueSerializer.equals(other.valueSerializer)) {
+ return false;
+ }
+ return true;
+ }
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
index 6371dfabe..efa223eb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
@@ -17,11 +17,9 @@
*/
package org.apache.drill.exec.store.sys;
-import java.io.Closeable;
import java.io.IOException;
-public interface PStoreProvider extends AutoCloseable, Closeable{
-
+public interface PStoreProvider extends AutoCloseable {
public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException;
public void start() throws IOException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
index 580d20ad6..532e6bec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
@@ -53,7 +53,7 @@ public class PStoreRegistry {
logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName);
Class<? extends PStoreProvider> storeProviderClass = (Class<? extends PStoreProvider>) Class.forName(storeProviderClassName);
Constructor<? extends PStoreProvider> c = storeProviderClass.getConstructor(PStoreRegistry.class);
- return c.newInstance(this);
+ return new CachingStoreProvider(c.newInstance(this));
} catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException
| InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
logger.error(e.getMessage(), e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
index 416a21a9f..40f25e770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -228,4 +228,8 @@ public class FilePStore<V> implements PStore<V> {
}
}
+ @Override
+ public void close() {
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
index 094d093a3..e7c2f9487 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
@@ -19,7 +19,6 @@
package org.apache.drill.exec.store.sys.local;
import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.EStoreProvider;
@@ -27,24 +26,22 @@ import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
public class LocalEStoreProvider implements EStoreProvider{
- private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = Maps.newConcurrentMap();
@Override
public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral.");
- if (! (estores.containsKey(storeConfig)) ) {
- EStore<V> p = new MapEStore<V>();
- EStore<?> p2 = estores.putIfAbsent(storeConfig, p);
- if(p2 != null) {
- return (EStore<V>) p2;
- }
- return p;
- } else {
- return (EStore<V>) estores.get(storeConfig);
- }
+ return new MapEStore<V>();
}
+
+ @Override
+ public void start() throws IOException {
+ }
+
+ @Override
+ public void close() {
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
index 272391618..96e51e6f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
* Implementation of EStore using ConcurrentHashMap.
* @param <V>
*/
-public class MapEStore <V> implements EStore<V> {
+public class MapEStore<V> implements EStore<V> {
ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
@Override
@@ -57,4 +57,8 @@ public class MapEStore <V> implements EStore<V> {
V out = store.putIfAbsent(key, value);
return out == null;
}
+
+ @Override
+ public void close() {
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
index 71a41f00c..c67561895 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
@@ -62,4 +62,8 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
blobMap.remove(key);
}
+ @Override
+ public void close() {
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
index b8a5cdd9b..53452f30c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
@@ -43,4 +43,44 @@ public class JacksonSerializer<X> implements PClassSerializer<X> {
public X deserialize(byte[] bytes) throws IOException {
return reader.readValue(bytes);
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((reader == null) ? 0 : reader.hashCode());
+ result = prime * result + ((writer == null) ? 0 : writer.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ JacksonSerializer other = (JacksonSerializer) obj;
+ if (reader == null) {
+ if (other.reader != null) {
+ return false;
+ }
+ } else if (!reader.equals(other.reader)) {
+ return false;
+ }
+ if (writer == null) {
+ if (other.writer != null) {
+ return false;
+ }
+ } else if (!writer.equals(other.writer)) {
+ return false;
+ }
+ return true;
+ }
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
index 1ea714e9a..52df7a447 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
@@ -51,5 +51,43 @@ public class ProtoSerializer<X, B extends Message.Builder> implements PClassSeri
return (X) b.build();
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((readSchema == null) ? 0 : readSchema.hashCode());
+ result = prime * result + ((writeSchema == null) ? 0 : writeSchema.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ProtoSerializer other = (ProtoSerializer) obj;
+ if (readSchema == null) {
+ if (other.readSchema != null) {
+ return false;
+ }
+ } else if (!readSchema.equals(other.readSchema)) {
+ return false;
+ }
+ if (writeSchema == null) {
+ if (other.writeSchema != null) {
+ return false;
+ }
+ } else if (!writeSchema.equals(other.writeSchema)) {
+ return false;
+ }
+ return true;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
index d61f3b450..01059a4bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
@@ -27,9 +27,8 @@ import java.util.Map.Entry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.zookeeper.CreateMode;
@@ -40,7 +39,8 @@ import com.google.common.collect.Lists;
* This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store)
* @param <V>
*/
-public abstract class ZkAbstractStore<V> {
+public abstract class ZkAbstractStore<V> implements AutoCloseable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class);
protected CuratorFramework framework;
protected PStoreConfig<V> config;
@@ -206,4 +206,15 @@ public abstract class ZkAbstractStore<V> {
}
}
+
+ @Override
+ public void close() {
+ try{
+ childrenCache.close();
+ }catch(IOException e){
+ logger.warn("Failure while closing out abstract store.", e);
+ }
+ }
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
index 1c2c3fd72..7d7d47565 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
@@ -40,4 +40,12 @@ public class ZkEStoreProvider implements EStoreProvider{
Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL);
return new ZkEStore<V>(curator,store);
}
+
+ @Override
+ public void start() throws IOException {
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index 03d244190..f8fa2bca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.sys.zk;
-import java.io.File;
import java.io.IOException;
import org.apache.curator.framework.CuratorFramework;
@@ -26,7 +25,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.EStoreProvider;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -45,7 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider {
private final DrillFileSystem fs;
private final Path blobRoot;
- private final ZkEStoreProvider zkEStoreProvider;
+ private final EStoreProvider zkEStoreProvider;
public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
ClusterCoordinator coord = registry.getClusterCoordinator();
@@ -66,7 +65,6 @@ public class ZkPStoreProvider implements PStoreProvider {
throw new DrillbitStartupException("Failure while attempting to set up blob store.", e);
}
-
this.zkEStoreProvider = new ZkEStoreProvider(curator);
}
@@ -79,11 +77,6 @@ public class ZkPStoreProvider implements PStoreProvider {
}
@Override
- public void close() {
- }
-
-
- @Override
public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
switch(config.getMode()){
case BLOB_PERSISTENT:
@@ -101,4 +94,7 @@ public class ZkPStoreProvider implements PStoreProvider {
public void start() {
}
+ @Override
+ public void close() {
+ }
}