summaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSimon Willnauer <simonw@apache.org>2016-01-09 21:50:17 +0100
committerSimon Willnauer <simonw@apache.org>2016-01-09 21:50:17 +0100
commit54d1e35d845bde4985d1b1557143d98f42c060c4 (patch)
tree26f32318e6c0419924347f459a0befc5dd0d6fd1 /core/src
parent2dbad1d65adfee7de8ed51a27d10d5be334862d4 (diff)
Cleanup IndexingOperationListeners infrastructure
This commit reduces the former ShardIndexinService to a simple stats/metrics class, moves IndexingSlowLog to the IndexService level since it can be shared across shards of an index and is now hidden behind IndexingOperationListener. IndexingOperationListener is now a first class citizen in IndexShard and is passed in from IndexService.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java2
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java2
-rw-r--r--core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java2
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/ClusterModule.java2
-rw-r--r--core/src/main/java/org/elasticsearch/index/IndexService.java13
-rw-r--r--core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java (renamed from core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java)12
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java1
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java2
-rw-r--r--core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java70
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java43
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java152
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java (renamed from core/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java)2
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java (renamed from core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java)79
-rw-r--r--core/src/main/java/org/elasticsearch/indices/IndicesService.java2
-rw-r--r--core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java2
-rw-r--r--core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java (renamed from core/src/test/java/org/elasticsearch/index/indexing/IndexingSlowLogTests.java)4
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java128
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java162
19 files changed, 448 insertions, 234 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java
index ebdc8b72c7..85644e8523 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java
@@ -32,7 +32,7 @@ import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
-import org.elasticsearch.index.indexing.IndexingStats;
+import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.recovery.RecoveryStats;
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index 2597695a1e..d7d40426a6 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
case NONE:
UpdateResponse updateResponse = translate.action();
- indexShard.indexingService().noopUpdate(updateRequest.type());
+ indexShard.noopUpdate(updateRequest.type());
return new UpdateResult(translate, updateResponse);
default:
throw new IllegalStateException("Illegal update operation " + translate.operation());
diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
index ad1ea759d2..9ba1f2d1ea 100644
--- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
+++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
@@ -269,7 +269,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
if (indexServiceOrNull != null) {
IndexShard shard = indexService.getShardOrNull(request.shardId());
if (shard != null) {
- shard.indexingService().noopUpdate(request.type());
+ shard.noopUpdate(request.type());
}
}
listener.onResponse(update);
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
index c0370248ad..5dce6d5757 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
@@ -67,7 +67,7 @@ import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineConfig;
-import org.elasticsearch.index.indexing.IndexingSlowLog;
+import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.search.stats.SearchSlowLog;
import org.elasticsearch.index.settings.IndexDynamicSettings;
import org.elasticsearch.index.shard.IndexShard;
diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java
index 853a1234bd..3d1a9f8ed7 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexService.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexService.java
@@ -45,7 +45,6 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
-import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.QueryShardContext;
@@ -68,6 +67,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -101,6 +101,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings;
+ private final IndexingSlowLog slowLog;
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
SimilarityService similarityService,
@@ -130,6 +131,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
+ this.slowLog = new IndexingSlowLog(indexSettings.getSettings());
}
public int numberOfShards() {
@@ -292,9 +294,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId)));
if (useShadowEngine(primary, indexSettings)) {
- indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider);
+ indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow engines don't index
} else {
- indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider);
+ indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, slowLog);
}
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
@@ -552,6 +554,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
} catch (Exception e) {
logger.warn("failed to refresh index store settings", e);
}
+ try {
+ slowLog.onRefreshSettings(settings); // this will be refactored soon anyway so duplication is ok here
+ } catch (Exception e) {
+ logger.warn("failed to refresh slowlog settings", e);
+ }
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java
index 292c2a16e9..5cd3685b2f 100644
--- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.elasticsearch.index.indexing;
+package org.elasticsearch.index;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
@@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.ParsedDocument;
+import org.elasticsearch.index.shard.IndexingOperationListener;
import java.io.IOException;
import java.util.Locale;
@@ -35,7 +36,7 @@ import java.util.concurrent.TimeUnit;
/**
*/
-public final class IndexingSlowLog {
+public final class IndexingSlowLog implements IndexingOperationListener {
private boolean reformat;
@@ -124,8 +125,9 @@ public final class IndexingSlowLog {
}
}
- void postIndex(Engine.Index index, long tookInNanos) {
- postIndexing(index.parsedDoc(), tookInNanos);
+ public void postIndex(Engine.Index index) {
+ final long took = index.endTime() - index.startTime();
+ postIndexing(index.parsedDoc(), took);
}
/**
@@ -192,4 +194,4 @@ public final class IndexingSlowLog {
return sb.toString();
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
index 35f1e066d3..4086bebfa8 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
@@ -30,7 +30,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 1fcbbee541..d93ce36686 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -55,12 +55,10 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
-import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java
deleted file mode 100644
index 39f3dd602f..0000000000
--- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.index.indexing;
-
-import org.elasticsearch.index.engine.Engine;
-
-/**
- * An indexing listener for indexing, delete, events.
- */
-public abstract class IndexingOperationListener {
-
- /**
- * Called before the indexing occurs.
- */
- public Engine.Index preIndex(Engine.Index operation) {
- return operation;
- }
-
- /**
- * Called after the indexing operation occurred.
- */
- public void postIndex(Engine.Index index) {
-
- }
-
- /**
- * Called after the indexing operation occurred with exception.
- */
- public void postIndex(Engine.Index index, Throwable ex) {
-
- }
-
- /**
- * Called before the delete occurs.
- */
- public Engine.Delete preDelete(Engine.Delete delete) {
- return delete;
- }
-
-
- /**
- * Called after the delete operation occurred.
- */
- public void postDelete(Engine.Delete delete) {
-
- }
-
- /**
- * Called after the delete operation occurred with exception.
- */
- public void postDelete(Engine.Delete delete, Throwable ex) {
-
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 54bd1cd844..db5e1f7e4a 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -81,8 +81,6 @@ import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
-import org.elasticsearch.index.indexing.IndexingStats;
-import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperService;
@@ -125,6 +123,8 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -143,7 +143,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexCache indexCache;
private final Store store;
private final MergeSchedulerConfig mergeSchedulerConfig;
- private final ShardIndexingService indexingService;
+ private final InternalIndexingStats internalIndexingStats;
private final ShardSearchStats searchService;
private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService;
@@ -167,7 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexEventListener indexEventListener;
private final IndexSettings idxSettings;
private final NodeServicesProvider provider;
-
private TimeValue refreshInterval;
private volatile ScheduledFuture<?> refreshScheduledFuture;
@@ -176,6 +175,8 @@ public class IndexShard extends AbstractIndexShardComponent {
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
+ private final IndexingOperationListener indexingOperationListeners;
+
@Nullable
private RecoveryState recoveryState;
@@ -215,7 +216,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory,
- IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider) {
+ IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.inactiveTime = settings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, settings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)));
@@ -232,7 +233,10 @@ public class IndexShard extends AbstractIndexShardComponent {
this.threadPool = provider.getThreadPool();
this.mapperService = mapperService;
this.indexCache = indexCache;
- this.indexingService = new ShardIndexingService(shardId, indexSettings);
+ this.internalIndexingStats = new InternalIndexingStats();
+ final List<IndexingOperationListener> listenersList = new ArrayList<>(Arrays.asList(listeners));
+ listenersList.add(internalIndexingStats);
+ this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
this.getService = new ShardGetService(indexSettings, this, mapperService);
this.termVectorsService = provider.getTermVectorsService();
this.searchService = new ShardSearchStats(settings);
@@ -285,10 +289,6 @@ public class IndexShard extends AbstractIndexShardComponent {
return true;
}
- public ShardIndexingService indexingService() {
- return this.indexingService;
- }
-
public ShardGetService getService() {
return this.getService;
}
@@ -489,7 +489,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public boolean index(Engine.Index index) {
ensureWriteAllowed(index);
markLastWrite();
- index = indexingService.preIndex(index);
+ index = indexingOperationListeners.preIndex(index);
final boolean created;
try {
if (logger.isTraceEnabled()) {
@@ -503,10 +503,10 @@ public class IndexShard extends AbstractIndexShardComponent {
}
index.endTime(System.nanoTime());
} catch (Throwable ex) {
- indexingService.postIndex(index, ex);
+ indexingOperationListeners.postIndex(index, ex);
throw ex;
}
- indexingService.postIndex(index);
+ indexingOperationListeners.postIndex(index);
return created;
}
@@ -532,7 +532,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public void delete(Engine.Delete delete) {
ensureWriteAllowed(delete);
markLastWrite();
- delete = indexingService.preDelete(delete);
+ delete = indexingOperationListeners.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
@@ -545,10 +545,10 @@ public class IndexShard extends AbstractIndexShardComponent {
}
delete.endTime(System.nanoTime());
} catch (Throwable ex) {
- indexingService.postDelete(delete, ex);
+ indexingOperationListeners.postDelete(delete, ex);
throw ex;
}
- indexingService.postDelete(delete);
+ indexingOperationListeners.postDelete(delete);
}
public Engine.GetResult get(Engine.Get get) {
@@ -600,7 +600,7 @@ public class IndexShard extends AbstractIndexShardComponent {
throttled = engine.isThrottled();
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
}
- return indexingService.stats(throttled, throttleTimeInMillis, types);
+ return internalIndexingStats.stats(throttled, throttleTimeInMillis, types);
}
public SearchStats searchStats(String... groups) {
@@ -1222,7 +1222,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings);
- indexingService.onRefreshSettings(settings);
if (change) {
getEngine().onSettingsChanged();
}
@@ -1258,6 +1257,14 @@ public class IndexShard extends AbstractIndexShardComponent {
return inactiveTime;
}
+ /**
+ * Should be called for each no-op update operation to increment relevant statistics.
+ * @param type the doc type of the update
+ */
+ public void noopUpdate(String type) {
+ internalIndexingStats.noopUpdate(type);
+ }
+
class EngineRefresher implements Runnable {
@Override
public void run() {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java
new file mode 100644
index 0000000000..e5d3574223
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.index.engine.Engine;
+
+import java.util.List;
+
+/**
+ * An indexing listener for indexing, delete, events.
+ */
+public interface IndexingOperationListener {
+
+ /**
+ * Called before the indexing occurs.
+ */
+ default Engine.Index preIndex(Engine.Index operation) {
+ return operation;
+ }
+
+ /**
+ * Called after the indexing operation occurred.
+ */
+ default void postIndex(Engine.Index index) {}
+
+ /**
+ * Called after the indexing operation occurred with exception.
+ */
+ default void postIndex(Engine.Index index, Throwable ex) {}
+
+ /**
+ * Called before the delete occurs.
+ */
+ default Engine.Delete preDelete(Engine.Delete delete) {
+ return delete;
+ }
+
+
+ /**
+ * Called after the delete operation occurred.
+ */
+ default void postDelete(Engine.Delete delete) {}
+
+ /**
+ * Called after the delete operation occurred with exception.
+ */
+ default void postDelete(Engine.Delete delete, Throwable ex) {}
+
+ /**
+ * A Composite listener that multiplexes calls to each of the listeners methods.
+ */
+ final class CompositeListener implements IndexingOperationListener{
+ private final List<IndexingOperationListener> listeners;
+ private final ESLogger logger;
+
+ public CompositeListener(List<IndexingOperationListener> listeners, ESLogger logger) {
+ this.listeners = listeners;
+ this.logger = logger;
+ }
+
+ @Override
+ public Engine.Index preIndex(Engine.Index operation) {
+ assert operation != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.preIndex(operation);
+ } catch (Throwable t) {
+ logger.warn("preIndex listener [{}] failed", t, listener);
+ }
+ }
+ return operation;
+ }
+
+ @Override
+ public void postIndex(Engine.Index index) {
+ assert index != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postIndex(index);
+ } catch (Throwable t) {
+ logger.warn("postIndex listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void postIndex(Engine.Index index, Throwable ex) {
+ assert index != null && ex != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postIndex(index, ex);
+ } catch (Throwable t) {
+ logger.warn("postIndex listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public Engine.Delete preDelete(Engine.Delete delete) {
+ assert delete != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.preDelete(delete);
+ } catch (Throwable t) {
+ logger.warn("preDelete listener [{}] failed", t, listener);
+ }
+ }
+ return delete;
+ }
+
+ @Override
+ public void postDelete(Engine.Delete delete) {
+ assert delete != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postDelete(delete);
+ } catch (Throwable t) {
+ logger.warn("postDelete listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void postDelete(Engine.Delete delete, Throwable ex) {
+ assert delete != null && ex != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postDelete(delete, ex);
+ } catch (Throwable t) {
+ logger.warn("postDelete listener [{}] failed", t, listener);
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java
index 07ca8af17e..27cda2ca1c 100644
--- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.elasticsearch.index.indexing;
+package org.elasticsearch.index.shard;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
diff --git a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java
index f7175c02c5..9996d705b3 100644
--- a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java
@@ -17,49 +17,34 @@
* under the License.
*/
-package org.elasticsearch.index.indexing;
+package org.elasticsearch.index.shard;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.shard.AbstractIndexShardComponent;
-import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyMap;
/**
+ * Internal class that maintains relevant indexing statistics / metrics.
+ * @see IndexShard
*/
-public class ShardIndexingService extends AbstractIndexShardComponent {
-
- private final IndexingSlowLog slowLog;
-
+final class InternalIndexingStats implements IndexingOperationListener {
private final StatsHolder totalStats = new StatsHolder();
-
- private final CopyOnWriteArrayList<IndexingOperationListener> listeners = new CopyOnWriteArrayList<>();
-
private volatile Map<String, StatsHolder> typesStats = emptyMap();
- public ShardIndexingService(ShardId shardId, IndexSettings indexSettings) {
- super(shardId, indexSettings);
- this.slowLog = new IndexingSlowLog(this.indexSettings.getSettings());
- }
-
/**
* Returns the stats, including type specific stats. If the types are null/0 length, then nothing
* is returned for them. If they are set, then only types provided will be returned, or
* <tt>_all</tt> for all types.
*/
- public IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) {
+ IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) {
IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis);
Map<String, IndexingStats.Stats> typesSt = null;
if (types != null && types.length > 0) {
@@ -79,20 +64,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return new IndexingStats(total, typesSt);
}
- public void addListener(IndexingOperationListener listener) {
- listeners.add(listener);
- }
-
- public void removeListener(IndexingOperationListener listener) {
- listeners.remove(listener);
- }
public Engine.Index preIndex(Engine.Index operation) {
totalStats.indexCurrent.inc();
typeStats(operation.type()).indexCurrent.inc();
- for (IndexingOperationListener listener : listeners) {
- operation = listener.preIndex(operation);
- }
return operation;
}
@@ -103,14 +78,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
StatsHolder typeStats = typeStats(index.type());
typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec();
- slowLog.postIndex(index, took);
- for (IndexingOperationListener listener : listeners) {
- try {
- listener.postIndex(index);
- } catch (Exception e) {
- logger.warn("postIndex listener [{}] failed", e, listener);
- }
- }
}
public void postIndex(Engine.Index index, Throwable ex) {
@@ -118,21 +85,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats(index.type()).indexCurrent.dec();
totalStats.indexFailed.inc();
typeStats(index.type()).indexFailed.inc();
- for (IndexingOperationListener listener : listeners) {
- try {
- listener.postIndex(index, ex);
- } catch (Throwable t) {
- logger.warn("postIndex listener [{}] failed", t, listener);
- }
- }
}
public Engine.Delete preDelete(Engine.Delete delete) {
totalStats.deleteCurrent.inc();
typeStats(delete.type()).deleteCurrent.inc();
- for (IndexingOperationListener listener : listeners) {
- delete = listener.preDelete(delete);
- }
return delete;
}
@@ -144,25 +101,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
StatsHolder typeStats = typeStats(delete.type());
typeStats.deleteMetric.inc(took);
typeStats.deleteCurrent.dec();
- for (IndexingOperationListener listener : listeners) {
- try {
- listener.postDelete(delete);
- } catch (Exception e) {
- logger.warn("postDelete listener [{}] failed", e, listener);
- }
- }
}
public void postDelete(Engine.Delete delete, Throwable ex) {
totalStats.deleteCurrent.dec();
typeStats(delete.type()).deleteCurrent.dec();
- for (IndexingOperationListener listener : listeners) {
- try {
- listener. postDelete(delete, ex);
- } catch (Throwable t) {
- logger.warn("postDelete listener [{}] failed", t, listener);
- }
- }
}
public void noopUpdate(String type) {
@@ -170,7 +113,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats(type).noopUpdates.inc();
}
- public void clear() {
+ public void clear() { // NOCOMMIT - this is unused?
totalStats.clear();
synchronized (this) {
if (!typesStats.isEmpty()) {
@@ -200,10 +143,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return stats;
}
- public void onRefreshSettings(Settings settings) {
- slowLog.onRefreshSettings(settings);
- }
-
static class StatsHolder {
public final MeanMetric indexMetric = new MeanMetric();
public final MeanMetric deleteMetric = new MeanMetric();
@@ -214,9 +153,9 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
return new IndexingStats.Stats(
- indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(),
- deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
- noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
+ indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(),
+ deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
+ noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
}
public long totalCurrent() {
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
index 36ed70ae65..392a58686e 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
@@ -52,7 +52,7 @@ import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
-import org.elasticsearch.index.indexing.IndexingStats;
+import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
diff --git a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
index c8142f3d37..0a036cbd80 100644
--- a/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
+++ b/core/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java
@@ -36,7 +36,7 @@ import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
-import org.elasticsearch.index.indexing.IndexingStats;
+import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.recovery.RecoveryStats;
diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java
index e86132a909..110aa90047 100644
--- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java
+++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java
@@ -41,7 +41,7 @@ import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
-import org.elasticsearch.index.indexing.IndexingStats;
+import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.refresh.RefreshStats;
diff --git a/core/src/test/java/org/elasticsearch/index/indexing/IndexingSlowLogTests.java b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java
index ccbef6837c..e39c0a805f 100644
--- a/core/src/test/java/org/elasticsearch/index/indexing/IndexingSlowLogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.elasticsearch.index.indexing;
+package org.elasticsearch.index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.StringField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.json.JsonXContent;
-import org.elasticsearch.index.indexing.IndexingSlowLog.SlowLogParsedDocumentPrinter;
+import org.elasticsearch.index.IndexingSlowLog.SlowLogParsedDocumentPrinter;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.test.ESTestCase;
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 66e0a0655d..d18e279636 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -76,8 +76,6 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.flush.FlushStats;
-import org.elasticsearch.index.indexing.IndexingOperationListener;
-import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
@@ -109,6 +107,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@@ -612,76 +611,76 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingUpdate);
}
- public void testPreIndex() throws IOException {
- createIndex("testpreindex");
+ public void testIndexingOperationsListeners() throws IOException {
+ createIndex("test_iol");
ensureGreen();
+ client().prepareIndex("test_iol", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
- IndexService test = indicesService.indexService("testpreindex");
+ IndexService test = indicesService.indexService("test_iol");
IndexShard shard = test.getShardOrNull(0);
- ShardIndexingService shardIndexingService = shard.indexingService();
- final AtomicBoolean preIndexCalled = new AtomicBoolean(false);
-
- shardIndexingService.addListener(new IndexingOperationListener() {
+ AtomicInteger preIndex = new AtomicInteger();
+ AtomicInteger postIndex = new AtomicInteger();
+ AtomicInteger postIndexException = new AtomicInteger();
+ AtomicInteger preDelete = new AtomicInteger();
+ AtomicInteger postDelete = new AtomicInteger();
+ AtomicInteger postDeleteException = new AtomicInteger();
+ shard = reinitWithWrapper(test, shard, null, new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
- preIndexCalled.set(true);
- return super.preIndex(operation);
+ preIndex.incrementAndGet();
+ return operation;
}
- });
-
- ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
- Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
- shard.index(index);
- assertTrue(preIndexCalled.get());
- }
-
- public void testPostIndex() throws IOException {
- createIndex("testpostindex");
- ensureGreen();
- IndicesService indicesService = getInstanceFromNode(IndicesService.class);
- IndexService test = indicesService.indexService("testpostindex");
- IndexShard shard = test.getShardOrNull(0);
- ShardIndexingService shardIndexingService = shard.indexingService();
- final AtomicBoolean postIndexCalled = new AtomicBoolean(false);
- shardIndexingService.addListener(new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index) {
- postIndexCalled.set(true);
- super.postIndex(index);
+ postIndex.incrementAndGet();
}
- });
-
- ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
- Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
- shard.index(index);
- assertTrue(postIndexCalled.get());
- }
- public void testPostIndexWithException() throws IOException {
- createIndex("testpostindexwithexception");
- ensureGreen();
- IndicesService indicesService = getInstanceFromNode(IndicesService.class);
- IndexService test = indicesService.indexService("testpostindexwithexception");
- IndexShard shard = test.getShardOrNull(0);
- ShardIndexingService shardIndexingService = shard.indexingService();
+ @Override
+ public void postIndex(Engine.Index index, Throwable ex) {
+ postIndexException.incrementAndGet();
+ }
- shard.close("Unexpected close", true);
- shard.state = IndexShardState.STARTED; // It will generate exception
+ @Override
+ public Engine.Delete preDelete(Engine.Delete delete) {
+ preDelete.incrementAndGet();
+ return delete;
+ }
- final AtomicBoolean postIndexWithExceptionCalled = new AtomicBoolean(false);
+ @Override
+ public void postDelete(Engine.Delete delete) {
+ postDelete.incrementAndGet();
+ }
- shardIndexingService.addListener(new IndexingOperationListener() {
@Override
- public void postIndex(Engine.Index index, Throwable ex) {
- assertNotNull(ex);
- postIndexWithExceptionCalled.set(true);
- super.postIndex(index, ex);
+ public void postDelete(Engine.Delete delete, Throwable ex) {
+ postDeleteException.incrementAndGet();
+
}
});
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
+ shard.index(index);
+ assertEquals(1, preIndex.get());
+ assertEquals(1, postIndex.get());
+ assertEquals(0, postIndexException.get());
+ assertEquals(0, preDelete.get());
+ assertEquals(0, postDelete.get());
+ assertEquals(0, postDeleteException.get());
+
+ Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
+ shard.delete(delete);
+
+ assertEquals(1, preIndex.get());
+ assertEquals(1, postIndex.get());
+ assertEquals(0, postIndexException.get());
+ assertEquals(1, preDelete.get());
+ assertEquals(1, postDelete.get());
+ assertEquals(0, postDeleteException.get());
+
+ shard.close("Unexpected close", true);
+ shard.state = IndexShardState.STARTED; // It will generate exception
try {
shard.index(index);
@@ -690,7 +689,26 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
- assertTrue(postIndexWithExceptionCalled.get());
+ assertEquals(2, preIndex.get());
+ assertEquals(1, postIndex.get());
+ assertEquals(1, postIndexException.get());
+ assertEquals(1, preDelete.get());
+ assertEquals(1, postDelete.get());
+ assertEquals(0, postDeleteException.get());
+ try {
+ shard.delete(delete);
+ fail();
+ }catch (IllegalIndexShardStateException e){
+
+ }
+
+ assertEquals(2, preIndex.get());
+ assertEquals(1, postIndex.get());
+ assertEquals(1, postIndexException.get());
+ assertEquals(2, preDelete.get());
+ assertEquals(1, postDelete.get());
+ assertEquals(1, postDeleteException.get());
+
}
public void testMaybeFlush() throws Exception {
@@ -1041,11 +1059,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
// test will fail due to unclosed searchers if the searcher is not released
}
- private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException {
+ private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
NodeServicesProvider indexServices = indexService.getIndexServices();
- IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices);
+ IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, listeners);
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java
new file mode 100644
index 0000000000..92bbf06a7b
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.apache.lucene.index.Term;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IndexingOperationListenerTests extends ESTestCase{
+
+ // this test also tests if calls are correct if one or more listeners throw exceptions
+ public void testListenersAreExecuted() {
+ AtomicInteger preIndex = new AtomicInteger();
+ AtomicInteger postIndex = new AtomicInteger();
+ AtomicInteger postIndexException = new AtomicInteger();
+ AtomicInteger preDelete = new AtomicInteger();
+ AtomicInteger postDelete = new AtomicInteger();
+ AtomicInteger postDeleteException = new AtomicInteger();
+ IndexingOperationListener listener = new IndexingOperationListener() {
+ @Override
+ public Engine.Index preIndex(Engine.Index operation) {
+ preIndex.incrementAndGet();
+ return operation;
+ }
+
+ @Override
+ public void postIndex(Engine.Index index) {
+ postIndex.incrementAndGet();
+ }
+
+ @Override
+ public void postIndex(Engine.Index index, Throwable ex) {
+ postIndexException.incrementAndGet();
+ }
+
+ @Override
+ public Engine.Delete preDelete(Engine.Delete delete) {
+ preDelete.incrementAndGet();
+ return delete;
+ }
+
+ @Override
+ public void postDelete(Engine.Delete delete) {
+ postDelete.incrementAndGet();
+ }
+
+ @Override
+ public void postDelete(Engine.Delete delete, Throwable ex) {
+ postDeleteException.incrementAndGet();
+ }
+ };
+
+ IndexingOperationListener throwingListener = new IndexingOperationListener() {
+ @Override
+ public Engine.Index preIndex(Engine.Index operation) {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void postIndex(Engine.Index index) {
+ throw new RuntimeException(); }
+
+ @Override
+ public void postIndex(Engine.Index index, Throwable ex) {
+ throw new RuntimeException(); }
+
+ @Override
+ public Engine.Delete preDelete(Engine.Delete delete) {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void postDelete(Engine.Delete delete) {
+ throw new RuntimeException(); }
+
+ @Override
+ public void postDelete(Engine.Delete delete, Throwable ex) {
+ throw new RuntimeException();
+ }
+ };
+ final List<IndexingOperationListener> indexingOperationListeners = new ArrayList<>(Arrays.asList(listener, listener));
+ if (randomBoolean()) {
+ indexingOperationListeners.add(throwingListener);
+ if (randomBoolean()) {
+ indexingOperationListeners.add(throwingListener);
+ }
+ }
+ Collections.shuffle(indexingOperationListeners, random());
+ IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
+ Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
+ Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
+ compositeListener.postDelete(delete);
+ assertEquals(0, preIndex.get());
+ assertEquals(0, postIndex.get());
+ assertEquals(0, postIndexException.get());
+ assertEquals(0, preDelete.get());
+ assertEquals(2, postDelete.get());
+ assertEquals(0, postDeleteException.get());
+
+ compositeListener.postDelete(delete, new RuntimeException());
+ assertEquals(0, preIndex.get());
+ assertEquals(0, postIndex.get());
+ assertEquals(0, postIndexException.get());
+ assertEquals(0, preDelete.get());
+ assertEquals(2, postDelete.get());
+ assertEquals(2, postDeleteException.get());
+
+ compositeListener.preDelete(delete);
+ assertEquals(0, preIndex.get());
+ assertEquals(0, postIndex.get());
+ assertEquals(0, postIndexException.get());
+ assertEquals(2, preDelete.get());
+ assertEquals(2, postDelete.get());
+ assertEquals(2, postDeleteException.get());
+
+ compositeListener.postIndex(index);
+ assertEquals(0, preIndex.get());
+ assertEquals(2, postIndex.get());
+ assertEquals(0, postIndexException.get());
+ assertEquals(2, preDelete.get());
+ assertEquals(2, postDelete.get());
+ assertEquals(2, postDeleteException.get());
+
+ compositeListener.postIndex(index, new RuntimeException());
+ assertEquals(0, preIndex.get());
+ assertEquals(2, postIndex.get());
+ assertEquals(2, postIndexException.get());
+ assertEquals(2, preDelete.get());
+ assertEquals(2, postDelete.get());
+ assertEquals(2, postDeleteException.get());
+
+ compositeListener.preIndex(index);
+ assertEquals(2, preIndex.get());
+ assertEquals(2, postIndex.get());
+ assertEquals(2, postIndexException.get());
+ assertEquals(2, preDelete.get());
+ assertEquals(2, postDelete.get());
+ assertEquals(2, postDeleteException.get());
+ }
+}