summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch
diff options
context:
space:
mode:
authorNik Everett <nik9000@gmail.com>2016-12-23 15:51:09 -0500
committerNik Everett <nik9000@gmail.com>2017-01-06 20:03:32 -0500
commit12923ef8961f40abded8341f00881a163f7c68a9 (patch)
treef575d28d2d4c98dd4528aaf339514d7a7d3c7210 /core/src/test/java/org/elasticsearch
parentb0c009ae769877594ada8a2b05267463caebf274 (diff)
Close and flush refresh listeners on shard close
Right now closing a shard looks like it strands refresh listeners, causing tests like `delete/50_refresh/refresh=wait_for waits until changes are visible in search` to fail. Here is a build that fails: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+multi_cluster_search+multijob-darwin-compatibility/4/console This attempts to fix the problem by implements `Closeable` on `RefreshListeners` and rejecting listeners when closed. More importantly the act of closing the instance flushes all pending listeners so we shouldn't have any stranded listeners on close. Because it was needed for testing, this also adds the number of pending listeners to the `CommonStats` object and all API to which that flows: `_cat/nodes`, `_cat/indices`, `_cat/shards`, and `_nodes/stats`.
Diffstat (limited to 'core/src/test/java/org/elasticsearch')
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java30
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java48
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java82
4 files changed, 147 insertions, 15 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
index dfc10169e7..859e9f6e25 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
@@ -19,7 +19,11 @@
package org.elasticsearch.action.admin.indices.stats;
+import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.CommitStats;
@@ -111,6 +115,32 @@ public class IndicesStatsTests extends ESSingleNodeTestCase {
}
}
+ public void testRefreshListeners() throws Exception {
+ // Create an index without automatic refreshes
+ createIndex("test", Settings.builder().put("refresh_interval", -1).build());
+
+ // Index a document asynchronously so the request will only return when document is refreshed
+ ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test")
+ .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute();
+
+ // Wait for the refresh listener to appear in the stats
+ assertBusy(() -> {
+ IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().setRefresh(true).get();
+ CommonStats common = stats.getIndices().get("test").getTotal();
+ assertEquals(1, common.refresh.getListeners());
+ });
+
+ // Refresh the index and wait for the request to come back
+ client().admin().indices().prepareRefresh("test").get();
+ index.get();
+
+ // The document should appear in the statistics and the refresh listener should be gone
+ IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().setRefresh(true).setDocs(true).get();
+ CommonStats common = stats.getIndices().get("test").getTotal();
+ assertEquals(1, common.docs.getCount());
+ assertEquals(0, common.refresh.getListeners());
+ }
+
/**
* Gives access to package private IndicesStatsResponse constructor for test purpose.
**/
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
index cd71418f0e..e27d3a5b04 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
@@ -116,7 +116,7 @@ public class TransportWriteActionTests extends ESTestCase {
Result result = action.apply(new TestAction(), request, indexShard);
CapturingActionListener<Response> listener = new CapturingActionListener<>();
responder.accept(result, listener);
- assertNull(listener.response); // Haven't reallresponded yet
+ assertNull(listener.response); // Haven't responded yet
@SuppressWarnings({ "unchecked", "rawtypes" })
ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
diff --git a/core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java b/core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java
new file mode 100644
index 0000000000..91ac42628e
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.refresh;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.test.AbstractStreamableTestCase;
+
+import java.io.IOException;
+
+public class RefreshStatsTests extends AbstractStreamableTestCase<RefreshStats> {
+ @Override
+ protected RefreshStats createTestInstance() {
+ return new RefreshStats(randomNonNegativeLong(), randomNonNegativeLong(), between(0, Integer.MAX_VALUE));
+ }
+
+ @Override
+ protected RefreshStats createBlankInstance() {
+ return new RefreshStats();
+ }
+
+ public void testPre5Dot2() throws IOException {
+ // We can drop the compatibility once the assertion just below this list fails
+ assertTrue(Version.CURRENT.minimumCompatibilityVersion().before(Version.V_5_2_0_UNRELEASED));
+
+ RefreshStats instance = createTestInstance();
+ RefreshStats copied = copyInstance(instance, Version.V_5_1_1_UNRELEASED);
+ assertEquals(instance.getTotal(), copied.getTotal());
+ assertEquals(instance.getTotalTimeInMillis(), copied.getTotalTimeInMillis());
+ assertEquals(0, copied.getListeners());
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
index 358c3006e7..c1e2605ec2 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
@@ -132,7 +132,38 @@ public class RefreshListenersTests extends ESTestCase {
terminate(threadPool);
}
+ public void testBeforeRefresh() throws Exception {
+ assertEquals(0, listeners.pendingCount());
+ Engine.IndexResult index = index("1");
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertFalse(listeners.addOrNotify(index.getTranslogLocation(), listener));
+ assertNull(listener.forcedRefresh.get());
+ assertEquals(1, listeners.pendingCount());
+ engine.refresh("I said so");
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertEquals(0, listeners.pendingCount());
+ }
+
+ public void testAfterRefresh() throws Exception {
+ assertEquals(0, listeners.pendingCount());
+ Engine.IndexResult index = index("1");
+ engine.refresh("I said so");
+ if (randomBoolean()) {
+ index(randomFrom("1" /* same document */, "2" /* different document */));
+ if (randomBoolean()) {
+ engine.refresh("I said so");
+ }
+ }
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertEquals(0, listeners.pendingCount());
+ }
+
public void testTooMany() throws Exception {
+ assertEquals(0, listeners.pendingCount());
assertFalse(listeners.refreshNeeded());
Engine.IndexResult index = index("1");
@@ -149,6 +180,7 @@ public class RefreshListenersTests extends ESTestCase {
for (DummyRefreshListener listener : nonForcedListeners) {
assertNull("Called listener too early!", listener.forcedRefresh.get());
}
+ assertEquals(maxListeners, listeners.pendingCount());
// Add one more listener which should cause a refresh.
DummyRefreshListener forcingListener = new DummyRefreshListener();
@@ -162,22 +194,45 @@ public class RefreshListenersTests extends ESTestCase {
listener.assertNoError();
}
assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
}
- public void testAfterRefresh() throws Exception {
- Engine.IndexResult index = index("1");
+ public void testClose() throws Exception {
+ assertEquals(0, listeners.pendingCount());
+ Engine.IndexResult refreshedOperation = index("1");
engine.refresh("I said so");
- if (randomBoolean()) {
- index(randomFrom("1" /* same document */, "2" /* different document */));
- if (randomBoolean()) {
- engine.refresh("I said so");
- }
+ Engine.IndexResult unrefreshedOperation = index("1");
+ {
+ /* Closing flushed pending listeners as though they were refreshed. Since this can only happen when the index is closed and no
+ * longer useful there doesn't seem much point in sending the listener some kind of "I'm closed now, go away" enum value. */
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertFalse(listeners.addOrNotify(unrefreshedOperation.getTranslogLocation(), listener));
+ assertNull(listener.forcedRefresh.get());
+ listeners.close();
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
+ }
+ {
+ // If you add a listener for an already refreshed location then it'll just fire even if closed
+ DummyRefreshListener listener = new DummyRefreshListener();
+ assertTrue(listeners.addOrNotify(refreshedOperation.getTranslogLocation(), listener));
+ assertFalse(listener.forcedRefresh.get());
+ listener.assertNoError();
+ assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
+ }
+ {
+ // But adding a listener to a non-refreshed location will fail
+ DummyRefreshListener listener = new DummyRefreshListener();
+ Exception e = expectThrows(IllegalStateException.class, () ->
+ listeners.addOrNotify(unrefreshedOperation.getTranslogLocation(), listener));
+ assertEquals("can't wait for refresh on a closed index", e.getMessage());
+ assertNull(listener.forcedRefresh.get());
+ assertFalse(listeners.refreshNeeded());
+ assertEquals(0, listeners.pendingCount());
}
-
- DummyRefreshListener listener = new DummyRefreshListener();
- assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
- assertFalse(listener.forcedRefresh.get());
- listener.assertNoError();
}
/**
@@ -291,13 +346,12 @@ public class RefreshListenersTests extends ESTestCase {
/**
* When the listener is called this captures it's only argument.
*/
- AtomicReference<Boolean> forcedRefresh = new AtomicReference<>();
+ final AtomicReference<Boolean> forcedRefresh = new AtomicReference<>();
private volatile Exception error;
@Override
public void accept(Boolean forcedRefresh) {
try {
- assertNotNull(forcedRefresh);
Boolean oldValue = this.forcedRefresh.getAndSet(forcedRefresh);
assertNull("Listener called twice", oldValue);
} catch (Exception e) {