summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartijn van Groningen <martijn.v.groningen@gmail.com>2017-02-23 21:51:33 +0100
committerMartijn van Groningen <martijn.v.groningen@gmail.com>2017-02-24 08:52:27 +0100
commit211d50f7b88f682320556f4fcd0d443c7f5d50a9 (patch)
tree6ff8183933d942e55e73d972f2311d569e8cd10a
parent57b5d1d29b60dd1842546f723e013fe2c21716d2 (diff)
[INGEST] Lazy load the geoip databases.
Load the geoip database the first time a pipeline gets created that has a geoip processor. This saves memory (measured ~150MB for the city db) in cases when the plugin is installed, but not used.
-rw-r--r--plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java62
-rw-r--r--plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java35
-rw-r--r--plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java35
-rw-r--r--plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java38
4 files changed, 134 insertions, 36 deletions
diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java
new file mode 100644
index 0000000000..f73d2ca13c
--- /dev/null
+++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.ingest.geoip;
+
+import com.maxmind.geoip2.DatabaseReader;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.common.CheckedSupplier;
+import org.elasticsearch.common.logging.Loggers;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
+ * no memory is being wasted on the database reader.
+ */
+final class DatabaseReaderLazyLoader implements Closeable {
+
+ private static final Logger LOGGER = Loggers.getLogger(DatabaseReaderLazyLoader.class);
+
+ private final String databaseFileName;
+ private final CheckedSupplier<DatabaseReader, IOException> loader;
+ // package protected for testing only:
+ final SetOnce<DatabaseReader> databaseReader;
+
+ DatabaseReaderLazyLoader(String databaseFileName, CheckedSupplier<DatabaseReader, IOException> loader) {
+ this.databaseFileName = databaseFileName;
+ this.loader = loader;
+ this.databaseReader = new SetOnce<>();
+ }
+
+ synchronized DatabaseReader get() throws IOException {
+ if (databaseReader.get() == null) {
+ databaseReader.set(loader.get());
+ LOGGER.debug("Loaded [{}] geoip database", databaseFileName);
+ }
+ return databaseReader.get();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ IOUtils.close(databaseReader.get());
+ }
+}
diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java
index 3d1418dc94..2cbaa7a3bb 100644
--- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java
+++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java
@@ -19,19 +19,6 @@
package org.elasticsearch.ingest.geoip;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.CityResponse;
@@ -49,6 +36,19 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
@@ -264,9 +264,9 @@ public final class GeoIpProcessor extends AbstractProcessor {
);
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE);
- private final Map<String, DatabaseReader> databaseReaders;
+ private final Map<String, DatabaseReaderLazyLoader> databaseReaders;
- public Factory(Map<String, DatabaseReader> databaseReaders) {
+ public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
this.databaseReaders = databaseReaders;
}
@@ -279,12 +279,13 @@ public final class GeoIpProcessor extends AbstractProcessor {
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
- DatabaseReader databaseReader = databaseReaders.get(databaseFile);
- if (databaseReader == null) {
+ DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile);
+ if (lazyLoader == null) {
throw newConfigurationException(TYPE, processorTag,
"database_file", "database file [" + databaseFile + "] doesn't exist");
}
+ DatabaseReader databaseReader = lazyLoader.get();
String databaseType = databaseReader.getMetadata().getDatabaseType();
final Set<Property> properties;
diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java
index 4e5cc5c023..1571bc99ea 100644
--- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java
+++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java
@@ -19,6 +19,15 @@
package org.elasticsearch.ingest.geoip;
+import com.maxmind.db.NoCache;
+import com.maxmind.db.NodeCache;
+import com.maxmind.geoip2.DatabaseReader;
+import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.plugins.IngestPlugin;
+import org.elasticsearch.plugins.Plugin;
+
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -35,20 +44,11 @@ import java.util.Map;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
-import com.maxmind.db.NoCache;
-import com.maxmind.db.NodeCache;
-import com.maxmind.geoip2.DatabaseReader;
-import org.apache.lucene.util.IOUtils;
-import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.ingest.Processor;
-import org.elasticsearch.plugins.IngestPlugin;
-import org.elasticsearch.plugins.Plugin;
-
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
public static final Setting<Long> CACHE_SIZE =
Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);
- private Map<String, DatabaseReader> databaseReaders;
+ private Map<String, DatabaseReaderLazyLoader> databaseReaders;
@Override
public List<Setting<?>> getSettings() {
@@ -76,12 +76,12 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
}
- static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
+ static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}
- Map<String, DatabaseReader> databaseReaders = new HashMap<>();
+ Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz");
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
@@ -89,10 +89,13 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
while (iterator.hasNext()) {
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
- try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
- databaseReaders.put(databasePath.getFileName().toString(),
- new DatabaseReader.Builder(inputStream).withCache(cache).build());
- }
+ String databaseFileName = databasePath.getFileName().toString();
+ DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, () -> {
+ try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
+ return new DatabaseReader.Builder(inputStream).withCache(cache).build();
+ }
+ });
+ databaseReaders.put(databaseFileName, holder);
}
}
}
diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java
index 0c80bcc71f..8db0d15f79 100644
--- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java
+++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java
@@ -22,7 +22,6 @@ package org.elasticsearch.ingest.geoip;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.maxmind.db.NoCache;
import com.maxmind.db.NodeCache;
-import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.test.ESTestCase;
@@ -48,7 +47,7 @@ import static org.hamcrest.Matchers.sameInstance;
public class GeoIpProcessorFactoryTests extends ESTestCase {
- private static Map<String, DatabaseReader> databaseReaders;
+ private static Map<String, DatabaseReaderLazyLoader> databaseReaders;
@BeforeClass
public static void loadDatabaseReaders() throws IOException {
@@ -66,7 +65,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
@AfterClass
public static void closeDatabaseReaders() throws IOException {
- for (DatabaseReader reader : databaseReaders.values()) {
+ for (DatabaseReaderLazyLoader reader : databaseReaders.values()) {
reader.close();
}
databaseReaders = null;
@@ -222,4 +221,37 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));
}
}
+
+ public void testLazyLoading() throws Exception {
+ Path configDir = createTempDir();
+ Path geoIpConfigDir = configDir.resolve("ingest-geoip");
+ Files.createDirectories(geoIpConfigDir);
+ Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")),
+ geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
+ Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
+ geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
+
+ // Loading another database reader instances, because otherwise we can't test lazy loading as the the
+ // database readers used at class level are reused between tests. (we want to keep that otherwise running this
+ // test will take roughly 4 times more time)
+ Map<String, DatabaseReaderLazyLoader> databaseReaders =
+ IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, NoCache.getInstance());
+ GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+ for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
+ assertNull(lazyLoader.databaseReader.get());
+ }
+
+ Map<String, Object> config = new HashMap<>();
+ config.put("field", "_field");
+ config.put("database_file", "GeoLite2-City.mmdb.gz");
+ factory.create(null, "_tag", config);
+ config = new HashMap<>();
+ config.put("field", "_field");
+ config.put("database_file", "GeoLite2-Country.mmdb.gz");
+ factory.create(null, "_tag", config);
+
+ for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
+ assertNotNull(lazyLoader.databaseReader.get());
+ }
+ }
}