summaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorLuca Cavanna <javanna@users.noreply.github.com>2017-05-03 11:20:53 +0200
committerGitHub <noreply@github.com>2017-05-03 11:20:53 +0200
commit92bfd16c5824885ca673c5bb99acac9148c18693 (patch)
treee78cded3ebf92d44d5fef49228ea7703f7590a6e /core/src
parentbe19ccef573b4e9a839159157d7e6c2645261897 (diff)
Java api: ActionRequestBuilder#execute to return a PlainActionFuture (#24415)
This change makes the request builder code-path same as `Client#execute`. The request builder used to return a `ListenableActionFuture` when calling execute, which allows to associate listeners with the returned future. For async execution though it is recommended to use the `execute` method that accepts an `ActionListener`, like users would do when using `Client#execute`. Relates to #24412 Relates to #9201
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java23
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java87
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java4
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java34
-rw-r--r--core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java12
-rw-r--r--core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java10
-rw-r--r--core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java15
10 files changed, 89 insertions, 108 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java
index 076d4ae67f..964568fc47 100644
--- a/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java
+++ b/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java
@@ -19,18 +19,16 @@
package org.elasticsearch.action;
-import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.threadpool.ThreadPool;
import java.util.Objects;
-public abstract class ActionRequestBuilder<Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> {
+public abstract class ActionRequestBuilder<Request extends ActionRequest, Response extends ActionResponse,
+ RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> {
protected final Action<Request, Response, RequestBuilder> action;
protected final Request request;
- private final ThreadPool threadPool;
protected final ElasticsearchClient client;
protected ActionRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
@@ -38,18 +36,14 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
this.action = action;
this.request = request;
this.client = client;
- threadPool = client.threadPool();
}
-
public Request request() {
return this.request;
}
- public ListenableActionFuture<Response> execute() {
- PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(threadPool);
- execute(future);
- return future;
+ public ActionFuture<Response> execute() {
+ return client.execute(action, request);
}
/**
@@ -74,13 +68,6 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
}
public void execute(ActionListener<Response> listener) {
- client.execute(action, beforeExecute(request), listener);
- }
-
- /**
- * A callback to additionally process the request before its executed
- */
- protected Request beforeExecute(Request request) {
- return request;
+ client.execute(action, request, listener);
}
}
diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java
index 48df23ea3a..a2cab6b85a 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java
@@ -21,9 +21,9 @@ package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
@@ -90,7 +90,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyCollectionOf;
@@ -466,8 +465,7 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksCancellation() throws Exception {
// Start blocking test task
// Get real client (the plugin is not registered on transport nodes)
- ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
- .execute();
+ ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
logger.info("--> started test tasks");
// Wait for the task to start on all nodes
@@ -488,8 +486,7 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksUnblocking() throws Exception {
// Start blocking test task
- ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
- .execute();
+ ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().size(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
@@ -502,42 +499,45 @@ public class TasksIT extends ESIntegTestCase {
}
public void testListTasksWaitForCompletion() throws Exception {
- waitForCompletionTestCase(randomBoolean(), id -> {
- return client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME)
- .setWaitForCompletion(true).execute();
- }, response -> {
- assertThat(response.getNodeFailures(), empty());
- assertThat(response.getTaskFailures(), empty());
- assertThat(response.getTasks(), hasSize(1));
- TaskInfo task = response.getTasks().get(0);
- assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction());
- });
+ waitForCompletionTestCase(randomBoolean(),
+ id -> client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME)
+ .setWaitForCompletion(true).execute(),
+ response -> {
+ assertThat(response.getNodeFailures(), empty());
+ assertThat(response.getTaskFailures(), empty());
+ assertThat(response.getTasks(), hasSize(1));
+ TaskInfo task = response.getTasks().get(0);
+ assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction());
+ }
+ );
}
public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception {
- waitForCompletionTestCase(false, id -> {
- return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
- }, response -> {
- assertTrue(response.getTask().isCompleted());
- // We didn't store the result so it won't come back when we wait
- assertNull(response.getTask().getResponse());
- // But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete.
- assertNotNull(response.getTask().getTask());
- assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
- });
+ waitForCompletionTestCase(false,
+ id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(),
+ response -> {
+ assertTrue(response.getTask().isCompleted());
+ //We didn't store the result so it won't come back when we wait
+ assertNull(response.getTask().getResponse());
+ //But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete
+ assertNotNull(response.getTask().getTask());
+ assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
+ }
+ );
}
public void testGetTaskWaitForCompletionWithStoringResult() throws Exception {
- waitForCompletionTestCase(true, id -> {
- return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
- }, response -> {
- assertTrue(response.getTask().isCompleted());
- // We stored the task so we should get its results
- assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
- // The task's details should also be there
- assertNotNull(response.getTask().getTask());
- assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
- });
+ waitForCompletionTestCase(true,
+ id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(),
+ response -> {
+ assertTrue(response.getTask().isCompleted());
+ // We stored the task so we should get its results
+ assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
+ // The task's details should also be there
+ assertNotNull(response.getTask().getTask());
+ assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
+ }
+ );
}
/**
@@ -546,13 +546,13 @@ public class TasksIT extends ESIntegTestCase {
* @param wait start waiting for a task. Accepts that id of the task to wait for and returns a future waiting for it.
* @param validator validate the response and return the task ids that were found
*/
- private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator)
+ private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ActionFuture<T>> wait, Consumer<T> validator)
throws Exception {
// Start blocking test task
- ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
+ ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.setShouldStoreResult(storeResult).execute();
- ListenableActionFuture<T> waitResponseFuture;
+ ActionFuture<T> waitResponseFuture;
TaskId taskId;
try {
taskId = waitForTestTaskStartOnAllNodes();
@@ -623,8 +623,7 @@ public class TasksIT extends ESIntegTestCase {
*/
private void waitForTimeoutTestCase(Function<TaskId, ? extends Iterable<? extends Throwable>> wait) throws Exception {
// Start blocking test task
- ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
- .execute();
+ ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
try {
TaskId taskId = waitForTestTaskStartOnAllNodes();
@@ -662,7 +661,7 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksListWaitForNoTask() throws Exception {
// Spin up a request to wait for no matching tasks
- ListenableActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks()
+ ActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(10))
.execute();
@@ -672,12 +671,12 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksGetWaitForNoTask() throws Exception {
// Spin up a request to wait for no matching tasks
- ListenableActionFuture<GetTaskResponse> waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1")
+ ActionFuture<GetTaskResponse> waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1")
.setWaitForCompletion(true).setTimeout(timeValueMillis(10))
.execute();
// It should finish quickly and without complaint
- expectNotFound(() -> waitResponseFuture.get());
+ expectNotFound(waitResponseFuture::get);
}
public void testTasksWaitForAllTask() throws Exception {
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
index 58bf897bd3..6c1f44ea69 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java
@@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.stats;
-import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
@@ -124,7 +124,7 @@ public class IndicesStatsTests extends ESSingleNodeTestCase {
createIndex("test", Settings.builder().put("refresh_interval", -1).build());
// Index a document asynchronously so the request will only return when document is refreshed
- ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test")
+ ActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test")
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute();
// Wait for the refresh listener to appear in the stats
diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java
index 5f52293d7e..f3611663b4 100644
--- a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java
+++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java
@@ -19,7 +19,7 @@
package org.elasticsearch.action.support;
-import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
@@ -140,7 +140,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
.build();
logger.info("--> start the index creation process");
- ListenableActionFuture<CreateIndexResponse> responseListener =
+ ActionFuture<CreateIndexResponse> responseListener =
prepareCreate(indexName)
.setSettings(settings)
.setWaitForActiveShards(ActiveShardCount.ALL)
diff --git a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java
index 43d866a47b..67318b7b21 100644
--- a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java
+++ b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java
@@ -25,6 +25,7 @@ import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
+import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
@@ -32,7 +33,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.index.IndexAction;
-import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -101,22 +101,22 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
// validation in the settings??? - ugly and conceptually wrong)
// choosing arbitrary top level actions to test
- client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<>(GetAction.NAME, client.threadPool()));
- client.prepareSearch().execute().addListener(new AssertingActionListener<>(SearchAction.NAME, client.threadPool()));
- client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool()));
- client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute().addListener(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool()));
- client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute().addListener(new AssertingActionListener<>(IndexAction.NAME, client.threadPool()));
+ client.prepareGet("idx", "type", "id").execute(new AssertingActionListener<>(GetAction.NAME, client.threadPool()));
+ client.prepareSearch().execute(new AssertingActionListener<>(SearchAction.NAME, client.threadPool()));
+ client.prepareDelete("idx", "type", "id").execute(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool()));
+ client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool()));
+ client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool()));
// choosing arbitrary cluster admin actions to test
- client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool()));
- client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool()));
- client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool()));
+ client.admin().cluster().prepareClusterStats().execute(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool()));
+ client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool()));
+ client.admin().cluster().prepareReroute().execute(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool()));
// choosing arbitrary indices admin actions to test
- client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool()));
- client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool()));
- client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool()));
- client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<>(FlushAction.NAME, client.threadPool()));
+ client.admin().indices().prepareCreate("idx").execute(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool()));
+ client.admin().indices().prepareStats().execute(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool()));
+ client.admin().indices().prepareClearCache("idx1", "idx2").execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool()));
+ client.admin().indices().prepareFlush().execute(new AssertingActionListener<>(FlushAction.NAME, client.threadPool()));
}
public void testOverrideHeader() throws Exception {
@@ -126,13 +126,13 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
expected.put("key2", "val 2");
client.threadPool().getThreadContext().putHeader("key1", key1Val);
client.prepareGet("idx", "type", "id")
- .execute().addListener(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool()));
+ .execute(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool()));
client.admin().cluster().prepareClusterStats()
- .execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool()));
+ .execute(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool()));
client.admin().indices().prepareCreate("idx")
- .execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool()));
+ .execute(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool()));
}
protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) {
@@ -205,7 +205,5 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
}
return result;
}
-
}
-
}
diff --git a/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java b/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java
index ca089e6eb8..cad590b3c8 100644
--- a/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java
+++ b/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java
@@ -19,8 +19,8 @@
package org.elasticsearch.index;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@@ -149,7 +149,7 @@ public class WaitUntilRefreshIT extends ESIntegTestCase {
*/
public void testNoRefreshInterval() throws InterruptedException, ExecutionException {
client().admin().indices().prepareUpdateSettings("test").setSettings(singletonMap("index.refresh_interval", -1)).get();
- ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "index", "1").setSource("foo", "bar")
+ ActionFuture<IndexResponse> index = client().prepareIndex("test", "index", "1").setSource("foo", "bar")
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute();
while (false == index.isDone()) {
client().admin().indices().prepareRefresh("test").get();
diff --git a/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java
index 7366834391..02607f0c1f 100644
--- a/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java
+++ b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java
@@ -19,7 +19,7 @@
package org.elasticsearch.search;
-import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -128,7 +128,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
assertThat(cancelTasksResponse.getTasks().get(0).getTaskId(), equalTo(searchTask.getTaskId()));
}
- private SearchResponse ensureSearchWasCancelled(ListenableActionFuture<SearchResponse> searchResponse) {
+ private SearchResponse ensureSearchWasCancelled(ActionFuture<SearchResponse> searchResponse) {
try {
SearchResponse response = searchResponse.actionGet();
logger.info("Search response {}", response);
@@ -146,7 +146,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
indexTestData();
logger.info("Executing search");
- ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery(
+ ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery(
scriptQuery(new Script(
ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap())))
.execute();
@@ -164,7 +164,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
indexTestData();
logger.info("Executing search");
- ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
+ ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.addScriptField("test_field",
new Script(ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap())
).execute();
@@ -182,7 +182,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
indexTestData();
logger.info("Executing search");
- ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
+ ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setScroll(TimeValue.timeValueSeconds(10))
.setSize(5)
.setQuery(
@@ -230,7 +230,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
String scrollId = searchResponse.getScrollId();
logger.info("Executing scroll with id {}", scrollId);
- ListenableActionFuture<SearchResponse> scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId())
+ ActionFuture<SearchResponse> scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(keepAlive).execute();
awaitForBlock(plugins);
diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
index 2503d6e157..6c34cdf2e4 100644
--- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
+++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
@@ -21,7 +21,7 @@ package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
-import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
@@ -412,7 +412,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> execution was blocked on node [{}], aborting snapshot", blockedNode);
- ListenableActionFuture<DeleteSnapshotResponse> deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute();
+ ActionFuture<DeleteSnapshotResponse> deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute();
// Make sure that abort makes some progress
Thread.sleep(100);
unblockNode("test-repo", blockedNode);
diff --git a/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java
index 680bfea6b3..29657c5fb8 100644
--- a/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java
+++ b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java
@@ -19,7 +19,7 @@
package org.elasticsearch.snapshots;
-import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@@ -83,7 +83,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of first snapshot");
- ListenableActionFuture<DeleteSnapshotResponse> future =
+ ActionFuture<DeleteSnapshotResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
@@ -129,8 +129,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
- ListenableActionFuture<DeleteSnapshotResponse> future =
- client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
+ ActionFuture<DeleteSnapshotResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
@@ -185,8 +184,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
- ListenableActionFuture<DeleteSnapshotResponse> future =
- client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
+ ActionFuture<DeleteSnapshotResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
index 11194a689d..67911f3e5b 100644
--- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
+++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
@@ -22,7 +22,7 @@ package org.elasticsearch.snapshots;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
-import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
@@ -56,7 +56,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@@ -155,7 +154,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertHitCount(client.prepareSearch("test-idx-2").setSize(0).get(), 100L);
assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 100L);
- ListenableActionFuture<FlushResponse> flushResponseFuture = null;
+ ActionFuture<FlushResponse> flushResponseFuture = null;
if (randomBoolean()) {
ArrayList<String> indicesToFlush = new ArrayList<>();
for (int i = 1; i < 4; i++) {
@@ -888,7 +887,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
logger.info("--> restore index after deletion");
- ListenableActionFuture<RestoreSnapshotResponse> restoreSnapshotResponseFuture =
+ ActionFuture<RestoreSnapshotResponse> restoreSnapshotResponseFuture =
client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
logger.info("--> wait for the index to appear");
@@ -2014,7 +2013,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
logger.info("--> snapshot allow partial {}", allowPartial);
- ListenableActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
+ ActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute();
logger.info("--> wait for block to kick in");
if (initBlocking) {
@@ -2109,7 +2108,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
blockAllDataNodes("test-repo");
logger.info("--> execution will be blocked on all data nodes");
- final ListenableActionFuture<RestoreSnapshotResponse> restoreFut;
+ final ActionFuture<RestoreSnapshotResponse> restoreFut;
try {
logger.info("--> start restore");
restoreFut = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
@@ -2174,7 +2173,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> execution will be blocked on all data nodes");
blockAllDataNodes(repoName);
- final ListenableActionFuture<RestoreSnapshotResponse> restoreFut;
+ final ActionFuture<RestoreSnapshotResponse> restoreFut;
try {
logger.info("--> start restore");
restoreFut = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
@@ -2461,7 +2460,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// take initial snapshot with a block, making sure we only get 1 in-progress snapshot returned
// block a node so the create snapshot operation can remain in progress
final String initialBlockedNode = blockNodeWithIndex(repositoryName, indexName);
- ListenableActionFuture<CreateSnapshotResponse> responseListener =
+ ActionFuture<CreateSnapshotResponse> responseListener =
client.admin().cluster().prepareCreateSnapshot(repositoryName, "snap-on-empty-repo")
.setWaitForCompletion(false)
.setIndices(indexName)