summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java48
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java4
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java20
-rw-r--r--core/src/main/java/org/elasticsearch/node/Node.java7
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java3
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java15
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java5
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java5
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java156
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java44
-rw-r--r--core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java90
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java14
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/RelocationIT.java26
-rw-r--r--core/src/test/java/org/elasticsearch/tribe/TribeIT.java28
-rw-r--r--plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java3
-rw-r--r--plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java2
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java29
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java8
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java555
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java54
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java2
-rw-r--r--test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java162
31 files changed, 750 insertions, 560 deletions
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
index f6870cc05b..7794c58ddd 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
@@ -19,30 +19,9 @@
package org.elasticsearch.discovery.zen;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
-import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
@@ -53,6 +32,8 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@@ -75,6 +56,25 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@@ -186,9 +186,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
@Override
- public void close() throws IOException {
+ public void close() {
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
- IOUtils.close(receivedResponses.values());
+ Releasables.close(receivedResponses.values());
closed = true;
}
@@ -272,7 +272,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
}
- class SendPingsHandler implements Closeable {
+ class SendPingsHandler implements Releasable {
private final int id;
private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet();
private final PingCollection pingCollection;
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
index f9a16243e0..272a75f4e7 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@@ -240,6 +241,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
joinThreadControl.stop();
masterFD.stop("zen disco stop");
nodesFD.stop();
+ Releasables.close(zenPing); // stop any ongoing pinging
DiscoveryNodes nodes = nodes();
if (sendLeaveRequest) {
if (nodes.getMasterNode() == null) {
@@ -269,7 +271,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
@Override
protected void doClose() throws IOException {
- IOUtils.close(masterFD, nodesFD, zenPing);
+ IOUtils.close(masterFD, nodesFD);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
index cb2c8cb501..75ea701dc9 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
@@ -19,26 +19,26 @@
package org.elasticsearch.discovery.zen;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
-public interface ZenPing extends Closeable {
+public interface ZenPing extends Releasable {
void start(PingContextProvider contextProvider);
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 9eb7f9a037..298d6712ff 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -75,9 +75,6 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
-import org.elasticsearch.discovery.zen.UnicastHostsProvider;
-import org.elasticsearch.discovery.zen.UnicastZenPing;
-import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayAllocator;
@@ -655,11 +652,13 @@ public class Node implements Closeable {
injector.getInstance(SnapshotShardsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
+ // close discovery early to not react to pings anymore.
+ // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
+ injector.getInstance(Discovery.class).stop();
// we close indices first, so operations won't be allowed on it
injector.getInstance(IndicesTTLService.class).stop();
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
- injector.getInstance(Discovery.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(MonitorService.class).stop();
injector.getInstance(GatewayService.class).stop();
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java
index e289d90c7a..b5ef7a642a 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java
@@ -31,7 +31,8 @@ import java.io.IOException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
-@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
+@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0,
+ autoMinMasterNodes = false)
public class IndicesExistsIT extends ESIntegTestCase {
public void testIndexExistsWithBlocksInPlace() throws IOException {
diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
index 96ba5729cb..7c764ed172 100644
--- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
+++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
@@ -43,7 +43,7 @@ import java.util.concurrent.CyclicBarrier;
import static org.hamcrest.Matchers.equalTo;
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class IndexingMasterFailoverIT extends ESIntegTestCase {
@Override
diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
index 3e58291d4a..257b80663a 100644
--- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
@@ -63,7 +63,7 @@ import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE")
public class MinimumMasterNodesIT extends ESIntegTestCase {
@@ -275,12 +275,14 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
.put("discovery.initial_state_timeout", "500ms")
.build();
- logger.info("--> start 2 nodes");
- internalCluster().startNodesAsync(2, settings).get();
+ logger.info("--> start first node and wait for it to be a master");
+ internalCluster().startNode(settings);
+ ensureClusterSizeConsistency();
// wait until second node join the cluster
- ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
- assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
+ logger.info("--> start second node and wait for it to join");
+ internalCluster().startNode(settings);
+ ensureClusterSizeConsistency();
logger.info("--> setting minimum master node to 2");
setMinimumMasterNodes(2);
@@ -298,8 +300,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
logger.info("--> bringing another node up");
internalCluster().startNode(Settings.builder().put(settings).put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build());
- clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
- assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
+ ensureClusterSizeConsistency();
}
private void assertNoMasterBlockOnAllNodes() throws InterruptedException {
diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java
index f73043ce4e..0be4f7c4e5 100644
--- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java
@@ -49,7 +49,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class NoMasterNodeIT extends ESIntegTestCase {
@Override
diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java
index f47134a45b..ab3f82fff7 100644
--- a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java
@@ -69,7 +69,10 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase {
private void removePublishTimeout() {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
- assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0")));
+ assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0")
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")
+ ));
}
public void testClusterUpdateSettingsAcknowledgement() {
diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java
index 2ea4f75515..f51a4f11ae 100644
--- a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java
@@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
-import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
@@ -61,7 +60,9 @@ public class AckIT extends ESIntegTestCase {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), 0).build();
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit to check acking
+ .build();
}
public void testUpdateSettingsAcknowledgement() {
diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
index d98f929424..19657b0548 100644
--- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
@@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationD
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@@ -104,7 +103,6 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
- .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 3)
.put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "10s")
.build();
diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java
index 3d345f24db..f056eded34 100644
--- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java
@@ -20,32 +20,20 @@ package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
-import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.component.LifecycleComponent;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.discovery.zen.ZenDiscovery;
-import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.junit.annotations.TestLogging;
-import org.elasticsearch.threadpool.ThreadPool;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -56,17 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ClusterServiceIT extends ESIntegTestCase {
- @Override
- protected Collection<Class<? extends Plugin>> nodePlugins() {
- return Arrays.asList(TestPlugin.class);
- }
-
public void testAckedUpdateTask() throws Exception {
internalCluster().startNode();
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
@@ -482,141 +463,4 @@ public class ClusterServiceIT extends ESIntegTestCase {
assertTrue(controlSources.isEmpty());
block2.countDown();
}
-
- public void testLocalNodeMasterListenerCallbacks() throws Exception {
- Settings settings = Settings.builder()
- .put("discovery.zen.minimum_master_nodes", 1)
- .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms")
- .put("discovery.initial_state_timeout", "500ms")
- .build();
-
- String node_0 = internalCluster().startNode(settings);
- ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
- MasterAwareService testService = internalCluster().getInstance(MasterAwareService.class);
-
- ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
- .setWaitForNodes("1").get();
- assertThat(clusterHealth.isTimedOut(), equalTo(false));
-
- // the first node should be a master as the minimum required is 1
- assertThat(clusterService.state().nodes().getMasterNode(), notNullValue());
- assertThat(clusterService.state().nodes().isLocalNodeElectedMaster(), is(true));
- assertThat(testService.master(), is(true));
- String node_1 = internalCluster().startNode(settings);
- final ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class, node_1);
- MasterAwareService testService1 = internalCluster().getInstance(MasterAwareService.class, node_1);
-
- clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
- assertThat(clusterHealth.isTimedOut(), equalTo(false));
-
- // the second node should not be the master as node1 is already the master.
- assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(false));
- assertThat(testService1.master(), is(false));
-
- internalCluster().stopCurrentMasterNode();
- clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").get();
- assertThat(clusterHealth.isTimedOut(), equalTo(false));
-
- // now that node0 is closed, node1 should be elected as master
- assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(true));
- assertThat(testService1.master(), is(true));
-
- // start another node and set min_master_node
- internalCluster().startNode(Settings.builder().put(settings));
- assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
-
- Settings transientSettings = Settings.builder()
- .put("discovery.zen.minimum_master_nodes", 2)
- .build();
- client().admin().cluster().prepareUpdateSettings().setTransientSettings(transientSettings).get();
-
- // and shutdown the second node
- internalCluster().stopRandomNonMasterNode();
-
- // there should not be any master as the minimum number of required eligible masters is not met
- awaitBusy(() -> clusterService1.state().nodes().getMasterNode() == null &&
- clusterService1.clusterServiceState().getClusterStateStatus() == ClusterStateStatus.APPLIED);
- assertThat(testService1.master(), is(false));
-
- // bring the node back up
- String node_2 = internalCluster().startNode(Settings.builder().put(settings).put(transientSettings));
- ClusterService clusterService2 = internalCluster().getInstance(ClusterService.class, node_2);
- MasterAwareService testService2 = internalCluster().getInstance(MasterAwareService.class, node_2);
-
- // make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive
- // the updated cluster state...
- assertThat(internalCluster().client(node_1).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true)
- .setWaitForNodes("2").get().isTimedOut(), is(false));
- assertThat(internalCluster().client(node_2).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true)
- .setWaitForNodes("2").get().isTimedOut(), is(false));
-
- // now that we started node1 again, a new master should be elected
- assertThat(clusterService2.state().nodes().getMasterNode(), is(notNullValue()));
- if (node_2.equals(clusterService2.state().nodes().getMasterNode().getName())) {
- assertThat(testService1.master(), is(false));
- assertThat(testService2.master(), is(true));
- } else {
- assertThat(testService1.master(), is(true));
- assertThat(testService2.master(), is(false));
- }
- }
-
- public static class TestPlugin extends Plugin {
-
- @Override
- public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
- List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
- services.add(MasterAwareService.class);
- return services;
- }
- }
-
- @Singleton
- public static class MasterAwareService extends AbstractLifecycleComponent implements LocalNodeMasterListener {
-
- private final ClusterService clusterService;
- private volatile boolean master;
-
- @Inject
- public MasterAwareService(Settings settings, ClusterService clusterService) {
- super(settings);
- clusterService.add(this);
- this.clusterService = clusterService;
- logger.info("initialized test service");
- }
-
- @Override
- public void onMaster() {
- logger.info("on master [{}]", clusterService.localNode());
- master = true;
- }
-
- @Override
- public void offMaster() {
- logger.info("off master [{}]", clusterService.localNode());
- master = false;
- }
-
- public boolean master() {
- return master;
- }
-
- @Override
- protected void doStart() {
- }
-
- @Override
- protected void doStop() {
- }
-
- @Override
- protected void doClose() {
- }
-
- @Override
- public String executorName() {
- return ThreadPool.Names.SAME;
- }
-
- }
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java
index 9fd4fc1851..bede01ed21 100644
--- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java
@@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -44,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
@@ -1098,6 +1100,48 @@ public class ClusterServiceTests extends ESTestCase {
timedClusterService.close();
}
+ public void testLocalNodeMasterListenerCallbacks() throws Exception {
+ TimedClusterService timedClusterService = createTimedClusterService(false);
+
+ AtomicBoolean isMaster = new AtomicBoolean();
+ timedClusterService.add(new LocalNodeMasterListener() {
+ @Override
+ public void onMaster() {
+ isMaster.set(true);
+ }
+
+ @Override
+ public void offMaster() {
+ isMaster.set(false);
+ }
+
+ @Override
+ public String executorName() {
+ return ThreadPool.Names.SAME;
+ }
+ });
+
+ ClusterState state = timedClusterService.state();
+ DiscoveryNodes nodes = state.nodes();
+ DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
+ state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
+ setState(timedClusterService, state);
+ assertThat(isMaster.get(), is(true));
+
+ nodes = state.nodes();
+ nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(null);
+ state = ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES))
+ .nodes(nodesBuilder).build();
+ setState(timedClusterService, state);
+ assertThat(isMaster.get(), is(false));
+ nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
+ state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
+ setState(timedClusterService, state);
+ assertThat(isMaster.get(), is(true));
+
+ timedClusterService.close();
+ }
+
private static class SimpleTask {
private final int id;
diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
index 22844e0588..1688157c0d 100644
--- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
+++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
@@ -122,7 +122,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
index 3af2e32eef..b708ab4c26 100644
--- a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
+++ b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.equalTo;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class ZenUnicastDiscoveryIT extends ESIntegTestCase {
private ClusterDiscoveryConfiguration discoveryConfig;
diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
index 226b1422b4..8284388d2c 100644
--- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
@@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
@@ -37,7 +36,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
-@ClusterScope(numDataNodes =0, scope= Scope.TEST)
+@ClusterScope(numDataNodes = 0, scope = Scope.TEST)
public class QuorumGatewayIT extends ESIntegTestCase {
@Override
protected int numberOfReplicas() {
@@ -47,8 +46,7 @@ public class QuorumGatewayIT extends ESIntegTestCase {
public void testQuorumRecovery() throws Exception {
logger.info("--> starting 3 nodes");
// we are shutting down nodes - make sure we don't have 2 clusters if we test network
- internalCluster().startNodesAsync(3,
- Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build()).get();
+ internalCluster().startNodesAsync(3).get();
createIndex("test");
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java
index 1e35bcdd46..1794aa2272 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java
@@ -34,7 +34,7 @@ import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class RecoverAfterNodesIT extends ESIntegTestCase {
private static final TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(10);
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
index 4e1ff3f867..052bfc00ef 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
@@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
-import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -44,6 +43,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
+import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore;
@@ -333,48 +333,43 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID();
assertThat(metaDataUuid, not(equalTo("_na_")));
- Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
-
logger.info("--> closing first node, and indexing more data to the second node");
- internalCluster().fullRestart(new RestartCallback() {
+ internalCluster().stopRandomDataNode();
- @Override
- public void doAfterNodes(int numNodes, Client client) throws Exception {
- if (numNodes == 1) {
- logger.info("--> one node is closed - start indexing data into the second one");
- client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
- // TODO: remove once refresh doesn't fail immediately if there a master block:
- // https://github.com/elastic/elasticsearch/issues/9997
- client.admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
- client.admin().indices().prepareRefresh().execute().actionGet();
-
- for (int i = 0; i < 10; i++) {
- assertHitCount(client.prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3);
- }
+ logger.info("--> one node is closed - start indexing data into the second one");
+ client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
+ // TODO: remove once refresh doesn't fail immediately if there a master block:
+ // https://github.com/elastic/elasticsearch/issues/9997
+ client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
+ client().admin().indices().prepareRefresh().execute().actionGet();
- logger.info("--> add some metadata, additional type and template");
- client.admin().indices().preparePutMapping("test").setType("type2")
- .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
- .execute().actionGet();
- client.admin().indices().preparePutTemplate("template_1")
- .setPatterns(Collections.singletonList("te*"))
- .setOrder(0)
- .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
- .startObject("field1").field("type", "text").field("store", true).endObject()
- .startObject("field2").field("type", "keyword").field("store", true).endObject()
- .endObject().endObject().endObject())
- .execute().actionGet();
- client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
- logger.info("--> starting two nodes back, verifying we got the latest version");
- }
+ for (int i = 0; i < 10; i++) {
+ assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3);
+ }
- }
+ logger.info("--> add some metadata, additional type and template");
+ client().admin().indices().preparePutMapping("test").setType("type2")
+ .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
+ .execute().actionGet();
+ client().admin().indices().preparePutTemplate("template_1")
+ .setTemplate("te*")
+ .setOrder(0)
+ .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
+ .startObject("field1").field("type", "text").field("store", true).endObject()
+ .startObject("field2").field("type", "keyword").field("store", true).endObject()
+ .endObject().endObject().endObject())
+ .execute().actionGet();
+ client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
+
+ logger.info("--> stopping the second node");
+ internalCluster().stopRandomDataNode();
+
+ logger.info("--> starting the two nodes back");
- });
+ internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get();
logger.info("--> running cluster_health (wait for the shards to startup)");
ensureGreen();
- primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(), equalTo(metaDataUuid));
@@ -502,27 +497,28 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
public void testRecoveryDifferentNodeOrderStartup() throws Exception {
// we need different data paths so we make sure we start the second node fresh
- final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build());
+ final Path pathNode1 = createTempDir();
+ final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build());
client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet();
- internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build());
+ final Path pathNode2 = createTempDir();
+ final String node_2 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
ensureGreen();
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
-
- internalCluster().fullRestart(new RestartCallback() {
-
- @Override
- public boolean doRestart(String nodeName) {
- return !node_1.equals(nodeName);
- }
- });
-
+ if (randomBoolean()) {
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
+ } else {
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));
+ }
+ // start the second node again
+ internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
ensureYellow();
primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
-
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
}
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
index dfe9a09fb2..e0a6111833 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
@@ -555,10 +555,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
// start a master node
internalCluster().startNode(nodeSettings);
- InternalTestCluster.Async<String> blueFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
- InternalTestCluster.Async<String> redFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
- final String blueNodeName = blueFuture.get();
- final String redNodeName = redFuture.get();
+ final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
+ final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));
diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
index c55fc51433..fe3b569755 100644
--- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
@@ -209,7 +209,10 @@ public class RareClusterStateIT extends ESIntegTestCase {
// but the change might not be on the node that performed the indexing
// operation yet
- Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0ms").build();
+ Settings settings = Settings.builder()
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
+ .build();
final List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
@@ -327,7 +330,6 @@ public class RareClusterStateIT extends ESIntegTestCase {
// time of indexing it
final List<String> nodeNames = internalCluster().startNodesAsync(2,
Settings.builder()
- .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
.build()).get();
diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
index 4a61bebd4d..dc38867705 100644
--- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
@@ -72,16 +72,15 @@ public class FullRollingRestartIT extends ESIntegTestCase {
}
logger.info("--> now start adding nodes");
- internalCluster().startNodesAsync(2, settings).get();
+ internalCluster().startNode(settings);
+ internalCluster().startNode(settings);
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
logger.info("--> add two more nodes");
- internalCluster().startNodesAsync(2, settings).get();
-
- // We now have 5 nodes
- setMinimumMasterNodes(3);
+ internalCluster().startNode(settings);
+ internalCluster().startNode(settings);
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("5"));
@@ -97,9 +96,6 @@ public class FullRollingRestartIT extends ESIntegTestCase {
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("4"));
- // going down to 3 nodes. note that the min_master_node may not be in effect when we shutdown the 4th
- // node, but that's OK as it is set to 3 before.
- setMinimumMasterNodes(2);
internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
@@ -115,8 +111,6 @@ public class FullRollingRestartIT extends ESIntegTestCase {
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("2"));
- // closing the 2nd node
- setMinimumMasterNodes(1);
internalCluster().stopRandomDataNode();
// make sure the cluster state is yellow, and all has been recovered
diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
index 8a59b7460b..bd474280e4 100644
--- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
@@ -44,8 +44,8 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
+import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@@ -53,7 +53,6 @@ import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
-import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
@@ -77,6 +76,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -351,7 +351,8 @@ public class RelocationIT extends ESIntegTestCase {
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get();
- internalCluster().startNodesAsync(2).get();
+ internalCluster().startNode();
+ internalCluster().startNode();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
@@ -424,14 +425,15 @@ public class RelocationIT extends ESIntegTestCase {
public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException {
int halfNodes = randomIntBetween(1, 3);
- Settings blueSetting = Settings.builder().put("node.attr.color", "blue").build();
- InternalTestCluster.Async<List<String>> blueFuture = internalCluster().startNodesAsync(halfNodes, blueSetting);
- Settings redSetting = Settings.builder().put("node.attr.color", "red").build();
- InternalTestCluster.Async<java.util.List<String>> redFuture = internalCluster().startNodesAsync(halfNodes, redSetting);
- blueFuture.get();
- redFuture.get();
- logger.info("blue nodes: {}", blueFuture.get());
- logger.info("red nodes: {}", redFuture.get());
+ Settings[] nodeSettings = Stream.concat(
+ Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes),
+ Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes)
+ ).toArray(Settings[]::new);
+ List<String> nodes = internalCluster().startNodesAsync(nodeSettings).get();
+ String[] blueNodes = nodes.subList(0, halfNodes).stream().toArray(String[]::new);
+ String[] redNodes = nodes.subList(halfNodes, nodes.size()).stream().toArray(String[]::new);
+ logger.info("blue nodes: {}", (Object)blueNodes);
+ logger.info("red nodes: {}", (Object)redNodes);
ensureStableCluster(halfNodes * 2);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
@@ -439,7 +441,7 @@ public class RelocationIT extends ESIntegTestCase {
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))
));
- assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2]));
+ assertAllShardsOnNodes("test", redNodes);
int numDocs = randomIntBetween(100, 150);
ArrayList<String> ids = new ArrayList<>();
logger.info(" --> indexing [{}] docs", numDocs);
diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java
index 6121b2c0c8..cf4fe03893 100644
--- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java
+++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java
@@ -19,18 +19,6 @@
package org.elasticsearch.tribe;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.client.Client;
@@ -58,6 +46,18 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -121,13 +121,13 @@ public class TribeIT extends ESIntegTestCase {
final Collection<Class<? extends Plugin>> plugins = nodePlugins();
if (cluster1 == null) {
- cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes,
+ cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_1",
plugins, Function.identity());
}
if (cluster2 == null) {
- cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes,
+ cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_2",
plugins, Function.identity());
}
diff --git a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java
index 72e1f2da79..f8b105884b 100644
--- a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java
+++ b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java
@@ -39,7 +39,8 @@ import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE,
numDataNodes = 0,
transportClientRatio = 0.0,
- numClientNodes = 0)
+ numClientNodes = 0,
+ autoMinMasterNodes = false)
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-azure/issues/89")
public class AzureMinimumMasterNodesTests extends AbstractAzureComputeServiceTestCase {
diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java
index cdb45bfe08..41f5af48eb 100644
--- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java
+++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java
@@ -33,7 +33,7 @@ import static org.hamcrest.CoreMatchers.is;
* starting.
* This test requires AWS to run.
*/
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0, autoMinMasterNodes = false)
public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase {
public void testMinimumMasterNodesStart() {
Settings nodeSettings = Settings.builder()
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
index 82e7ce072e..29f77bc166 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
@@ -149,6 +149,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
@@ -178,6 +179,7 @@ import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgno
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@@ -527,10 +529,15 @@ public abstract class ESIntegTestCase extends ESTestCase {
if (cluster() != null) {
if (currentClusterScope != Scope.TEST) {
MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData();
- assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData
- .persistentSettings().getAsMap().size(), equalTo(0));
- assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData
- .transientSettings().getAsMap().size(), equalTo(0));
+ final Map<String, String> persistent = metaData.persistentSettings().getAsMap();
+ assertThat("test leaves persistent cluster metadata behind: " + persistent, persistent.size(), equalTo(0));
+ final Map<String, String> transientSettings = new HashMap<>(metaData.transientSettings().getAsMap());
+ if (isInternalCluster() && internalCluster().getAutoManageMinMasterNode()) {
+ // this is set by the test infra
+ transientSettings.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey());
+ }
+ assertThat("test leaves transient cluster metadata behind: " + transientSettings,
+ transientSettings.keySet(), empty());
}
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
@@ -1519,6 +1526,12 @@ public abstract class ESIntegTestCase extends ESTestCase {
boolean supportsDedicatedMasters() default true;
/**
+ * The cluster automatically manages the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} by default
+ * as nodes are started and stopped. Set this to false to manage the setting manually.
+ */
+ boolean autoMinMasterNodes() default true;
+
+ /**
* Returns the number of client nodes in the cluster. Default is {@link InternalTestCluster#DEFAULT_NUM_CLIENT_NODES}, a
* negative value means that the number of client nodes will be randomized.
*/
@@ -1615,6 +1628,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
return annotation == null ? true : annotation.supportsDedicatedMasters();
}
+ private boolean getAutoMinMasterNodes() {
+ ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
+ return annotation == null ? true : annotation.autoMinMasterNodes();
+ }
+
private int getNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null ? -1 : annotation.numDataNodes();
@@ -1753,7 +1771,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
mockPlugins = mocks;
}
- return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes,
+ return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, getAutoMinMasterNodes(),
+ minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper());
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
index 69c5908698..1d3b24b3ce 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
@@ -444,7 +444,6 @@ public abstract class ESTestCase extends LuceneTestCase {
return RandomPicks.randomFrom(random, array);
}
-
/** Pick a random object from the given list. */
public static <T> T randomFrom(List<T> list) {
return RandomPicks.randomFrom(random(), list);
@@ -452,7 +451,12 @@ public abstract class ESTestCase extends LuceneTestCase {
/** Pick a random object from the given collection. */
public static <T> T randomFrom(Collection<T> collection) {
- return RandomPicks.randomFrom(random(), collection);
+ return randomFrom(random(), collection);
+ }
+
+ /** Pick a random object from the given collection. */
+ public static <T> T randomFrom(Random random, Collection<T> collection) {
+ return RandomPicks.randomFrom(random, collection);
}
public static String randomAsciiOfLengthBetween(int minCodeUnits, int maxCodeUnits) {
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index 37e3a58295..3c29b878e7 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -24,12 +24,10 @@ import com.carrotsearch.randomizedtesting.SysGlobals;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
-
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@@ -68,7 +66,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.discovery.DiscoverySettings;
+import org.elasticsearch.discovery.zen.ElectMasterService;
+import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLockObtainFailedException;
@@ -133,8 +132,10 @@ import java.util.stream.Stream;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
+import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.test.ESTestCase.assertBusy;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
+import static org.elasticsearch.test.ESTestCase.randomFrom;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -225,6 +226,8 @@ public final class InternalTestCluster extends TestCluster {
private final ExecutorService executor;
+ private final boolean autoManageMinMasterNodes;
+
private final Collection<Class<? extends Plugin>> mockPlugins;
/**
@@ -238,9 +241,10 @@ public final class InternalTestCluster extends TestCluster {
public InternalTestCluster(long clusterSeed, Path baseDir,
boolean randomlyAddDedicatedMasters,
- int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
+ boolean autoManageMinMasterNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins, Function<Client, Client> clientWrapper) {
super(clusterSeed);
+ this.autoManageMinMasterNodes = autoManageMinMasterNodes;
this.clientWrapper = clientWrapper;
this.baseDir = baseDir;
this.clusterName = clusterName;
@@ -345,6 +349,11 @@ public final class InternalTestCluster extends TestCluster {
return clusterName;
}
+ /** returns true if the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} setting is auto managed by this cluster */
+ public boolean getAutoManageMinMasterNode() {
+ return autoManageMinMasterNodes;
+ }
+
public String[] getNodeNames() {
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
}
@@ -466,7 +475,7 @@ public final class InternalTestCluster extends TestCluster {
if (randomNodeAndClient != null) {
return randomNodeAndClient;
}
- NodeAndClient buildNode = buildNode();
+ NodeAndClient buildNode = buildNode(1);
buildNode.startNode();
publishNode(buildNode);
return buildNode;
@@ -496,30 +505,20 @@ public final class InternalTestCluster extends TestCluster {
* if more nodes than <code>n</code> are present this method will not
* stop any of the running nodes.
*/
- public void ensureAtLeastNumDataNodes(int n) {
- final List<Async<String>> asyncs = new ArrayList<>();
- synchronized (this) {
- int size = numDataNodes();
- for (int i = size; i < n; i++) {
- logger.info("increasing cluster size from {} to {}", size, n);
- if (numSharedDedicatedMasterNodes > 0) {
- asyncs.add(startDataOnlyNodeAsync());
- } else {
- asyncs.add(startNodeAsync());
- }
- }
- }
- try {
- for (Async<String> async : asyncs) {
- async.get();
+ public synchronized void ensureAtLeastNumDataNodes(int n) {
+ boolean added = false;
+ int size = numDataNodes();
+ for (int i = size; i < n; i++) {
+ logger.info("increasing cluster size from {} to {}", size, n);
+ added = true;
+ if (numSharedDedicatedMasterNodes > 0) {
+ startDataOnlyNode(Settings.EMPTY);
+ } else {
+ startNode(Settings.EMPTY);
}
- } catch (Exception e) {
- throw new ElasticsearchException("failed to start nodes", e);
}
- if (!asyncs.isEmpty()) {
- synchronized (this) {
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get());
- }
+ if (added) {
+ validateClusterFormed();
}
}
@@ -544,28 +543,47 @@ public final class InternalTestCluster extends TestCluster {
while (values.hasNext() && numNodesAndClients++ < size - n) {
NodeAndClient next = values.next();
nodesToRemove.add(next);
- removeDisruptionSchemeFromNode(next);
- next.close();
- }
- for (NodeAndClient toRemove : nodesToRemove) {
- nodes.remove(toRemove.name);
}
+
+ stopNodesAndClients(nodesToRemove);
if (!nodesToRemove.isEmpty() && size() > 0) {
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get());
+ validateClusterFormed();
}
}
- private NodeAndClient buildNode(Settings settings) {
+ /**
+ * builds a new node given the settings.
+ *
+ * @param settings the settings to use
+ * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
+ */
+ private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) {
int ord = nextNodeId.getAndIncrement();
- return buildNode(ord, random.nextLong(), settings, false);
+ return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes);
}
- private NodeAndClient buildNode() {
+ /**
+ * builds a new node with default settings
+ *
+ * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
+ */
+ private NodeAndClient buildNode(int defaultMinMasterNodes) {
int ord = nextNodeId.getAndIncrement();
- return buildNode(ord, random.nextLong(), null, false);
+ return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes);
}
- private NodeAndClient buildNode(int nodeId, long seed, Settings settings, boolean reuseExisting) {
+ /**
+ * builds a new node
+ *
+ * @param nodeId the node internal id (see {@link NodeAndClient#nodeAndClientId()}
+ * @param seed the node's random seed
+ * @param settings the settings to use
+ * @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and
+ * the method will return the existing one
+ * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
+ */
+ private NodeAndClient buildNode(int nodeId, long seed, Settings settings,
+ boolean reuseExisting, int defaultMinMasterNodes) {
assert Thread.holdsLock(this);
ensureOpen();
settings = getSettings(nodeId, seed, settings);
@@ -577,13 +595,21 @@ public final class InternalTestCluster extends TestCluster {
assert reuseExisting == true || nodes.containsKey(name) == false :
"node name [" + name + "] already exists but not allowed to use it";
}
- Settings finalSettings = Settings.builder()
+ Settings.Builder finalSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home
.put(settings)
.put("node.name", name)
- .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed)
- .build();
- MockNode node = new MockNode(finalSettings, plugins);
+ .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed);
+
+ if (autoManageMinMasterNodes) {
+ assert finalSettings.get(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null :
+ "min master nodes may not be set when auto managed";
+ finalSettings
+ // don't wait too long not to slow down tests
+ .put(ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.getKey(), "5s")
+ .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), defaultMinMasterNodes);
+ }
+ MockNode node = new MockNode(finalSettings.build(), plugins);
return new NodeAndClient(name, node, nodeId);
}
@@ -684,7 +710,7 @@ public final class InternalTestCluster extends TestCluster {
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
if (size() == 0) {
// if we are the first node - don't wait for a state
- builder.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
+ builder.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
}
return startNode(builder);
}
@@ -777,6 +803,10 @@ public final class InternalTestCluster extends TestCluster {
return nodeAndClientId;
}
+ public boolean isMasterEligible() {
+ return Node.NODE_MASTER_SETTING.get(node.settings());
+ }
+
Client client(Random random) {
if (closed.get()) {
throw new RuntimeException("already closed");
@@ -844,21 +874,40 @@ public final class InternalTestCluster extends TestCluster {
node.close();
}
- void restart(RestartCallback callback, boolean clearDataIfNeeded) throws Exception {
- assert callback != null;
- resetClient();
+ /**
+ * closes the current node if not already closed, builds a new node object using the current node settings and starts it
+ */
+ void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
if (!node.isClosed()) {
closeNode();
}
- Settings newSettings = callback.onNodeStopped(name);
- if (newSettings == null) {
- newSettings = Settings.EMPTY;
+ recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes);
+ startNode();
+ }
+
+ /**
+ * rebuilds a new node object using the current node settings and starts it
+ */
+ void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
+ assert callback != null;
+ Settings callbackSettings = callback.onNodeStopped(name);
+ Settings.Builder newSettings = Settings.builder();
+ if (callbackSettings != null) {
+ newSettings.put(callbackSettings);
+ }
+ if (minMasterNodes >= 0) {
+ assert ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed";
+ newSettings.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build();
}
+
+ // validation is (optionally) done in fullRestart/rollingRestart
+ newSettings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s");
if (clearDataIfNeeded) {
clearDataIfNeeded(callback);
}
- createNewNode(newSettings);
- startNode();
+ createNewNode(newSettings.build());
+ // make sure cached client points to new node
+ resetClient();
}
private void clearDataIfNeeded(RestartCallback callback) throws IOException {
@@ -948,22 +997,24 @@ public final class InternalTestCluster extends TestCluster {
if (wipeData) {
wipePendingDataDirectories();
}
+ if (nodes.size() > 0 && autoManageMinMasterNodes) {
+ updateMinMasterNodes(getMasterNodesCount());
+ }
logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
return;
}
logger.debug("Cluster is NOT consistent - restarting shared nodes - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
// trash all nodes with id >= sharedNodesSeeds.length - they are non shared
-
-
+ final List<NodeAndClient> toClose = new ArrayList<>();
for (Iterator<NodeAndClient> iterator = nodes.values().iterator(); iterator.hasNext();) {
NodeAndClient nodeAndClient = iterator.next();
if (nodeAndClient.nodeAndClientId() >= sharedNodesSeeds.length) {
logger.debug("Close Node [{}] not shared", nodeAndClient.name);
- nodeAndClient.close();
- iterator.remove();
+ toClose.add(nodeAndClient);
}
}
+ stopNodesAndClients(toClose);
// clean up what the nodes left that is unused
if (wipeData) {
@@ -972,13 +1023,19 @@ public final class InternalTestCluster extends TestCluster {
// start any missing node
assert newSize == numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes;
+ final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes;
+ final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1;
+ final List<NodeAndClient> toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes
for (int i = 0; i < numSharedDedicatedMasterNodes; i++) {
final Settings.Builder settings = Settings.builder();
- settings.put(Node.NODE_MASTER_SETTING.getKey(), true).build();
- settings.put(Node.NODE_DATA_SETTING.getKey(), false).build();
- NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
- nodeAndClient.startNode();
- publishNode(nodeAndClient);
+ settings.put(Node.NODE_MASTER_SETTING.getKey(), true);
+ settings.put(Node.NODE_DATA_SETTING.getKey(), false);
+ if (autoManageMinMasterNodes) {
+ settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
+ }
+
+ NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
+ toStartAndPublish.add(nodeAndClient);
}
for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) {
final Settings.Builder settings = Settings.builder();
@@ -987,32 +1044,43 @@ public final class InternalTestCluster extends TestCluster {
settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build();
settings.put(Node.NODE_DATA_SETTING.getKey(), true).build();
}
- NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
- nodeAndClient.startNode();
- publishNode(nodeAndClient);
+ if (autoManageMinMasterNodes) {
+ settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
+ }
+ NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
+ toStartAndPublish.add(nodeAndClient);
}
for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes;
i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) {
final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
- NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
- nodeAndClient.startNode();
- publishNode(nodeAndClient);
+ NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
+ toStartAndPublish.add(nodeAndClient);
}
+ startAndPublishNodesAndClients(toStartAndPublish);
+
nextNodeId.set(newSize);
assert size() == newSize;
if (newSize > 0) {
- ClusterHealthResponse response = client().admin().cluster().prepareHealth()
- .setWaitForNodes(Integer.toString(newSize)).get();
- if (response.isTimedOut()) {
- logger.warn("failed to wait for a cluster of size [{}], got [{}]", newSize, response);
- throw new IllegalStateException("cluster failed to reach the expected size of [" + newSize + "]");
- }
+ validateClusterFormed();
}
logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
}
+ /** ensure a cluster is form with {@link #nodes}.size() nodes. */
+ private void validateClusterFormed() {
+ final int size = nodes.size();
+ String name = randomFrom(random, getNodeNames());
+ logger.trace("validating cluster formed via [{}], expecting [{}]", name, size);
+ final Client client = client(name);
+ ClusterHealthResponse response = client.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size)).get();
+ if (response.isTimedOut()) {
+ logger.warn("failed to wait for a cluster of size [{}], got [{}]", size, response);
+ throw new IllegalStateException("cluster failed to reach the expected size of [" + size + "]");
+ }
+ }
+
@Override
public synchronized void afterTest() throws IOException {
wipePendingDataDirectories();
@@ -1234,9 +1302,7 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
- removeDisruptionSchemeFromNode(nodeAndClient);
- nodes.remove(nodeAndClient.name);
- nodeAndClient.close();
+ stopNodesAndClient(nodeAndClient);
return true;
}
return false;
@@ -1251,9 +1317,7 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(nc -> filter.test(nc.node.settings()));
if (nodeAndClient != null) {
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
- removeDisruptionSchemeFromNode(nodeAndClient);
- nodes.remove(nodeAndClient.name);
- nodeAndClient.close();
+ stopNodesAndClient(nodeAndClient);
}
}
@@ -1266,9 +1330,7 @@ public final class InternalTestCluster extends TestCluster {
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
- removeDisruptionSchemeFromNode(nodes.get(masterNodeName));
- NodeAndClient remove = nodes.remove(masterNodeName);
- remove.close();
+ stopNodesAndClient(nodes.get(masterNodeName));
}
/**
@@ -1278,8 +1340,47 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName()).negate());
if (nodeAndClient != null) {
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
+ stopNodesAndClient(nodeAndClient);
+ }
+ }
+
+ private synchronized void startAndPublishNodesAndClients(List<NodeAndClient> nodeAndClients) {
+ if (nodeAndClients.size() > 0) {
+ final int newMasters = (int) nodeAndClients.stream().filter(NodeAndClient::isMasterEligible)
+ .filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters
+ .count();
+ final int currentMasters = getMasterNodesCount();
+ if (autoManageMinMasterNodes && currentMasters > 1 && newMasters > 0) {
+ // special case for 1 node master - we can't update the min master nodes before we add more nodes.
+ updateMinMasterNodes(currentMasters + newMasters);
+ }
+ for (NodeAndClient nodeAndClient : nodeAndClients) {
+ nodeAndClient.startNode();
+ publishNode(nodeAndClient);
+ }
+ if (autoManageMinMasterNodes && currentMasters == 1 && newMasters > 0) {
+ // update once masters have joined
+ validateClusterFormed();
+ updateMinMasterNodes(currentMasters + newMasters);
+ }
+ }
+ }
+
+ private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException {
+ stopNodesAndClients(Collections.singleton(nodeAndClient));
+ }
+
+ private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
+ if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {
+ int masters = (int)nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();
+ if (masters > 0) {
+ updateMinMasterNodes(getMasterNodesCount() - masters);
+ }
+ }
+ for (NodeAndClient nodeAndClient: nodeAndClients) {
removeDisruptionSchemeFromNode(nodeAndClient);
- nodes.remove(nodeAndClient.name);
+ NodeAndClient previous = nodes.remove(nodeAndClient.name);
+ assert previous == nodeAndClient;
nodeAndClient.close();
}
}
@@ -1319,8 +1420,7 @@ public final class InternalTestCluster extends TestCluster {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(predicate);
if (nodeAndClient != null) {
- logger.info("Restarting random node [{}] ", nodeAndClient.name);
- nodeAndClient.restart(callback, true);
+ restartNode(nodeAndClient, callback);
}
}
@@ -1331,93 +1431,10 @@ public final class InternalTestCluster extends TestCluster {
ensureOpen();
NodeAndClient nodeAndClient = nodes.get(nodeName);
if (nodeAndClient != null) {
- logger.info("Restarting node [{}] ", nodeAndClient.name);
- nodeAndClient.restart(callback, true);
+ restartNode(nodeAndClient, callback);
}
}
- private synchronized void restartAllNodes(boolean rollingRestart, RestartCallback callback) throws Exception {
- ensureOpen();
- List<NodeAndClient> toRemove = new ArrayList<>();
- try {
- for (NodeAndClient nodeAndClient : nodes.values()) {
- if (!callback.doRestart(nodeAndClient.name)) {
- logger.info("Closing node [{}] during restart", nodeAndClient.name);
- toRemove.add(nodeAndClient);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- nodeAndClient.close();
- }
- }
- } finally {
- for (NodeAndClient nodeAndClient : toRemove) {
- nodes.remove(nodeAndClient.name);
- }
- }
- logger.info("Restarting remaining nodes rollingRestart [{}]", rollingRestart);
- if (rollingRestart) {
- int numNodesRestarted = 0;
- for (NodeAndClient nodeAndClient : nodes.values()) {
- callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
- logger.info("Restarting node [{}] ", nodeAndClient.name);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- nodeAndClient.restart(callback, true);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
- }
- }
- } else {
- int numNodesRestarted = 0;
- Set[] nodesRoleOrder = new Set[nextNodeId.get()];
- Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
- for (NodeAndClient nodeAndClient : nodes.values()) {
- callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
- logger.info("Stopping node [{}] ", nodeAndClient.name);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- nodeAndClient.closeNode();
- // delete data folders now, before we start other nodes that may claim it
- nodeAndClient.clearDataIfNeeded(callback);
-
- DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
- nodesRoleOrder[nodeAndClient.nodeAndClientId()] = discoveryNode.getRoles();
- nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
- }
-
- assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size();
-
- // randomize start up order, but making sure that:
- // 1) A data folder that was assigned to a data node will stay so
- // 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used)
- // will still belong to data nodes
- for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) {
- Collections.shuffle(sameRoleNodes, random);
- }
-
- for (Set roles : nodesRoleOrder) {
- if (roles == null) {
- // if some nodes were stopped, we want have a role for them
- continue;
- }
- NodeAndClient nodeAndClient = nodesByRoles.get(roles).remove(0);
- logger.info("Starting node [{}] ", nodeAndClient.name);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- // we already cleared data folders, before starting nodes up
- nodeAndClient.restart(callback, false);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
- }
- }
- }
- }
-
-
public static final RestartCallback EMPTY_CALLBACK = new RestartCallback() {
@Override
public Settings onNodeStopped(String node) {
@@ -1442,15 +1459,98 @@ public final class InternalTestCluster extends TestCluster {
/**
* Restarts all nodes in a rolling restart fashion ie. only restarts on node a time.
*/
- public void rollingRestart(RestartCallback function) throws Exception {
- restartAllNodes(true, function);
+ public synchronized void rollingRestart(RestartCallback callback) throws Exception {
+ int numNodesRestarted = 0;
+ for (NodeAndClient nodeAndClient : nodes.values()) {
+ callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
+ restartNode(nodeAndClient, callback);
+ }
+ }
+
+ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
+ logger.info("Restarting node [{}] ", nodeAndClient.name);
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
+ }
+ final int masterNodesCount = getMasterNodesCount();
+ // special case to allow stopping one node in a two node cluster and keep it functional
+ final boolean updateMinMaster = nodeAndClient.isMasterEligible() && masterNodesCount == 2 && autoManageMinMasterNodes;
+ if (updateMinMaster) {
+ updateMinMasterNodes(masterNodesCount - 1);
+ }
+ nodeAndClient.restart(callback, true, autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
+ }
+ if (callback.validateClusterForming() || updateMinMaster) {
+ // we have to validate cluster size if updateMinMaster == true, because we need the
+ // second node to join in order to increment min_master_nodes back to 2.
+ validateClusterFormed();
+ }
+ if (updateMinMaster) {
+ updateMinMasterNodes(masterNodesCount);
+ }
}
/**
* Restarts all nodes in the cluster. It first stops all nodes and then restarts all the nodes again.
*/
- public void fullRestart(RestartCallback function) throws Exception {
- restartAllNodes(false, function);
+ public synchronized void fullRestart(RestartCallback callback) throws Exception {
+ int numNodesRestarted = 0;
+ Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
+ Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
+ for (NodeAndClient nodeAndClient : nodes.values()) {
+ callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
+ logger.info("Stopping node [{}] ", nodeAndClient.name);
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
+ }
+ nodeAndClient.closeNode();
+ // delete data folders now, before we start other nodes that may claim it
+ nodeAndClient.clearDataIfNeeded(callback);
+ DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
+ rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId] = discoveryNode.getRoles();
+ nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
+ }
+
+ assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size();
+
+ // randomize start up order, but making sure that:
+ // 1) A data folder that was assigned to a data node will stay so
+ // 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used)
+ // will still belong to data nodes
+ for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) {
+ Collections.shuffle(sameRoleNodes, random);
+ }
+ List<NodeAndClient> startUpOrder = new ArrayList<>();
+ for (Set roles : rolesOrderedByOriginalStartupOrder) {
+ if (roles == null) {
+ // if some nodes were stopped, we want have a role for that ordinal
+ continue;
+ }
+ final List<NodeAndClient> nodesByRole = nodesByRoles.get(roles);
+ startUpOrder.add(nodesByRole.remove(0));
+ }
+ assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0;
+
+ // do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs)
+ for (NodeAndClient nodeAndClient : startUpOrder) {
+ logger.info("resetting node [{}] ", nodeAndClient.name);
+ // we already cleared data folders, before starting nodes up
+ nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1);
+ }
+
+ for (NodeAndClient nodeAndClient : startUpOrder) {
+ logger.info("starting node [{}] ", nodeAndClient.name);
+ nodeAndClient.startNode();
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
+ }
+ }
+
+ if (callback.validateClusterForming()) {
+ validateClusterFormed();
+ }
}
@@ -1534,19 +1634,51 @@ public final class InternalTestCluster extends TestCluster {
* Starts a node with the given settings and returns it's name.
*/
public synchronized String startNode(Settings settings) {
- NodeAndClient buildNode = buildNode(settings);
- buildNode.startNode();
- publishNode(buildNode);
+ final int defaultMinMasterNodes = getMinMasterNodes(getMasterNodesCount() + (Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0));
+ NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
+ startAndPublishNodesAndClients(Collections.singletonList(buildNode));
return buildNode.name;
}
+ /**
+ * updates the min master nodes setting in the current running cluster.
+ *
+ * @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting
+ */
+ private int updateMinMasterNodes(int eligibleMasterNodeCount) {
+ assert autoManageMinMasterNodes;
+ final int minMasterNodes = getMinMasterNodes(eligibleMasterNodeCount);
+ if (getMasterNodesCount() > 0) {
+ // there should be at least one master to update
+ logger.debug("updating min_master_nodes to [{}]", minMasterNodes);
+ try {
+ assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
+ Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes)
+ ));
+ } catch (Exception e) {
+ throw new ElasticsearchException("failed to update minimum master node to [{}] (current masters [{}])", e,
+ minMasterNodes, getMasterNodesCount());
+ }
+ }
+ return minMasterNodes;
+ }
+
+ /** calculates a min master nodes value based on the given number of master nodes */
+ private int getMinMasterNodes(int eligibleMasterNodes) {
+ return eligibleMasterNodes / 2 + 1;
+ }
+
+ private int getMasterNodesCount() {
+ return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count();
+ }
+
public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes) {
return startMasterOnlyNodesAsync(numNodes, Settings.EMPTY);
}
public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes, Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
- return startNodesAsync(numNodes, settings1, Version.CURRENT);
+ return startNodesAsync(numNodes, settings1);
}
public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes) {
@@ -1555,7 +1687,7 @@ public final class InternalTestCluster extends TestCluster {
public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes, Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
- return startNodesAsync(numNodes, settings1, Version.CURRENT);
+ return startNodesAsync(numNodes, settings1);
}
public synchronized Async<String> startMasterOnlyNodeAsync() {
@@ -1564,7 +1696,7 @@ public final class InternalTestCluster extends TestCluster {
public synchronized Async<String> startMasterOnlyNodeAsync(Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
- return startNodeAsync(settings1, Version.CURRENT);
+ return startNodeAsync(settings1);
}
public synchronized String startMasterOnlyNode(Settings settings) {
@@ -1578,7 +1710,7 @@ public final class InternalTestCluster extends TestCluster {
public synchronized Async<String> startDataOnlyNodeAsync(Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
- return startNodeAsync(settings1, Version.CURRENT);
+ return startNodeAsync(settings1);
}
public synchronized String startDataOnlyNode(Settings settings) {
@@ -1590,21 +1722,25 @@ public final class InternalTestCluster extends TestCluster {
* Starts a node in an async manner with the given settings and returns future with its name.
*/
public synchronized Async<String> startNodeAsync() {
- return startNodeAsync(Settings.EMPTY, Version.CURRENT);
+ return startNodeAsync(Settings.EMPTY);
}
/**
* Starts a node in an async manner with the given settings and returns future with its name.
*/
public synchronized Async<String> startNodeAsync(final Settings settings) {
- return startNodeAsync(settings, Version.CURRENT);
+ final int defaultMinMasterNodes;
+ if (autoManageMinMasterNodes) {
+ int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0;
+ defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
+ } else {
+ defaultMinMasterNodes = -1;
+ }
+ return startNodeAsync(settings, defaultMinMasterNodes);
}
- /**
- * Starts a node in an async manner with the given settings and version and returns future with its name.
- */
- public synchronized Async<String> startNodeAsync(final Settings settings, final Version version) {
- final NodeAndClient buildNode = buildNode(settings);
+ private synchronized Async<String> startNodeAsync(final Settings settings, int defaultMinMasterNodes) {
+ final NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
final Future<String> submit = executor.submit(() -> {
buildNode.startNode();
publishNode(buildNode);
@@ -1613,27 +1749,28 @@ public final class InternalTestCluster extends TestCluster {
return () -> submit.get();
}
+
/**
* Starts multiple nodes in an async manner and returns future with its name.
*/
public synchronized Async<List<String>> startNodesAsync(final int numNodes) {
- return startNodesAsync(numNodes, Settings.EMPTY, Version.CURRENT);
+ return startNodesAsync(numNodes, Settings.EMPTY);
}
/**
* Starts multiple nodes in an async manner with the given settings and returns future with its name.
*/
- public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings) {
- return startNodesAsync(numNodes, settings, Version.CURRENT);
- }
-
- /**
- * Starts multiple nodes in an async manner with the given settings and version and returns future with its name.
- */
- public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings, final Version version) {
+ public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings) {
+ final int defaultMinMasterNodes;
+ if (autoManageMinMasterNodes) {
+ int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? numNodes : 0;
+ defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
+ } else {
+ defaultMinMasterNodes = -1;
+ }
final List<Async<String>> asyncs = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
- asyncs.add(startNodeAsync(settings, version));
+ asyncs.add(startNodeAsync(settings, defaultMinMasterNodes));
}
return () -> {
@@ -1650,9 +1787,16 @@ public final class InternalTestCluster extends TestCluster {
* The order of the node names returned matches the order of the settings provided.
*/
public synchronized Async<List<String>> startNodesAsync(final Settings... settings) {
+ final int defaultMinMasterNodes;
+ if (autoManageMinMasterNodes) {
+ int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count();
+ defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
+ } else {
+ defaultMinMasterNodes = -1;
+ }
List<Async<String>> asyncs = new ArrayList<>();
for (Settings setting : settings) {
- asyncs.add(startNodeAsync(setting, Version.CURRENT));
+ asyncs.add(startNodeAsync(setting, defaultMinMasterNodes));
}
return () -> {
List<String> ids = new ArrayList<>();
@@ -1683,6 +1827,11 @@ public final class InternalTestCluster extends TestCluster {
return dataAndMasterNodes().size();
}
+ public synchronized int numMasterNodes() {
+ return filterNodes(nodes, NodeAndClient::isMasterEligible).size();
+ }
+
+
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
clearDisruptionScheme();
scheme.applyToCluster(this);
@@ -1887,14 +2036,8 @@ public final class InternalTestCluster extends TestCluster {
return false;
}
-
- /**
- * If this returns <code>false</code> the node with the given node name will not be restarted. It will be
- * closed and removed from the cluster. Returns <code>true</code> by default.
- */
- public boolean doRestart(String nodeName) {
- return true;
- }
+ /** returns true if the restart should also validate the cluster has reformed */
+ public boolean validateClusterForming() { return true; }
}
public Settings getDefaultSettings() {
diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java
index c544b2bad8..fe16f03411 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java
@@ -18,11 +18,6 @@
*/
package org.elasticsearch.test.discovery;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
@@ -32,13 +27,22 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.PingContextProvider;
import org.elasticsearch.discovery.zen.ZenPing;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging
* to be immediate and can be used to speed up tests.
*/
public final class MockZenPing extends AbstractComponent implements ZenPing {
- static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap();
+ static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = new HashMap<>();
+
+ /** a set of the last discovered pings. used to throttle busy spinning where MockZenPing will keep returning the same results */
+ private Set<MockZenPing> lastDiscoveredPings = null;
private volatile PingContextProvider contextProvider;
@@ -50,18 +54,34 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
public void start(PingContextProvider contextProvider) {
this.contextProvider = contextProvider;
assert contextProvider != null;
- boolean added = getActiveNodesForCurrentCluster().add(this);
- assert added;
+ synchronized (activeNodesPerCluster) {
+ boolean added = getActiveNodesForCurrentCluster().add(this);
+ assert added;
+ activeNodesPerCluster.notifyAll();
+ }
}
@Override
public void ping(PingListener listener, TimeValue timeout) {
logger.info("pinging using mock zen ping");
- List<PingResponse> responseList = getActiveNodesForCurrentCluster().stream()
- .filter(p -> p != this) // remove this as pings are not expected to return the local node
- .map(MockZenPing::getPingResponse)
- .collect(Collectors.toList());
- listener.onPing(responseList);
+ synchronized (activeNodesPerCluster) {
+ Set<MockZenPing> activeNodes = getActiveNodesForCurrentCluster();
+ if (activeNodes.equals(lastDiscoveredPings)) {
+ try {
+ logger.trace("nothing has changed since the last ping. waiting for a change");
+ activeNodesPerCluster.wait(timeout.millis());
+ } catch (InterruptedException e) {
+
+ }
+ activeNodes = getActiveNodesForCurrentCluster();
+ }
+ lastDiscoveredPings = activeNodes;
+ List<PingResponse> responseList = activeNodes.stream()
+ .filter(p -> p != this) // remove this as pings are not expected to return the local node
+ .map(MockZenPing::getPingResponse)
+ .collect(Collectors.toList());
+ listener.onPing(responseList);
+ }
}
private ClusterName getClusterName() {
@@ -74,13 +94,17 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
}
private Set<MockZenPing> getActiveNodesForCurrentCluster() {
+ assert Thread.holdsLock(activeNodesPerCluster);
return activeNodesPerCluster.computeIfAbsent(getClusterName(),
clusterName -> ConcurrentCollections.newConcurrentSet());
}
@Override
public void close() {
- boolean found = getActiveNodesForCurrentCluster().remove(this);
- assert found;
+ synchronized (activeNodesPerCluster) {
+ boolean found = getActiveNodesForCurrentCluster().remove(this);
+ assert found;
+ activeNodesPerCluster.notifyAll();
+ }
}
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
index 8cff517316..bc560c9b0f 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
@@ -636,7 +636,7 @@ public class ElasticsearchAssertions {
* a way that sucks less.
*/
NamedWriteableRegistry registry;
- if (ESIntegTestCase.isInternalCluster()) {
+ if (ESIntegTestCase.isInternalCluster() && ESIntegTestCase.internalCluster().size() > 0) {
registry = ESIntegTestCase.internalCluster().getInstance(NamedWriteableRegistry.class);
} else {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
index 327a49d367..36903c7c60 100644
--- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
+++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
@@ -19,21 +19,6 @@
*/
package org.elasticsearch.test.test;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Client;
@@ -51,14 +36,32 @@ import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TransportSettings;
+import org.hamcrest.Matcher;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.DATA;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.INGEST;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.MASTER;
+import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileNotExists;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
/**
@@ -81,10 +84,10 @@ public class InternalTestClusterTests extends ESTestCase {
Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
+ randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
+ randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
// TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way
assertClusters(cluster0, cluster1, false);
@@ -116,7 +119,8 @@ public class InternalTestClusterTests extends ESTestCase {
public static void assertSettings(Settings left, Settings right, boolean checkClusterUniqueSettings) {
Set<Map.Entry<String, String>> entries0 = left.getAsMap().entrySet();
Map<String, String> entries1 = right.getAsMap();
- assertThat(entries0.size(), equalTo(entries1.size()));
+ assertThat("--> left:\n" + left.toDelimitedString('\n') + "\n-->right:\n" + right.toDelimitedString('\n'),
+ entries0.size(), equalTo(entries1.size()));
for (Map.Entry<String, String> entry : entries0) {
if (clusterUniqueSettings.contains(entry.getKey()) && checkClusterUniqueSettings == false) {
continue;
@@ -125,6 +129,41 @@ public class InternalTestClusterTests extends ESTestCase {
}
}
+ private void assertMMNinNodeSetting(InternalTestCluster cluster, int masterNodes) {
+ for (final String node : cluster.getNodeNames()) {
+ assertMMNinNodeSetting(node, cluster, masterNodes);
+ }
+ }
+
+ private void assertMMNinNodeSetting(String node, InternalTestCluster cluster, int masterNodes) {
+ final int minMasterNodes = masterNodes / 2 + 1;
+ final Matcher<Map<? extends String, ? extends String>> minMasterMatcher =
+ hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes));
+ final Matcher<Map<? extends String, ?>> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()));
+ Settings nodeSettings = cluster.client(node).admin().cluster().prepareNodesInfo(node).get().getNodes().get(0).getSettings();
+ assertThat("node setting of node [" + node + "] has the wrong min_master_node setting: ["
+ + nodeSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
+ nodeSettings.getAsMap(),
+ cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher);
+ }
+
+ private void assertMMNinClusterSetting(InternalTestCluster cluster, int masterNodes) {
+ final int minMasterNodes = masterNodes / 2 + 1;
+ Matcher<Map<? extends String, ? extends String>> minMasterMatcher =
+ hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes));
+ Matcher<Map<? extends String, ?>> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()));
+
+ for (final String node : cluster.getNodeNames()) {
+ Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true)
+ .get().getState().getMetaData().settings();
+
+ assertThat("dynamic setting for node [" + node + "] has the wrong min_master_node setting : ["
+ + stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
+ stateSettings.getAsMap(),
+ cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher);
+ }
+ }
+
public void testBeforeTest() throws Exception {
long clusterSeed = randomLong();
boolean masterNodes = randomBoolean();
@@ -156,11 +195,12 @@ public class InternalTestClusterTests extends ESTestCase {
Path baseDir = createTempDir();
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class);
+ final boolean autoManageMinMasterNodes = randomBoolean();
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
+ autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, mockPlugins, Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes,
+ autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, mockPlugins, Function.identity());
assertClusters(cluster0, cluster1, false);
@@ -182,6 +222,8 @@ public class InternalTestClusterTests extends ESTestCase {
assertSettings(client.settings(), other.settings(), false);
}
assertArrayEquals(cluster0.getNodeNames(), cluster1.getNodeNames());
+ assertMMNinNodeSetting(cluster0, cluster0.numMasterNodes());
+ assertMMNinNodeSetting(cluster1, cluster0.numMasterNodes());
cluster0.afterTest();
cluster1.afterTest();
} finally {
@@ -216,12 +258,15 @@ public class InternalTestClusterTests extends ESTestCase {
boolean enableHttpPipelining = randomBoolean();
String nodePrefix = "test";
Path baseDir = createTempDir();
+ final boolean autoManageMinMasterNodes = randomBoolean();
InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
+ autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class),
Function.identity());
try {
cluster.beforeTest(random(), 0.0);
+ final int originalMasterCount = cluster.numMasterNodes();
+ assertMMNinNodeSetting(cluster, originalMasterCount);
final Map<String,Path[]> shardNodePaths = new HashMap<>();
for (String name: cluster.getNodeNames()) {
shardNodePaths.put(name, getNodePaths(cluster, name));
@@ -230,7 +275,15 @@ public class InternalTestClusterTests extends ESTestCase {
Path dataPath = getNodePaths(cluster, poorNode)[0];
final Path testMarker = dataPath.resolve("testMarker");
Files.createDirectories(testMarker);
+ int expectedMasterCount = originalMasterCount;
+ if (cluster.getInstance(ClusterService.class, poorNode).localNode().isMasterNode()) {
+ expectedMasterCount--;
+ }
cluster.stopRandomNode(InternalTestCluster.nameFilter(poorNode));
+ if (expectedMasterCount != originalMasterCount) {
+ // check for updated
+ assertMMNinClusterSetting(cluster, expectedMasterCount);
+ }
assertFileExists(testMarker); // stopping a node half way shouldn't clean data
final String stableNode = randomFrom(cluster.getNodeNames());
@@ -240,10 +293,17 @@ public class InternalTestClusterTests extends ESTestCase {
Files.createDirectories(stableTestMarker);
final String newNode1 = cluster.startNode();
+ expectedMasterCount++;
assertThat(getNodePaths(cluster, newNode1)[0], equalTo(dataPath));
assertFileExists(testMarker); // starting a node should re-use data folders and not clean it
+ if (expectedMasterCount > 1) { // this is the first master, it's in cluster state settings won't be updated
+ assertMMNinClusterSetting(cluster, expectedMasterCount);
+ }
+ assertMMNinNodeSetting(newNode1, cluster, expectedMasterCount);
final String newNode2 = cluster.startNode();
+ expectedMasterCount++;
+ assertMMNinClusterSetting(cluster, expectedMasterCount);
final Path newDataPath = getNodePaths(cluster, newNode2)[0];
final Path newTestMarker = newDataPath.resolve("newTestMarker");
assertThat(newDataPath, not(dataPath));
@@ -262,6 +322,7 @@ public class InternalTestClusterTests extends ESTestCase {
assertThat("data paths for " + name + " changed", getNodePaths(cluster, name),
equalTo(shardNodePaths.get(name)));
}
+ assertMMNinNodeSetting(cluster, originalMasterCount);
} finally {
cluster.close();
@@ -280,7 +341,7 @@ public class InternalTestClusterTests extends ESTestCase {
public void testDifferentRolesMaintainPathOnRestart() throws Exception {
final Path baseDir = createTempDir();
final int numNodes = 5;
- InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, 0, 0, "test",
+ InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, true, 0, 0, "test",
new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
@@ -301,7 +362,9 @@ public class InternalTestClusterTests extends ESTestCase {
try {
Map<DiscoveryNode.Role, Set<String>> pathsPerRole = new HashMap<>();
for (int i = 0; i < numNodes; i++) {
- final DiscoveryNode.Role role = randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST);
+ final DiscoveryNode.Role role = i == numNodes -1 && pathsPerRole.containsKey(MASTER) == false ?
+ MASTER : // last noe and still no master ofr the cluster
+ randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST);
final String node;
switch (role) {
case MASTER:
@@ -343,6 +406,59 @@ public class InternalTestClusterTests extends ESTestCase {
} finally {
cluster.close();
}
+ }
+ public void testTwoNodeCluster() throws Exception {
+ final boolean autoManageMinMasterNodes = randomBoolean();
+ NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
+ @Override
+ public Settings nodeSettings(int nodeOrdinal) {
+ return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false)
+ .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2)
+ .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME)
+ .build();
+ }
+
+ @Override
+ public Settings transportClientSettings() {
+ return Settings.builder()
+ .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build();
+ }
+ };
+ boolean enableHttpPipelining = randomBoolean();
+ String nodePrefix = "test";
+ Path baseDir = createTempDir();
+ InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false, autoManageMinMasterNodes, 2, 2,
+ "test", nodeConfigurationSource, 0, enableHttpPipelining, nodePrefix,
+ Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity());
+ try {
+ cluster.beforeTest(random(), 0.0);
+ assertMMNinNodeSetting(cluster, 2);
+ switch (randomInt(2)) {
+ case 0:
+ cluster.stopRandomDataNode();
+ assertMMNinClusterSetting(cluster, 1);
+ cluster.startNode();
+ assertMMNinClusterSetting(cluster, 2);
+ assertMMNinNodeSetting(cluster, 2);
+ break;
+ case 1:
+ cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
+ @Override
+ public Settings onNodeStopped(String nodeName) throws Exception {
+ assertMMNinClusterSetting(cluster, 1);
+ return super.onNodeStopped(nodeName);
+ }
+ });
+ assertMMNinClusterSetting(cluster, 2);
+ break;
+ case 2:
+ cluster.fullRestart();
+ break;
+ }
+ assertMMNinNodeSetting(cluster, 2);
+ } finally {
+ cluster.close();
+ }
}
}