summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/index/shard
diff options
context:
space:
mode:
authorSimon Willnauer <simonw@apache.org>2016-01-09 21:50:17 +0100
committerSimon Willnauer <simonw@apache.org>2016-01-09 21:50:17 +0100
commit54d1e35d845bde4985d1b1557143d98f42c060c4 (patch)
tree26f32318e6c0419924347f459a0befc5dd0d6fd1 /core/src/main/java/org/elasticsearch/index/shard
parent2dbad1d65adfee7de8ed51a27d10d5be334862d4 (diff)
Cleanup IndexingOperationListeners infrastructure
This commit reduces the former ShardIndexinService to a simple stats/metrics class, moves IndexingSlowLog to the IndexService level since it can be shared across shards of an index and is now hidden behind IndexingOperationListener. IndexingOperationListener is now a first class citizen in IndexShard and is passed in from IndexService.
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/shard')
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java43
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java152
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java321
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java172
4 files changed, 670 insertions, 18 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 54bd1cd844..db5e1f7e4a 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -81,8 +81,6 @@ import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
-import org.elasticsearch.index.indexing.IndexingStats;
-import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperService;
@@ -125,6 +123,8 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -143,7 +143,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexCache indexCache;
private final Store store;
private final MergeSchedulerConfig mergeSchedulerConfig;
- private final ShardIndexingService indexingService;
+ private final InternalIndexingStats internalIndexingStats;
private final ShardSearchStats searchService;
private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService;
@@ -167,7 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexEventListener indexEventListener;
private final IndexSettings idxSettings;
private final NodeServicesProvider provider;
-
private TimeValue refreshInterval;
private volatile ScheduledFuture<?> refreshScheduledFuture;
@@ -176,6 +175,8 @@ public class IndexShard extends AbstractIndexShardComponent {
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
+ private final IndexingOperationListener indexingOperationListeners;
+
@Nullable
private RecoveryState recoveryState;
@@ -215,7 +216,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory,
- IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider) {
+ IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.inactiveTime = settings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, settings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)));
@@ -232,7 +233,10 @@ public class IndexShard extends AbstractIndexShardComponent {
this.threadPool = provider.getThreadPool();
this.mapperService = mapperService;
this.indexCache = indexCache;
- this.indexingService = new ShardIndexingService(shardId, indexSettings);
+ this.internalIndexingStats = new InternalIndexingStats();
+ final List<IndexingOperationListener> listenersList = new ArrayList<>(Arrays.asList(listeners));
+ listenersList.add(internalIndexingStats);
+ this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
this.getService = new ShardGetService(indexSettings, this, mapperService);
this.termVectorsService = provider.getTermVectorsService();
this.searchService = new ShardSearchStats(settings);
@@ -285,10 +289,6 @@ public class IndexShard extends AbstractIndexShardComponent {
return true;
}
- public ShardIndexingService indexingService() {
- return this.indexingService;
- }
-
public ShardGetService getService() {
return this.getService;
}
@@ -489,7 +489,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public boolean index(Engine.Index index) {
ensureWriteAllowed(index);
markLastWrite();
- index = indexingService.preIndex(index);
+ index = indexingOperationListeners.preIndex(index);
final boolean created;
try {
if (logger.isTraceEnabled()) {
@@ -503,10 +503,10 @@ public class IndexShard extends AbstractIndexShardComponent {
}
index.endTime(System.nanoTime());
} catch (Throwable ex) {
- indexingService.postIndex(index, ex);
+ indexingOperationListeners.postIndex(index, ex);
throw ex;
}
- indexingService.postIndex(index);
+ indexingOperationListeners.postIndex(index);
return created;
}
@@ -532,7 +532,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public void delete(Engine.Delete delete) {
ensureWriteAllowed(delete);
markLastWrite();
- delete = indexingService.preDelete(delete);
+ delete = indexingOperationListeners.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
@@ -545,10 +545,10 @@ public class IndexShard extends AbstractIndexShardComponent {
}
delete.endTime(System.nanoTime());
} catch (Throwable ex) {
- indexingService.postDelete(delete, ex);
+ indexingOperationListeners.postDelete(delete, ex);
throw ex;
}
- indexingService.postDelete(delete);
+ indexingOperationListeners.postDelete(delete);
}
public Engine.GetResult get(Engine.Get get) {
@@ -600,7 +600,7 @@ public class IndexShard extends AbstractIndexShardComponent {
throttled = engine.isThrottled();
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
}
- return indexingService.stats(throttled, throttleTimeInMillis, types);
+ return internalIndexingStats.stats(throttled, throttleTimeInMillis, types);
}
public SearchStats searchStats(String... groups) {
@@ -1222,7 +1222,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings);
- indexingService.onRefreshSettings(settings);
if (change) {
getEngine().onSettingsChanged();
}
@@ -1258,6 +1257,14 @@ public class IndexShard extends AbstractIndexShardComponent {
return inactiveTime;
}
+ /**
+ * Should be called for each no-op update operation to increment relevant statistics.
+ * @param type the doc type of the update
+ */
+ public void noopUpdate(String type) {
+ internalIndexingStats.noopUpdate(type);
+ }
+
class EngineRefresher implements Runnable {
@Override
public void run() {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java
new file mode 100644
index 0000000000..e5d3574223
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.index.engine.Engine;
+
+import java.util.List;
+
+/**
+ * An indexing listener for indexing, delete, events.
+ */
+public interface IndexingOperationListener {
+
+ /**
+ * Called before the indexing occurs.
+ */
+ default Engine.Index preIndex(Engine.Index operation) {
+ return operation;
+ }
+
+ /**
+ * Called after the indexing operation occurred.
+ */
+ default void postIndex(Engine.Index index) {}
+
+ /**
+ * Called after the indexing operation occurred with exception.
+ */
+ default void postIndex(Engine.Index index, Throwable ex) {}
+
+ /**
+ * Called before the delete occurs.
+ */
+ default Engine.Delete preDelete(Engine.Delete delete) {
+ return delete;
+ }
+
+
+ /**
+ * Called after the delete operation occurred.
+ */
+ default void postDelete(Engine.Delete delete) {}
+
+ /**
+ * Called after the delete operation occurred with exception.
+ */
+ default void postDelete(Engine.Delete delete, Throwable ex) {}
+
+ /**
+ * A Composite listener that multiplexes calls to each of the listeners methods.
+ */
+ final class CompositeListener implements IndexingOperationListener{
+ private final List<IndexingOperationListener> listeners;
+ private final ESLogger logger;
+
+ public CompositeListener(List<IndexingOperationListener> listeners, ESLogger logger) {
+ this.listeners = listeners;
+ this.logger = logger;
+ }
+
+ @Override
+ public Engine.Index preIndex(Engine.Index operation) {
+ assert operation != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.preIndex(operation);
+ } catch (Throwable t) {
+ logger.warn("preIndex listener [{}] failed", t, listener);
+ }
+ }
+ return operation;
+ }
+
+ @Override
+ public void postIndex(Engine.Index index) {
+ assert index != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postIndex(index);
+ } catch (Throwable t) {
+ logger.warn("postIndex listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void postIndex(Engine.Index index, Throwable ex) {
+ assert index != null && ex != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postIndex(index, ex);
+ } catch (Throwable t) {
+ logger.warn("postIndex listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public Engine.Delete preDelete(Engine.Delete delete) {
+ assert delete != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.preDelete(delete);
+ } catch (Throwable t) {
+ logger.warn("preDelete listener [{}] failed", t, listener);
+ }
+ }
+ return delete;
+ }
+
+ @Override
+ public void postDelete(Engine.Delete delete) {
+ assert delete != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postDelete(delete);
+ } catch (Throwable t) {
+ logger.warn("postDelete listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void postDelete(Engine.Delete delete, Throwable ex) {
+ assert delete != null && ex != null;
+ for (IndexingOperationListener listener : listeners) {
+ try {
+ listener.postDelete(delete, ex);
+ } catch (Throwable t) {
+ logger.warn("postDelete listener [{}] failed", t, listener);
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java
new file mode 100644
index 0000000000..27cda2ca1c
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ */
+public class IndexingStats implements Streamable, ToXContent {
+
+ public static class Stats implements Streamable, ToXContent {
+
+ private long indexCount;
+ private long indexTimeInMillis;
+ private long indexCurrent;
+ private long indexFailedCount;
+ private long deleteCount;
+ private long deleteTimeInMillis;
+ private long deleteCurrent;
+ private long noopUpdateCount;
+ private long throttleTimeInMillis;
+ private boolean isThrottled;
+
+ Stats() {}
+
+ public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long indexFailedCount, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount, boolean isThrottled, long throttleTimeInMillis) {
+ this.indexCount = indexCount;
+ this.indexTimeInMillis = indexTimeInMillis;
+ this.indexCurrent = indexCurrent;
+ this.indexFailedCount = indexFailedCount;
+ this.deleteCount = deleteCount;
+ this.deleteTimeInMillis = deleteTimeInMillis;
+ this.deleteCurrent = deleteCurrent;
+ this.noopUpdateCount = noopUpdateCount;
+ this.isThrottled = isThrottled;
+ this.throttleTimeInMillis = throttleTimeInMillis;
+ }
+
+ public void add(Stats stats) {
+ indexCount += stats.indexCount;
+ indexTimeInMillis += stats.indexTimeInMillis;
+ indexCurrent += stats.indexCurrent;
+ indexFailedCount += stats.indexFailedCount;
+
+ deleteCount += stats.deleteCount;
+ deleteTimeInMillis += stats.deleteTimeInMillis;
+ deleteCurrent += stats.deleteCurrent;
+
+ noopUpdateCount += stats.noopUpdateCount;
+ throttleTimeInMillis += stats.throttleTimeInMillis;
+ if (isThrottled != stats.isThrottled) {
+ isThrottled = true; //When combining if one is throttled set result to throttled.
+ }
+ }
+
+ /**
+ * The total number of indexing operations
+ */
+ public long getIndexCount() { return indexCount; }
+
+ /**
+ * The number of failed indexing operations
+ */
+ public long getIndexFailedCount() { return indexFailedCount; }
+
+ /**
+ * The total amount of time spend on executing index operations.
+ */
+ public TimeValue getIndexTime() { return new TimeValue(indexTimeInMillis); }
+
+ /**
+ * Returns the currently in-flight indexing operations.
+ */
+ public long getIndexCurrent() { return indexCurrent;}
+
+ /**
+ * Returns the number of delete operation executed
+ */
+ public long getDeleteCount() {
+ return deleteCount;
+ }
+
+ /**
+ * Returns if the index is under merge throttling control
+ */
+ public boolean isThrottled() { return isThrottled; }
+
+ /**
+ * Gets the amount of time in a TimeValue that the index has been under merge throttling control
+ */
+ public TimeValue getThrottleTime() { return new TimeValue(throttleTimeInMillis); }
+
+ /**
+ * The total amount of time spend on executing delete operations.
+ */
+ public TimeValue getDeleteTime() { return new TimeValue(deleteTimeInMillis); }
+
+ /**
+ * Returns the currently in-flight delete operations
+ */
+ public long getDeleteCurrent() {
+ return deleteCurrent;
+ }
+
+ public long getNoopUpdateCount() {
+ return noopUpdateCount;
+ }
+
+ public static Stats readStats(StreamInput in) throws IOException {
+ Stats stats = new Stats();
+ stats.readFrom(in);
+ return stats;
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ indexCount = in.readVLong();
+ indexTimeInMillis = in.readVLong();
+ indexCurrent = in.readVLong();
+
+ if(in.getVersion().onOrAfter(Version.V_2_1_0)){
+ indexFailedCount = in.readVLong();
+ }
+
+ deleteCount = in.readVLong();
+ deleteTimeInMillis = in.readVLong();
+ deleteCurrent = in.readVLong();
+ noopUpdateCount = in.readVLong();
+ isThrottled = in.readBoolean();
+ throttleTimeInMillis = in.readLong();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVLong(indexCount);
+ out.writeVLong(indexTimeInMillis);
+ out.writeVLong(indexCurrent);
+
+ if(out.getVersion().onOrAfter(Version.V_2_1_0)) {
+ out.writeVLong(indexFailedCount);
+ }
+
+ out.writeVLong(deleteCount);
+ out.writeVLong(deleteTimeInMillis);
+ out.writeVLong(deleteCurrent);
+ out.writeVLong(noopUpdateCount);
+ out.writeBoolean(isThrottled);
+ out.writeLong(throttleTimeInMillis);
+
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field(Fields.INDEX_TOTAL, indexCount);
+ builder.timeValueField(Fields.INDEX_TIME_IN_MILLIS, Fields.INDEX_TIME, indexTimeInMillis);
+ builder.field(Fields.INDEX_CURRENT, indexCurrent);
+ builder.field(Fields.INDEX_FAILED, indexFailedCount);
+
+ builder.field(Fields.DELETE_TOTAL, deleteCount);
+ builder.timeValueField(Fields.DELETE_TIME_IN_MILLIS, Fields.DELETE_TIME, deleteTimeInMillis);
+ builder.field(Fields.DELETE_CURRENT, deleteCurrent);
+
+ builder.field(Fields.NOOP_UPDATE_TOTAL, noopUpdateCount);
+
+ builder.field(Fields.IS_THROTTLED, isThrottled);
+ builder.timeValueField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, throttleTimeInMillis);
+ return builder;
+ }
+ }
+
+ private Stats totalStats;
+
+ @Nullable
+ private Map<String, Stats> typeStats;
+
+ public IndexingStats() {
+ totalStats = new Stats();
+ }
+
+ public IndexingStats(Stats totalStats, @Nullable Map<String, Stats> typeStats) {
+ this.totalStats = totalStats;
+ this.typeStats = typeStats;
+ }
+
+ public void add(IndexingStats indexingStats) {
+ add(indexingStats, true);
+ }
+
+ public void add(IndexingStats indexingStats, boolean includeTypes) {
+ if (indexingStats == null) {
+ return;
+ }
+ addTotals(indexingStats);
+ if (includeTypes && indexingStats.typeStats != null && !indexingStats.typeStats.isEmpty()) {
+ if (typeStats == null) {
+ typeStats = new HashMap<>(indexingStats.typeStats.size());
+ }
+ for (Map.Entry<String, Stats> entry : indexingStats.typeStats.entrySet()) {
+ Stats stats = typeStats.get(entry.getKey());
+ if (stats == null) {
+ typeStats.put(entry.getKey(), entry.getValue());
+ } else {
+ stats.add(entry.getValue());
+ }
+ }
+ }
+ }
+
+ public void addTotals(IndexingStats indexingStats) {
+ if (indexingStats == null) {
+ return;
+ }
+ totalStats.add(indexingStats.totalStats);
+ }
+
+ public Stats getTotal() {
+ return this.totalStats;
+ }
+
+ @Nullable
+ public Map<String, Stats> getTypeStats() {
+ return this.typeStats;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+ builder.startObject(Fields.INDEXING);
+ totalStats.toXContent(builder, params);
+ if (typeStats != null && !typeStats.isEmpty()) {
+ builder.startObject(Fields.TYPES);
+ for (Map.Entry<String, Stats> entry : typeStats.entrySet()) {
+ builder.startObject(entry.getKey(), XContentBuilder.FieldCaseConversion.NONE);
+ entry.getValue().toXContent(builder, params);
+ builder.endObject();
+ }
+ builder.endObject();
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ static final class Fields {
+ static final XContentBuilderString INDEXING = new XContentBuilderString("indexing");
+ static final XContentBuilderString TYPES = new XContentBuilderString("types");
+ static final XContentBuilderString INDEX_TOTAL = new XContentBuilderString("index_total");
+ static final XContentBuilderString INDEX_TIME = new XContentBuilderString("index_time");
+ static final XContentBuilderString INDEX_TIME_IN_MILLIS = new XContentBuilderString("index_time_in_millis");
+ static final XContentBuilderString INDEX_CURRENT = new XContentBuilderString("index_current");
+ static final XContentBuilderString INDEX_FAILED = new XContentBuilderString("index_failed");
+ static final XContentBuilderString DELETE_TOTAL = new XContentBuilderString("delete_total");
+ static final XContentBuilderString DELETE_TIME = new XContentBuilderString("delete_time");
+ static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis");
+ static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current");
+ static final XContentBuilderString NOOP_UPDATE_TOTAL = new XContentBuilderString("noop_update_total");
+ static final XContentBuilderString IS_THROTTLED = new XContentBuilderString("is_throttled");
+ static final XContentBuilderString THROTTLED_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
+ static final XContentBuilderString THROTTLED_TIME = new XContentBuilderString("throttle_time");
+ }
+
+ public static IndexingStats readIndexingStats(StreamInput in) throws IOException {
+ IndexingStats indexingStats = new IndexingStats();
+ indexingStats.readFrom(in);
+ return indexingStats;
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ totalStats = Stats.readStats(in);
+ if (in.readBoolean()) {
+ int size = in.readVInt();
+ typeStats = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ typeStats.put(in.readString(), Stats.readStats(in));
+ }
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ totalStats.writeTo(out);
+ if (typeStats == null || typeStats.isEmpty()) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeVInt(typeStats.size());
+ for (Map.Entry<String, Stats> entry : typeStats.entrySet()) {
+ out.writeString(entry.getKey());
+ entry.getValue().writeTo(out);
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java
new file mode 100644
index 0000000000..9996d705b3
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.metrics.MeanMetric;
+import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.index.engine.Engine;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * Internal class that maintains relevant indexing statistics / metrics.
+ * @see IndexShard
+ */
+final class InternalIndexingStats implements IndexingOperationListener {
+ private final StatsHolder totalStats = new StatsHolder();
+ private volatile Map<String, StatsHolder> typesStats = emptyMap();
+
+ /**
+ * Returns the stats, including type specific stats. If the types are null/0 length, then nothing
+ * is returned for them. If they are set, then only types provided will be returned, or
+ * <tt>_all</tt> for all types.
+ */
+ IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) {
+ IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis);
+ Map<String, IndexingStats.Stats> typesSt = null;
+ if (types != null && types.length > 0) {
+ typesSt = new HashMap<>(typesStats.size());
+ if (types.length == 1 && types[0].equals("_all")) {
+ for (Map.Entry<String, StatsHolder> entry : typesStats.entrySet()) {
+ typesSt.put(entry.getKey(), entry.getValue().stats(isThrottled, currentThrottleInMillis));
+ }
+ } else {
+ for (Map.Entry<String, StatsHolder> entry : typesStats.entrySet()) {
+ if (Regex.simpleMatch(types, entry.getKey())) {
+ typesSt.put(entry.getKey(), entry.getValue().stats(isThrottled, currentThrottleInMillis));
+ }
+ }
+ }
+ }
+ return new IndexingStats(total, typesSt);
+ }
+
+
+ public Engine.Index preIndex(Engine.Index operation) {
+ totalStats.indexCurrent.inc();
+ typeStats(operation.type()).indexCurrent.inc();
+ return operation;
+ }
+
+ public void postIndex(Engine.Index index) {
+ long took = index.endTime() - index.startTime();
+ totalStats.indexMetric.inc(took);
+ totalStats.indexCurrent.dec();
+ StatsHolder typeStats = typeStats(index.type());
+ typeStats.indexMetric.inc(took);
+ typeStats.indexCurrent.dec();
+ }
+
+ public void postIndex(Engine.Index index, Throwable ex) {
+ totalStats.indexCurrent.dec();
+ typeStats(index.type()).indexCurrent.dec();
+ totalStats.indexFailed.inc();
+ typeStats(index.type()).indexFailed.inc();
+ }
+
+ public Engine.Delete preDelete(Engine.Delete delete) {
+ totalStats.deleteCurrent.inc();
+ typeStats(delete.type()).deleteCurrent.inc();
+ return delete;
+ }
+
+
+ public void postDelete(Engine.Delete delete) {
+ long took = delete.endTime() - delete.startTime();
+ totalStats.deleteMetric.inc(took);
+ totalStats.deleteCurrent.dec();
+ StatsHolder typeStats = typeStats(delete.type());
+ typeStats.deleteMetric.inc(took);
+ typeStats.deleteCurrent.dec();
+ }
+
+ public void postDelete(Engine.Delete delete, Throwable ex) {
+ totalStats.deleteCurrent.dec();
+ typeStats(delete.type()).deleteCurrent.dec();
+ }
+
+ public void noopUpdate(String type) {
+ totalStats.noopUpdates.inc();
+ typeStats(type).noopUpdates.inc();
+ }
+
+ public void clear() { // NOCOMMIT - this is unused?
+ totalStats.clear();
+ synchronized (this) {
+ if (!typesStats.isEmpty()) {
+ MapBuilder<String, StatsHolder> typesStatsBuilder = MapBuilder.newMapBuilder();
+ for (Map.Entry<String, StatsHolder> typeStats : typesStats.entrySet()) {
+ if (typeStats.getValue().totalCurrent() > 0) {
+ typeStats.getValue().clear();
+ typesStatsBuilder.put(typeStats.getKey(), typeStats.getValue());
+ }
+ }
+ typesStats = typesStatsBuilder.immutableMap();
+ }
+ }
+ }
+
+ private StatsHolder typeStats(String type) {
+ StatsHolder stats = typesStats.get(type);
+ if (stats == null) {
+ synchronized (this) {
+ stats = typesStats.get(type);
+ if (stats == null) {
+ stats = new StatsHolder();
+ typesStats = MapBuilder.newMapBuilder(typesStats).put(type, stats).immutableMap();
+ }
+ }
+ }
+ return stats;
+ }
+
+ static class StatsHolder {
+ public final MeanMetric indexMetric = new MeanMetric();
+ public final MeanMetric deleteMetric = new MeanMetric();
+ public final CounterMetric indexCurrent = new CounterMetric();
+ public final CounterMetric indexFailed = new CounterMetric();
+ public final CounterMetric deleteCurrent = new CounterMetric();
+ public final CounterMetric noopUpdates = new CounterMetric();
+
+ public IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
+ return new IndexingStats.Stats(
+ indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(),
+ deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
+ noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
+ }
+
+ public long totalCurrent() {
+ return indexCurrent.count() + deleteMetric.count();
+ }
+
+ public void clear() {
+ indexMetric.clear();
+ deleteMetric.clear();
+ }
+
+
+ }
+}