diff options
author | Luca Cavanna <javanna@users.noreply.github.com> | 2017-05-03 11:20:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-03 11:20:53 +0200 |
commit | 92bfd16c5824885ca673c5bb99acac9148c18693 (patch) | |
tree | e78cded3ebf92d44d5fef49228ea7703f7590a6e /core/src | |
parent | be19ccef573b4e9a839159157d7e6c2645261897 (diff) |
Java api: ActionRequestBuilder#execute to return a PlainActionFuture (#24415)
This change makes the request builder code-path same as `Client#execute`. The request builder used to return a `ListenableActionFuture` when calling execute, which allows to associate listeners with the returned future. For async execution though it is recommended to use the `execute` method that accepts an `ActionListener`, like users would do when using `Client#execute`.
Relates to #24412
Relates to #9201
Diffstat (limited to 'core/src')
10 files changed, 89 insertions, 108 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java index 076d4ae67f..964568fc47 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java @@ -19,18 +19,16 @@ package org.elasticsearch.action; -import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Objects; -public abstract class ActionRequestBuilder<Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> { +public abstract class ActionRequestBuilder<Request extends ActionRequest, Response extends ActionResponse, + RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> { protected final Action<Request, Response, RequestBuilder> action; protected final Request request; - private final ThreadPool threadPool; protected final ElasticsearchClient client; protected ActionRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) { @@ -38,18 +36,14 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon this.action = action; this.request = request; this.client = client; - threadPool = client.threadPool(); } - public Request request() { return this.request; } - public ListenableActionFuture<Response> execute() { - PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(threadPool); - execute(future); - return future; + public ActionFuture<Response> execute() { + return client.execute(action, request); } /** @@ -74,13 +68,6 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon } public void execute(ActionListener<Response> listener) { - client.execute(action, beforeExecute(request), listener); - } - - /** - * A callback to additionally process the request before its executed - */ - protected Request beforeExecute(Request request) { - return request; + client.execute(action, request, listener); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 48df23ea3a..a2cab6b85a 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -21,9 +21,9 @@ package org.elasticsearch.action.admin.cluster.node.tasks; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; @@ -90,7 +90,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyCollectionOf; @@ -466,8 +465,7 @@ public class TasksIT extends ESIntegTestCase { public void testTasksCancellation() throws Exception { // Start blocking test task // Get real client (the plugin is not registered on transport nodes) - ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) - .execute(); + ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute(); logger.info("--> started test tasks"); // Wait for the task to start on all nodes @@ -488,8 +486,7 @@ public class TasksIT extends ESIntegTestCase { public void testTasksUnblocking() throws Exception { // Start blocking test task - ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) - .execute(); + ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute(); // Wait for the task to start on all nodes assertBusy(() -> assertEquals(internalCluster().size(), client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); @@ -502,42 +499,45 @@ public class TasksIT extends ESIntegTestCase { } public void testListTasksWaitForCompletion() throws Exception { - waitForCompletionTestCase(randomBoolean(), id -> { - return client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME) - .setWaitForCompletion(true).execute(); - }, response -> { - assertThat(response.getNodeFailures(), empty()); - assertThat(response.getTaskFailures(), empty()); - assertThat(response.getTasks(), hasSize(1)); - TaskInfo task = response.getTasks().get(0); - assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction()); - }); + waitForCompletionTestCase(randomBoolean(), + id -> client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME) + .setWaitForCompletion(true).execute(), + response -> { + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getTasks(), hasSize(1)); + TaskInfo task = response.getTasks().get(0); + assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction()); + } + ); } public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception { - waitForCompletionTestCase(false, id -> { - return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(); - }, response -> { - assertTrue(response.getTask().isCompleted()); - // We didn't store the result so it won't come back when we wait - assertNull(response.getTask().getResponse()); - // But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete. - assertNotNull(response.getTask().getTask()); - assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); - }); + waitForCompletionTestCase(false, + id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(), + response -> { + assertTrue(response.getTask().isCompleted()); + //We didn't store the result so it won't come back when we wait + assertNull(response.getTask().getResponse()); + //But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete + assertNotNull(response.getTask().getTask()); + assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); + } + ); } public void testGetTaskWaitForCompletionWithStoringResult() throws Exception { - waitForCompletionTestCase(true, id -> { - return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(); - }, response -> { - assertTrue(response.getTask().isCompleted()); - // We stored the task so we should get its results - assertEquals(0, response.getTask().getResponseAsMap().get("failure_count")); - // The task's details should also be there - assertNotNull(response.getTask().getTask()); - assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); - }); + waitForCompletionTestCase(true, + id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(), + response -> { + assertTrue(response.getTask().isCompleted()); + // We stored the task so we should get its results + assertEquals(0, response.getTask().getResponseAsMap().get("failure_count")); + // The task's details should also be there + assertNotNull(response.getTask().getTask()); + assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); + } + ); } /** @@ -546,13 +546,13 @@ public class TasksIT extends ESIntegTestCase { * @param wait start waiting for a task. Accepts that id of the task to wait for and returns a future waiting for it. * @param validator validate the response and return the task ids that were found */ - private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator) + private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ActionFuture<T>> wait, Consumer<T> validator) throws Exception { // Start blocking test task - ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) + ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) .setShouldStoreResult(storeResult).execute(); - ListenableActionFuture<T> waitResponseFuture; + ActionFuture<T> waitResponseFuture; TaskId taskId; try { taskId = waitForTestTaskStartOnAllNodes(); @@ -623,8 +623,7 @@ public class TasksIT extends ESIntegTestCase { */ private void waitForTimeoutTestCase(Function<TaskId, ? extends Iterable<? extends Throwable>> wait) throws Exception { // Start blocking test task - ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) - .execute(); + ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute(); try { TaskId taskId = waitForTestTaskStartOnAllNodes(); @@ -662,7 +661,7 @@ public class TasksIT extends ESIntegTestCase { public void testTasksListWaitForNoTask() throws Exception { // Spin up a request to wait for no matching tasks - ListenableActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks() + ActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks() .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(10)) .execute(); @@ -672,12 +671,12 @@ public class TasksIT extends ESIntegTestCase { public void testTasksGetWaitForNoTask() throws Exception { // Spin up a request to wait for no matching tasks - ListenableActionFuture<GetTaskResponse> waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1") + ActionFuture<GetTaskResponse> waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1") .setWaitForCompletion(true).setTimeout(timeValueMillis(10)) .execute(); // It should finish quickly and without complaint - expectNotFound(() -> waitResponseFuture.get()); + expectNotFound(waitResponseFuture::get); } public void testTasksWaitForAllTask() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java index 58bf897bd3..6c1f44ea69 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.stats; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; @@ -124,7 +124,7 @@ public class IndicesStatsTests extends ESSingleNodeTestCase { createIndex("test", Settings.builder().put("refresh_interval", -1).build()); // Index a document asynchronously so the request will only return when document is refreshed - ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test") + ActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test") .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute(); // Wait for the refresh listener to appear in the stats diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java index 5f52293d7e..f3611663b4 100644 --- a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.support; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; @@ -140,7 +140,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase { .build(); logger.info("--> start the index creation process"); - ListenableActionFuture<CreateIndexResponse> responseListener = + ActionFuture<CreateIndexResponse> responseListener = prepareCreate(indexName) .setSettings(settings) .setWaitForActiveShards(ActiveShardCount.ALL) diff --git a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java index 43d866a47b..67318b7b21 100644 --- a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java +++ b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; +import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction; import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.flush.FlushAction; @@ -32,7 +33,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.index.IndexAction; -import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -101,22 +101,22 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { // validation in the settings??? - ugly and conceptually wrong) // choosing arbitrary top level actions to test - client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<>(GetAction.NAME, client.threadPool())); - client.prepareSearch().execute().addListener(new AssertingActionListener<>(SearchAction.NAME, client.threadPool())); - client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool())); - client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute().addListener(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool())); - client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute().addListener(new AssertingActionListener<>(IndexAction.NAME, client.threadPool())); + client.prepareGet("idx", "type", "id").execute(new AssertingActionListener<>(GetAction.NAME, client.threadPool())); + client.prepareSearch().execute(new AssertingActionListener<>(SearchAction.NAME, client.threadPool())); + client.prepareDelete("idx", "type", "id").execute(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool())); + client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool())); + client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool())); // choosing arbitrary cluster admin actions to test - client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool())); - client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool())); - client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool())); + client.admin().cluster().prepareClusterStats().execute(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool())); + client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool())); + client.admin().cluster().prepareReroute().execute(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool())); // choosing arbitrary indices admin actions to test - client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool())); - client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool())); - client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool())); - client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<>(FlushAction.NAME, client.threadPool())); + client.admin().indices().prepareCreate("idx").execute(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool())); + client.admin().indices().prepareStats().execute(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool())); + client.admin().indices().prepareClearCache("idx1", "idx2").execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool())); + client.admin().indices().prepareFlush().execute(new AssertingActionListener<>(FlushAction.NAME, client.threadPool())); } public void testOverrideHeader() throws Exception { @@ -126,13 +126,13 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { expected.put("key2", "val 2"); client.threadPool().getThreadContext().putHeader("key1", key1Val); client.prepareGet("idx", "type", "id") - .execute().addListener(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool())); + .execute(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool())); client.admin().cluster().prepareClusterStats() - .execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool())); + .execute(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool())); client.admin().indices().prepareCreate("idx") - .execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool())); + .execute(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool())); } protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) { @@ -205,7 +205,5 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { } return result; } - } - } diff --git a/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java b/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java index ca089e6eb8..cad590b3c8 100644 --- a/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java +++ b/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java @@ -19,8 +19,8 @@ package org.elasticsearch.index; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -149,7 +149,7 @@ public class WaitUntilRefreshIT extends ESIntegTestCase { */ public void testNoRefreshInterval() throws InterruptedException, ExecutionException { client().admin().indices().prepareUpdateSettings("test").setSettings(singletonMap("index.refresh_interval", -1)).get(); - ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "index", "1").setSource("foo", "bar") + ActionFuture<IndexResponse> index = client().prepareIndex("test", "index", "1").setSource("foo", "bar") .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute(); while (false == index.isDone()) { client().admin().indices().prepareRefresh("test").get(); diff --git a/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java index 7366834391..02607f0c1f 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.search; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -128,7 +128,7 @@ public class SearchCancellationIT extends ESIntegTestCase { assertThat(cancelTasksResponse.getTasks().get(0).getTaskId(), equalTo(searchTask.getTaskId())); } - private SearchResponse ensureSearchWasCancelled(ListenableActionFuture<SearchResponse> searchResponse) { + private SearchResponse ensureSearchWasCancelled(ActionFuture<SearchResponse> searchResponse) { try { SearchResponse response = searchResponse.actionGet(); logger.info("Search response {}", response); @@ -146,7 +146,7 @@ public class SearchCancellationIT extends ESIntegTestCase { indexTestData(); logger.info("Executing search"); - ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery( + ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery( scriptQuery(new Script( ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap()))) .execute(); @@ -164,7 +164,7 @@ public class SearchCancellationIT extends ESIntegTestCase { indexTestData(); logger.info("Executing search"); - ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test") + ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test") .addScriptField("test_field", new Script(ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap()) ).execute(); @@ -182,7 +182,7 @@ public class SearchCancellationIT extends ESIntegTestCase { indexTestData(); logger.info("Executing search"); - ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test") + ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test") .setScroll(TimeValue.timeValueSeconds(10)) .setSize(5) .setQuery( @@ -230,7 +230,7 @@ public class SearchCancellationIT extends ESIntegTestCase { String scrollId = searchResponse.getScrollId(); logger.info("Executing scroll with id {}", scrollId); - ListenableActionFuture<SearchResponse> scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId()) + ActionFuture<SearchResponse> scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId()) .setScroll(keepAlive).execute(); awaitForBlock(plugins); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 2503d6e157..6c34cdf2e4 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -21,7 +21,7 @@ package org.elasticsearch.snapshots; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; @@ -412,7 +412,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest logger.info("--> execution was blocked on node [{}], aborting snapshot", blockedNode); - ListenableActionFuture<DeleteSnapshotResponse> deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute(); + ActionFuture<DeleteSnapshotResponse> deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute(); // Make sure that abort makes some progress Thread.sleep(100); unblockNode("test-repo", blockedNode); diff --git a/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java index 680bfea6b3..29657c5fb8 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.snapshots; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -83,7 +83,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { String blockedNode = internalCluster().getMasterName(); ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); logger.info("--> start deletion of first snapshot"); - ListenableActionFuture<DeleteSnapshotResponse> future = + ActionFuture<DeleteSnapshotResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); @@ -129,8 +129,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { String blockedNode = internalCluster().getMasterName(); ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); logger.info("--> start deletion of snapshot"); - ListenableActionFuture<DeleteSnapshotResponse> future = - client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute(); + ActionFuture<DeleteSnapshotResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); @@ -185,8 +184,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { String blockedNode = internalCluster().getMasterName(); ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); logger.info("--> start deletion of snapshot"); - ListenableActionFuture<DeleteSnapshotResponse> future = - client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); + ActionFuture<DeleteSnapshotResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 11194a689d..67911f3e5b 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -22,7 +22,7 @@ package org.elasticsearch.snapshots; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; @@ -56,7 +56,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -155,7 +154,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertHitCount(client.prepareSearch("test-idx-2").setSize(0).get(), 100L); assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 100L); - ListenableActionFuture<FlushResponse> flushResponseFuture = null; + ActionFuture<FlushResponse> flushResponseFuture = null; if (randomBoolean()) { ArrayList<String> indicesToFlush = new ArrayList<>(); for (int i = 1; i < 4; i++) { @@ -888,7 +887,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> delete index"); cluster().wipeIndices("test-idx"); logger.info("--> restore index after deletion"); - ListenableActionFuture<RestoreSnapshotResponse> restoreSnapshotResponseFuture = + ActionFuture<RestoreSnapshotResponse> restoreSnapshotResponseFuture = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute(); logger.info("--> wait for the index to appear"); @@ -2014,7 +2013,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(100L)); logger.info("--> snapshot allow partial {}", allowPartial); - ListenableActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + ActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute(); logger.info("--> wait for block to kick in"); if (initBlocking) { @@ -2109,7 +2108,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas blockAllDataNodes("test-repo"); logger.info("--> execution will be blocked on all data nodes"); - final ListenableActionFuture<RestoreSnapshotResponse> restoreFut; + final ActionFuture<RestoreSnapshotResponse> restoreFut; try { logger.info("--> start restore"); restoreFut = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") @@ -2174,7 +2173,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> execution will be blocked on all data nodes"); blockAllDataNodes(repoName); - final ListenableActionFuture<RestoreSnapshotResponse> restoreFut; + final ActionFuture<RestoreSnapshotResponse> restoreFut; try { logger.info("--> start restore"); restoreFut = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) @@ -2461,7 +2460,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas // take initial snapshot with a block, making sure we only get 1 in-progress snapshot returned // block a node so the create snapshot operation can remain in progress final String initialBlockedNode = blockNodeWithIndex(repositoryName, indexName); - ListenableActionFuture<CreateSnapshotResponse> responseListener = + ActionFuture<CreateSnapshotResponse> responseListener = client.admin().cluster().prepareCreateSnapshot(repositoryName, "snap-on-empty-repo") .setWaitForCompletion(false) .setIndices(indexName) |