summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoaz Leskes <b.leskes@gmail.com>2016-11-15 13:42:26 +0000
committerGitHub <noreply@github.com>2016-11-15 13:42:26 +0000
commitd6c2b4f7c578a6013dfd57806e011030d1647482 (patch)
treebae4cf532012205873e25166ea7377da7565e8c4
parent66fbb0dbc21f56e074981d1cfaaa9e7cf4676336 (diff)
Adapt InternalTestCluster to auto adjust `minimum_master_nodes` (#21458)
#20960 removed `LocalDiscovery` and we now use `ZenDiscovery` in all our tests. To keep cluster forming fast, we are using a `MockZenPing` implementation which uses static maps to return instant results making master election fast. Currently, we don't set `minimum_master_nodes` causing the occasional split brain when starting multiple nodes concurrently and their pinging is so fast that it misses the fact that one of the node has elected it self master. To solve this, `InternalTestCluster` is modified to behave like a true cluster and manage and set `minimum_master_nodes` correctly with every change to the number of nodes. Tests that want to manage the settings themselves can opt out using a new `autoMinMasterNodes` parameter to the `ClusterScope` annotation. Having `min_master_nodes` set means the started node may need to wait for other nodes to be started as well. To combat this, we set `discovery.initial_state_timeout` to `0` and wait for the cluster to form once all node have been started. Also, because a node may wait and ping while other nodes are started, `MockZenPing` is adapted to wait rather than busy-ping.
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java48
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java4
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java20
-rw-r--r--core/src/main/java/org/elasticsearch/node/Node.java7
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java3
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java15
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java5
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java5
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java156
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java44
-rw-r--r--core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java90
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java6
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java14
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/RelocationIT.java26
-rw-r--r--core/src/test/java/org/elasticsearch/tribe/TribeIT.java28
-rw-r--r--plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java3
-rw-r--r--plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java2
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java29
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java8
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java555
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java54
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java2
-rw-r--r--test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java162
31 files changed, 750 insertions, 560 deletions
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
index f6870cc05b..7794c58ddd 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
@@ -19,30 +19,9 @@
package org.elasticsearch.discovery.zen;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
-import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
@@ -53,6 +32,8 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@@ -75,6 +56,25 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@@ -186,9 +186,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
@Override
- public void close() throws IOException {
+ public void close() {
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
- IOUtils.close(receivedResponses.values());
+ Releasables.close(receivedResponses.values());
closed = true;
}
@@ -272,7 +272,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
}
- class SendPingsHandler implements Closeable {
+ class SendPingsHandler implements Releasable {
private final int id;
private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet();
private final PingCollection pingCollection;
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
index f9a16243e0..272a75f4e7 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@@ -240,6 +241,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
joinThreadControl.stop();
masterFD.stop("zen disco stop");
nodesFD.stop();
+ Releasables.close(zenPing); // stop any ongoing pinging
DiscoveryNodes nodes = nodes();
if (sendLeaveRequest) {
if (nodes.getMasterNode() == null) {
@@ -269,7 +271,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
@Override
protected void doClose() throws IOException {
- IOUtils.close(masterFD, nodesFD, zenPing);
+ IOUtils.close(masterFD, nodesFD);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
index cb2c8cb501..75ea701dc9 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
@@ -19,26 +19,26 @@
package org.elasticsearch.discovery.zen;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
-public interface ZenPing extends Closeable {
+public interface ZenPing extends Releasable {
void start(PingContextProvider contextProvider);
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 9eb7f9a037..298d6712ff 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -75,9 +75,6 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
-import org.elasticsearch.discovery.zen.UnicastHostsProvider;
-import org.elasticsearch.discovery.zen.UnicastZenPing;
-import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayAllocator;
@@ -655,11 +652,13 @@ public class Node implements Closeable {
injector.getInstance(SnapshotShardsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
+ // close discovery early to not react to pings anymore.
+ // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
+ injector.getInstance(Discovery.class).stop();
// we close indices first, so operations won't be allowed on it
injector.getInstance(IndicesTTLService.class).stop();
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
- injector.getInstance(Discovery.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(MonitorService.class).stop();
injector.getInstance(GatewayService.class).stop();
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java
index e289d90c7a..b5ef7a642a 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java
@@ -31,7 +31,8 @@ import java.io.IOException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
-@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
+@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0,
+ autoMinMasterNodes = false)
public class IndicesExistsIT extends ESIntegTestCase {
public void testIndexExistsWithBlocksInPlace() throws IOException {
diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
index 96ba5729cb..7c764ed172 100644
--- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
+++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java
@@ -43,7 +43,7 @@ import java.util.concurrent.CyclicBarrier;
import static org.hamcrest.Matchers.equalTo;
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class IndexingMasterFailoverIT extends ESIntegTestCase {
@Override
diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
index 3e58291d4a..257b80663a 100644
--- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
@@ -63,7 +63,7 @@ import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE")
public class MinimumMasterNodesIT extends ESIntegTestCase {
@@ -275,12 +275,14 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
.put("discovery.initial_state_timeout", "500ms")
.build();
- logger.info("--> start 2 nodes");
- internalCluster().startNodesAsync(2, settings).get();
+ logger.info("--> start first node and wait for it to be a master");
+ internalCluster().startNode(settings);
+ ensureClusterSizeConsistency();
// wait until second node join the cluster
- ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
- assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
+ logger.info("--> start second node and wait for it to join");
+ internalCluster().startNode(settings);
+ ensureClusterSizeConsistency();
logger.info("--> setting minimum master node to 2");
setMinimumMasterNodes(2);
@@ -298,8 +300,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
logger.info("--> bringing another node up");
internalCluster().startNode(Settings.builder().put(settings).put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build());
- clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
- assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
+ ensureClusterSizeConsistency();
}
private void assertNoMasterBlockOnAllNodes() throws InterruptedException {
diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java
index f73043ce4e..0be4f7c4e5 100644
--- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java
@@ -49,7 +49,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class NoMasterNodeIT extends ESIntegTestCase {
@Override
diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java
index f47134a45b..ab3f82fff7 100644
--- a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java
@@ -69,7 +69,10 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase {
private void removePublishTimeout() {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
- assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0")));
+ assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0")
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")
+ ));
}
public void testClusterUpdateSettingsAcknowledgement() {
diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java
index 2ea4f75515..f51a4f11ae 100644
--- a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java
@@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
-import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
@@ -61,7 +60,9 @@ public class AckIT extends ESIntegTestCase {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), 0).build();
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit to check acking
+ .build();
}
public void testUpdateSettingsAcknowledgement() {
diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
index d98f929424..19657b0548 100644
--- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java
@@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationD
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@@ -104,7 +103,6 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
- .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 3)
.put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "10s")
.build();
diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java
index 3d345f24db..f056eded34 100644
--- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java
@@ -20,32 +20,20 @@ package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
-import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.component.LifecycleComponent;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.discovery.zen.ZenDiscovery;
-import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.junit.annotations.TestLogging;
-import org.elasticsearch.threadpool.ThreadPool;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -56,17 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ClusterServiceIT extends ESIntegTestCase {
- @Override
- protected Collection<Class<? extends Plugin>> nodePlugins() {
- return Arrays.asList(TestPlugin.class);
- }
-
public void testAckedUpdateTask() throws Exception {
internalCluster().startNode();
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
@@ -482,141 +463,4 @@ public class ClusterServiceIT extends ESIntegTestCase {
assertTrue(controlSources.isEmpty());
block2.countDown();
}
-
- public void testLocalNodeMasterListenerCallbacks() throws Exception {
- Settings settings = Settings.builder()
- .put("discovery.zen.minimum_master_nodes", 1)
- .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms")
- .put("discovery.initial_state_timeout", "500ms")
- .build();
-
- String node_0 = internalCluster().startNode(settings);
- ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
- MasterAwareService testService = internalCluster().getInstance(MasterAwareService.class);
-
- ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
- .setWaitForNodes("1").get();
- assertThat(clusterHealth.isTimedOut(), equalTo(false));
-
- // the first node should be a master as the minimum required is 1
- assertThat(clusterService.state().nodes().getMasterNode(), notNullValue());
- assertThat(clusterService.state().nodes().isLocalNodeElectedMaster(), is(true));
- assertThat(testService.master(), is(true));
- String node_1 = internalCluster().startNode(settings);
- final ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class, node_1);
- MasterAwareService testService1 = internalCluster().getInstance(MasterAwareService.class, node_1);
-
- clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
- assertThat(clusterHealth.isTimedOut(), equalTo(false));
-
- // the second node should not be the master as node1 is already the master.
- assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(false));
- assertThat(testService1.master(), is(false));
-
- internalCluster().stopCurrentMasterNode();
- clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").get();
- assertThat(clusterHealth.isTimedOut(), equalTo(false));
-
- // now that node0 is closed, node1 should be elected as master
- assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(true));
- assertThat(testService1.master(), is(true));
-
- // start another node and set min_master_node
- internalCluster().startNode(Settings.builder().put(settings));
- assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
-
- Settings transientSettings = Settings.builder()
- .put("discovery.zen.minimum_master_nodes", 2)
- .build();
- client().admin().cluster().prepareUpdateSettings().setTransientSettings(transientSettings).get();
-
- // and shutdown the second node
- internalCluster().stopRandomNonMasterNode();
-
- // there should not be any master as the minimum number of required eligible masters is not met
- awaitBusy(() -> clusterService1.state().nodes().getMasterNode() == null &&
- clusterService1.clusterServiceState().getClusterStateStatus() == ClusterStateStatus.APPLIED);
- assertThat(testService1.master(), is(false));
-
- // bring the node back up
- String node_2 = internalCluster().startNode(Settings.builder().put(settings).put(transientSettings));
- ClusterService clusterService2 = internalCluster().getInstance(ClusterService.class, node_2);
- MasterAwareService testService2 = internalCluster().getInstance(MasterAwareService.class, node_2);
-
- // make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive
- // the updated cluster state...
- assertThat(internalCluster().client(node_1).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true)
- .setWaitForNodes("2").get().isTimedOut(), is(false));
- assertThat(internalCluster().client(node_2).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true)
- .setWaitForNodes("2").get().isTimedOut(), is(false));
-
- // now that we started node1 again, a new master should be elected
- assertThat(clusterService2.state().nodes().getMasterNode(), is(notNullValue()));
- if (node_2.equals(clusterService2.state().nodes().getMasterNode().getName())) {
- assertThat(testService1.master(), is(false));
- assertThat(testService2.master(), is(true));
- } else {
- assertThat(testService1.master(), is(true));
- assertThat(testService2.master(), is(false));
- }
- }
-
- public static class TestPlugin extends Plugin {
-
- @Override
- public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
- List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
- services.add(MasterAwareService.class);
- return services;
- }
- }
-
- @Singleton
- public static class MasterAwareService extends AbstractLifecycleComponent implements LocalNodeMasterListener {
-
- private final ClusterService clusterService;
- private volatile boolean master;
-
- @Inject
- public MasterAwareService(Settings settings, ClusterService clusterService) {
- super(settings);
- clusterService.add(this);
- this.clusterService = clusterService;
- logger.info("initialized test service");
- }
-
- @Override
- public void onMaster() {
- logger.info("on master [{}]", clusterService.localNode());
- master = true;
- }
-
- @Override
- public void offMaster() {
- logger.info("off master [{}]", clusterService.localNode());
- master = false;
- }
-
- public boolean master() {
- return master;
- }
-
- @Override
- protected void doStart() {
- }
-
- @Override
- protected void doStop() {
- }
-
- @Override
- protected void doClose() {
- }
-
- @Override
- public String executorName() {
- return ThreadPool.Names.SAME;
- }
-
- }
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java
index 9fd4fc1851..bede01ed21 100644
--- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java
@@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -44,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
@@ -1098,6 +1100,48 @@ public class ClusterServiceTests extends ESTestCase {
timedClusterService.close();
}
+ public void testLocalNodeMasterListenerCallbacks() throws Exception {
+ TimedClusterService timedClusterService = createTimedClusterService(false);
+
+ AtomicBoolean isMaster = new AtomicBoolean();
+ timedClusterService.add(new LocalNodeMasterListener() {
+ @Override
+ public void onMaster() {
+ isMaster.set(true);
+ }
+
+ @Override
+ public void offMaster() {
+ isMaster.set(false);
+ }
+
+ @Override
+ public String executorName() {
+ return ThreadPool.Names.SAME;
+ }
+ });
+
+ ClusterState state = timedClusterService.state();
+ DiscoveryNodes nodes = state.nodes();
+ DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
+ state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
+ setState(timedClusterService, state);
+ assertThat(isMaster.get(), is(true));
+
+ nodes = state.nodes();
+ nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(null);
+ state = ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES))
+ .nodes(nodesBuilder).build();
+ setState(timedClusterService, state);
+ assertThat(isMaster.get(), is(false));
+ nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
+ state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
+ setState(timedClusterService, state);
+ assertThat(isMaster.get(), is(true));
+
+ timedClusterService.close();
+ }
+
private static class SimpleTask {
private final int id;
diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
index 22844e0588..1688157c0d 100644
--- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
+++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
@@ -122,7 +122,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
index 3af2e32eef..b708ab4c26 100644
--- a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
+++ b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.equalTo;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class ZenUnicastDiscoveryIT extends ESIntegTestCase {
private ClusterDiscoveryConfiguration discoveryConfig;
diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
index 226b1422b4..8284388d2c 100644
--- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
@@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
@@ -37,7 +36,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
-@ClusterScope(numDataNodes =0, scope= Scope.TEST)
+@ClusterScope(numDataNodes = 0, scope = Scope.TEST)
public class QuorumGatewayIT extends ESIntegTestCase {
@Override
protected int numberOfReplicas() {
@@ -47,8 +46,7 @@ public class QuorumGatewayIT extends ESIntegTestCase {
public void testQuorumRecovery() throws Exception {
logger.info("--> starting 3 nodes");
// we are shutting down nodes - make sure we don't have 2 clusters if we test network
- internalCluster().startNodesAsync(3,
- Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build()).get();
+ internalCluster().startNodesAsync(3).get();
createIndex("test");
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java
index 1e35bcdd46..1794aa2272 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java
@@ -34,7 +34,7 @@ import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class RecoverAfterNodesIT extends ESIntegTestCase {
private static final TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(10);
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
index 4e1ff3f867..052bfc00ef 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
@@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
-import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -44,6 +43,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
+import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore;
@@ -333,48 +333,43 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID();
assertThat(metaDataUuid, not(equalTo("_na_")));
- Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
-
logger.info("--> closing first node, and indexing more data to the second node");
- internalCluster().fullRestart(new RestartCallback() {
+ internalCluster().stopRandomDataNode();
- @Override
- public void doAfterNodes(int numNodes, Client client) throws Exception {
- if (numNodes == 1) {
- logger.info("--> one node is closed - start indexing data into the second one");
- client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
- // TODO: remove once refresh doesn't fail immediately if there a master block:
- // https://github.com/elastic/elasticsearch/issues/9997
- client.admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
- client.admin().indices().prepareRefresh().execute().actionGet();
-
- for (int i = 0; i < 10; i++) {
- assertHitCount(client.prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3);
- }
+ logger.info("--> one node is closed - start indexing data into the second one");
+ client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
+ // TODO: remove once refresh doesn't fail immediately if there a master block:
+ // https://github.com/elastic/elasticsearch/issues/9997
+ client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
+ client().admin().indices().prepareRefresh().execute().actionGet();
- logger.info("--> add some metadata, additional type and template");
- client.admin().indices().preparePutMapping("test").setType("type2")
- .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
- .execute().actionGet();
- client.admin().indices().preparePutTemplate("template_1")
- .setPatterns(Collections.singletonList("te*"))
- .setOrder(0)
- .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
- .startObject("field1").field("type", "text").field("store", true).endObject()
- .startObject("field2").field("type", "keyword").field("store", true).endObject()
- .endObject().endObject().endObject())
- .execute().actionGet();
- client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
- logger.info("--> starting two nodes back, verifying we got the latest version");
- }
+ for (int i = 0; i < 10; i++) {
+ assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3);
+ }
- }
+ logger.info("--> add some metadata, additional type and template");
+ client().admin().indices().preparePutMapping("test").setType("type2")
+ .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
+ .execute().actionGet();
+ client().admin().indices().preparePutTemplate("template_1")
+ .setTemplate("te*")
+ .setOrder(0)
+ .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
+ .startObject("field1").field("type", "text").field("store", true).endObject()
+ .startObject("field2").field("type", "keyword").field("store", true).endObject()
+ .endObject().endObject().endObject())
+ .execute().actionGet();
+ client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
+
+ logger.info("--> stopping the second node");
+ internalCluster().stopRandomDataNode();
+
+ logger.info("--> starting the two nodes back");
- });
+ internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get();
logger.info("--> running cluster_health (wait for the shards to startup)");
ensureGreen();
- primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(), equalTo(metaDataUuid));
@@ -502,27 +497,28 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
public void testRecoveryDifferentNodeOrderStartup() throws Exception {
// we need different data paths so we make sure we start the second node fresh
- final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build());
+ final Path pathNode1 = createTempDir();
+ final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build());
client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet();
- internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build());
+ final Path pathNode2 = createTempDir();
+ final String node_2 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
ensureGreen();
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
-
- internalCluster().fullRestart(new RestartCallback() {
-
- @Override
- public boolean doRestart(String nodeName) {
- return !node_1.equals(nodeName);
- }
- });
-
+ if (randomBoolean()) {
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
+ } else {
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));
+ }
+ // start the second node again
+ internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
ensureYellow();
primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
-
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
}
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
index dfe9a09fb2..e0a6111833 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
@@ -555,10 +555,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
// start a master node
internalCluster().startNode(nodeSettings);
- InternalTestCluster.Async<String> blueFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
- InternalTestCluster.Async<String> redFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
- final String blueNodeName = blueFuture.get();
- final String redNodeName = redFuture.get();
+ final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
+ final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));
diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
index c55fc51433..fe3b569755 100644
--- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java
@@ -209,7 +209,10 @@ public class RareClusterStateIT extends ESIntegTestCase {
// but the change might not be on the node that performed the indexing
// operation yet
- Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0ms").build();
+ Settings settings = Settings.builder()
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
+ .build();
final List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
@@ -327,7 +330,6 @@ public class RareClusterStateIT extends ESIntegTestCase {
// time of indexing it
final List<String> nodeNames = internalCluster().startNodesAsync(2,
Settings.builder()
- .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
.build()).get();
diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
index 4a61bebd4d..dc38867705 100644
--- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java
@@ -72,16 +72,15 @@ public class FullRollingRestartIT extends ESIntegTestCase {
}
logger.info("--> now start adding nodes");
- internalCluster().startNodesAsync(2, settings).get();
+ internalCluster().startNode(settings);
+ internalCluster().startNode(settings);
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
logger.info("--> add two more nodes");
- internalCluster().startNodesAsync(2, settings).get();
-
- // We now have 5 nodes
- setMinimumMasterNodes(3);
+ internalCluster().startNode(settings);
+ internalCluster().startNode(settings);
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("5"));
@@ -97,9 +96,6 @@ public class FullRollingRestartIT extends ESIntegTestCase {
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("4"));
- // going down to 3 nodes. note that the min_master_node may not be in effect when we shutdown the 4th
- // node, but that's OK as it is set to 3 before.
- setMinimumMasterNodes(2);
internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
@@ -115,8 +111,6 @@ public class FullRollingRestartIT extends ESIntegTestCase {
// make sure the cluster state is green, and all has been recovered
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("2"));
- // closing the 2nd node
- setMinimumMasterNodes(1);
internalCluster().stopRandomDataNode();
// make sure the cluster state is yellow, and all has been recovered
diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
index 8a59b7460b..bd474280e4 100644
--- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
@@ -44,8 +44,8 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
+import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@@ -53,7 +53,6 @@ import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
-import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
@@ -77,6 +76,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -351,7 +351,8 @@ public class RelocationIT extends ESIntegTestCase {
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get();
- internalCluster().startNodesAsync(2).get();
+ internalCluster().startNode();
+ internalCluster().startNode();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
@@ -424,14 +425,15 @@ public class RelocationIT extends ESIntegTestCase {
public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException {
int halfNodes = randomIntBetween(1, 3);
- Settings blueSetting = Settings.builder().put("node.attr.color", "blue").build();
- InternalTestCluster.Async<List<String>> blueFuture = internalCluster().startNodesAsync(halfNodes, blueSetting);
- Settings redSetting = Settings.builder().put("node.attr.color", "red").build();
- InternalTestCluster.Async<java.util.List<String>> redFuture = internalCluster().startNodesAsync(halfNodes, redSetting);
- blueFuture.get();
- redFuture.get();
- logger.info("blue nodes: {}", blueFuture.get());
- logger.info("red nodes: {}", redFuture.get());
+ Settings[] nodeSettings = Stream.concat(
+ Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes),
+ Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes)
+ ).toArray(Settings[]::new);
+ List<String> nodes = internalCluster().startNodesAsync(nodeSettings).get();
+ String[] blueNodes = nodes.subList(0, halfNodes).stream().toArray(String[]::new);
+ String[] redNodes = nodes.subList(halfNodes, nodes.size()).stream().toArray(String[]::new);
+ logger.info("blue nodes: {}", (Object)blueNodes);
+ logger.info("red nodes: {}", (Object)redNodes);
ensureStableCluster(halfNodes * 2);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
@@ -439,7 +441,7 @@ public class RelocationIT extends ESIntegTestCase {
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))
));
- assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2]));
+ assertAllShardsOnNodes("test", redNodes);
int numDocs = randomIntBetween(100, 150);
ArrayList<String> ids = new ArrayList<>();
logger.info(" --> indexing [{}] docs", numDocs);
diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java
index 6121b2c0c8..cf4fe03893 100644
--- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java
+++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java
@@ -19,18 +19,6 @@
package org.elasticsearch.tribe;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.client.Client;
@@ -58,6 +46,18 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -121,13 +121,13 @@ public class TribeIT extends ESIntegTestCase {
final Collection<Class<? extends Plugin>> plugins = nodePlugins();
if (cluster1 == null) {
- cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes,
+ cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_1",
plugins, Function.identity());
}
if (cluster2 == null) {
- cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes,
+ cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_2",
plugins, Function.identity());
}
diff --git a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java
index 72e1f2da79..f8b105884b 100644
--- a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java
+++ b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java
@@ -39,7 +39,8 @@ import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE,
numDataNodes = 0,
transportClientRatio = 0.0,
- numClientNodes = 0)
+ numClientNodes = 0,
+ autoMinMasterNodes = false)
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-azure/issues/89")
public class AzureMinimumMasterNodesTests extends AbstractAzureComputeServiceTestCase {
diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java
index cdb45bfe08..41f5af48eb 100644
--- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java
+++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java
@@ -33,7 +33,7 @@ import static org.hamcrest.CoreMatchers.is;
* starting.
* This test requires AWS to run.
*/
-@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0, autoMinMasterNodes = false)
public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase {
public void testMinimumMasterNodesStart() {
Settings nodeSettings = Settings.builder()
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
index 82e7ce072e..29f77bc166 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
@@ -149,6 +149,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
@@ -178,6 +179,7 @@ import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgno
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@@ -527,10 +529,15 @@ public abstract class ESIntegTestCase extends ESTestCase {
if (cluster() != null) {
if (currentClusterScope != Scope.TEST) {
MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData();
- assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData
- .persistentSettings().getAsMap().size(), equalTo(0));
- assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData
- .transientSettings().getAsMap().size(), equalTo(0));
+ final Map<String, String> persistent = metaData.persistentSettings().getAsMap();
+ assertThat("test leaves persistent cluster metadata behind: " + persistent, persistent.size(), equalTo(0));
+ final Map<String, String> transientSettings = new HashMap<>(metaData.transientSettings().getAsMap());
+ if (isInternalCluster() && internalCluster().getAutoManageMinMasterNode()) {
+ // this is set by the test infra
+ transientSettings.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey());
+ }
+ assertThat("test leaves transient cluster metadata behind: " + transientSettings,
+ transientSettings.keySet(), empty());
}
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
@@ -1519,6 +1526,12 @@ public abstract class ESIntegTestCase extends ESTestCase {
boolean supportsDedicatedMasters() default true;
/**
+ * The cluster automatically manages the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} by default
+ * as nodes are started and stopped. Set this to false to manage the setting manually.
+ */
+ boolean autoMinMasterNodes() default true;
+
+ /**
* Returns the number of client nodes in the cluster. Default is {@link InternalTestCluster#DEFAULT_NUM_CLIENT_NODES}, a
* negative value means that the number of client nodes will be randomized.
*/
@@ -1615,6 +1628,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
return annotation == null ? true : annotation.supportsDedicatedMasters();
}
+ private boolean getAutoMinMasterNodes() {
+ ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
+ return annotation == null ? true : annotation.autoMinMasterNodes();
+ }
+
private int getNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null ? -1 : annotation.numDataNodes();
@@ -1753,7 +1771,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
mockPlugins = mocks;
}
- return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes,
+ return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, getAutoMinMasterNodes(),
+ minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper());
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
index 69c5908698..1d3b24b3ce 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
@@ -444,7 +444,6 @@ public abstract class ESTestCase extends LuceneTestCase {
return RandomPicks.randomFrom(random, array);
}
-
/** Pick a random object from the given list. */
public static <T> T randomFrom(List<T> list) {
return RandomPicks.randomFrom(random(), list);
@@ -452,7 +451,12 @@ public abstract class ESTestCase extends LuceneTestCase {
/** Pick a random object from the given collection. */
public static <T> T randomFrom(Collection<T> collection) {
- return RandomPicks.randomFrom(random(), collection);
+ return randomFrom(random(), collection);
+ }
+
+ /** Pick a random object from the given collection. */
+ public static <T> T randomFrom(Random random, Collection<T> collection) {
+ return RandomPicks.randomFrom(random, collection);
}
public static String randomAsciiOfLengthBetween(int minCodeUnits, int maxCodeUnits) {
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index 37e3a58295..3c29b878e7 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -24,12 +24,10 @@ import com.carrotsearch.randomizedtesting.SysGlobals;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
-
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@@ -68,7 +66,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.discovery.DiscoverySettings;
+import org.elasticsearch.discovery.zen.ElectMasterService;
+import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLockObtainFailedException;
@@ -133,8 +132,10 @@ import java.util.stream.Stream;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
+import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.test.ESTestCase.assertBusy;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
+import static org.elasticsearch.test.ESTestCase.randomFrom;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -225,6 +226,8 @@ public final class InternalTestCluster extends TestCluster {
private final ExecutorService executor;
+ private final boolean autoManageMinMasterNodes;
+
private final Collection<Class<? extends Plugin>> mockPlugins;
/**
@@ -238,9 +241,10 @@ public final class InternalTestCluster extends TestCluster {
public InternalTestCluster(long clusterSeed, Path baseDir,
boolean randomlyAddDedicatedMasters,
- int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
+ boolean autoManageMinMasterNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins, Function<Client, Client> clientWrapper) {
super(clusterSeed);
+ this.autoManageMinMasterNodes = autoManageMinMasterNodes;
this.clientWrapper = clientWrapper;
this.baseDir = baseDir;
this.clusterName = clusterName;
@@ -345,6 +349,11 @@ public final class InternalTestCluster extends TestCluster {
return clusterName;
}
+ /** returns true if the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} setting is auto managed by this cluster */
+ public boolean getAutoManageMinMasterNode() {
+ return autoManageMinMasterNodes;
+ }
+
public String[] getNodeNames() {
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
}
@@ -466,7 +475,7 @@ public final class InternalTestCluster extends TestCluster {
if (randomNodeAndClient != null) {
return randomNodeAndClient;
}
- NodeAndClient buildNode = buildNode();
+ NodeAndClient buildNode = buildNode(1);
buildNode.startNode();
publishNode(buildNode);
return buildNode;
@@ -496,30 +505,20 @@ public final class InternalTestCluster extends TestCluster {
* if more nodes than <code>n</code> are present this method will not
* stop any of the running nodes.
*/
- public void ensureAtLeastNumDataNodes(int n) {
- final List<Async<String>> asyncs = new ArrayList<>();
- synchronized (this) {
- int size = numDataNodes();
- for (int i = size; i < n; i++) {
- logger.info("increasing cluster size from {} to {}", size, n);
- if (numSharedDedicatedMasterNodes > 0) {
- asyncs.add(startDataOnlyNodeAsync());
- } else {
- asyncs.add(startNodeAsync());
- }
- }
- }
- try {
- for (Async<String> async : asyncs) {
- async.get();
+ public synchronized void ensureAtLeastNumDataNodes(int n) {
+ boolean added = false;
+ int size = numDataNodes();
+ for (int i = size; i < n; i++) {
+ logger.info("increasing cluster size from {} to {}", size, n);
+ added = true;
+ if (numSharedDedicatedMasterNodes > 0) {
+ startDataOnlyNode(Settings.EMPTY);
+ } else {
+ startNode(Settings.EMPTY);
}
- } catch (Exception e) {
- throw new ElasticsearchException("failed to start nodes", e);
}
- if (!asyncs.isEmpty()) {
- synchronized (this) {
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get());
- }
+ if (added) {
+ validateClusterFormed();
}
}
@@ -544,28 +543,47 @@ public final class InternalTestCluster extends TestCluster {
while (values.hasNext() && numNodesAndClients++ < size - n) {
NodeAndClient next = values.next();
nodesToRemove.add(next);
- removeDisruptionSchemeFromNode(next);
- next.close();
- }
- for (NodeAndClient toRemove : nodesToRemove) {
- nodes.remove(toRemove.name);
}
+
+ stopNodesAndClients(nodesToRemove);
if (!nodesToRemove.isEmpty() && size() > 0) {
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get());
+ validateClusterFormed();
}
}
- private NodeAndClient buildNode(Settings settings) {
+ /**
+ * builds a new node given the settings.
+ *
+ * @param settings the settings to use
+ * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
+ */
+ private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) {
int ord = nextNodeId.getAndIncrement();
- return buildNode(ord, random.nextLong(), settings, false);
+ return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes);
}
- private NodeAndClient buildNode() {
+ /**
+ * builds a new node with default settings
+ *
+ * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
+ */
+ private NodeAndClient buildNode(int defaultMinMasterNodes) {
int ord = nextNodeId.getAndIncrement();
- return buildNode(ord, random.nextLong(), null, false);
+ return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes);
}
- private NodeAndClient buildNode(int nodeId, long seed, Settings settings, boolean reuseExisting) {
+ /**
+ * builds a new node
+ *
+ * @param nodeId the node internal id (see {@link NodeAndClient#nodeAndClientId()}
+ * @param seed the node's random seed
+ * @param settings the settings to use
+ * @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and
+ * the method will return the existing one
+ * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
+ */
+ private NodeAndClient buildNode(int nodeId, long seed, Settings settings,
+ boolean reuseExisting, int defaultMinMasterNodes) {
assert Thread.holdsLock(this);
ensureOpen();
settings = getSettings(nodeId, seed, settings);
@@ -577,13 +595,21 @@ public final class InternalTestCluster extends TestCluster {
assert reuseExisting == true || nodes.containsKey(name) == false :
"node name [" + name + "] already exists but not allowed to use it";
}
- Settings finalSettings = Settings.builder()
+ Settings.Builder finalSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home
.put(settings)
.put("node.name", name)
- .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed)
- .build();
- MockNode node = new MockNode(finalSettings, plugins);
+ .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed);
+
+ if (autoManageMinMasterNodes) {
+ assert finalSettings.get(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null :
+ "min master nodes may not be set when auto managed";
+ finalSettings
+ // don't wait too long not to slow down tests
+ .put(ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.getKey(), "5s")
+ .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), defaultMinMasterNodes);
+ }
+ MockNode node = new MockNode(finalSettings.build(), plugins);
return new NodeAndClient(name, node, nodeId);
}
@@ -684,7 +710,7 @@ public final class InternalTestCluster extends TestCluster {
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
if (size() == 0) {
// if we are the first node - don't wait for a state
- builder.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
+ builder.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
}
return startNode(builder);
}
@@ -777,6 +803,10 @@ public final class InternalTestCluster extends TestCluster {
return nodeAndClientId;
}
+ public boolean isMasterEligible() {
+ return Node.NODE_MASTER_SETTING.get(node.settings());
+ }
+
Client client(Random random) {
if (closed.get()) {
throw new RuntimeException("already closed");
@@ -844,21 +874,40 @@ public final class InternalTestCluster extends TestCluster {
node.close();
}
- void restart(RestartCallback callback, boolean clearDataIfNeeded) throws Exception {
- assert callback != null;
- resetClient();
+ /**
+ * closes the current node if not already closed, builds a new node object using the current node settings and starts it
+ */
+ void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
if (!node.isClosed()) {
closeNode();
}
- Settings newSettings = callback.onNodeStopped(name);
- if (newSettings == null) {
- newSettings = Settings.EMPTY;
+ recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes);
+ startNode();
+ }
+
+ /**
+ * rebuilds a new node object using the current node settings and starts it
+ */
+ void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
+ assert callback != null;
+ Settings callbackSettings = callback.onNodeStopped(name);
+ Settings.Builder newSettings = Settings.builder();
+ if (callbackSettings != null) {
+ newSettings.put(callbackSettings);
+ }
+ if (minMasterNodes >= 0) {
+ assert ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed";
+ newSettings.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build();
}
+
+ // validation is (optionally) done in fullRestart/rollingRestart
+ newSettings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s");
if (clearDataIfNeeded) {
clearDataIfNeeded(callback);
}
- createNewNode(newSettings);
- startNode();
+ createNewNode(newSettings.build());
+ // make sure cached client points to new node
+ resetClient();
}
private void clearDataIfNeeded(RestartCallback callback) throws IOException {
@@ -948,22 +997,24 @@ public final class InternalTestCluster extends TestCluster {
if (wipeData) {
wipePendingDataDirectories();
}
+ if (nodes.size() > 0 && autoManageMinMasterNodes) {
+ updateMinMasterNodes(getMasterNodesCount());
+ }
logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
return;
}
logger.debug("Cluster is NOT consistent - restarting shared nodes - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
// trash all nodes with id >= sharedNodesSeeds.length - they are non shared
-
-
+ final List<NodeAndClient> toClose = new ArrayList<>();
for (Iterator<NodeAndClient> iterator = nodes.values().iterator(); iterator.hasNext();) {
NodeAndClient nodeAndClient = iterator.next();
if (nodeAndClient.nodeAndClientId() >= sharedNodesSeeds.length) {
logger.debug("Close Node [{}] not shared", nodeAndClient.name);
- nodeAndClient.close();
- iterator.remove();
+ toClose.add(nodeAndClient);
}
}
+ stopNodesAndClients(toClose);
// clean up what the nodes left that is unused
if (wipeData) {
@@ -972,13 +1023,19 @@ public final class InternalTestCluster extends TestCluster {
// start any missing node
assert newSize == numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes;
+ final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes;
+ final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1;
+ final List<NodeAndClient> toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes
for (int i = 0; i < numSharedDedicatedMasterNodes; i++) {
final Settings.Builder settings = Settings.builder();
- settings.put(Node.NODE_MASTER_SETTING.getKey(), true).build();
- settings.put(Node.NODE_DATA_SETTING.getKey(), false).build();
- NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
- nodeAndClient.startNode();
- publishNode(nodeAndClient);
+ settings.put(Node.NODE_MASTER_SETTING.getKey(), true);
+ settings.put(Node.NODE_DATA_SETTING.getKey(), false);
+ if (autoManageMinMasterNodes) {
+ settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
+ }
+
+ NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
+ toStartAndPublish.add(nodeAndClient);
}
for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) {
final Settings.Builder settings = Settings.builder();
@@ -987,32 +1044,43 @@ public final class InternalTestCluster extends TestCluster {
settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build();
settings.put(Node.NODE_DATA_SETTING.getKey(), true).build();
}
- NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
- nodeAndClient.startNode();
- publishNode(nodeAndClient);
+ if (autoManageMinMasterNodes) {
+ settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
+ }
+ NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
+ toStartAndPublish.add(nodeAndClient);
}
for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes;
i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) {
final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
- NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
- nodeAndClient.startNode();
- publishNode(nodeAndClient);
+ NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
+ toStartAndPublish.add(nodeAndClient);
}
+ startAndPublishNodesAndClients(toStartAndPublish);
+
nextNodeId.set(newSize);
assert size() == newSize;
if (newSize > 0) {
- ClusterHealthResponse response = client().admin().cluster().prepareHealth()
- .setWaitForNodes(Integer.toString(newSize)).get();
- if (response.isTimedOut()) {
- logger.warn("failed to wait for a cluster of size [{}], got [{}]", newSize, response);
- throw new IllegalStateException("cluster failed to reach the expected size of [" + newSize + "]");
- }
+ validateClusterFormed();
}
logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
}
+ /** ensure a cluster is form with {@link #nodes}.size() nodes. */
+ private void validateClusterFormed() {
+ final int size = nodes.size();
+ String name = randomFrom(random, getNodeNames());
+ logger.trace("validating cluster formed via [{}], expecting [{}]", name, size);
+ final Client client = client(name);
+ ClusterHealthResponse response = client.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size)).get();
+ if (response.isTimedOut()) {
+ logger.warn("failed to wait for a cluster of size [{}], got [{}]", size, response);
+ throw new IllegalStateException("cluster failed to reach the expected size of [" + size + "]");
+ }
+ }
+
@Override
public synchronized void afterTest() throws IOException {
wipePendingDataDirectories();
@@ -1234,9 +1302,7 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
- removeDisruptionSchemeFromNode(nodeAndClient);
- nodes.remove(nodeAndClient.name);
- nodeAndClient.close();
+ stopNodesAndClient(nodeAndClient);
return true;
}
return false;
@@ -1251,9 +1317,7 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(nc -> filter.test(nc.node.settings()));
if (nodeAndClient != null) {
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
- removeDisruptionSchemeFromNode(nodeAndClient);
- nodes.remove(nodeAndClient.name);
- nodeAndClient.close();
+ stopNodesAndClient(nodeAndClient);
}
}
@@ -1266,9 +1330,7 @@ public final class InternalTestCluster extends TestCluster {
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
- removeDisruptionSchemeFromNode(nodes.get(masterNodeName));
- NodeAndClient remove = nodes.remove(masterNodeName);
- remove.close();
+ stopNodesAndClient(nodes.get(masterNodeName));
}
/**
@@ -1278,8 +1340,47 @@ public final class InternalTestCluster extends TestCluster {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName()).negate());
if (nodeAndClient != null) {
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
+ stopNodesAndClient(nodeAndClient);
+ }
+ }
+
+ private synchronized void startAndPublishNodesAndClients(List<NodeAndClient> nodeAndClients) {
+ if (nodeAndClients.size() > 0) {
+ final int newMasters = (int) nodeAndClients.stream().filter(NodeAndClient::isMasterEligible)
+ .filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters
+ .count();
+ final int currentMasters = getMasterNodesCount();
+ if (autoManageMinMasterNodes && currentMasters > 1 && newMasters > 0) {
+ // special case for 1 node master - we can't update the min master nodes before we add more nodes.
+ updateMinMasterNodes(currentMasters + newMasters);
+ }
+ for (NodeAndClient nodeAndClient : nodeAndClients) {
+ nodeAndClient.startNode();
+ publishNode(nodeAndClient);
+ }
+ if (autoManageMinMasterNodes && currentMasters == 1 && newMasters > 0) {
+ // update once masters have joined
+ validateClusterFormed();
+ updateMinMasterNodes(currentMasters + newMasters);
+ }
+ }
+ }
+
+ private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException {
+ stopNodesAndClients(Collections.singleton(nodeAndClient));
+ }
+
+ private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
+ if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {
+ int masters = (int)nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();
+ if (masters > 0) {
+ updateMinMasterNodes(getMasterNodesCount() - masters);
+ }
+ }
+ for (NodeAndClient nodeAndClient: nodeAndClients) {
removeDisruptionSchemeFromNode(nodeAndClient);
- nodes.remove(nodeAndClient.name);
+ NodeAndClient previous = nodes.remove(nodeAndClient.name);
+ assert previous == nodeAndClient;
nodeAndClient.close();
}
}
@@ -1319,8 +1420,7 @@ public final class InternalTestCluster extends TestCluster {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(predicate);
if (nodeAndClient != null) {
- logger.info("Restarting random node [{}] ", nodeAndClient.name);
- nodeAndClient.restart(callback, true);
+ restartNode(nodeAndClient, callback);
}
}
@@ -1331,93 +1431,10 @@ public final class InternalTestCluster extends TestCluster {
ensureOpen();
NodeAndClient nodeAndClient = nodes.get(nodeName);
if (nodeAndClient != null) {
- logger.info("Restarting node [{}] ", nodeAndClient.name);
- nodeAndClient.restart(callback, true);
+ restartNode(nodeAndClient, callback);
}
}
- private synchronized void restartAllNodes(boolean rollingRestart, RestartCallback callback) throws Exception {
- ensureOpen();
- List<NodeAndClient> toRemove = new ArrayList<>();
- try {
- for (NodeAndClient nodeAndClient : nodes.values()) {
- if (!callback.doRestart(nodeAndClient.name)) {
- logger.info("Closing node [{}] during restart", nodeAndClient.name);
- toRemove.add(nodeAndClient);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- nodeAndClient.close();
- }
- }
- } finally {
- for (NodeAndClient nodeAndClient : toRemove) {
- nodes.remove(nodeAndClient.name);
- }
- }
- logger.info("Restarting remaining nodes rollingRestart [{}]", rollingRestart);
- if (rollingRestart) {
- int numNodesRestarted = 0;
- for (NodeAndClient nodeAndClient : nodes.values()) {
- callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
- logger.info("Restarting node [{}] ", nodeAndClient.name);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- nodeAndClient.restart(callback, true);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
- }
- }
- } else {
- int numNodesRestarted = 0;
- Set[] nodesRoleOrder = new Set[nextNodeId.get()];
- Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
- for (NodeAndClient nodeAndClient : nodes.values()) {
- callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
- logger.info("Stopping node [{}] ", nodeAndClient.name);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- nodeAndClient.closeNode();
- // delete data folders now, before we start other nodes that may claim it
- nodeAndClient.clearDataIfNeeded(callback);
-
- DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
- nodesRoleOrder[nodeAndClient.nodeAndClientId()] = discoveryNode.getRoles();
- nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
- }
-
- assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size();
-
- // randomize start up order, but making sure that:
- // 1) A data folder that was assigned to a data node will stay so
- // 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used)
- // will still belong to data nodes
- for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) {
- Collections.shuffle(sameRoleNodes, random);
- }
-
- for (Set roles : nodesRoleOrder) {
- if (roles == null) {
- // if some nodes were stopped, we want have a role for them
- continue;
- }
- NodeAndClient nodeAndClient = nodesByRoles.get(roles).remove(0);
- logger.info("Starting node [{}] ", nodeAndClient.name);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
- }
- // we already cleared data folders, before starting nodes up
- nodeAndClient.restart(callback, false);
- if (activeDisruptionScheme != null) {
- activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
- }
- }
- }
- }
-
-
public static final RestartCallback EMPTY_CALLBACK = new RestartCallback() {
@Override
public Settings onNodeStopped(String node) {
@@ -1442,15 +1459,98 @@ public final class InternalTestCluster extends TestCluster {
/**
* Restarts all nodes in a rolling restart fashion ie. only restarts on node a time.
*/
- public void rollingRestart(RestartCallback function) throws Exception {
- restartAllNodes(true, function);
+ public synchronized void rollingRestart(RestartCallback callback) throws Exception {
+ int numNodesRestarted = 0;
+ for (NodeAndClient nodeAndClient : nodes.values()) {
+ callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
+ restartNode(nodeAndClient, callback);
+ }
+ }
+
+ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
+ logger.info("Restarting node [{}] ", nodeAndClient.name);
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
+ }
+ final int masterNodesCount = getMasterNodesCount();
+ // special case to allow stopping one node in a two node cluster and keep it functional
+ final boolean updateMinMaster = nodeAndClient.isMasterEligible() && masterNodesCount == 2 && autoManageMinMasterNodes;
+ if (updateMinMaster) {
+ updateMinMasterNodes(masterNodesCount - 1);
+ }
+ nodeAndClient.restart(callback, true, autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
+ }
+ if (callback.validateClusterForming() || updateMinMaster) {
+ // we have to validate cluster size if updateMinMaster == true, because we need the
+ // second node to join in order to increment min_master_nodes back to 2.
+ validateClusterFormed();
+ }
+ if (updateMinMaster) {
+ updateMinMasterNodes(masterNodesCount);
+ }
}
/**
* Restarts all nodes in the cluster. It first stops all nodes and then restarts all the nodes again.
*/
- public void fullRestart(RestartCallback function) throws Exception {
- restartAllNodes(false, function);
+ public synchronized void fullRestart(RestartCallback callback) throws Exception {
+ int numNodesRestarted = 0;
+ Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
+ Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
+ for (NodeAndClient nodeAndClient : nodes.values()) {
+ callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
+ logger.info("Stopping node [{}] ", nodeAndClient.name);
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
+ }
+ nodeAndClient.closeNode();
+ // delete data folders now, before we start other nodes that may claim it
+ nodeAndClient.clearDataIfNeeded(callback);
+ DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
+ rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId] = discoveryNode.getRoles();
+ nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
+ }
+
+ assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size();
+
+ // randomize start up order, but making sure that:
+ // 1) A data folder that was assigned to a data node will stay so
+ // 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used)
+ // will still belong to data nodes
+ for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) {
+ Collections.shuffle(sameRoleNodes, random);
+ }
+ List<NodeAndClient> startUpOrder = new ArrayList<>();
+ for (Set roles : rolesOrderedByOriginalStartupOrder) {
+ if (roles == null) {
+ // if some nodes were stopped, we want have a role for that ordinal
+ continue;
+ }
+ final List<NodeAndClient> nodesByRole = nodesByRoles.get(roles);
+ startUpOrder.add(nodesByRole.remove(0));
+ }
+ assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0;
+
+ // do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs)
+ for (NodeAndClient nodeAndClient : startUpOrder) {
+ logger.info("resetting node [{}] ", nodeAndClient.name);
+ // we already cleared data folders, before starting nodes up
+ nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1);
+ }
+
+ for (NodeAndClient nodeAndClient : startUpOrder) {
+ logger.info("starting node [{}] ", nodeAndClient.name);
+ nodeAndClient.startNode();
+ if (activeDisruptionScheme != null) {
+ activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
+ }
+ }
+
+ if (callback.validateClusterForming()) {
+ validateClusterFormed();
+ }
}
@@ -1534,19 +1634,51 @@ public final class InternalTestCluster extends TestCluster {
* Starts a node with the given settings and returns it's name.
*/
public synchronized String startNode(Settings settings) {
- NodeAndClient buildNode = buildNode(settings);
- buildNode.startNode();
- publishNode(buildNode);
+ final int defaultMinMasterNodes = getMinMasterNodes(getMasterNodesCount() + (Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0));
+ NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
+ startAndPublishNodesAndClients(Collections.singletonList(buildNode));
return buildNode.name;
}
+ /**
+ * updates the min master nodes setting in the current running cluster.
+ *
+ * @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting
+ */
+ private int updateMinMasterNodes(int eligibleMasterNodeCount) {
+ assert autoManageMinMasterNodes;
+ final int minMasterNodes = getMinMasterNodes(eligibleMasterNodeCount);
+ if (getMasterNodesCount() > 0) {
+ // there should be at least one master to update
+ logger.debug("updating min_master_nodes to [{}]", minMasterNodes);
+ try {
+ assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
+ Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes)
+ ));
+ } catch (Exception e) {
+ throw new ElasticsearchException("failed to update minimum master node to [{}] (current masters [{}])", e,
+ minMasterNodes, getMasterNodesCount());
+ }
+ }
+ return minMasterNodes;
+ }
+
+ /** calculates a min master nodes value based on the given number of master nodes */
+ private int getMinMasterNodes(int eligibleMasterNodes) {
+ return eligibleMasterNodes / 2 + 1;
+ }
+
+ private int getMasterNodesCount() {
+ return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count();
+ }
+
public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes) {
return startMasterOnlyNodesAsync(numNodes, Settings.EMPTY);
}
public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes, Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
- return startNodesAsync(numNodes, settings1, Version.CURRENT);
+ return startNodesAsync(numNodes, settings1);
}
public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes) {
@@ -1555,7 +1687,7 @@ public final class InternalTestCluster extends TestCluster {
public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes, Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
- return startNodesAsync(numNodes, settings1, Version.CURRENT);
+ return startNodesAsync(numNodes, settings1);
}
public synchronized Async<String> startMasterOnlyNodeAsync() {
@@ -1564,7 +1696,7 @@ public final class InternalTestCluster extends TestCluster {
public synchronized Async<String> startMasterOnlyNodeAsync(Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
- return startNodeAsync(settings1, Version.CURRENT);
+ return startNodeAsync(settings1);
}
public synchronized String startMasterOnlyNode(Settings settings) {
@@ -1578,7 +1710,7 @@ public final class InternalTestCluster extends TestCluster {
public synchronized Async<String> startDataOnlyNodeAsync(Settings settings) {
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
- return startNodeAsync(settings1, Version.CURRENT);
+ return startNodeAsync(settings1);
}
public synchronized String startDataOnlyNode(Settings settings) {
@@ -1590,21 +1722,25 @@ public final class InternalTestCluster extends TestCluster {
* Starts a node in an async manner with the given settings and returns future with its name.
*/
public synchronized Async<String> startNodeAsync() {
- return startNodeAsync(Settings.EMPTY, Version.CURRENT);
+ return startNodeAsync(Settings.EMPTY);
}
/**
* Starts a node in an async manner with the given settings and returns future with its name.
*/
public synchronized Async<String> startNodeAsync(final Settings settings) {
- return startNodeAsync(settings, Version.CURRENT);
+ final int defaultMinMasterNodes;
+ if (autoManageMinMasterNodes) {
+ int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0;
+ defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
+ } else {
+ defaultMinMasterNodes = -1;
+ }
+ return startNodeAsync(settings, defaultMinMasterNodes);
}
- /**
- * Starts a node in an async manner with the given settings and version and returns future with its name.
- */
- public synchronized Async<String> startNodeAsync(final Settings settings, final Version version) {
- final NodeAndClient buildNode = buildNode(settings);
+ private synchronized Async<String> startNodeAsync(final Settings settings, int defaultMinMasterNodes) {
+ final NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
final Future<String> submit = executor.submit(() -> {
buildNode.startNode();
publishNode(buildNode);
@@ -1613,27 +1749,28 @@ public final class InternalTestCluster extends TestCluster {
return () -> submit.get();
}
+
/**
* Starts multiple nodes in an async manner and returns future with its name.
*/
public synchronized Async<List<String>> startNodesAsync(final int numNodes) {
- return startNodesAsync(numNodes, Settings.EMPTY, Version.CURRENT);
+ return startNodesAsync(numNodes, Settings.EMPTY);
}
/**
* Starts multiple nodes in an async manner with the given settings and returns future with its name.
*/
- public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings) {
- return startNodesAsync(numNodes, settings, Version.CURRENT);
- }
-
- /**
- * Starts multiple nodes in an async manner with the given settings and version and returns future with its name.
- */
- public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings, final Version version) {
+ public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings) {
+ final int defaultMinMasterNodes;
+ if (autoManageMinMasterNodes) {
+ int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? numNodes : 0;
+ defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
+ } else {
+ defaultMinMasterNodes = -1;
+ }
final List<Async<String>> asyncs = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
- asyncs.add(startNodeAsync(settings, version));
+ asyncs.add(startNodeAsync(settings, defaultMinMasterNodes));
}
return () -> {
@@ -1650,9 +1787,16 @@ public final class InternalTestCluster extends TestCluster {
* The order of the node names returned matches the order of the settings provided.
*/
public synchronized Async<List<String>> startNodesAsync(final Settings... settings) {
+ final int defaultMinMasterNodes;
+ if (autoManageMinMasterNodes) {
+ int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count();
+ defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
+ } else {
+ defaultMinMasterNodes = -1;
+ }
List<Async<String>> asyncs = new ArrayList<>();
for (Settings setting : settings) {
- asyncs.add(startNodeAsync(setting, Version.CURRENT));
+ asyncs.add(startNodeAsync(setting, defaultMinMasterNodes));
}
return () -> {
List<String> ids = new ArrayList<>();
@@ -1683,6 +1827,11 @@ public final class InternalTestCluster extends TestCluster {
return dataAndMasterNodes().size();
}
+ public synchronized int numMasterNodes() {
+ return filterNodes(nodes, NodeAndClient::isMasterEligible).size();
+ }
+
+
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
clearDisruptionScheme();
scheme.applyToCluster(this);
@@ -1887,14 +2036,8 @@ public final class InternalTestCluster extends TestCluster {
return false;
}
-
- /**
- * If this returns <code>false</code> the node with the given node name will not be restarted. It will be
- * closed and removed from the cluster. Returns <code>true</code> by default.
- */
- public boolean doRestart(String nodeName) {
- return true;
- }
+ /** returns true if the restart should also validate the cluster has reformed */
+ public boolean validateClusterForming() { return true; }
}
public Settings getDefaultSettings() {
diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java
index c544b2bad8..fe16f03411 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java
@@ -18,11 +18,6 @@
*/
package org.elasticsearch.test.discovery;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
@@ -32,13 +27,22 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.PingContextProvider;
import org.elasticsearch.discovery.zen.ZenPing;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging
* to be immediate and can be used to speed up tests.
*/
public final class MockZenPing extends AbstractComponent implements ZenPing {
- static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap();
+ static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = new HashMap<>();
+
+ /** a set of the last discovered pings. used to throttle busy spinning where MockZenPing will keep returning the same results */
+ private Set<MockZenPing> lastDiscoveredPings = null;
private volatile PingContextProvider contextProvider;
@@ -50,18 +54,34 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
public void start(PingContextProvider contextProvider) {
this.contextProvider = contextProvider;
assert contextProvider != null;
- boolean added = getActiveNodesForCurrentCluster().add(this);
- assert added;
+ synchronized (activeNodesPerCluster) {
+ boolean added = getActiveNodesForCurrentCluster().add(this);
+ assert added;
+ activeNodesPerCluster.notifyAll();
+ }
}
@Override
public void ping(PingListener listener, TimeValue timeout) {
logger.info("pinging using mock zen ping");
- List<PingResponse> responseList = getActiveNodesForCurrentCluster().stream()
- .filter(p -> p != this) // remove this as pings are not expected to return the local node
- .map(MockZenPing::getPingResponse)
- .collect(Collectors.toList());
- listener.onPing(responseList);
+ synchronized (activeNodesPerCluster) {
+ Set<MockZenPing> activeNodes = getActiveNodesForCurrentCluster();
+ if (activeNodes.equals(lastDiscoveredPings)) {
+ try {
+ logger.trace("nothing has changed since the last ping. waiting for a change");
+ activeNodesPerCluster.wait(timeout.millis());
+ } catch (InterruptedException e) {
+
+ }
+ activeNodes = getActiveNodesForCurrentCluster();
+ }
+ lastDiscoveredPings = activeNodes;
+ List<PingResponse> responseList = activeNodes.stream()
+ .filter(p -> p != this) // remove this as pings are not expected to return the local node
+ .map(MockZenPing::getPingResponse)
+ .collect(Collectors.toList());
+ listener.onPing(responseList);
+ }
}
private ClusterName getClusterName() {
@@ -74,13 +94,17 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
}
private Set<MockZenPing> getActiveNodesForCurrentCluster() {
+ assert Thread.holdsLock(activeNodesPerCluster);
return activeNodesPerCluster.computeIfAbsent(getClusterName(),
clusterName -> ConcurrentCollections.newConcurrentSet());
}
@Override
public void close() {
- boolean found = getActiveNodesForCurrentCluster().remove(this);
- assert found;
+ synchronized (activeNodesPerCluster) {
+ boolean found = getActiveNodesForCurrentCluster().remove(this);
+ assert found;
+ activeNodesPerCluster.notifyAll();
+ }
}
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
index 8cff517316..bc560c9b0f 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
@@ -636,7 +636,7 @@ public class ElasticsearchAssertions {
* a way that sucks less.
*/
NamedWriteableRegistry registry;
- if (ESIntegTestCase.isInternalCluster()) {
+ if (ESIntegTestCase.isInternalCluster() && ESIntegTestCase.internalCluster().size() > 0) {
registry = ESIntegTestCase.internalCluster().getInstance(NamedWriteableRegistry.class);
} else {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
index 327a49d367..36903c7c60 100644
--- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
+++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
@@ -19,21 +19,6 @@
*/
package org.elasticsearch.test.test;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Client;
@@ -51,14 +36,32 @@ import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TransportSettings;
+import org.hamcrest.Matcher;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.DATA;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.INGEST;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.MASTER;
+import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileNotExists;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
/**
@@ -81,10 +84,10 @@ public class InternalTestClusterTests extends ESTestCase {
Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
+ randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
+ randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
// TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way
assertClusters(cluster0, cluster1, false);
@@ -116,7 +119,8 @@ public class InternalTestClusterTests extends ESTestCase {
public static void assertSettings(Settings left, Settings right, boolean checkClusterUniqueSettings) {
Set<Map.Entry<String, String>> entries0 = left.getAsMap().entrySet();
Map<String, String> entries1 = right.getAsMap();
- assertThat(entries0.size(), equalTo(entries1.size()));
+ assertThat("--> left:\n" + left.toDelimitedString('\n') + "\n-->right:\n" + right.toDelimitedString('\n'),
+ entries0.size(), equalTo(entries1.size()));
for (Map.Entry<String, String> entry : entries0) {
if (clusterUniqueSettings.contains(entry.getKey()) && checkClusterUniqueSettings == false) {
continue;
@@ -125,6 +129,41 @@ public class InternalTestClusterTests extends ESTestCase {
}
}
+ private void assertMMNinNodeSetting(InternalTestCluster cluster, int masterNodes) {
+ for (final String node : cluster.getNodeNames()) {
+ assertMMNinNodeSetting(node, cluster, masterNodes);
+ }
+ }
+
+ private void assertMMNinNodeSetting(String node, InternalTestCluster cluster, int masterNodes) {
+ final int minMasterNodes = masterNodes / 2 + 1;
+ final Matcher<Map<? extends String, ? extends String>> minMasterMatcher =
+ hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes));
+ final Matcher<Map<? extends String, ?>> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()));
+ Settings nodeSettings = cluster.client(node).admin().cluster().prepareNodesInfo(node).get().getNodes().get(0).getSettings();
+ assertThat("node setting of node [" + node + "] has the wrong min_master_node setting: ["
+ + nodeSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
+ nodeSettings.getAsMap(),
+ cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher);
+ }
+
+ private void assertMMNinClusterSetting(InternalTestCluster cluster, int masterNodes) {
+ final int minMasterNodes = masterNodes / 2 + 1;
+ Matcher<Map<? extends String, ? extends String>> minMasterMatcher =
+ hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes));
+ Matcher<Map<? extends String, ?>> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()));
+
+ for (final String node : cluster.getNodeNames()) {
+ Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true)
+ .get().getState().getMetaData().settings();
+
+ assertThat("dynamic setting for node [" + node + "] has the wrong min_master_node setting : ["
+ + stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
+ stateSettings.getAsMap(),
+ cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher);
+ }
+ }
+
public void testBeforeTest() throws Exception {
long clusterSeed = randomLong();
boolean masterNodes = randomBoolean();
@@ -156,11 +195,12 @@ public class InternalTestClusterTests extends ESTestCase {
Path baseDir = createTempDir();
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class);
+ final boolean autoManageMinMasterNodes = randomBoolean();
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
+ autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, mockPlugins, Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes,
+ autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, mockPlugins, Function.identity());
assertClusters(cluster0, cluster1, false);
@@ -182,6 +222,8 @@ public class InternalTestClusterTests extends ESTestCase {
assertSettings(client.settings(), other.settings(), false);
}
assertArrayEquals(cluster0.getNodeNames(), cluster1.getNodeNames());
+ assertMMNinNodeSetting(cluster0, cluster0.numMasterNodes());
+ assertMMNinNodeSetting(cluster1, cluster0.numMasterNodes());
cluster0.afterTest();
cluster1.afterTest();
} finally {
@@ -216,12 +258,15 @@ public class InternalTestClusterTests extends ESTestCase {
boolean enableHttpPipelining = randomBoolean();
String nodePrefix = "test";
Path baseDir = createTempDir();
+ final boolean autoManageMinMasterNodes = randomBoolean();
InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
- minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
+ autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class),
Function.identity());
try {
cluster.beforeTest(random(), 0.0);
+ final int originalMasterCount = cluster.numMasterNodes();
+ assertMMNinNodeSetting(cluster, originalMasterCount);
final Map<String,Path[]> shardNodePaths = new HashMap<>();
for (String name: cluster.getNodeNames()) {
shardNodePaths.put(name, getNodePaths(cluster, name));
@@ -230,7 +275,15 @@ public class InternalTestClusterTests extends ESTestCase {
Path dataPath = getNodePaths(cluster, poorNode)[0];
final Path testMarker = dataPath.resolve("testMarker");
Files.createDirectories(testMarker);
+ int expectedMasterCount = originalMasterCount;
+ if (cluster.getInstance(ClusterService.class, poorNode).localNode().isMasterNode()) {
+ expectedMasterCount--;
+ }
cluster.stopRandomNode(InternalTestCluster.nameFilter(poorNode));
+ if (expectedMasterCount != originalMasterCount) {
+ // check for updated
+ assertMMNinClusterSetting(cluster, expectedMasterCount);
+ }
assertFileExists(testMarker); // stopping a node half way shouldn't clean data
final String stableNode = randomFrom(cluster.getNodeNames());
@@ -240,10 +293,17 @@ public class InternalTestClusterTests extends ESTestCase {
Files.createDirectories(stableTestMarker);
final String newNode1 = cluster.startNode();
+ expectedMasterCount++;
assertThat(getNodePaths(cluster, newNode1)[0], equalTo(dataPath));
assertFileExists(testMarker); // starting a node should re-use data folders and not clean it
+ if (expectedMasterCount > 1) { // this is the first master, it's in cluster state settings won't be updated
+ assertMMNinClusterSetting(cluster, expectedMasterCount);
+ }
+ assertMMNinNodeSetting(newNode1, cluster, expectedMasterCount);
final String newNode2 = cluster.startNode();
+ expectedMasterCount++;
+ assertMMNinClusterSetting(cluster, expectedMasterCount);
final Path newDataPath = getNodePaths(cluster, newNode2)[0];
final Path newTestMarker = newDataPath.resolve("newTestMarker");
assertThat(newDataPath, not(dataPath));
@@ -262,6 +322,7 @@ public class InternalTestClusterTests extends ESTestCase {
assertThat("data paths for " + name + " changed", getNodePaths(cluster, name),
equalTo(shardNodePaths.get(name)));
}
+ assertMMNinNodeSetting(cluster, originalMasterCount);
} finally {
cluster.close();
@@ -280,7 +341,7 @@ public class InternalTestClusterTests extends ESTestCase {
public void testDifferentRolesMaintainPathOnRestart() throws Exception {
final Path baseDir = createTempDir();
final int numNodes = 5;
- InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, 0, 0, "test",
+ InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, true, 0, 0, "test",
new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
@@ -301,7 +362,9 @@ public class InternalTestClusterTests extends ESTestCase {
try {
Map<DiscoveryNode.Role, Set<String>> pathsPerRole = new HashMap<>();
for (int i = 0; i < numNodes; i++) {
- final DiscoveryNode.Role role = randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST);
+ final DiscoveryNode.Role role = i == numNodes -1 && pathsPerRole.containsKey(MASTER) == false ?
+ MASTER : // last noe and still no master ofr the cluster
+ randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST);
final String node;
switch (role) {
case MASTER:
@@ -343,6 +406,59 @@ public class InternalTestClusterTests extends ESTestCase {
} finally {
cluster.close();
}
+ }
+ public void testTwoNodeCluster() throws Exception {
+ final boolean autoManageMinMasterNodes = randomBoolean();
+ NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
+ @Override
+ public Settings nodeSettings(int nodeOrdinal) {
+ return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false)
+ .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2)
+ .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME)
+ .build();
+ }
+
+ @Override
+ public Settings transportClientSettings() {
+ return Settings.builder()
+ .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build();
+ }
+ };
+ boolean enableHttpPipelining = randomBoolean();
+ String nodePrefix = "test";
+ Path baseDir = createTempDir();
+ InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false, autoManageMinMasterNodes, 2, 2,
+ "test", nodeConfigurationSource, 0, enableHttpPipelining, nodePrefix,
+ Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity());
+ try {
+ cluster.beforeTest(random(), 0.0);
+ assertMMNinNodeSetting(cluster, 2);
+ switch (randomInt(2)) {
+ case 0:
+ cluster.stopRandomDataNode();
+ assertMMNinClusterSetting(cluster, 1);
+ cluster.startNode();
+ assertMMNinClusterSetting(cluster, 2);
+ assertMMNinNodeSetting(cluster, 2);
+ break;
+ case 1:
+ cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
+ @Override
+ public Settings onNodeStopped(String nodeName) throws Exception {
+ assertMMNinClusterSetting(cluster, 1);
+ return super.onNodeStopped(nodeName);
+ }
+ });
+ assertMMNinClusterSetting(cluster, 2);
+ break;
+ case 2:
+ cluster.fullRestart();
+ break;
+ }
+ assertMMNinNodeSetting(cluster, 2);
+ } finally {
+ cluster.close();
+ }
}
}