diff options
author | Jacques Nadeau <jacques@apache.org> | 2015-01-06 21:52:40 -0800 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2015-01-09 08:15:53 -0800 |
commit | 7638dbb82606c9644d6ad02210fbfd5d8f6ae090 (patch) | |
tree | 1f60e9e4fd1efb63ae634264cf11da3a4aa82be4 /exec/java-exec | |
parent | 1552c96f4ee511af851cb241d0eb4bf3ec4ac4c7 (diff) |
DRILL-1947: Cache PStore/EStore instances rather than recreating on each need. As part of this, make sure that PStoreConfig doesn't use identity equality.
Diffstat (limited to 'exec/java-exec')
16 files changed, 257 insertions, 35 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 4b9b20d2c..67342c4d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.rest.DrillRestServer; import org.apache.drill.exec.service.ServiceEngine; +import org.apache.drill.exec.store.sys.CachingStoreProvider; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.store.sys.PStoreRegistry; import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; @@ -106,7 +107,7 @@ public class Drillbit implements Closeable{ if(serviceSet != null) { this.coord = serviceSet.getCoordinator(); - this.storeProvider = new LocalPStoreProvider(config); + this.storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config)); } else { Runtime.getRuntime().addShutdownHook(new ShutdownThread(config)); this.coord = new ZKClusterCoordinator(config); @@ -175,7 +176,11 @@ public class Drillbit implements Closeable{ logger.warn("Failure while shutting down embedded jetty server."); } Closeables.closeQuietly(engine); - Closeables.closeQuietly(storeProvider); + try{ + storeProvider.close(); + }catch(Exception e){ + logger.warn("Failure while closing store provider.", e); + } Closeables.closeQuietly(coord); Closeables.closeQuietly(manager); Closeables.closeQuietly(context); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java new file mode 100644 index 000000000..68440cbcd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + +import org.apache.drill.exec.store.sys.PStoreConfig.Mode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class CachingStoreProvider implements PStoreProvider, AutoCloseable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class); + + private final ConcurrentMap<PStoreConfig<?>, PStore<?>> storeCache = Maps.newConcurrentMap(); + private final PStoreProvider provider; + + public CachingStoreProvider(PStoreProvider provider) { + super(); + this.provider = provider; + } + + @SuppressWarnings("unchecked") + public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException { + PStore<?> s = storeCache.get(config); + if(s == null){ + PStore<?> newStore = provider.getStore(config); + s = storeCache.putIfAbsent(config, newStore); + if(s == null){ + s = newStore; + }else{ + newStore.close(); + } + } + + return (PStore<V>) s; + + } + + @Override + public void start() throws IOException { + provider.start(); + } + + @Override + public void close() throws Exception { + for(PStore<?> store : storeCache.values()){ + store.close(); + } + storeCache.clear(); + provider.close(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java index b09c5b4c3..4c79a28ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java @@ -18,12 +18,10 @@ package org.apache.drill.exec.store.sys; -import java.io.IOException; /** * Interface to define the provider which return EStore. */ -public interface EStoreProvider { - public <V> PStore<V> getStore(PStoreConfig<V> table) throws IOException; +public interface EStoreProvider extends PStoreProvider { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java index 26c00ea3e..b6296453f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.sys; import java.util.Map; + /** * Interface for reading and writing values to a persistent storage provider. Iterators are guaranteed to be returned in key order. * @param <V> @@ -28,4 +29,5 @@ public interface PStore<V> extends Iterable<Map.Entry<String, V>> { public void put(String key, V value); public boolean putIfAbsent(String key, V value); public void delete(String key); + public void close(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java index 83c22430f..bd9d97739 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java @@ -116,4 +116,51 @@ public class PStoreConfig<V> { } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + maxIteratorSize; + result = prime * result + ((mode == null) ? 0 : mode.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((valueSerializer == null) ? 0 : valueSerializer.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PStoreConfig other = (PStoreConfig) obj; + if (maxIteratorSize != other.maxIteratorSize) { + return false; + } + if (mode != other.mode) { + return false; + } + if (name == null) { + if (other.name != null) { + return false; + } + } else if (!name.equals(other.name)) { + return false; + } + if (valueSerializer == null) { + if (other.valueSerializer != null) { + return false; + } + } else if (!valueSerializer.equals(other.valueSerializer)) { + return false; + } + return true; + } + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java index 6371dfabe..efa223eb5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java @@ -17,11 +17,9 @@ */ package org.apache.drill.exec.store.sys; -import java.io.Closeable; import java.io.IOException; -public interface PStoreProvider extends AutoCloseable, Closeable{ - +public interface PStoreProvider extends AutoCloseable { public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException; public void start() throws IOException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java index 580d20ad6..532e6bec3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java @@ -53,7 +53,7 @@ public class PStoreRegistry { logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName); Class<? extends PStoreProvider> storeProviderClass = (Class<? extends PStoreProvider>) Class.forName(storeProviderClassName); Constructor<? extends PStoreProvider> c = storeProviderClass.getConstructor(PStoreRegistry.class); - return c.newInstance(this); + return new CachingStoreProvider(c.newInstance(this)); } catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { logger.error(e.getMessage(), e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java index 416a21a9f..40f25e770 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java @@ -228,4 +228,8 @@ public class FilePStore<V> implements PStore<V> { } } + @Override + public void close() { + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java index 094d093a3..e7c2f9487 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.store.sys.local; import java.io.IOException; -import java.util.concurrent.ConcurrentMap; import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.EStoreProvider; @@ -27,24 +26,22 @@ import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreConfig.Mode; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; public class LocalEStoreProvider implements EStoreProvider{ - private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = Maps.newConcurrentMap(); @Override public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException { Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral."); - if (! (estores.containsKey(storeConfig)) ) { - EStore<V> p = new MapEStore<V>(); - EStore<?> p2 = estores.putIfAbsent(storeConfig, p); - if(p2 != null) { - return (EStore<V>) p2; - } - return p; - } else { - return (EStore<V>) estores.get(storeConfig); - } + return new MapEStore<V>(); } + + @Override + public void start() throws IOException { + } + + @Override + public void close() { + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java index 272391618..96e51e6f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java @@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; * Implementation of EStore using ConcurrentHashMap. * @param <V> */ -public class MapEStore <V> implements EStore<V> { +public class MapEStore<V> implements EStore<V> { ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>(); @Override @@ -57,4 +57,8 @@ public class MapEStore <V> implements EStore<V> { V out = store.putIfAbsent(key, value); return out == null; } + + @Override + public void close() { + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java index 71a41f00c..c67561895 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java @@ -62,4 +62,8 @@ public class NoWriteLocalPStore<V> implements PStore<V>{ blobMap.remove(key); } + @Override + public void close() { + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java index b8a5cdd9b..53452f30c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java @@ -43,4 +43,44 @@ public class JacksonSerializer<X> implements PClassSerializer<X> { public X deserialize(byte[] bytes) throws IOException { return reader.readValue(bytes); } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((reader == null) ? 0 : reader.hashCode()); + result = prime * result + ((writer == null) ? 0 : writer.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JacksonSerializer other = (JacksonSerializer) obj; + if (reader == null) { + if (other.reader != null) { + return false; + } + } else if (!reader.equals(other.reader)) { + return false; + } + if (writer == null) { + if (other.writer != null) { + return false; + } + } else if (!writer.equals(other.writer)) { + return false; + } + return true; + } + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java index 1ea714e9a..52df7a447 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java @@ -51,5 +51,43 @@ public class ProtoSerializer<X, B extends Message.Builder> implements PClassSeri return (X) b.build(); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((readSchema == null) ? 0 : readSchema.hashCode()); + result = prime * result + ((writeSchema == null) ? 0 : writeSchema.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ProtoSerializer other = (ProtoSerializer) obj; + if (readSchema == null) { + if (other.readSchema != null) { + return false; + } + } else if (!readSchema.equals(other.readSchema)) { + return false; + } + if (writeSchema == null) { + if (other.writeSchema != null) { + return false; + } + } else if (!writeSchema.equals(other.writeSchema)) { + return false; + } + return true; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java index d61f3b450..01059a4bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java @@ -27,9 +27,8 @@ import java.util.Map.Entry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; +import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.zookeeper.CreateMode; @@ -40,7 +39,8 @@ import com.google.common.collect.Lists; * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store) * @param <V> */ -public abstract class ZkAbstractStore<V> { +public abstract class ZkAbstractStore<V> implements AutoCloseable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class); protected CuratorFramework framework; protected PStoreConfig<V> config; @@ -206,4 +206,15 @@ public abstract class ZkAbstractStore<V> { } } + + @Override + public void close() { + try{ + childrenCache.close(); + }catch(IOException e){ + logger.warn("Failure while closing out abstract store.", e); + } + } + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java index 1c2c3fd72..7d7d47565 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java @@ -40,4 +40,12 @@ public class ZkEStoreProvider implements EStoreProvider{ Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL); return new ZkEStore<V>(curator,store); } + + @Override + public void start() throws IOException { + } + + @Override + public void close() throws Exception { + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java index 03d244190..f8fa2bca2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.sys.zk; -import java.io.File; import java.io.IOException; import org.apache.curator.framework.CuratorFramework; @@ -26,7 +25,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; -import org.apache.drill.exec.store.sys.EStore; +import org.apache.drill.exec.store.sys.EStoreProvider; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; @@ -45,7 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider { private final DrillFileSystem fs; private final Path blobRoot; - private final ZkEStoreProvider zkEStoreProvider; + private final EStoreProvider zkEStoreProvider; public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException { ClusterCoordinator coord = registry.getClusterCoordinator(); @@ -66,7 +65,6 @@ public class ZkPStoreProvider implements PStoreProvider { throw new DrillbitStartupException("Failure while attempting to set up blob store.", e); } - this.zkEStoreProvider = new ZkEStoreProvider(curator); } @@ -79,11 +77,6 @@ public class ZkPStoreProvider implements PStoreProvider { } @Override - public void close() { - } - - - @Override public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException { switch(config.getMode()){ case BLOB_PERSISTENT: @@ -101,4 +94,7 @@ public class ZkPStoreProvider implements PStoreProvider { public void start() { } + @Override + public void close() { + } } |