diff options
author | Martijn van Groningen <martijn.v.groningen@gmail.com> | 2017-02-23 21:51:33 +0100 |
---|---|---|
committer | Martijn van Groningen <martijn.v.groningen@gmail.com> | 2017-02-24 08:52:27 +0100 |
commit | 211d50f7b88f682320556f4fcd0d443c7f5d50a9 (patch) | |
tree | 6ff8183933d942e55e73d972f2311d569e8cd10a | |
parent | 57b5d1d29b60dd1842546f723e013fe2c21716d2 (diff) |
[INGEST] Lazy load the geoip databases.
Load the geoip database the first time a pipeline gets created that has a geoip processor.
This saves memory (measured ~150MB for the city db) in cases when the plugin is installed, but not used.
4 files changed, 134 insertions, 36 deletions
diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java new file mode 100644 index 0000000000..f73d2ca13c --- /dev/null +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest.geoip; + +import com.maxmind.geoip2.DatabaseReader; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.logging.Loggers; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used, + * no memory is being wasted on the database reader. + */ +final class DatabaseReaderLazyLoader implements Closeable { + + private static final Logger LOGGER = Loggers.getLogger(DatabaseReaderLazyLoader.class); + + private final String databaseFileName; + private final CheckedSupplier<DatabaseReader, IOException> loader; + // package protected for testing only: + final SetOnce<DatabaseReader> databaseReader; + + DatabaseReaderLazyLoader(String databaseFileName, CheckedSupplier<DatabaseReader, IOException> loader) { + this.databaseFileName = databaseFileName; + this.loader = loader; + this.databaseReader = new SetOnce<>(); + } + + synchronized DatabaseReader get() throws IOException { + if (databaseReader.get() == null) { + databaseReader.set(loader.get()); + LOGGER.debug("Loaded [{}] geoip database", databaseFileName); + } + return databaseReader.get(); + } + + @Override + public synchronized void close() throws IOException { + IOUtils.close(databaseReader.get()); + } +} diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 3d1418dc94..2cbaa7a3bb 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -19,19 +19,6 @@ package org.elasticsearch.ingest.geoip; -import java.io.IOException; -import java.net.InetAddress; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; - import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.model.CityResponse; @@ -49,6 +36,19 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; +import java.io.IOException; +import java.net.InetAddress; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList; @@ -264,9 +264,9 @@ public final class GeoIpProcessor extends AbstractProcessor { ); static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE); - private final Map<String, DatabaseReader> databaseReaders; + private final Map<String, DatabaseReaderLazyLoader> databaseReaders; - public Factory(Map<String, DatabaseReader> databaseReaders) { + public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) { this.databaseReaders = databaseReaders; } @@ -279,12 +279,13 @@ public final class GeoIpProcessor extends AbstractProcessor { List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); - DatabaseReader databaseReader = databaseReaders.get(databaseFile); - if (databaseReader == null) { + DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile); + if (lazyLoader == null) { throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist"); } + DatabaseReader databaseReader = lazyLoader.get(); String databaseType = databaseReader.getMetadata().getDatabaseType(); final Set<Property> properties; diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 4e5cc5c023..1571bc99ea 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -19,6 +19,15 @@ package org.elasticsearch.ingest.geoip; +import com.maxmind.db.NoCache; +import com.maxmind.db.NodeCache; +import com.maxmind.geoip2.DatabaseReader; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; + import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -35,20 +44,11 @@ import java.util.Map; import java.util.stream.Stream; import java.util.zip.GZIPInputStream; -import com.maxmind.db.NoCache; -import com.maxmind.db.NodeCache; -import com.maxmind.geoip2.DatabaseReader; -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.Plugin; - public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { public static final Setting<Long> CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope); - private Map<String, DatabaseReader> databaseReaders; + private Map<String, DatabaseReaderLazyLoader> databaseReaders; @Override public List<Setting<?>> getSettings() { @@ -76,12 +76,12 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders)); } - static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException { + static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException { if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); } - Map<String, DatabaseReader> databaseReaders = new HashMap<>(); + Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>(); try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) { PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz"); // Use iterator instead of forEach otherwise IOException needs to be caught twice... @@ -89,10 +89,13 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable while (iterator.hasNext()) { Path databasePath = iterator.next(); if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { - try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) { - databaseReaders.put(databasePath.getFileName().toString(), - new DatabaseReader.Builder(inputStream).withCache(cache).build()); - } + String databaseFileName = databasePath.getFileName().toString(); + DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, () -> { + try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) { + return new DatabaseReader.Builder(inputStream).withCache(cache).build(); + } + }); + databaseReaders.put(databaseFileName, holder); } } } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 0c80bcc71f..8db0d15f79 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.ingest.geoip; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.maxmind.db.NoCache; import com.maxmind.db.NodeCache; -import com.maxmind.geoip2.DatabaseReader; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Randomness; import org.elasticsearch.test.ESTestCase; @@ -48,7 +47,7 @@ import static org.hamcrest.Matchers.sameInstance; public class GeoIpProcessorFactoryTests extends ESTestCase { - private static Map<String, DatabaseReader> databaseReaders; + private static Map<String, DatabaseReaderLazyLoader> databaseReaders; @BeforeClass public static void loadDatabaseReaders() throws IOException { @@ -66,7 +65,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { @AfterClass public static void closeDatabaseReaders() throws IOException { - for (DatabaseReader reader : databaseReaders.values()) { + for (DatabaseReaderLazyLoader reader : databaseReaders.values()) { reader.close(); } databaseReaders = null; @@ -222,4 +221,37 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]")); } } + + public void testLazyLoading() throws Exception { + Path configDir = createTempDir(); + Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + Files.createDirectories(geoIpConfigDir); + Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")), + geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz")); + Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")), + geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz")); + + // Loading another database reader instances, because otherwise we can't test lazy loading as the the + // database readers used at class level are reused between tests. (we want to keep that otherwise running this + // test will take roughly 4 times more time) + Map<String, DatabaseReaderLazyLoader> databaseReaders = + IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, NoCache.getInstance()); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { + assertNull(lazyLoader.databaseReader.get()); + } + + Map<String, Object> config = new HashMap<>(); + config.put("field", "_field"); + config.put("database_file", "GeoLite2-City.mmdb.gz"); + factory.create(null, "_tag", config); + config = new HashMap<>(); + config.put("field", "_field"); + config.put("database_file", "GeoLite2-Country.mmdb.gz"); + factory.create(null, "_tag", config); + + for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { + assertNotNull(lazyLoader.databaseReader.get()); + } + } } |