diff options
author | Simon Willnauer <simonw@apache.org> | 2016-01-09 21:50:17 +0100 |
---|---|---|
committer | Simon Willnauer <simonw@apache.org> | 2016-01-09 21:50:17 +0100 |
commit | 54d1e35d845bde4985d1b1557143d98f42c060c4 (patch) | |
tree | 26f32318e6c0419924347f459a0befc5dd0d6fd1 /core/src/main/java/org/elasticsearch/index/shard | |
parent | 2dbad1d65adfee7de8ed51a27d10d5be334862d4 (diff) |
Cleanup IndexingOperationListeners infrastructure
This commit reduces the former ShardIndexinService to a simple stats/metrics
class, moves IndexingSlowLog to the IndexService level since it can be shared
across shards of an index and is now hidden behind IndexingOperationListener.
IndexingOperationListener is now a first class citizen in IndexShard and is passed
in from IndexService.
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/shard')
4 files changed, 670 insertions, 18 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 54bd1cd844..db5e1f7e4a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -81,8 +81,6 @@ import org.elasticsearch.index.fielddata.ShardFieldData; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.ShardGetService; -import org.elasticsearch.index.indexing.IndexingStats; -import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.MapperService; @@ -125,6 +123,8 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -143,7 +143,7 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexCache indexCache; private final Store store; private final MergeSchedulerConfig mergeSchedulerConfig; - private final ShardIndexingService indexingService; + private final InternalIndexingStats internalIndexingStats; private final ShardSearchStats searchService; private final ShardGetService getService; private final ShardIndexWarmerService shardWarmerService; @@ -167,7 +167,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final IndexEventListener indexEventListener; private final IndexSettings idxSettings; private final NodeServicesProvider provider; - private TimeValue refreshInterval; private volatile ScheduledFuture<?> refreshScheduledFuture; @@ -176,6 +175,8 @@ public class IndexShard extends AbstractIndexShardComponent { protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>(); protected final EngineFactory engineFactory; + private final IndexingOperationListener indexingOperationListeners; + @Nullable private RecoveryState recoveryState; @@ -215,7 +216,7 @@ public class IndexShard extends AbstractIndexShardComponent { public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, - IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider) { + IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) { super(shardId, indexSettings); final Settings settings = indexSettings.getSettings(); this.inactiveTime = settings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, settings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5))); @@ -232,7 +233,10 @@ public class IndexShard extends AbstractIndexShardComponent { this.threadPool = provider.getThreadPool(); this.mapperService = mapperService; this.indexCache = indexCache; - this.indexingService = new ShardIndexingService(shardId, indexSettings); + this.internalIndexingStats = new InternalIndexingStats(); + final List<IndexingOperationListener> listenersList = new ArrayList<>(Arrays.asList(listeners)); + listenersList.add(internalIndexingStats); + this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); this.getService = new ShardGetService(indexSettings, this, mapperService); this.termVectorsService = provider.getTermVectorsService(); this.searchService = new ShardSearchStats(settings); @@ -285,10 +289,6 @@ public class IndexShard extends AbstractIndexShardComponent { return true; } - public ShardIndexingService indexingService() { - return this.indexingService; - } - public ShardGetService getService() { return this.getService; } @@ -489,7 +489,7 @@ public class IndexShard extends AbstractIndexShardComponent { public boolean index(Engine.Index index) { ensureWriteAllowed(index); markLastWrite(); - index = indexingService.preIndex(index); + index = indexingOperationListeners.preIndex(index); final boolean created; try { if (logger.isTraceEnabled()) { @@ -503,10 +503,10 @@ public class IndexShard extends AbstractIndexShardComponent { } index.endTime(System.nanoTime()); } catch (Throwable ex) { - indexingService.postIndex(index, ex); + indexingOperationListeners.postIndex(index, ex); throw ex; } - indexingService.postIndex(index); + indexingOperationListeners.postIndex(index); return created; } @@ -532,7 +532,7 @@ public class IndexShard extends AbstractIndexShardComponent { public void delete(Engine.Delete delete) { ensureWriteAllowed(delete); markLastWrite(); - delete = indexingService.preDelete(delete); + delete = indexingOperationListeners.preDelete(delete); try { if (logger.isTraceEnabled()) { logger.trace("delete [{}]", delete.uid().text()); @@ -545,10 +545,10 @@ public class IndexShard extends AbstractIndexShardComponent { } delete.endTime(System.nanoTime()); } catch (Throwable ex) { - indexingService.postDelete(delete, ex); + indexingOperationListeners.postDelete(delete, ex); throw ex; } - indexingService.postDelete(delete); + indexingOperationListeners.postDelete(delete); } public Engine.GetResult get(Engine.Get get) { @@ -600,7 +600,7 @@ public class IndexShard extends AbstractIndexShardComponent { throttled = engine.isThrottled(); throttleTimeInMillis = engine.getIndexThrottleTimeInMillis(); } - return indexingService.stats(throttled, throttleTimeInMillis, types); + return internalIndexingStats.stats(throttled, throttleTimeInMillis, types); } public SearchStats searchStats(String... groups) { @@ -1222,7 +1222,6 @@ public class IndexShard extends AbstractIndexShardComponent { } mergePolicyConfig.onRefreshSettings(settings); searchService.onRefreshSettings(settings); - indexingService.onRefreshSettings(settings); if (change) { getEngine().onSettingsChanged(); } @@ -1258,6 +1257,14 @@ public class IndexShard extends AbstractIndexShardComponent { return inactiveTime; } + /** + * Should be called for each no-op update operation to increment relevant statistics. + * @param type the doc type of the update + */ + public void noopUpdate(String type) { + internalIndexingStats.noopUpdate(type); + } + class EngineRefresher implements Runnable { @Override public void run() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java new file mode 100644 index 0000000000..e5d3574223 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -0,0 +1,152 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.index.engine.Engine; + +import java.util.List; + +/** + * An indexing listener for indexing, delete, events. + */ +public interface IndexingOperationListener { + + /** + * Called before the indexing occurs. + */ + default Engine.Index preIndex(Engine.Index operation) { + return operation; + } + + /** + * Called after the indexing operation occurred. + */ + default void postIndex(Engine.Index index) {} + + /** + * Called after the indexing operation occurred with exception. + */ + default void postIndex(Engine.Index index, Throwable ex) {} + + /** + * Called before the delete occurs. + */ + default Engine.Delete preDelete(Engine.Delete delete) { + return delete; + } + + + /** + * Called after the delete operation occurred. + */ + default void postDelete(Engine.Delete delete) {} + + /** + * Called after the delete operation occurred with exception. + */ + default void postDelete(Engine.Delete delete, Throwable ex) {} + + /** + * A Composite listener that multiplexes calls to each of the listeners methods. + */ + final class CompositeListener implements IndexingOperationListener{ + private final List<IndexingOperationListener> listeners; + private final ESLogger logger; + + public CompositeListener(List<IndexingOperationListener> listeners, ESLogger logger) { + this.listeners = listeners; + this.logger = logger; + } + + @Override + public Engine.Index preIndex(Engine.Index operation) { + assert operation != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.preIndex(operation); + } catch (Throwable t) { + logger.warn("preIndex listener [{}] failed", t, listener); + } + } + return operation; + } + + @Override + public void postIndex(Engine.Index index) { + assert index != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postIndex(index); + } catch (Throwable t) { + logger.warn("postIndex listener [{}] failed", t, listener); + } + } + } + + @Override + public void postIndex(Engine.Index index, Throwable ex) { + assert index != null && ex != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postIndex(index, ex); + } catch (Throwable t) { + logger.warn("postIndex listener [{}] failed", t, listener); + } + } + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + assert delete != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.preDelete(delete); + } catch (Throwable t) { + logger.warn("preDelete listener [{}] failed", t, listener); + } + } + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + assert delete != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postDelete(delete); + } catch (Throwable t) { + logger.warn("postDelete listener [{}] failed", t, listener); + } + } + } + + @Override + public void postDelete(Engine.Delete delete, Throwable ex) { + assert delete != null && ex != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postDelete(delete, ex); + } catch (Throwable t) { + logger.warn("postDelete listener [{}] failed", t, listener); + } + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java new file mode 100644 index 0000000000..27cda2ca1c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingStats.java @@ -0,0 +1,321 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + */ +public class IndexingStats implements Streamable, ToXContent { + + public static class Stats implements Streamable, ToXContent { + + private long indexCount; + private long indexTimeInMillis; + private long indexCurrent; + private long indexFailedCount; + private long deleteCount; + private long deleteTimeInMillis; + private long deleteCurrent; + private long noopUpdateCount; + private long throttleTimeInMillis; + private boolean isThrottled; + + Stats() {} + + public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long indexFailedCount, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount, boolean isThrottled, long throttleTimeInMillis) { + this.indexCount = indexCount; + this.indexTimeInMillis = indexTimeInMillis; + this.indexCurrent = indexCurrent; + this.indexFailedCount = indexFailedCount; + this.deleteCount = deleteCount; + this.deleteTimeInMillis = deleteTimeInMillis; + this.deleteCurrent = deleteCurrent; + this.noopUpdateCount = noopUpdateCount; + this.isThrottled = isThrottled; + this.throttleTimeInMillis = throttleTimeInMillis; + } + + public void add(Stats stats) { + indexCount += stats.indexCount; + indexTimeInMillis += stats.indexTimeInMillis; + indexCurrent += stats.indexCurrent; + indexFailedCount += stats.indexFailedCount; + + deleteCount += stats.deleteCount; + deleteTimeInMillis += stats.deleteTimeInMillis; + deleteCurrent += stats.deleteCurrent; + + noopUpdateCount += stats.noopUpdateCount; + throttleTimeInMillis += stats.throttleTimeInMillis; + if (isThrottled != stats.isThrottled) { + isThrottled = true; //When combining if one is throttled set result to throttled. + } + } + + /** + * The total number of indexing operations + */ + public long getIndexCount() { return indexCount; } + + /** + * The number of failed indexing operations + */ + public long getIndexFailedCount() { return indexFailedCount; } + + /** + * The total amount of time spend on executing index operations. + */ + public TimeValue getIndexTime() { return new TimeValue(indexTimeInMillis); } + + /** + * Returns the currently in-flight indexing operations. + */ + public long getIndexCurrent() { return indexCurrent;} + + /** + * Returns the number of delete operation executed + */ + public long getDeleteCount() { + return deleteCount; + } + + /** + * Returns if the index is under merge throttling control + */ + public boolean isThrottled() { return isThrottled; } + + /** + * Gets the amount of time in a TimeValue that the index has been under merge throttling control + */ + public TimeValue getThrottleTime() { return new TimeValue(throttleTimeInMillis); } + + /** + * The total amount of time spend on executing delete operations. + */ + public TimeValue getDeleteTime() { return new TimeValue(deleteTimeInMillis); } + + /** + * Returns the currently in-flight delete operations + */ + public long getDeleteCurrent() { + return deleteCurrent; + } + + public long getNoopUpdateCount() { + return noopUpdateCount; + } + + public static Stats readStats(StreamInput in) throws IOException { + Stats stats = new Stats(); + stats.readFrom(in); + return stats; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + indexCount = in.readVLong(); + indexTimeInMillis = in.readVLong(); + indexCurrent = in.readVLong(); + + if(in.getVersion().onOrAfter(Version.V_2_1_0)){ + indexFailedCount = in.readVLong(); + } + + deleteCount = in.readVLong(); + deleteTimeInMillis = in.readVLong(); + deleteCurrent = in.readVLong(); + noopUpdateCount = in.readVLong(); + isThrottled = in.readBoolean(); + throttleTimeInMillis = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(indexCount); + out.writeVLong(indexTimeInMillis); + out.writeVLong(indexCurrent); + + if(out.getVersion().onOrAfter(Version.V_2_1_0)) { + out.writeVLong(indexFailedCount); + } + + out.writeVLong(deleteCount); + out.writeVLong(deleteTimeInMillis); + out.writeVLong(deleteCurrent); + out.writeVLong(noopUpdateCount); + out.writeBoolean(isThrottled); + out.writeLong(throttleTimeInMillis); + + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.INDEX_TOTAL, indexCount); + builder.timeValueField(Fields.INDEX_TIME_IN_MILLIS, Fields.INDEX_TIME, indexTimeInMillis); + builder.field(Fields.INDEX_CURRENT, indexCurrent); + builder.field(Fields.INDEX_FAILED, indexFailedCount); + + builder.field(Fields.DELETE_TOTAL, deleteCount); + builder.timeValueField(Fields.DELETE_TIME_IN_MILLIS, Fields.DELETE_TIME, deleteTimeInMillis); + builder.field(Fields.DELETE_CURRENT, deleteCurrent); + + builder.field(Fields.NOOP_UPDATE_TOTAL, noopUpdateCount); + + builder.field(Fields.IS_THROTTLED, isThrottled); + builder.timeValueField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, throttleTimeInMillis); + return builder; + } + } + + private Stats totalStats; + + @Nullable + private Map<String, Stats> typeStats; + + public IndexingStats() { + totalStats = new Stats(); + } + + public IndexingStats(Stats totalStats, @Nullable Map<String, Stats> typeStats) { + this.totalStats = totalStats; + this.typeStats = typeStats; + } + + public void add(IndexingStats indexingStats) { + add(indexingStats, true); + } + + public void add(IndexingStats indexingStats, boolean includeTypes) { + if (indexingStats == null) { + return; + } + addTotals(indexingStats); + if (includeTypes && indexingStats.typeStats != null && !indexingStats.typeStats.isEmpty()) { + if (typeStats == null) { + typeStats = new HashMap<>(indexingStats.typeStats.size()); + } + for (Map.Entry<String, Stats> entry : indexingStats.typeStats.entrySet()) { + Stats stats = typeStats.get(entry.getKey()); + if (stats == null) { + typeStats.put(entry.getKey(), entry.getValue()); + } else { + stats.add(entry.getValue()); + } + } + } + } + + public void addTotals(IndexingStats indexingStats) { + if (indexingStats == null) { + return; + } + totalStats.add(indexingStats.totalStats); + } + + public Stats getTotal() { + return this.totalStats; + } + + @Nullable + public Map<String, Stats> getTypeStats() { + return this.typeStats; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(Fields.INDEXING); + totalStats.toXContent(builder, params); + if (typeStats != null && !typeStats.isEmpty()) { + builder.startObject(Fields.TYPES); + for (Map.Entry<String, Stats> entry : typeStats.entrySet()) { + builder.startObject(entry.getKey(), XContentBuilder.FieldCaseConversion.NONE); + entry.getValue().toXContent(builder, params); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString INDEXING = new XContentBuilderString("indexing"); + static final XContentBuilderString TYPES = new XContentBuilderString("types"); + static final XContentBuilderString INDEX_TOTAL = new XContentBuilderString("index_total"); + static final XContentBuilderString INDEX_TIME = new XContentBuilderString("index_time"); + static final XContentBuilderString INDEX_TIME_IN_MILLIS = new XContentBuilderString("index_time_in_millis"); + static final XContentBuilderString INDEX_CURRENT = new XContentBuilderString("index_current"); + static final XContentBuilderString INDEX_FAILED = new XContentBuilderString("index_failed"); + static final XContentBuilderString DELETE_TOTAL = new XContentBuilderString("delete_total"); + static final XContentBuilderString DELETE_TIME = new XContentBuilderString("delete_time"); + static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis"); + static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current"); + static final XContentBuilderString NOOP_UPDATE_TOTAL = new XContentBuilderString("noop_update_total"); + static final XContentBuilderString IS_THROTTLED = new XContentBuilderString("is_throttled"); + static final XContentBuilderString THROTTLED_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis"); + static final XContentBuilderString THROTTLED_TIME = new XContentBuilderString("throttle_time"); + } + + public static IndexingStats readIndexingStats(StreamInput in) throws IOException { + IndexingStats indexingStats = new IndexingStats(); + indexingStats.readFrom(in); + return indexingStats; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + totalStats = Stats.readStats(in); + if (in.readBoolean()) { + int size = in.readVInt(); + typeStats = new HashMap<>(size); + for (int i = 0; i < size; i++) { + typeStats.put(in.readString(), Stats.readStats(in)); + } + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + totalStats.writeTo(out); + if (typeStats == null || typeStats.isEmpty()) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeVInt(typeStats.size()); + for (Map.Entry<String, Stats> entry : typeStats.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out); + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java new file mode 100644 index 0000000000..9996d705b3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -0,0 +1,172 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.index.engine.Engine; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyMap; + +/** + * Internal class that maintains relevant indexing statistics / metrics. + * @see IndexShard + */ +final class InternalIndexingStats implements IndexingOperationListener { + private final StatsHolder totalStats = new StatsHolder(); + private volatile Map<String, StatsHolder> typesStats = emptyMap(); + + /** + * Returns the stats, including type specific stats. If the types are null/0 length, then nothing + * is returned for them. If they are set, then only types provided will be returned, or + * <tt>_all</tt> for all types. + */ + IndexingStats stats(boolean isThrottled, long currentThrottleInMillis, String... types) { + IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis); + Map<String, IndexingStats.Stats> typesSt = null; + if (types != null && types.length > 0) { + typesSt = new HashMap<>(typesStats.size()); + if (types.length == 1 && types[0].equals("_all")) { + for (Map.Entry<String, StatsHolder> entry : typesStats.entrySet()) { + typesSt.put(entry.getKey(), entry.getValue().stats(isThrottled, currentThrottleInMillis)); + } + } else { + for (Map.Entry<String, StatsHolder> entry : typesStats.entrySet()) { + if (Regex.simpleMatch(types, entry.getKey())) { + typesSt.put(entry.getKey(), entry.getValue().stats(isThrottled, currentThrottleInMillis)); + } + } + } + } + return new IndexingStats(total, typesSt); + } + + + public Engine.Index preIndex(Engine.Index operation) { + totalStats.indexCurrent.inc(); + typeStats(operation.type()).indexCurrent.inc(); + return operation; + } + + public void postIndex(Engine.Index index) { + long took = index.endTime() - index.startTime(); + totalStats.indexMetric.inc(took); + totalStats.indexCurrent.dec(); + StatsHolder typeStats = typeStats(index.type()); + typeStats.indexMetric.inc(took); + typeStats.indexCurrent.dec(); + } + + public void postIndex(Engine.Index index, Throwable ex) { + totalStats.indexCurrent.dec(); + typeStats(index.type()).indexCurrent.dec(); + totalStats.indexFailed.inc(); + typeStats(index.type()).indexFailed.inc(); + } + + public Engine.Delete preDelete(Engine.Delete delete) { + totalStats.deleteCurrent.inc(); + typeStats(delete.type()).deleteCurrent.inc(); + return delete; + } + + + public void postDelete(Engine.Delete delete) { + long took = delete.endTime() - delete.startTime(); + totalStats.deleteMetric.inc(took); + totalStats.deleteCurrent.dec(); + StatsHolder typeStats = typeStats(delete.type()); + typeStats.deleteMetric.inc(took); + typeStats.deleteCurrent.dec(); + } + + public void postDelete(Engine.Delete delete, Throwable ex) { + totalStats.deleteCurrent.dec(); + typeStats(delete.type()).deleteCurrent.dec(); + } + + public void noopUpdate(String type) { + totalStats.noopUpdates.inc(); + typeStats(type).noopUpdates.inc(); + } + + public void clear() { // NOCOMMIT - this is unused? + totalStats.clear(); + synchronized (this) { + if (!typesStats.isEmpty()) { + MapBuilder<String, StatsHolder> typesStatsBuilder = MapBuilder.newMapBuilder(); + for (Map.Entry<String, StatsHolder> typeStats : typesStats.entrySet()) { + if (typeStats.getValue().totalCurrent() > 0) { + typeStats.getValue().clear(); + typesStatsBuilder.put(typeStats.getKey(), typeStats.getValue()); + } + } + typesStats = typesStatsBuilder.immutableMap(); + } + } + } + + private StatsHolder typeStats(String type) { + StatsHolder stats = typesStats.get(type); + if (stats == null) { + synchronized (this) { + stats = typesStats.get(type); + if (stats == null) { + stats = new StatsHolder(); + typesStats = MapBuilder.newMapBuilder(typesStats).put(type, stats).immutableMap(); + } + } + } + return stats; + } + + static class StatsHolder { + public final MeanMetric indexMetric = new MeanMetric(); + public final MeanMetric deleteMetric = new MeanMetric(); + public final CounterMetric indexCurrent = new CounterMetric(); + public final CounterMetric indexFailed = new CounterMetric(); + public final CounterMetric deleteCurrent = new CounterMetric(); + public final CounterMetric noopUpdates = new CounterMetric(); + + public IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) { + return new IndexingStats.Stats( + indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(), + deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), + noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); + } + + public long totalCurrent() { + return indexCurrent.count() + deleteMetric.count(); + } + + public void clear() { + indexMetric.clear(); + deleteMetric.clear(); + } + + + } +} |