summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java11
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java7
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java22
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java5
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java10
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java11
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java262
-rw-r--r--core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java102
-rw-r--r--core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java20
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java10
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java8
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java3
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java39
-rw-r--r--core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java9
-rw-r--r--core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java22
-rw-r--r--core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java9
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/RelocationIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java2
-rw-r--r--plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java4
-rw-r--r--plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java2
-rw-r--r--plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java3
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java232
34 files changed, 310 insertions, 525 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java
index 102e16691d..33f683b458 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java
@@ -45,7 +45,7 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
@TestLogging("_root:DEBUG")
public void testDelayShards() throws Exception {
logger.info("--> starting 3 nodes");
- internalCluster().startNodesAsync(3).get();
+ internalCluster().startNodes(3);
// Wait for all 3 nodes to be up
logger.info("--> waiting for 3 nodes to be up");
diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java
index aa73eafb49..26a882f004 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java
@@ -162,7 +162,7 @@ public class ClusterStatsIT extends ESIntegTestCase {
}
public void testValuesSmokeScreen() throws IOException, ExecutionException, InterruptedException {
- internalCluster().startNodesAsync(randomIntBetween(1, 3)).get();
+ internalCluster().startNodes(randomIntBetween(1, 3));
index("test1", "type", "1", "f", "f");
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
@@ -202,7 +202,7 @@ public class ClusterStatsIT extends ESIntegTestCase {
public void testAllocatedProcessors() throws Exception {
// start one node with 7 processors.
- internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build()).get();
+ internalCluster().startNode(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build());
waitForNodes(1);
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
index 7c764ed172..b35aac5f95 100644
--- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
+++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
@@ -75,7 +75,7 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase {
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)
.build();
- internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get();
+ internalCluster().startMasterOnlyNodes(3, sharedSettings);
String dataNode = internalCluster().startDataOnlyNode(sharedSettings);
diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java
index 1fd71c7ae5..307b47edac 100644
--- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java
+++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java
@@ -61,7 +61,6 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
-import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.OldIndexUtils;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
@@ -129,24 +128,23 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
}
void setupCluster() throws Exception {
- InternalTestCluster.Async<List<String>> replicas = internalCluster().startNodesAsync(1); // for replicas
+ List<String> replicas = internalCluster().startNodes(1); // for replicas
Path baseTempDir = createTempDir();
// start single data path node
Settings.Builder nodeSettings = Settings.builder()
.put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("single-path").toAbsolutePath())
.put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master
- InternalTestCluster.Async<String> singleDataPathNode = internalCluster().startNodeAsync(nodeSettings.build());
+ singleDataPathNodeName = internalCluster().startNode(nodeSettings);
// start multi data path node
nodeSettings = Settings.builder()
.put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("multi-path1").toAbsolutePath() + "," + baseTempDir
.resolve("multi-path2").toAbsolutePath())
.put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master
- InternalTestCluster.Async<String> multiDataPathNode = internalCluster().startNodeAsync(nodeSettings.build());
+ multiDataPathNodeName = internalCluster().startNode(nodeSettings);
// find single data path dir
- singleDataPathNodeName = singleDataPathNode.get();
Path[] nodePaths = internalCluster().getInstance(NodeEnvironment.class, singleDataPathNodeName).nodeDataPaths();
assertEquals(1, nodePaths.length);
singleDataPath = nodePaths[0].resolve(NodeEnvironment.INDICES_FOLDER);
@@ -155,7 +153,6 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
logger.info("--> Single data path: {}", singleDataPath);
// find multi data path dirs
- multiDataPathNodeName = multiDataPathNode.get();
nodePaths = internalCluster().getInstance(NodeEnvironment.class, multiDataPathNodeName).nodeDataPaths();
assertEquals(2, nodePaths.length);
multiDataPath = new Path[]{nodePaths[0].resolve(NodeEnvironment.INDICES_FOLDER),
@@ -165,8 +162,6 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
Files.createDirectories(multiDataPath[0]);
Files.createDirectories(multiDataPath[1]);
logger.info("--> Multi data paths: {}, {}", multiDataPath[0], multiDataPath[1]);
-
- replicas.get(); // wait for replicas
}
void upgradeIndexFolder() throws Exception {
diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java
index d7a62ca7f1..3d3e87db08 100644
--- a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java
@@ -126,7 +126,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
}
public void testClusterInfoServiceCollectsInformation() throws Exception {
- internalCluster().startNodesAsync(2).get();
+ internalCluster().startNodes(2);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0)
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE).build()));
@@ -174,10 +174,9 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
}
public void testClusterInfoServiceInformationClearOnError() throws InterruptedException, ExecutionException {
- internalCluster().startNodesAsync(2,
+ internalCluster().startNodes(2,
// manually control publishing
- Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build())
- .get();
+ Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build());
prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get();
ensureGreen("test");
InternalTestCluster internalTestCluster = internalCluster();
diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
index 257b80663a..8f76581ae3 100644
--- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
@@ -24,7 +24,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
@@ -202,22 +201,19 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
.build();
logger.info("--> start first 2 nodes");
- internalCluster().startNodesAsync(2, settings).get();
+ internalCluster().startNodes(2, settings);
ClusterState state;
- assertBusy(new Runnable() {
- @Override
- public void run() {
- for (Client client : clients()) {
- ClusterState state = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
- assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
- }
+ assertBusy(() -> {
+ for (Client client : clients()) {
+ ClusterState state1 = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
+ assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
}
});
logger.info("--> start two more nodes");
- internalCluster().startNodesAsync(2, settings).get();
+ internalCluster().startNodes(2, settings);
ensureGreen();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet();
@@ -252,7 +248,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
assertNoMasterBlockOnAllNodes();
logger.info("--> start back the 2 nodes ");
- String[] newNodes = internalCluster().startNodesAsync(2, settings).get().toArray(Strings.EMPTY_ARRAY);
+ String[] newNodes = internalCluster().startNodes(2, settings).stream().toArray(String[]::new);
ensureGreen();
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
@@ -338,7 +334,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
logger.info("--> starting [{}] nodes. min_master_nodes set to [{}]", nodeCount, initialMinMasterNodes);
- internalCluster().startNodesAsync(nodeCount, settings.build()).get();
+ internalCluster().startNodes(nodeCount, settings.build());
logger.info("--> waiting for nodes to join");
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodeCount)).get().isTimedOut());
@@ -371,7 +367,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "100ms") // speed things up
.build();
- internalCluster().startNodesAsync(3, settings).get();
+ internalCluster().startNodes(3, settings);
ensureGreen(); // ensure cluster state is recovered before we disrupt things
final String master = internalCluster().getMasterName();
diff --git a/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java b/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java
index 9613128a00..a21f61ce8a 100644
--- a/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java
@@ -27,17 +27,16 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
-
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
public class UpdateSettingsValidationIT extends ESIntegTestCase {
public void testUpdateSettingsValidation() throws Exception {
- internalCluster().startNodesAsync(
+ internalCluster().startNodes(
Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build(),
Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build(),
Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build()
- ).get();
+ );
createIndex("test");
NumShards test = getNumShards("test");
diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
index 19657b0548..b814716cb4 100644
--- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
@@ -57,7 +57,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
logger.info("--> starting 2 nodes on the same rack");
- internalCluster().startNodesAsync(2, Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_1").build()).get();
+ internalCluster().startNodes(2, Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_1").build());
createIndex("test1");
createIndex("test2");
@@ -107,12 +107,12 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
.build();
logger.info("--> starting 4 nodes on different zones");
- List<String> nodes = internalCluster().startNodesAsync(
+ List<String> nodes = internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
- ).get();
+ );
String A_0 = nodes.get(0);
String B_0 = nodes.get(1);
String B_1 = nodes.get(2);
@@ -153,10 +153,10 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
.build();
logger.info("--> starting 2 nodes on zones 'a' & 'b'");
- List<String> nodes = internalCluster().startNodesAsync(
+ List<String> nodes = internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
- ).get();
+ );
String A_0 = nodes.get(0);
String B_0 = nodes.get(1);
client().admin().indices().prepareCreate("test")
diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java
index 9c6a4273a7..f9f4a136e1 100644
--- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java
@@ -85,7 +85,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
}
private void rerouteWithCommands(Settings commonSettings) throws Exception {
- List<String> nodesIds = internalCluster().startNodesAsync(2, commonSettings).get();
+ List<String> nodesIds = internalCluster().startNodes(2, commonSettings);
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
@@ -304,7 +304,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
}
public void testClusterRerouteWithBlocks() throws Exception {
- List<String> nodesIds = internalCluster().startNodesAsync(2).get();
+ List<String> nodesIds = internalCluster().startNodes(2);
logger.info("--> create an index with 1 shard and 0 replicas");
assertAcked(prepareCreate("test-blocks").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)));
diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java
index 627fc03701..03866269ca 100644
--- a/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java
@@ -43,7 +43,7 @@ public class FilteringAllocationIT extends ESIntegTestCase {
public void testDecommissionNodeNoReplicas() throws Exception {
logger.info("--> starting 2 nodes");
- List<String> nodesIds = internalCluster().startNodesAsync(2).get();
+ List<String> nodesIds = internalCluster().startNodes(2);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
assertThat(cluster().size(), equalTo(2));
@@ -82,7 +82,7 @@ public class FilteringAllocationIT extends ESIntegTestCase {
public void testDisablingAllocationFiltering() throws Exception {
logger.info("--> starting 2 nodes");
- List<String> nodesIds = internalCluster().startNodesAsync(2).get();
+ List<String> nodesIds = internalCluster().startNodes(2);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
assertThat(cluster().size(), equalTo(2));
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java
index e432121898..853f0f6561 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java
@@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.test.junit.annotations.TestLogging;
import java.util.Collections;
import java.util.List;
@@ -42,7 +41,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
* get allocated to a free node when the node hosting it leaves the cluster.
*/
public void testNoDelayedTimeout() throws Exception {
- internalCluster().startNodesAsync(3).get();
+ internalCluster().startNodes(3);
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@@ -61,7 +60,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
* on it before.
*/
public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception {
- internalCluster().startNodesAsync(3).get();
+ internalCluster().startNodes(3);
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@@ -85,7 +84,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
* though the node hosting the shard is not coming back.
*/
public void testDelayedAllocationTimesOut() throws Exception {
- internalCluster().startNodesAsync(3).get();
+ internalCluster().startNodes(3);
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@@ -107,7 +106,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
* even though the node it was hosted on will not come back.
*/
public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception {
- internalCluster().startNodesAsync(3).get();
+ internalCluster().startNodes(3);
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@@ -133,7 +132,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
* even though the node it was hosted on will not come back.
*/
public void testDelayedAllocationChangeWithSettingTo0() throws Exception {
- internalCluster().startNodesAsync(3).get();
+ internalCluster().startNodes(3);
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java
index 93326e54db..86dd2dfe18 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java
@@ -71,7 +71,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
private void createStaleReplicaScenario() throws Exception {
logger.info("--> starting 3 nodes, 1 master, 2 data");
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
- internalCluster().startDataOnlyNodesAsync(2).get();
+ internalCluster().startDataOnlyNodes(2);
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
@@ -267,7 +267,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
public void testNotWaitForQuorumCopies() throws Exception {
logger.info("--> starting 3 nodes");
- internalCluster().startNodesAsync(3).get();
+ internalCluster().startNodes(3);
logger.info("--> creating index with 1 primary and 2 replicas");
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", randomIntBetween(1, 3)).put("index.number_of_replicas", 2)).get());
@@ -289,7 +289,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
*/
public void testForceAllocatePrimaryOnNoDecision() throws Exception {
logger.info("--> starting 1 node");
- final String node = internalCluster().startNodeAsync().get();
+ final String node = internalCluster().startNode();
logger.info("--> creating index with 1 primary and 0 replicas");
final String indexName = "test-idx";
assertAcked(client().admin().indices()
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java
index 68e8fc9c94..51ddc0f3fd 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java
@@ -54,7 +54,7 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
}
public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
- List<String> nodes = internalCluster().startNodesAsync(3).get();
+ List<String> nodes = internalCluster().startNodes(3);
// Wait for all 3 nodes to be up
assertBusy(new Runnable() {
diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
index c5fa05d663..99a7f04e74 100644
--- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
+++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
@@ -43,8 +43,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.cluster.service.ClusterServiceState;
+import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
@@ -187,13 +187,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
private List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
- ExecutionException, InterruptedException {
+ ExecutionException, InterruptedException {
configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
- List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
+ List<String> nodes = internalCluster().startNodes(numberOfNodes);
ensureStableCluster(numberOfNodes);
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
- ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
+ ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
@@ -201,16 +201,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
static final Settings DEFAULT_SETTINGS = Settings.builder()
- .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
- .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
- .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
- .put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
- // value and the time of disruption and does not recover immediately
- // when disruption is stop. We should make sure we recover faster
- // then the default of 30s, causing ensureGreen and friends to time out
+ .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
+ .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
+ .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
+ .put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
+ // value and the time of disruption and does not recover immediately
+ // when disruption is stop. We should make sure we recover faster
+ // then the default of 30s, causing ensureGreen and friends to time out
- .build();
+ .build();
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
@@ -237,10 +237,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("---> configured unicast");
// TODO: Rarely use default settings form some of these
Settings nodeSettings = Settings.builder()
- .put(settings)
- .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
- .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
- .build();
+ .put(settings)
+ .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
+ .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
+ .build();
if (discoveryConfig == null) {
if (unicastHostsOrdinals == null) {
@@ -306,8 +306,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("--> reducing min master nodes to 2");
assertAcked(client().admin().cluster().prepareUpdateSettings()
- .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
- .get());
+ .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
+ .get());
String master = internalCluster().getMasterName();
String nonMaster = null;
@@ -334,8 +334,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// Makes sure that the get request can be executed on each node locally:
assertAcked(prepareCreate("test").setSettings(Settings.builder()
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
));
// Everything is stable now, it is now time to simulate evil...
@@ -376,7 +376,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
if (!success) {
fail("node [" + node + "] has no master or has blocks, despite of being on the right side of the partition. State dump:\n"
- + nodeState);
+ + nodeState);
}
}
@@ -388,8 +388,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all");
client().admin().cluster().prepareUpdateSettings()
- .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"))
- .get();
+ .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"))
+ .get();
networkDisruption.startDisrupting();
@@ -416,10 +416,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
final List<String> nodes = startCluster(3);
assertAcked(prepareCreate("test")
- .setSettings(Settings.builder()
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
- ));
+ .setSettings(Settings.builder()
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
+ ));
ensureGreen();
String isolatedNode = internalCluster().getMasterName();
@@ -440,7 +440,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
for (String node : nodes) {
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()),
- true, node);
+ true, node);
}
logger.info("issue a reroute");
@@ -468,8 +468,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
} catch (AssertionError t) {
fail("failed comparing cluster state: " + t.getMessage() + "\n" +
- "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state +
- "\n--- cluster state [" + node + "]: ---\n" + nodeState);
+ "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state +
+ "\n--- cluster state [" + node + "]: ---\n" + nodeState);
}
}
@@ -482,7 +482,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates
*/
@TestLogging("_root:DEBUG,org.elasticsearch.action.index:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE,org.elasticsearch.cluster.service:TRACE,"
- + "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE")
+ + "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE")
public void testAckedIndexing() throws Exception {
final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;
@@ -491,10 +491,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
final List<String> nodes = startCluster(rarely() ? 5 : 3);
assertAcked(prepareCreate("test")
- .setSettings(Settings.builder()
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
- ));
+ .setSettings(Settings.builder()
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
+ ));
ensureGreen();
ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
@@ -530,7 +530,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
IndexResponse response =
- client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout);
+ client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout);
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
ackedDocs.put(id, node);
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
@@ -584,7 +584,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
disruptionScheme.stopDisrupting();
for (String node : internalCluster().getNodeNames()) {
ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
- DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
+ DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
}
ensureGreen("test");
@@ -594,7 +594,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
for (String id : ackedDocs.keySet()) {
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
- client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
+ client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
}
} catch (AssertionError e) {
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
@@ -684,7 +684,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String,
- String>>>());
+ String>>>());
for (final String node : majoritySide) {
masters.put(node, new ArrayList<Tuple<String, String>>());
internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() {
@@ -694,7 +694,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
if (!Objects.equals(previousMaster, currentMaster)) {
logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
- event.previousState());
+ event.previousState());
String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null;
String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null;
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
@@ -739,17 +739,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// The old master node will send this update + the cluster state where he is flagged as master to the other
// nodes that follow the new master. These nodes should ignore this update.
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
- ClusterStateUpdateTask(Priority.IMMEDIATE) {
- @Override
- public ClusterState execute(ClusterState currentState) throws Exception {
- return ClusterState.builder(currentState).build();
- }
+ ClusterStateUpdateTask(Priority.IMMEDIATE) {
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ return ClusterState.builder(currentState).build();
+ }
- @Override
- public void onFailure(String source, Exception e) {
- logger.warn((Supplier<?>) () -> new ParameterizedMessage("failure [{}]", source), e);
- }
- });
+ @Override
+ public void onFailure(String source, Exception e) {
+ logger.warn((Supplier<?>) () -> new ParameterizedMessage("failure [{}]", source), e);
+ }
+ });
// Save the new elected master node
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
@@ -769,15 +769,15 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
String nodeName = entry.getKey();
List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(),
- equalTo(2));
+ equalTo(2));
assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(),
- equalTo(oldMasterNode));
+ equalTo(oldMasterNode));
assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition
- .get(0).v2(), nullValue());
+ .get(0).v2(), nullValue());
assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(),
- nullValue());
+ nullValue());
assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]",
- recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
+ recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
}
}
@@ -789,11 +789,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
List<String> nodes = startCluster(3);
assertAcked(prepareCreate("test")
- .setSettings(Settings.builder()
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
- )
- .get());
+ .setSettings(Settings.builder()
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
+ )
+ .get());
ensureGreen("test");
nodes = new ArrayList<>(nodes);
@@ -809,13 +809,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value")
- .get();
+ .get();
assertThat(indexResponse.getVersion(), equalTo(1L));
logger.info("Verifying if document exists via node[{}]", notIsolatedNode);
GetResponse getResponse = internalCluster().client(notIsolatedNode).prepareGet("test", "type", indexResponse.getId())
- .setPreference("_local")
- .get();
+ .setPreference("_local")
+ .get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1L));
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
@@ -828,8 +828,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
for (String node : nodes) {
logger.info("Verifying if document exists after isolating node[{}] via node[{}]", isolatedNode, node);
getResponse = internalCluster().client(node).prepareGet("test", "type", indexResponse.getId())
- .setPreference("_local")
- .get();
+ .setPreference("_local")
+ .get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1L));
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
@@ -853,7 +853,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
- ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
+ ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
@@ -890,7 +890,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
- ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
+ ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
@@ -928,11 +928,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
TransportService masterTranspotService =
- internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
+ internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
- nonMasterNode);
+ nonMasterNode);
nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
assertNoMaster(nonMasterNode);
@@ -951,10 +951,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
final CountDownLatch countDownLatch = new CountDownLatch(2);
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
- .original()) {
+ .original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions
- options) throws IOException, TransportException {
+ options) throws IOException, TransportException {
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
countDownLatch.countDown();
}
@@ -982,16 +982,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
String nonMasterNode = randomFrom(nonMasterNodes);
assertAcked(prepareCreate("test")
- .setSettings(Settings.builder()
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
- ));
+ .setSettings(Settings.builder()
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
+ ));
ensureGreen();
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
// fail a random shard
ShardRouting failedShard =
- randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
+ randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean();
@@ -1006,20 +1006,20 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
networkDisruption.startDisrupting();
service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
- ShardStateAction.Listener() {
- @Override
- public void onSuccess() {
- success.set(true);
- latch.countDown();
- }
+ ShardStateAction.Listener() {
+ @Override
+ public void onSuccess() {
+ success.set(true);
+ latch.countDown();
+ }
- @Override
- public void onFailure(Exception e) {
- success.set(false);
- latch.countDown();
- assert false;
- }
- });
+ @Override
+ public void onFailure(Exception e) {
+ success.set(false);
+ latch.countDown();
+ assert false;
+ }
+ });
if (isolatedNode.equals(nonMasterNode)) {
assertNoMaster(nonMasterNode);
@@ -1051,11 +1051,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);
// don't wait for initial state, wat want to add the disruption while the cluster is forming..
- internalCluster().startNodesAsync(3,
- Settings.builder()
- .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "1ms")
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s")
- .build()).get();
+ internalCluster().startNodes(3, Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s").build());
logger.info("applying disruption while cluster is forming ...");
@@ -1084,7 +1080,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
- masterNode);
+ masterNode);
if (randomBoolean()) {
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
} else {
@@ -1110,21 +1106,18 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
configureCluster(Settings.EMPTY, 3, null, 1);
- InternalTestCluster.Async<String> masterNodeFuture = internalCluster().startMasterOnlyNodeAsync();
- InternalTestCluster.Async<String> node_1Future = internalCluster().startDataOnlyNodeAsync();
+ final String masterNode = internalCluster().startMasterOnlyNode();
+ final String node_1 = internalCluster().startDataOnlyNode();
- final String node_1 = node_1Future.get();
- final String masterNode = masterNodeFuture.get();
logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
- Settings.builder().put(indexSettings())
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
+ Settings.builder().put(indexSettings())
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");
- InternalTestCluster.Async<String> node_2Future = internalCluster().startDataOnlyNodeAsync();
- final String node_2 = node_2Future.get();
+ final String node_2 = internalCluster().startDataOnlyNode();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("doc").setSource("{\"int_field\":1}"));
@@ -1137,7 +1130,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
CountDownLatch endRelocationLatch = new CountDownLatch(1);
transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch,
- endRelocationLatch));
+ endRelocationLatch));
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get();
// wait for relocation to start
beginRelocationLatch.await();
@@ -1176,21 +1169,19 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
*/
public void testIndicesDeleted() throws Exception {
final Settings settings = Settings.builder()
- .put(DEFAULT_SETTINGS)
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
- .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
- .build();
+ .put(DEFAULT_SETTINGS)
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
+ .build();
final String idxName = "test";
configureCluster(settings, 3, null, 2);
- InternalTestCluster.Async<List<String>> masterNodes = internalCluster().startMasterOnlyNodesAsync(2);
- InternalTestCluster.Async<String> dataNode = internalCluster().startDataOnlyNodeAsync();
- dataNode.get();
- final List<String> allMasterEligibleNodes = masterNodes.get();
+ final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
+ final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(3);
assertAcked(prepareCreate("test"));
final String masterNode1 = internalCluster().getMasterName();
- NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode.get()),
+ NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode),
new NetworkUnresponsive());
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
@@ -1202,7 +1193,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
for (String masterNode : allMasterEligibleNodes) {
final ClusterServiceState masterState = internalCluster().clusterService(masterNode).clusterServiceState();
assertTrue("index not deleted on " + masterNode, masterState.getClusterState().metaData().hasIndex(idxName) == false &&
- masterState.getClusterStateStatus() == ClusterStateStatus.APPLIED);
+ masterState.getClusterStateStatus() == ClusterStateStatus.APPLIED);
}
});
internalCluster().restartNode(masterNode1, InternalTestCluster.EMPTY_CALLBACK);
@@ -1212,21 +1203,21 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
public void testElectMasterWithLatestVersion() throws Exception {
configureCluster(3, null, 2);
- final Set<String> nodes = new HashSet<>(internalCluster().startNodesAsync(3).get());
+ final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
ensureStableCluster(3);
ServiceDisruptionScheme isolateAllNodes = new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect());
internalCluster().setDisruptionScheme(isolateAllNodes);
logger.info("--> forcing a complete election to make sure \"preferred\" master is elected");
isolateAllNodes.startDisrupting();
- for (String node: nodes) {
+ for (String node : nodes) {
assertNoMaster(node);
}
isolateAllNodes.stopDisrupting();
ensureStableCluster(3);
final String preferredMasterName = internalCluster().getMasterName();
final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode();
- for (String node: nodes) {
+ for (String node : nodes) {
DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode();
assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId()));
}
@@ -1252,7 +1243,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("--> forcing a complete election again");
isolateAllNodes.startDisrupting();
- for (String node: nodes) {
+ for (String node : nodes) {
assertNoMaster(node);
}
@@ -1298,10 +1289,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
final NetworkLinkDisruptionType disruptionType;
switch (randomInt(2)) {
- case 0: disruptionType = new NetworkUnresponsive(); break;
- case 1: disruptionType = new NetworkDisconnect(); break;
- case 2: disruptionType = NetworkDelay.random(random()); break;
- default: throw new IllegalArgumentException();
+ case 0:
+ disruptionType = new NetworkUnresponsive();
+ break;
+ case 1:
+ disruptionType = new NetworkDisconnect();
+ break;
+ case 2:
+ disruptionType = NetworkDelay.random(random());
+ break;
+ default:
+ throw new IllegalArgumentException();
}
final ServiceDisruptionScheme scheme;
if (rarely()) {
@@ -1334,7 +1332,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
- (level));
+ (level));
}
}
}
@@ -1352,7 +1350,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
- oldMasterNode, not(equalTo(masterNode)));
+ oldMasterNode, not(equalTo(masterNode)));
}
}, 10, TimeUnit.SECONDS);
}
@@ -1372,12 +1370,12 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
private void assertDiscoveryCompleted(List<String> nodes) throws InterruptedException {
for (final String node : nodes) {
assertTrue(
- "node [" + node + "] is still joining master",
- awaitBusy(
- () -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(),
- 30,
- TimeUnit.SECONDS
- )
+ "node [" + node + "] is still joining master",
+ awaitBusy(
+ () -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(),
+ 30,
+ TimeUnit.SECONDS
+ )
);
}
}
diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
deleted file mode 100644
index b708ab4c26..0000000000
--- a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.discovery;
-
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
-import org.elasticsearch.test.ESIntegTestCase.Scope;
-import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
-import org.junit.Before;
-
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import static org.hamcrest.Matchers.equalTo;
-
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
-public class ZenUnicastDiscoveryIT extends ESIntegTestCase {
-
- private ClusterDiscoveryConfiguration discoveryConfig;
-
- @Override
- protected Settings nodeSettings(int nodeOrdinal) {
- return discoveryConfig.nodeSettings(nodeOrdinal);
- }
-
- @Before
- public void clearConfig() {
- discoveryConfig = null;
- }
-
- public void testNormalClusterForming() throws ExecutionException, InterruptedException {
- int currentNumNodes = randomIntBetween(3, 5);
-
- // use explicit unicast hosts so we can start those first
- int[] unicastHostOrdinals = new int[randomIntBetween(1, currentNumNodes)];
- for (int i = 0; i < unicastHostOrdinals.length; i++) {
- unicastHostOrdinals[i] = i;
- }
- discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, unicastHostOrdinals);
-
- // start the unicast hosts
- internalCluster().startNodesAsync(unicastHostOrdinals.length).get();
-
- // start the rest of the cluster
- internalCluster().startNodesAsync(currentNumNodes - unicastHostOrdinals.length).get();
-
- if (client().admin().cluster().prepareHealth().setWaitForNodes("" + currentNumNodes).get().isTimedOut()) {
- logger.info("cluster forming timed out, cluster state:\n{}", client().admin().cluster().prepareState().get().getState());
- fail("timed out waiting for cluster to form with [" + currentNumNodes + "] nodes");
- }
- }
-
- // Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this
- // test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=N
- // can't be satisfied.
- public void testMinimumMasterNodes() throws Exception {
- int currentNumNodes = randomIntBetween(3, 5);
- final int min_master_nodes = currentNumNodes / 2 + 1;
- int currentNumOfUnicastHosts = randomIntBetween(min_master_nodes, currentNumNodes);
- final Settings settings = Settings.builder()
- .put("discovery.zen.join_timeout", TimeValue.timeValueSeconds(10))
- .put("discovery.zen.minimum_master_nodes", min_master_nodes)
- .build();
- discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings);
-
- List<String> nodes = internalCluster().startNodesAsync(currentNumNodes).get();
-
- ensureStableCluster(currentNumNodes);
-
- DiscoveryNode masterDiscoNode = null;
- for (String node : nodes) {
- ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
- assertThat(state.nodes().getSize(), equalTo(currentNumNodes));
- if (masterDiscoNode == null) {
- masterDiscoNode = state.nodes().getMasterNode();
- } else {
- assertThat(masterDiscoNode.equals(state.nodes().getMasterNode()), equalTo(true));
- }
- }
- }
-}
diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java
index 6856d05365..27fb48f764 100644
--- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java
+++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java
@@ -80,12 +80,12 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(defaultSettings)
.build();
- internalCluster().startNodesAsync(2, masterNodeSettings).get();
+ internalCluster().startNodes(2, masterNodeSettings);
Settings dateNodeSettings = Settings.builder()
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(defaultSettings)
.build();
- internalCluster().startNodesAsync(2, dateNodeSettings).get();
+ internalCluster().startNodes(2, dateNodeSettings);
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("4")
@@ -100,13 +100,10 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
final String oldMaster = internalCluster().getMasterName();
internalCluster().stopCurrentMasterNode();
- assertBusy(new Runnable() {
- @Override
- public void run() {
- String current = internalCluster().getMasterName();
- assertThat(current, notNullValue());
- assertThat(current, not(equalTo(oldMaster)));
- }
+ assertBusy(() -> {
+ String current = internalCluster().getMasterName();
+ assertThat(current, notNullValue());
+ assertThat(current, not(equalTo(oldMaster)));
});
ensureSearchable("test");
@@ -130,7 +127,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(defaultSettings)
.build();
- internalCluster().startNodesAsync(2, dateNodeSettings).get();
+ internalCluster().startNodes(2, dateNodeSettings);
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master);
@@ -155,8 +152,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
}
public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception {
- List<String> nodeNames = internalCluster().startNodesAsync(2).get();
- client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
+ List<String> nodeNames = internalCluster().startNodes(2);
List<String> nonMasterNodes = new ArrayList<>(nodeNames);
nonMasterNodes.remove(internalCluster().getMasterName());
diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java
index bed21193ac..22f06b9098 100644
--- a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java
@@ -94,7 +94,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
public void testSimpleOpenClose() throws Exception {
logger.info("--> starting 2 nodes");
- internalCluster().startNodesAsync(2).get();
+ internalCluster().startNodes(2);
logger.info("--> creating test index");
createIndex("test");
@@ -237,7 +237,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
logger.info("--> cleaning nodes");
logger.info("--> starting 2 nodes");
- internalCluster().startNodesAsync(2).get();
+ internalCluster().startNodes(2);
logger.info("--> indexing a simple document");
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefreshPolicy(IMMEDIATE).get();
@@ -277,7 +277,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
public void testDanglingIndices() throws Exception {
logger.info("--> starting two nodes");
- final String node_1 = internalCluster().startNodesAsync(2).get().get(0);
+ final String node_1 = internalCluster().startNodes(2).get(0);
logger.info("--> indexing a simple document");
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefreshPolicy(IMMEDIATE).get();
@@ -331,7 +331,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
if (randomBoolean()) {
// test with a regular index
logger.info("--> starting a cluster with " + numNodes + " nodes");
- nodes = internalCluster().startNodesAsync(numNodes).get();
+ nodes = internalCluster().startNodes(numNodes);
logger.info("--> create an index");
createIndex(indexName);
} else {
@@ -344,7 +344,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), dataPath.toString())
.put("index.store.fs.fs_lock", randomFrom("native", "simple"))
.build();
- nodes = internalCluster().startNodesAsync(numNodes, nodeSettings).get();
+ nodes = internalCluster().startNodes(numNodes, nodeSettings);
logger.info("--> create a shadow replica index");
createShadowReplicaIndex(indexName, dataPath, numNodes - 1);
}
diff --git a/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java b/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java
index 3c2917f38e..3dfeca3053 100644
--- a/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java
@@ -36,6 +36,7 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedHashMap;
+import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -57,10 +58,9 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
public void testMetaIsRemovedIfAllShardsFromIndexRemoved() throws Exception {
// this test checks that the index state is removed from a data only node once all shards have been allocated away from it
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
- InternalTestCluster.Async<String> nodeName1 = internalCluster().startDataOnlyNodeAsync();
- InternalTestCluster.Async<String> nodeName2 = internalCluster().startDataOnlyNodeAsync();
- String node1 = nodeName1.get();
- String node2 = nodeName2.get();
+ List<String> nodeNames= internalCluster().startDataOnlyNodes(2);
+ String node1 = nodeNames.get(0);
+ String node2 = nodeNames.get(1);
String index = "index";
assertAcked(prepareCreate(index).setSettings(Settings.builder().put("index.number_of_replicas", 0).put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node1)));
diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
index 8284388d2c..3abaff3295 100644
--- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
@@ -46,8 +46,7 @@ public class QuorumGatewayIT extends ESIntegTestCase {
public void testQuorumRecovery() throws Exception {
logger.info("--> starting 3 nodes");
// we are shutting down nodes - make sure we don't have 2 clusters if we test network
- internalCluster().startNodesAsync(3).get();
-
+ internalCluster().startNodes(3);
createIndex("test");
ensureGreen();
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
index 052bfc00ef..431b592fac 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
@@ -316,7 +316,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
public void testLatestVersionLoaded() throws Exception {
// clean two nodes
- internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get();
+ internalCluster().startNodes(2, Settings.builder().put("gateway.recover_after_nodes", 2).build());
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
@@ -366,7 +366,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("--> starting the two nodes back");
- internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get();
+ internalCluster().startNodes(2, Settings.builder().put("gateway.recover_after_nodes", 2).build());
logger.info("--> running cluster_health (wait for the shards to startup)");
ensureGreen();
@@ -392,7 +392,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
.put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
- internalCluster().startNodesAsync(4, settings).get();
+ internalCluster().startNodes(4, settings);
// prevent any rebalance actions during the peer recovery
// if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
// we reuse the files on disk after full restarts for replicas.
diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java
index a335a42edb..d5a003003a 100644
--- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java
+++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java
@@ -110,7 +110,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
public void testCannotCreateWithBadPath() throws Exception {
Settings nodeSettings = nodeSettings("/badpath");
- internalCluster().startNodesAsync(1, nodeSettings).get();
+ internalCluster().startNodes(1, nodeSettings);
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, "/etc/foo")
@@ -132,7 +132,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
final Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
- internalCluster().startNodesAsync(3, nodeSettings).get();
+ internalCluster().startNodes(3, nodeSettings);
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build();
@@ -189,7 +189,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
final Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
- internalCluster().startNodesAsync(3, nodeSettings).get();
+ internalCluster().startNodes(3, nodeSettings);
final String IDX = "test";
Settings idxSettings = Settings.builder()
@@ -552,7 +552,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
final int nodeCount = randomIntBetween(2, 5);
logger.info("--> starting {} nodes", nodeCount);
- final List<String> nodes = internalCluster().startNodesAsync(nodeCount, nodeSettings).get();
+ final List<String> nodes = internalCluster().startNodes(nodeCount, nodeSettings);
final String IDX = "test";
final Tuple<Integer, Integer> numPrimariesAndReplicas = randomPrimariesAndReplicas(nodeCount);
final int numPrimaries = numPrimariesAndReplicas.v1();
@@ -605,7 +605,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
- final List<String> nodes = internalCluster().startNodesAsync(2, nodeSettings).get();
+ final List<String> nodes = internalCluster().startNodes(2, nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
@@ -661,7 +661,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
- internalCluster().startNodesAsync(3, nodeSettings).get();
+ internalCluster().startNodes(3, nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
@@ -731,10 +731,9 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
Settings fooSettings = Settings.builder().put(nodeSettings).put("node.attr.affinity", "foo").build();
Settings barSettings = Settings.builder().put(nodeSettings).put("node.attr.affinity", "bar").build();
- final InternalTestCluster.Async<List<String>> fooNodes = internalCluster().startNodesAsync(2, fooSettings);
- final InternalTestCluster.Async<List<String>> barNodes = internalCluster().startNodesAsync(2, barSettings);
- fooNodes.get();
- barNodes.get();
+ List<String> allNodes = internalCluster().startNodes(fooSettings, fooSettings, barSettings, barSettings);
+ List<String> fooNodes = allNodes.subList(0, 2);
+ List<String> barNodes = allNodes.subList(2, 4);
String IDX = "test";
Settings includeFoo = Settings.builder()
@@ -768,27 +767,27 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeBar).get();
// wait for the shards to move from "foo" nodes to "bar" nodes
- assertNoShardsOn(fooNodes.get());
+ assertNoShardsOn(fooNodes);
// put shards back on "foo"
client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeFoo).get();
// wait for the shards to move from "bar" nodes to "foo" nodes
- assertNoShardsOn(barNodes.get());
+ assertNoShardsOn(barNodes);
// Stop a foo node
logger.info("--> stopping first 'foo' node");
- internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(0)));
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get(0)));
// Ensure that the other foo node has all the shards now
- assertShardCountOn(fooNodes.get().get(1), 5);
+ assertShardCountOn(fooNodes.get(1), 5);
// Assert no shards on the "bar" nodes
- assertNoShardsOn(barNodes.get());
+ assertNoShardsOn(barNodes);
// Stop the second "foo" node
logger.info("--> stopping second 'foo' node");
- internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(1)));
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get(1)));
// The index should still be able to be allocated (on the "bar" nodes),
// all the "foo" nodes are gone
@@ -799,7 +798,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
String newFooNode = internalCluster().startNode(fooSettings);
assertShardCountOn(newFooNode, 5);
- assertNoShardsOn(barNodes.get());
+ assertNoShardsOn(barNodes);
}
public void testDeletingClosedIndexRemovesFiles() throws Exception {
@@ -808,7 +807,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
final int numNodes = randomIntBetween(2, 5);
logger.info("--> starting {} nodes", numNodes);
- final List<String> nodes = internalCluster().startNodesAsync(numNodes, nodeSettings).get();
+ final List<String> nodes = internalCluster().startNodes(numNodes, nodeSettings);
final String IDX = "test";
final Tuple<Integer, Integer> numPrimariesAndReplicas = randomPrimariesAndReplicas(numNodes);
final int numPrimaries = numPrimariesAndReplicas.v1();
@@ -851,7 +850,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
- internalCluster().startNodesAsync(2, nodeSettings).get();
+ internalCluster().startNodes(2, nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
@@ -868,7 +867,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
flushAndRefresh(IDX);
- internalCluster().startNodesAsync(1).get();
+ internalCluster().startNodes(1);
ensureYellow(IDX);
final ClusterHealthResponse clusterHealth = client().admin().cluster()
diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java
index 7d658a2a59..1fe6be5346 100644
--- a/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java
+++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java
@@ -72,7 +72,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
}
public void testCorruptTranslogFiles() throws Exception {
- internalCluster().startNodesAsync(1, Settings.EMPTY).get();
+ internalCluster().startNodes(1, Settings.EMPTY);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1)
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java
index ff4de23a06..bb8943f19d 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java
@@ -28,7 +28,6 @@ import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
-import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
@@ -47,7 +46,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MockEngineFactoryPlugin;
-import org.elasticsearch.index.translog.TruncateTranslogCommand;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@@ -85,7 +83,7 @@ public class TruncateTranslogIT extends ESIntegTestCase {
}
public void testCorruptTranslogTruncation() throws Exception {
- internalCluster().startNodesAsync(1, Settings.EMPTY).get();
+ internalCluster().startNodes(1, Settings.EMPTY);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1)
diff --git a/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java b/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java
index 81c9d12fbb..f8fe05bc97 100644
--- a/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java
@@ -34,7 +34,7 @@ public class DedicatedMasterGetFieldMappingIT extends SimpleGetFieldMappingsIT {
Settings settings = Settings.builder()
.put(Node.NODE_DATA_SETTING.getKey(), false)
.build();
- internalCluster().startNodesAsync(settings, Settings.EMPTY).get();
+ internalCluster().startNodes(settings, Settings.EMPTY);
}
}
diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
index fe3b569755..c38c20e0c2 100644
--- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
@@ -43,7 +43,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
-import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
@@ -174,7 +173,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/14932")
public void testDeleteCreateInOneBulk() throws Exception {
- internalCluster().startNodesAsync(2).get();
+ internalCluster().startNodes(2);
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
prepareCreate("test").setSettings(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, true).addMapping("type").get();
ensureGreen("test");
@@ -213,7 +212,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
.build();
- final List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get();
+ final List<String> nodeNames = internalCluster().startNodes(2, settings);
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
final String master = internalCluster().getMasterName();
@@ -328,11 +327,11 @@ public class RareClusterStateIT extends ESIntegTestCase {
// Here we want to test that everything goes well if the mappings that
// are needed for a document are not available on the replica at the
// time of indexing it
- final List<String> nodeNames = internalCluster().startNodesAsync(2,
+ final List<String> nodeNames = internalCluster().startNodes(2,
Settings.builder()
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
- .build()).get();
+ .build());
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
final String master = internalCluster().getMasterName();
diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java
index a6420034c4..d8e7e7c4ac 100644
--- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java
@@ -292,17 +292,14 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
}
public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception {
- InternalTestCluster.Async<String> masterFuture = internalCluster().startNodeAsync(
- Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), true, Node.NODE_DATA_SETTING.getKey(), false).build());
- InternalTestCluster.Async<List<String>> nodesFutures = internalCluster().startNodesAsync(4,
- Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false, Node.NODE_DATA_SETTING.getKey(), true).build());
-
- final String masterNode = masterFuture.get();
- final String node1 = nodesFutures.get().get(0);
- final String node2 = nodesFutures.get().get(1);
- final String node3 = nodesFutures.get().get(2);
+ final String masterNode = internalCluster().startMasterOnlyNode();
+ final List<String> nodes = internalCluster().startDataOnlyNodes(4);
+
+ final String node1 = nodes.get(0);
+ final String node2 = nodes.get(1);
+ final String node3 = nodes.get(2);
// we will use this later on, handy to start now to make sure it has a different data folder that node 1,2 &3
- final String node4 = nodesFutures.get().get(3);
+ final String node4 = nodes.get(3);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(indexSettings())
@@ -356,8 +353,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
logger.debug("--> starting the two old nodes back");
- internalCluster().startNodesAsync(2,
- Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false, Node.NODE_DATA_SETTING.getKey(), true).build());
+ internalCluster().startDataOnlyNodes(2);
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("5").get().isTimedOut());
@@ -372,7 +368,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
}
public void testShardActiveElseWhere() throws Exception {
- List<String> nodes = internalCluster().startNodesAsync(2).get();
+ List<String> nodes = internalCluster().startNodes(2);
final String masterNode = internalCluster().getMasterName();
final String nonMasterNode = nodes.get(0).equals(masterNode) ? nodes.get(1) : nodes.get(0);
diff --git a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java
index 2147cea696..f1dba4e58c 100644
--- a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java
+++ b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java
@@ -40,7 +40,7 @@ import static org.hamcrest.Matchers.notNullValue;
public class SimpleNodesInfoIT extends ESIntegTestCase {
public void testNodesInfos() throws Exception {
- List<String> nodesIds = internalCluster().startNodesAsync(2).get();
+ List<String> nodesIds = internalCluster().startNodes(2);
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
@@ -79,7 +79,7 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
}
public void testNodesInfosTotalIndexingBuffer() throws Exception {
- List<String> nodesIds = internalCluster().startNodesAsync(2).get();
+ List<String> nodesIds = internalCluster().startNodes(2);
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
@@ -113,11 +113,10 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
}
public void testAllocatedProcessors() throws Exception {
- List<String> nodesIds = internalCluster().
- startNodesAsync(
+ List<String> nodesIds = internalCluster().startNodes(
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 3).build(),
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 6).build()
- ).get();
+ );
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
index dc38867705..50035e1027 100644
--- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
@@ -126,7 +126,7 @@ public class FullRollingRestartIT extends ESIntegTestCase {
public void testNoRebalanceOnRollingRestart() throws Exception {
// see https://github.com/elastic/elasticsearch/issues/14387
internalCluster().startMasterOnlyNode(Settings.EMPTY);
- internalCluster().startDataOnlyNodesAsync(3).get();
+ internalCluster().startDataOnlyNodes(3);
/**
* We start 3 nodes and a dedicated master. Restart on of the data-nodes and ensure that we got no relocations.
* Yet we have 6 shards 0 replica so that means if the restarting node comes back both other nodes are subject
diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
index cd93a9fef2..2017b2796b 100644
--- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
@@ -470,7 +470,7 @@ public class RelocationIT extends ESIntegTestCase {
Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes),
Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes)
).toArray(Settings[]::new);
- List<String> nodes = internalCluster().startNodesAsync(nodeSettings).get();
+ List<String> nodes = internalCluster().startNodes(nodeSettings);
String[] blueNodes = nodes.subList(0, halfNodes).stream().toArray(String[]::new);
String[] redNodes = nodes.subList(halfNodes, nodes.size()).stream().toArray(String[]::new);
logger.info("blue nodes: {}", (Object)blueNodes);
diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
index 2375b7519c..b1c46b35b6 100644
--- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
+++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
@@ -621,7 +621,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception {
Settings nodeSettings = Settings.builder().put().build();
logger.info("--> start two nodes");
- internalCluster().startNodesAsync(2, nodeSettings).get();
+ internalCluster().startNodes(2, nodeSettings);
// Register mock repositories
client().admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
diff --git a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java
index 9e17ca2186..7f977592e8 100644
--- a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java
+++ b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java
@@ -64,8 +64,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
-
@ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0)
@SuppressForbidden(reason = "use http server")
// TODO this should be a IT but currently all ITs in this project run against a real cluster
@@ -269,7 +267,7 @@ public class AzureDiscoveryClusterFormationTests extends ESIntegTestCase {
// only wait for the cluster to form
ensureClusterSizeConsistency();
// add one more node and wait for it to join
- internalCluster().startDataOnlyNodeAsync().get();
+ internalCluster().startDataOnlyNode();
ensureClusterSizeConsistency();
}
}
diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java
index 693e765ac2..b4a1f55a3c 100644
--- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java
+++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java
@@ -243,7 +243,7 @@ public class Ec2DiscoveryClusterFormationTests extends ESIntegTestCase {
// only wait for the cluster to form
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get());
// add one more node and wait for it to join
- internalCluster().startDataOnlyNodeAsync().get();
+ internalCluster().startDataOnlyNode();
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get());
}
}
diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java
index 1512da2429..76d7c6408d 100644
--- a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java
+++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java
@@ -40,7 +40,6 @@ import org.junit.BeforeClass;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -200,7 +199,7 @@ public class GceDiscoverTests extends ESIntegTestCase {
// only wait for the cluster to form
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get());
// add one more node and wait for it to join
- internalCluster().startDataOnlyNodeAsync().get();
+ internalCluster().startDataOnlyNode();
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get());
}
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index 0b121b4aa6..2321690bee 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -130,7 +130,6 @@ import java.util.stream.Stream;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
-import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -696,10 +695,6 @@ public final class InternalTestCluster extends TestCluster {
ensureOpen(); // currently unused
Builder builder = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
- if (size() == 0) {
- // if we are the first node - don't wait for a state
- builder.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
- }
return startNode(builder);
}
@@ -791,6 +786,10 @@ public final class InternalTestCluster extends TestCluster {
return nodeAndClientId;
}
+ public String getName() {
+ return name;
+ }
+
public boolean isMasterEligible() {
return Node.NODE_MASTER_SETTING.get(node.settings());
}
@@ -887,9 +886,6 @@ public final class InternalTestCluster extends TestCluster {
assert ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed";
newSettings.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build();
}
-
- // validation is (optionally) done in fullRestart/rollingRestart
- newSettings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s");
if (clearDataIfNeeded) {
clearDataIfNeeded(callback);
}
@@ -1018,10 +1014,6 @@ public final class InternalTestCluster extends TestCluster {
final Settings.Builder settings = Settings.builder();
settings.put(Node.NODE_MASTER_SETTING.getKey(), true);
settings.put(Node.NODE_DATA_SETTING.getKey(), false);
- if (autoManageMinMasterNodes) {
- settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
- }
-
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
toStartAndPublish.add(nodeAndClient);
}
@@ -1032,9 +1024,6 @@ public final class InternalTestCluster extends TestCluster {
settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build();
settings.put(Node.NODE_DATA_SETTING.getKey(), true).build();
}
- if (autoManageMinMasterNodes) {
- settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
- }
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
toStartAndPublish.add(nodeAndClient);
}
@@ -1347,10 +1336,18 @@ public final class InternalTestCluster extends TestCluster {
// special case for 1 node master - we can't update the min master nodes before we add more nodes.
updateMinMasterNodes(currentMasters + newMasters);
}
- for (NodeAndClient nodeAndClient : nodeAndClients) {
- nodeAndClient.startNode();
- publishNode(nodeAndClient);
+ List<Future<?>> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList());
+ try {
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException e) {
+ throw new AssertionError("interrupted while starting nodes", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("failed to start nodes", e);
}
+ nodeAndClients.forEach(this::publishNode);
+
if (autoManageMinMasterNodes && currentMasters == 1 && newMasters > 0) {
// update once masters have joined
validateClusterFormed();
@@ -1535,13 +1532,7 @@ public final class InternalTestCluster extends TestCluster {
nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1);
}
- for (NodeAndClient nodeAndClient : startUpOrder) {
- logger.info("starting node [{}] ", nodeAndClient.name);
- nodeAndClient.startNode();
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
- }
- }
+ startAndPublishNodesAndClients(startUpOrder);
if (callback.validateClusterForming()) {
validateClusterFormed();
@@ -1636,6 +1627,61 @@ public final class InternalTestCluster extends TestCluster {
}
/**
+ * Starts multiple nodes with default settings and returns their names
+ */
+ public synchronized List<String> startNodes(int numOfNodes) {
+ return startNodes(numOfNodes, Settings.EMPTY);
+ }
+
+ /**
+ * Starts multiple nodes with the given settings and returns their names
+ */
+ public synchronized List<String> startNodes(int numOfNodes, Settings settings) {
+ return startNodes(Collections.nCopies(numOfNodes, settings).stream().toArray(Settings[]::new));
+ }
+
+ /**
+ * Starts multiple nodes with the given settings and returns their names
+ */
+ public synchronized List<String> startNodes(Settings... settings) {
+ final int defaultMinMasterNodes;
+ if (autoManageMinMasterNodes) {
+ int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count();
+ defaultMinMasterNodes = getMinMasterNodes(getMasterNodesCount() + mastersDelta);
+ } else {
+ defaultMinMasterNodes = -1;
+ }
+ List<NodeAndClient> nodes = new ArrayList<>();
+ for (Settings nodeSettings: settings) {
+ nodes.add(buildNode(nodeSettings, defaultMinMasterNodes));
+ }
+ startAndPublishNodesAndClients(nodes);
+ if (autoManageMinMasterNodes) {
+ validateClusterFormed();
+ }
+
+ return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
+ }
+
+ public synchronized List<String> startMasterOnlyNodes(int numNodes) {
+ return startMasterOnlyNodes(numNodes, Settings.EMPTY);
+ }
+
+ public synchronized List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
+ Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
+ return startNodes(numNodes, settings1);
+ }
+
+ public synchronized List<String> startDataOnlyNodes(int numNodes) {
+ return startDataOnlyNodes(numNodes, Settings.EMPTY);
+ }
+
+ public synchronized List<String> startDataOnlyNodes(int numNodes, Settings settings) {
+ Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
+ return startNodes(numNodes, settings1);
+ }
+
+ /**
* updates the min master nodes setting in the current running cluster.
*
* @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting
@@ -1667,31 +1713,8 @@ public final class InternalTestCluster extends TestCluster {
return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count();
}
- public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes) {
- return startMasterOnlyNodesAsync(numNodes, Settings.EMPTY);
- }
-
- public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes, Settings settings) {
- Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
- return startNodesAsync(numNodes, settings1);
- }
-
- public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes) {
- return startDataOnlyNodesAsync(numNodes, Settings.EMPTY);
- }
-
- public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes, Settings settings) {
- Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
- return startNodesAsync(numNodes, settings1);
- }
-
- public synchronized Async<String> startMasterOnlyNodeAsync() {
- return startMasterOnlyNodeAsync(Settings.EMPTY);
- }
-
- public synchronized Async<String> startMasterOnlyNodeAsync(Settings settings) {
- Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
- return startNodeAsync(settings1);
+ public synchronized String startMasterOnlyNode() {
+ return startMasterOnlyNode(Settings.EMPTY);
}
public synchronized String startMasterOnlyNode(Settings settings) {
@@ -1699,109 +1722,14 @@ public final class InternalTestCluster extends TestCluster {
return startNode(settings1);
}
- public synchronized Async<String> startDataOnlyNodeAsync() {
- return startDataOnlyNodeAsync(Settings.EMPTY);
+ public synchronized String startDataOnlyNode() {
+ return startDataOnlyNode(Settings.EMPTY);
}
-
- public synchronized Async<String> startDataOnlyNodeAsync(Settings settings) {
- Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
- return startNodeAsync(settings1);
- }
-
public synchronized String startDataOnlyNode(Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
return startNode(settings1);
}
- /**
- * Starts a node in an async manner with the given settings and returns future with its name.
- */
- public synchronized Async<String> startNodeAsync() {
- return startNodeAsync(Settings.EMPTY);
- }
-
- /**
- * Starts a node in an async manner with the given settings and returns future with its name.
- */
- public synchronized Async<String> startNodeAsync(final Settings settings) {
- final int defaultMinMasterNodes;
- if (autoManageMinMasterNodes) {
- int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0;
- defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
- } else {
- defaultMinMasterNodes = -1;
- }
- return startNodeAsync(settings, defaultMinMasterNodes);
- }
-
- private synchronized Async<String> startNodeAsync(final Settings settings, int defaultMinMasterNodes) {
- final NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
- final Future<String> submit = executor.submit(() -> {
- buildNode.startNode();
- publishNode(buildNode);
- return buildNode.name;
- });
- return () -> submit.get();
- }
-
-
- /**
- * Starts multiple nodes in an async manner and returns future with its name.
- */
- public synchronized Async<List<String>> startNodesAsync(final int numNodes) {
- return startNodesAsync(numNodes, Settings.EMPTY);
- }
-
- /**
- * Starts multiple nodes in an async manner with the given settings and returns future with its name.
- */
- public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings) {
- final int defaultMinMasterNodes;
- if (autoManageMinMasterNodes) {
- int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? numNodes : 0;
- defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
- } else {
- defaultMinMasterNodes = -1;
- }
- final List<Async<String>> asyncs = new ArrayList<>();
- for (int i = 0; i < numNodes; i++) {
- asyncs.add(startNodeAsync(settings, defaultMinMasterNodes));
- }
-
- return () -> {
- List<String> ids = new ArrayList<>();
- for (Async<String> async : asyncs) {
- ids.add(async.get());
- }
- return ids;
- };
- }
-
- /**
- * Starts multiple nodes (based on the number of settings provided) in an async manner, with explicit settings for each node.
- * The order of the node names returned matches the order of the settings provided.
- */
- public synchronized Async<List<String>> startNodesAsync(final Settings... settings) {
- final int defaultMinMasterNodes;
- if (autoManageMinMasterNodes) {
- int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count();
- defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
- } else {
- defaultMinMasterNodes = -1;
- }
- List<Async<String>> asyncs = new ArrayList<>();
- for (Settings setting : settings) {
- asyncs.add(startNodeAsync(setting, defaultMinMasterNodes));
- }
- return () -> {
- List<String> ids = new ArrayList<>();
- for (Async<String> async : asyncs) {
- ids.add(async.get());
- }
- return ids;
- };
- }
-
private synchronized void publishNode(NodeAndClient nodeAndClient) {
assert !nodeAndClient.node().isClosed();
nodes.put(nodeAndClient.name, nodeAndClient);
@@ -2121,14 +2049,4 @@ public final class InternalTestCluster extends TestCluster {
}
}
}
-
- /**
- * Simple interface that allows to wait for an async operation to finish
- *
- * @param <T> the result of the async execution
- */
- public interface Async<T> {
- T get() throws ExecutionException, InterruptedException;
- }
-
}