summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNik Everett <nik9000@gmail.com>2016-12-23 15:51:09 -0500
committerNik Everett <nik9000@gmail.com>2017-01-06 20:03:32 -0500
commit12923ef8961f40abded8341f00881a163f7c68a9 (patch)
treef575d28d2d4c98dd4528aaf339514d7a7d3c7210 /core
parentb0c009ae769877594ada8a2b05267463caebf274 (diff)
Close and flush refresh listeners on shard close
Right now closing a shard looks like it strands refresh listeners, causing tests like `delete/50_refresh/refresh=wait_for waits until changes are visible in search` to fail. Here is a build that fails: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+multi_cluster_search+multijob-darwin-compatibility/4/console This attempts to fix the problem by implements `Closeable` on `RefreshListeners` and rejecting listeners when closed. More importantly the act of closing the instance flushes all pending listeners so we shouldn't have any stranded listeners on close. Because it was needed for testing, this also adds the number of pending listeners to the `CommonStats` object and all API to which that flows: `_cat/nodes`, `_cat/indices`, `_cat/shards`, and `_nodes/stats`.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/elasticsearch/index/refresh/RefreshStats.java61
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java10
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java110
-rw-r--r--core/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java6
-rw-r--r--core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java3
-rw-r--r--core/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java2
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java30
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java48
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java82
10 files changed, 283 insertions, 71 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/refresh/RefreshStats.java b/core/src/main/java/org/elasticsearch/index/refresh/RefreshStats.java
index 3a3edd10dc..8fc16ae1b1 100644
--- a/core/src/main/java/org/elasticsearch/index/refresh/RefreshStats.java
+++ b/core/src/main/java/org/elasticsearch/index/refresh/RefreshStats.java
@@ -19,6 +19,7 @@
package org.elasticsearch.index.refresh;
+import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@@ -27,6 +28,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
+import java.util.Objects;
public class RefreshStats implements Streamable, ToXContent {
@@ -34,18 +36,19 @@ public class RefreshStats implements Streamable, ToXContent {
private long totalTimeInMillis;
+ /**
+ * Number of waiting refresh listeners.
+ */
+ private int listeners;
+
public RefreshStats() {
}
- public RefreshStats(long total, long totalTimeInMillis) {
+ public RefreshStats(long total, long totalTimeInMillis, int listeners) {
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
- }
-
- public void add(long total, long totalTimeInMillis) {
- this.total += total;
- this.totalTimeInMillis += totalTimeInMillis;
+ this.listeners = listeners;
}
public void add(RefreshStats refreshStats) {
@@ -58,6 +61,7 @@ public class RefreshStats implements Streamable, ToXContent {
}
this.total += refreshStats.total;
this.totalTimeInMillis += refreshStats.totalTimeInMillis;
+ this.listeners += refreshStats.listeners;
}
/**
@@ -81,31 +85,56 @@ public class RefreshStats implements Streamable, ToXContent {
return new TimeValue(totalTimeInMillis);
}
+ /**
+ * The number of waiting refresh listeners.
+ */
+ public int getListeners() {
+ return listeners;
+ }
+
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- builder.startObject(Fields.REFRESH);
- builder.field(Fields.TOTAL, total);
- builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, totalTimeInMillis);
+ builder.startObject("refresh");
+ builder.field("total", total);
+ builder.timeValueField("total_time_in_millis", "total_time", totalTimeInMillis);
+ builder.field("listeners", listeners);
builder.endObject();
return builder;
}
- static final class Fields {
- static final String REFRESH = "refresh";
- static final String TOTAL = "total";
- static final String TOTAL_TIME = "total_time";
- static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
- }
-
@Override
public void readFrom(StreamInput in) throws IOException {
total = in.readVLong();
totalTimeInMillis = in.readVLong();
+ if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
+ listeners = in.readVInt();
+ } else {
+ listeners = 0;
+ }
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
out.writeVLong(totalTimeInMillis);
+ if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
+ out.writeVInt(listeners);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != RefreshStats.class) {
+ return false;
+ }
+ RefreshStats rhs = (RefreshStats) obj;
+ return total == rhs.total
+ && totalTimeInMillis == rhs.totalTimeInMillis
+ && listeners == rhs.listeners;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(total, totalTimeInMillis, listeners);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index b7a1fdd2ca..070ce17ba2 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -661,7 +661,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public RefreshStats refreshStats() {
- return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()));
+ // Null refreshListeners means this shard doesn't support them so there can't be any.
+ int listeners = refreshListeners == null ? 0 : refreshListeners.pendingCount();
+ return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()), listeners);
}
public FlushStats flushStats() {
@@ -932,8 +934,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (engine != null && flushEngine) {
engine.flushAndClose();
}
- } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times
- IOUtils.close(engine);
+ } finally {
+ // playing safe here and close the engine even if the above succeeds - close can be called multiple times
+ // Also closing refreshListeners to prevent us from accumulating any more listeners
+ IOUtils.close(engine, refreshListeners);
indexShardOperationsLock.close();
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java b/core/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java
index ca94f1ea96..f0df6e12b8 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java
@@ -24,6 +24,7 @@ import org.apache.lucene.search.ReferenceManager;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.translog.Translog;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -35,18 +36,26 @@ import static java.util.Objects.requireNonNull;
/**
* Allows for the registration of listeners that are called when a change becomes visible for search. This functionality is exposed from
- * {@link IndexShard} but kept here so it can be tested without standing up the entire thing.
+ * {@link IndexShard} but kept here so it can be tested without standing up the entire thing.
+ *
+ * When {@link Closeable#close()}d it will no longer accept listeners and flush any existing listeners.
*/
-public final class RefreshListeners implements ReferenceManager.RefreshListener {
+public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable {
private final IntSupplier getMaxRefreshListeners;
private final Runnable forceRefresh;
private final Executor listenerExecutor;
private final Logger logger;
/**
+ * Is this closed? If true then we won't add more listeners and have flushed all pending listeners.
+ */
+ private volatile boolean closed = false;
+ /**
* List of refresh listeners. Defaults to null and built on demand because most refresh cycles won't need it. Entries are never removed
* from it, rather, it is nulled and rebuilt when needed again. The (hopefully) rare entries that didn't make the current refresh cycle
* are just added back to the new list. Both the reference and the contents are always modified while synchronized on {@code this}.
+ *
+ * We never set this to non-null while closed it {@code true}.
*/
private volatile List<Tuple<Translog.Location, Consumer<Boolean>>> refreshListeners = null;
/**
@@ -80,12 +89,17 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener
return true;
}
synchronized (this) {
- if (refreshListeners == null) {
- refreshListeners = new ArrayList<>();
+ List<Tuple<Translog.Location, Consumer<Boolean>>> listeners = refreshListeners;
+ if (listeners == null) {
+ if (closed) {
+ throw new IllegalStateException("can't wait for refresh on a closed index");
+ }
+ listeners = new ArrayList<>();
+ refreshListeners = listeners;
}
- if (refreshListeners.size() < getMaxRefreshListeners.getAsInt()) {
+ if (listeners.size() < getMaxRefreshListeners.getAsInt()) {
// We have a free slot so register the listener
- refreshListeners.add(new Tuple<>(location, listener));
+ listeners.add(new Tuple<>(location, listener));
return false;
}
}
@@ -95,12 +109,34 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener
return true;
}
+ @Override
+ public void close() throws IOException {
+ List<Tuple<Translog.Location, Consumer<Boolean>>> oldListeners;
+ synchronized (this) {
+ oldListeners = refreshListeners;
+ refreshListeners = null;
+ closed = true;
+ }
+ // Fire any listeners we might have had
+ fireListeners(oldListeners);
+ }
+
/**
* Returns true if there are pending listeners.
*/
public boolean refreshNeeded() {
+ // A null list doesn't need a refresh. If we're closed we don't need a refresh either.
+ return refreshListeners != null && false == closed;
+ }
+
+ /**
+ * The number of pending listeners.
+ */
+ public int pendingCount() {
// No need to synchronize here because we're doing a single volatile read
- return refreshListeners != null;
+ List<Tuple<Translog.Location, Consumer<Boolean>>> listeners = refreshListeners;
+ // A null list means we haven't accumulated any listeners. Otherwise we need the size.
+ return listeners == null ? 0 : listeners.size();
}
/**
@@ -125,33 +161,25 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
- /*
- * We intentionally ignore didRefresh here because our timing is a little off. It'd be a useful flag if we knew everything that made
+ /* We intentionally ignore didRefresh here because our timing is a little off. It'd be a useful flag if we knew everything that made
* it into the refresh, but the way we snapshot the translog position before the refresh, things can sneak into the refresh that we
- * don't know about.
- */
+ * don't know about. */
if (null == currentRefreshLocation) {
- /*
- * The translog had an empty last write location at the start of the refresh so we can't alert anyone to anything. This
- * usually happens during recovery. The next refresh cycle out to pick up this refresh.
- */
+ /* The translog had an empty last write location at the start of the refresh so we can't alert anyone to anything. This
+ * usually happens during recovery. The next refresh cycle out to pick up this refresh. */
return;
}
- /*
- * Set the lastRefreshedLocation so listeners that come in for locations before that will just execute inline without messing
+ /* Set the lastRefreshedLocation so listeners that come in for locations before that will just execute inline without messing
* around with refreshListeners or synchronizing at all. Note that it is not safe for us to abort early if we haven't advanced the
* position here because we set and read lastRefreshedLocation outside of a synchronized block. We do that so that waiting for a
* refresh that has already passed is just a volatile read but the cost is that any check whether or not we've advanced the
* position will introduce a race between adding the listener and the position check. We could work around this by moving this
* assignment into the synchronized block below and double checking lastRefreshedLocation in addOrNotify's synchronized block but
- * that doesn't seem worth it given that we already skip this process early if there aren't any listeners to iterate.
- */
+ * that doesn't seem worth it given that we already skip this process early if there aren't any listeners to iterate. */
lastRefreshedLocation = currentRefreshLocation;
- /*
- * Grab the current refresh listeners and replace them with null while synchronized. Any listeners that come in after this won't be
+ /* Grab the current refresh listeners and replace them with null while synchronized. Any listeners that come in after this won't be
* in the list we iterate over and very likely won't be candidates for refresh anyway because we've already moved the
- * lastRefreshedLocation.
- */
+ * lastRefreshedLocation. */
List<Tuple<Translog.Location, Consumer<Boolean>>> candidates;
synchronized (this) {
candidates = refreshListeners;
@@ -162,16 +190,15 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener
refreshListeners = null;
}
// Iterate the list of listeners, copying the listeners to fire to one list and those to preserve to another list.
- List<Consumer<Boolean>> listenersToFire = null;
+ List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire = null;
List<Tuple<Translog.Location, Consumer<Boolean>>> preservedListeners = null;
for (Tuple<Translog.Location, Consumer<Boolean>> tuple : candidates) {
Translog.Location location = tuple.v1();
- Consumer<Boolean> listener = tuple.v2();
if (location.compareTo(currentRefreshLocation) <= 0) {
if (listenersToFire == null) {
listenersToFire = new ArrayList<>();
}
- listenersToFire.add(listener);
+ listenersToFire.add(tuple);
} else {
if (preservedListeners == null) {
preservedListeners = new ArrayList<>();
@@ -179,27 +206,36 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener
preservedListeners.add(tuple);
}
}
- /*
- * Now add any preserved listeners back to the running list of refresh listeners while under lock. We'll try them next time. While
- * we were iterating the list of listeners new listeners could have come in. That means that adding all of our preserved listeners
- * might push our list of listeners above the maximum number of slots allowed. This seems unlikely because we expect few listeners
- * to be preserved. And the next listener while we're full will trigger a refresh anyway.
- */
+ /* Now deal with the listeners that it isn't time yet to fire. We need to do this under lock so we don't miss a concurrent close or
+ * newly registered listener. If we're not closed we just add the listeners to the list of listeners we check next time. If we are
+ * closed we fire the listeners even though it isn't time for them. */
if (preservedListeners != null) {
synchronized (this) {
if (refreshListeners == null) {
- refreshListeners = new ArrayList<>();
+ if (closed) {
+ listenersToFire.addAll(preservedListeners);
+ } else {
+ refreshListeners = preservedListeners;
+ }
+ } else {
+ assert closed == false : "Can't be closed and have non-null refreshListeners";
+ refreshListeners.addAll(preservedListeners);
}
- refreshListeners.addAll(preservedListeners);
}
}
// Lastly, fire the listeners that are ready on the listener thread pool
+ fireListeners(listenersToFire);
+ }
+
+ /**
+ * Fire some listeners. Does nothing if the list of listeners is null.
+ */
+ private void fireListeners(List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
if (listenersToFire != null) {
- final List<Consumer<Boolean>> finalListenersToFire = listenersToFire;
listenerExecutor.execute(() -> {
- for (Consumer<Boolean> listener : finalListenersToFire) {
+ for (Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
try {
- listener.accept(false);
+ listener.v2().accept(false);
} catch (Exception e) {
logger.warn("Error firing refresh listener", e);
}
diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java
index e56347f16b..02b91dd6ee 100644
--- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java
+++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java
@@ -260,6 +260,9 @@ public class RestIndicesAction extends AbstractCatAction {
table.addCell("refresh.time", "sibling:pri;alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes");
table.addCell("pri.refresh.time", "default:false;text-align:right;desc:time spent in refreshes");
+ table.addCell("refresh.listeners", "sibling:pri;alias:rli,refreshListeners;default:false;text-align:right;desc:number of pending refresh listeners");
+ table.addCell("pri.refresh.listeners", "default:false;text-align:right;desc:number of pending refresh listeners");
+
table.addCell("search.fetch_current", "sibling:pri;alias:sfc,searchFetchCurrent;default:false;text-align:right;desc:current fetch phase ops");
table.addCell("pri.search.fetch_current", "default:false;text-align:right;desc:current fetch phase ops");
@@ -475,6 +478,9 @@ public class RestIndicesAction extends AbstractCatAction {
table.addCell(indexStats == null ? null : indexStats.getTotal().getRefresh().getTotalTime());
table.addCell(indexStats == null ? null : indexStats.getPrimaries().getRefresh().getTotalTime());
+ table.addCell(indexStats == null ? null : indexStats.getTotal().getRefresh().getListeners());
+ table.addCell(indexStats == null ? null : indexStats.getPrimaries().getRefresh().getListeners());
+
table.addCell(indexStats == null ? null : indexStats.getTotal().getSearch().getTotal().getFetchCurrent());
table.addCell(indexStats == null ? null : indexStats.getPrimaries().getSearch().getTotal().getFetchCurrent());
diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java
index b632448192..0c5766cdee 100644
--- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java
+++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java
@@ -193,6 +193,8 @@ public class RestNodesAction extends AbstractCatAction {
table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes");
table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes");
+ table.addCell("refresh.listeners", "alias:rli,refreshListeners;default:false;text-align:right;"
+ + "desc:number of pending refresh listeners");
table.addCell("script.compilations", "alias:scrcc,scriptCompilations;default:false;text-align:right;desc:script compilations");
table.addCell("script.cache_evictions",
@@ -346,6 +348,7 @@ public class RestNodesAction extends AbstractCatAction {
RefreshStats refreshStats = indicesStats == null ? null : indicesStats.getRefresh();
table.addCell(refreshStats == null ? null : refreshStats.getTotal());
table.addCell(refreshStats == null ? null : refreshStats.getTotalTime());
+ table.addCell(refreshStats == null ? null : refreshStats.getListeners());
ScriptStats scriptStats = stats == null ? null : stats.getScriptStats();
table.addCell(scriptStats == null ? null : scriptStats.getCompilations());
diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java
index f8337fd25c..8944c0827e 100644
--- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java
+++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java
@@ -144,6 +144,7 @@ public class RestShardsAction extends AbstractCatAction {
table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes");
table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes");
+ table.addCell("refresh.listeners", "alias:rli,refreshListeners;default:false;text-align:right;desc:number of pending refresh listeners");
table.addCell("search.fetch_current", "alias:sfc,searchFetchCurrent;default:false;text-align:right;desc:current fetch phase ops");
table.addCell("search.fetch_time", "alias:sfti,searchFetchTime;default:false;text-align:right;desc:time spent in fetch phase");
@@ -290,6 +291,7 @@ public class RestShardsAction extends AbstractCatAction {
table.addCell(commonStats == null ? null : commonStats.getRefresh().getTotal());
table.addCell(commonStats == null ? null : commonStats.getRefresh().getTotalTime());
+ table.addCell(commonStats == null ? null : commonStats.getRefresh().getListeners());
table.addCell(commonStats == null ? null : commonStats.getSearch().getTotal().getFetchCurrent());
table.addCell(commonStats == null ? null : commonStats.getSearch().getTotal().getFetchTime());
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
index dfc10169e7..859e9f6e25 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
@@ -19,7 +19,11 @@
package org.elasticsearch.action.admin.indices.stats;
+import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.CommitStats;
@@ -111,6 +115,32 @@ public class IndicesStatsTests extends ESSingleNodeTestCase {
}
}
+ public void testRefreshListeners() throws Exception {
+ // Create an index without automatic refreshes
+ createIndex("test", Settings.builder().put("refresh_interval", -1).build());
+
+ // Index a document asynchronously so the request will only return when document is refreshed
+ ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test")
+ .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute();
+
+ // Wait for the refresh listener to appear in the stats
+ assertBusy(() -> {
+ IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().setRefresh(true).get();
+ CommonStats common = stats.getIndices().get("test").getTotal();
+ assertEquals(1, common.refresh.getListeners());
+ });
+
+ // Refresh the index and wait for the request to come back
+ client().admin().indices().prepareRefresh("test").get();
+ index.get();
+
+ // The document should appear in the statistics and the refresh listener should be gone
+ IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().setRefresh(true).setDocs(true).get();
+ CommonStats common = stats.getIndices().get("test").getTotal();
+ assertEquals(1, common.docs.getCount());
+ assertEquals(0, common.refresh.getListeners());
+ }
+
/**
* Gives access to package private IndicesStatsResponse constructor for test purpose.
**/
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
index cd71418f0e..e27d3a5b04 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
@@ -116,7 +116,7 @@ public class TransportWriteActionTests extends ESTestCase {
Result result = action.apply(new TestAction(), request, indexShard);
CapturingActionListener<Response> listener = new CapturingActionListener<>();
responder.accept(result, listener);
- assertNull(listener.response); // Haven't reallresponded yet
+ assertNull(listener.response); // Haven't responded yet
@SuppressWarnings({ "unchecked", "rawtypes" })
ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
diff --git a/core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java b/core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java
new file mode 100644
index 0000000000..91ac42628e
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.refresh;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.test.AbstractStreamableTestCase;
+
+import java.io.IOException;
+
+public class RefreshStatsTests extends AbstractStreamableTestCase<RefreshStats> {
+ @Override
+ protected RefreshStats createTestInstance() {
+ return new RefreshStats(randomNonNegativeLong(), randomNonNegativeLong(), between(0, Integer.MAX_VALUE));
+ }
+
+ @Override
+ protected RefreshStats createBlankInstance() {
+ return new RefreshStats();
+ }
+
+ public void testPre5Dot2() throws IOException {
+ // We can drop the compatibility once the assertion just below this list fails
+ assertTrue(Version.CURRENT.minimumCompatibilityVersion().before(Version.V_5_2_0_UNRELEASED));
+
+ RefreshStats instance = createTestInstance();
+ RefreshStats copied = copyInstance(instance, Version.V_5_1_1_UNRELEASED);
+ assertEquals(instance.getTotal(), copied.getTotal());
+ assertEquals(instance.getTotalTimeInMillis(), copied.getTotalTimeInMillis());
+ assertEquals(0, copied.getListeners());
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
index 358c3006e7..c1e2605ec2 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
@@ -132,7 +132,38 @@ public class RefreshListenersTests extends ESTestCase {
terminate(threadPool);
}
+ public void testBeforeRefresh() throws Exception {
+ assertEquals(0, listeners.pendingCount());
+ Engine.IndexResult index = index("1");
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertFalse(listeners.addOrNotify(index.getTranslogLocation(), listener));
+ assertNull(listener.forcedRefresh.get());
+ assertEquals(1, listeners.pendingCount());
+ engine.refresh("I said so");
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertEquals(0, listeners.pendingCount());
+ }
+
+ public void testAfterRefresh() throws Exception {
+ assertEquals(0, listeners.pendingCount());
+ Engine.IndexResult index = index("1");
+ engine.refresh("I said so");
+ if (randomBoolean()) {
+ index(randomFrom("1" /* same document */, "2" /* different document */));
+ if (randomBoolean()) {
+ engine.refresh("I said so");
+ }
+ }
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertEquals(0, listeners.pendingCount());
+ }
+
public void testTooMany() throws Exception {
+ assertEquals(0, listeners.pendingCount());
assertFalse(listeners.refreshNeeded());
Engine.IndexResult index = index("1");
@@ -149,6 +180,7 @@ public class RefreshListenersTests extends ESTestCase {
for (DummyRefreshListener listener : nonForcedListeners) {
assertNull("Called listener too early!", listener.forcedRefresh.get());
}
+ assertEquals(maxListeners, listeners.pendingCount());
// Add one more listener which should cause a refresh.
DummyRefreshListener forcingListener = new DummyRefreshListener();
@@ -162,22 +194,45 @@ public class RefreshListenersTests extends ESTestCase {
listener.assertNoError();
}
assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
}
- public void testAfterRefresh() throws Exception {
- Engine.IndexResult index = index("1");
+ public void testClose() throws Exception {
+ assertEquals(0, listeners.pendingCount());
+ Engine.IndexResult refreshedOperation = index("1");
engine.refresh("I said so");
- if (randomBoolean()) {
- index(randomFrom("1" /* same document */, "2" /* different document */));
- if (randomBoolean()) {
- engine.refresh("I said so");
- }
+ Engine.IndexResult unrefreshedOperation = index("1");
+ {
+ /* Closing flushed pending listeners as though they were refreshed. Since this can only happen when the index is closed and no
+ * longer useful there doesn't seem much point in sending the listener some kind of "I'm closed now, go away" enum value. */
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertFalse(listeners.addOrNotify(unrefreshedOperation.getTranslogLocation(), listener));
+ assertNull(listener.forcedRefresh.get());
+ listeners.close();
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
+ }
+ {
+ // If you add a listener for an already refreshed location then it'll just fire even if closed
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertTrue(listeners.addOrNotify(refreshedOperation.getTranslogLocation(), listener));
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
+ }
+ {
+ // But adding a listener to a non-refreshed location will fail
+ DummyRefreshListener listener = new DummyRefreshListener();
+ Exception e = expectThrows(IllegalStateException.class, () ->
+ listeners.addOrNotify(unrefreshedOperation.getTranslogLocation(), listener));
+ assertEquals("can't wait for refresh on a closed index", e.getMessage());
+ assertNull(listener.forcedRefresh.get());
+ assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
}
-
- DummyRefreshListener listener = new DummyRefreshListener();
- assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
- assertFalse(listener.forcedRefresh.get());
- listener.assertNoError();
}
/**
@@ -291,13 +346,12 @@ public class RefreshListenersTests extends ESTestCase {
/**
* When the listener is called this captures it's only argument.
*/
- AtomicReference<Boolean> forcedRefresh = new AtomicReference<>();
+ final AtomicReference<Boolean> forcedRefresh = new AtomicReference<>();
private volatile Exception error;
@Override
public void accept(Boolean forcedRefresh) {
try {
- assertNotNull(forcedRefresh);
Boolean oldValue = this.forcedRefresh.getAndSet(forcedRefresh);
assertNull("Listener called twice", oldValue);
} catch (Exception e) {