diff options
author | Boaz Leskes <b.leskes@gmail.com> | 2016-11-15 17:09:08 +0000 |
---|---|---|
committer | Boaz Leskes <b.leskes@gmail.com> | 2016-11-15 17:09:08 +0000 |
commit | 2c0338fa87fb1469cec52d6f4ef5b2e20fd3c703 (patch) | |
tree | 392cd94f4f3fa60df38fcc28922b4fcf410b4824 /core | |
parent | ee722d738aca80c66e3eede18e6b164bf1f24f1a (diff) | |
parent | 568a7ea5f197dc3f7396280a4bb0a4d5fcda7868 (diff) |
Merge remote-tracking branch 'upstream/master' into feature/seq_no
Diffstat (limited to 'core')
26 files changed, 213 insertions, 316 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 81c102f8fb..9662292cf6 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.util.concurrent; - import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -112,8 +111,22 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); - assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" + - Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]"; + assert assertDefaultContext(r); + } + + private boolean assertDefaultContext(Runnable r) { + try { + assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" + + Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]"; + } catch (IllegalStateException ex) { + // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks + // this must not trigger an exception here since we only assert if the default is restored and + // we don't really care if we are closed + if (contextHolder.isClosed() == false) { + throw ex; + } + } + return true; } /** diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 18ea7e9ace..ca1d364ffd 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -253,6 +253,13 @@ public final class ThreadContext implements Closeable, Writeable { return threadLocal.get() == DEFAULT_CONTEXT; } + /** + * Returns <code>true</code> if the context is closed, otherwise <code>true</code> + */ + boolean isClosed() { + return threadLocal.closed.get(); + } + @FunctionalInterface public interface StoredContext extends AutoCloseable { @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index f6870cc05b..7794c58ddd 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -19,30 +19,9 @@ package org.elasticsearch.discovery.zen; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -53,6 +32,8 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -75,6 +56,25 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -186,9 +186,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } @Override - public void close() throws IOException { + public void close() { ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); - IOUtils.close(receivedResponses.values()); + Releasables.close(receivedResponses.values()); closed = true; } @@ -272,7 +272,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } } - class SendPingsHandler implements Closeable { + class SendPingsHandler implements Releasable { private final int id; private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet(); private final PingCollection pingCollection; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f9a16243e0..272a75f4e7 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -240,6 +241,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover joinThreadControl.stop(); masterFD.stop("zen disco stop"); nodesFD.stop(); + Releasables.close(zenPing); // stop any ongoing pinging DiscoveryNodes nodes = nodes(); if (sendLeaveRequest) { if (nodes.getMasterNode() == null) { @@ -269,7 +271,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover @Override protected void doClose() throws IOException { - IOUtils.close(masterFD, nodesFD, zenPing); + IOUtils.close(masterFD, nodesFD); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index cb2c8cb501..75ea701dc9 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -19,26 +19,26 @@ package org.elasticsearch.discovery.zen; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -public interface ZenPing extends Closeable { +public interface ZenPing extends Releasable { void start(PingContextProvider contextProvider); diff --git a/core/src/main/java/org/elasticsearch/index/mapper/Mapper.java b/core/src/main/java/org/elasticsearch/index/mapper/Mapper.java index 83a20e03ff..384331c2d9 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/Mapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/Mapper.java @@ -94,7 +94,6 @@ public abstract class Mapper implements ToXContent, Iterable<Mapper> { private final ParseFieldMatcher parseFieldMatcher; private final Supplier<QueryShardContext> queryShardContextSupplier; - private QueryShardContext queryShardContext; public ParserContext(String type, IndexAnalyzers indexAnalyzers, Function<String, SimilarityProvider> similarityLookupService, MapperService mapperService, Function<String, TypeParser> typeParsers, @@ -138,12 +137,8 @@ public abstract class Mapper implements ToXContent, Iterable<Mapper> { return parseFieldMatcher; } - public QueryShardContext queryShardContext() { - // No need for synchronization, this class must be used in a single thread - if (queryShardContext == null) { - queryShardContext = queryShardContextSupplier.get(); - } - return queryShardContext; + public Supplier<QueryShardContext> queryShardContextSupplier() { + return queryShardContextSupplier; } public boolean isWithinMultiField() { return false; } @@ -161,7 +156,7 @@ public abstract class Mapper implements ToXContent, Iterable<Mapper> { static class MultiFieldParserContext extends ParserContext { MultiFieldParserContext(ParserContext in) { - super(in.type(), in.indexAnalyzers, in.similarityLookupService(), in.mapperService(), in.typeParsers(), in.indexVersionCreated(), in.parseFieldMatcher(), in::queryShardContext); + super(in.type(), in.indexAnalyzers, in.similarityLookupService(), in.mapperService(), in.typeParsers(), in.indexVersionCreated(), in.parseFieldMatcher(), in.queryShardContextSupplier()); } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 9eb7f9a037..298d6712ff 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -75,9 +75,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.UnicastHostsProvider; -import org.elasticsearch.discovery.zen.UnicastZenPing; -import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayAllocator; @@ -655,11 +652,13 @@ public class Node implements Closeable { injector.getInstance(SnapshotShardsService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); + // close discovery early to not react to pings anymore. + // This can confuse other nodes and delay things - mostly if we're the master and we're running tests. + injector.getInstance(Discovery.class).stop(); // we close indices first, so operations won't be allowed on it injector.getInstance(IndicesTTLService.class).stop(); injector.getInstance(RoutingService.class).stop(); injector.getInstance(ClusterService.class).stop(); - injector.getInstance(Discovery.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(MonitorService.class).stop(); injector.getInstance(GatewayService.class).stop(); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java index e289d90c7a..b5ef7a642a 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java @@ -31,7 +31,8 @@ import java.io.IOException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; -@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) +@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0, + autoMinMasterNodes = false) public class IndicesExistsIT extends ESIntegTestCase { public void testIndexExistsWithBlocksInPlace() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 96ba5729cb..7c764ed172 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -43,7 +43,7 @@ import java.util.concurrent.CyclicBarrier; import static org.hamcrest.Matchers.equalTo; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class IndexingMasterFailoverIT extends ESIntegTestCase { @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 3e58291d4a..257b80663a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -63,7 +63,7 @@ import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) @TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE") public class MinimumMasterNodesIT extends ESIntegTestCase { @@ -275,12 +275,14 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { .put("discovery.initial_state_timeout", "500ms") .build(); - logger.info("--> start 2 nodes"); - internalCluster().startNodesAsync(2, settings).get(); + logger.info("--> start first node and wait for it to be a master"); + internalCluster().startNode(settings); + ensureClusterSizeConsistency(); // wait until second node join the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + logger.info("--> start second node and wait for it to join"); + internalCluster().startNode(settings); + ensureClusterSizeConsistency(); logger.info("--> setting minimum master node to 2"); setMinimumMasterNodes(2); @@ -298,8 +300,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { logger.info("--> bringing another node up"); internalCluster().startNode(Settings.builder().put(settings).put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build()); - clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + ensureClusterSizeConsistency(); } private void assertNoMasterBlockOnAllNodes() throws InterruptedException { diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java index f73043ce4e..0be4f7c4e5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -49,7 +49,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class NoMasterNodeIT extends ESIntegTestCase { @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java index f47134a45b..ab3f82fff7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java @@ -69,7 +69,10 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase { private void removePublishTimeout() { //to test that the acknowledgement mechanism is working we better disable the wait for publish //otherwise the operation is most likely acknowledged even if it doesn't support ack - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0"))); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0") + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") + )); } public void testClusterUpdateSettingsAcknowledgement() { diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java index 2ea4f75515..f51a4f11ae 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java @@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.index.Index; @@ -61,7 +60,9 @@ public class AckIT extends ESIntegTestCase { //to test that the acknowledgement mechanism is working we better disable the wait for publish //otherwise the operation is most likely acknowledged even if it doesn't support ack return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), 0).build(); + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit to check acking + .build(); } public void testUpdateSettingsAcknowledgement() { diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java index d98f929424..19657b0548 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationD import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -104,7 +103,6 @@ public class AwarenessAllocationIT extends ESIntegTestCase { Settings commonSettings = Settings.builder() .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b") .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") - .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 3) .put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "10s") .build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index 3d345f24db..f056eded34 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -20,32 +20,20 @@ package org.elasticsearch.cluster.service; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -56,17 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class ClusterServiceIT extends ESIntegTestCase { - @Override - protected Collection<Class<? extends Plugin>> nodePlugins() { - return Arrays.asList(TestPlugin.class); - } - public void testAckedUpdateTask() throws Exception { internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); @@ -482,141 +463,4 @@ public class ClusterServiceIT extends ESIntegTestCase { assertTrue(controlSources.isEmpty()); block2.countDown(); } - - public void testLocalNodeMasterListenerCallbacks() throws Exception { - Settings settings = Settings.builder() - .put("discovery.zen.minimum_master_nodes", 1) - .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") - .put("discovery.initial_state_timeout", "500ms") - .build(); - - String node_0 = internalCluster().startNode(settings); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - MasterAwareService testService = internalCluster().getInstance(MasterAwareService.class); - - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("1").get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - // the first node should be a master as the minimum required is 1 - assertThat(clusterService.state().nodes().getMasterNode(), notNullValue()); - assertThat(clusterService.state().nodes().isLocalNodeElectedMaster(), is(true)); - assertThat(testService.master(), is(true)); - String node_1 = internalCluster().startNode(settings); - final ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class, node_1); - MasterAwareService testService1 = internalCluster().getInstance(MasterAwareService.class, node_1); - - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - // the second node should not be the master as node1 is already the master. - assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(false)); - assertThat(testService1.master(), is(false)); - - internalCluster().stopCurrentMasterNode(); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - // now that node0 is closed, node1 should be elected as master - assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(true)); - assertThat(testService1.master(), is(true)); - - // start another node and set min_master_node - internalCluster().startNode(Settings.builder().put(settings)); - assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); - - Settings transientSettings = Settings.builder() - .put("discovery.zen.minimum_master_nodes", 2) - .build(); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(transientSettings).get(); - - // and shutdown the second node - internalCluster().stopRandomNonMasterNode(); - - // there should not be any master as the minimum number of required eligible masters is not met - awaitBusy(() -> clusterService1.state().nodes().getMasterNode() == null && - clusterService1.clusterServiceState().getClusterStateStatus() == ClusterStateStatus.APPLIED); - assertThat(testService1.master(), is(false)); - - // bring the node back up - String node_2 = internalCluster().startNode(Settings.builder().put(settings).put(transientSettings)); - ClusterService clusterService2 = internalCluster().getInstance(ClusterService.class, node_2); - MasterAwareService testService2 = internalCluster().getInstance(MasterAwareService.class, node_2); - - // make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive - // the updated cluster state... - assertThat(internalCluster().client(node_1).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true) - .setWaitForNodes("2").get().isTimedOut(), is(false)); - assertThat(internalCluster().client(node_2).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true) - .setWaitForNodes("2").get().isTimedOut(), is(false)); - - // now that we started node1 again, a new master should be elected - assertThat(clusterService2.state().nodes().getMasterNode(), is(notNullValue())); - if (node_2.equals(clusterService2.state().nodes().getMasterNode().getName())) { - assertThat(testService1.master(), is(false)); - assertThat(testService2.master(), is(true)); - } else { - assertThat(testService1.master(), is(true)); - assertThat(testService2.master(), is(false)); - } - } - - public static class TestPlugin extends Plugin { - - @Override - public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() { - List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1); - services.add(MasterAwareService.class); - return services; - } - } - - @Singleton - public static class MasterAwareService extends AbstractLifecycleComponent implements LocalNodeMasterListener { - - private final ClusterService clusterService; - private volatile boolean master; - - @Inject - public MasterAwareService(Settings settings, ClusterService clusterService) { - super(settings); - clusterService.add(this); - this.clusterService = clusterService; - logger.info("initialized test service"); - } - - @Override - public void onMaster() { - logger.info("on master [{}]", clusterService.localNode()); - master = true; - } - - @Override - public void offMaster() { - logger.info("off master [{}]", clusterService.localNode()); - master = false; - } - - public boolean master() { - return master; - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - - @Override - public String executorName() { - return ThreadPool.Names.SAME; - } - - } } diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index 9fd4fc1851..bede01ed21 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -44,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -1098,6 +1100,48 @@ public class ClusterServiceTests extends ESTestCase { timedClusterService.close(); } + public void testLocalNodeMasterListenerCallbacks() throws Exception { + TimedClusterService timedClusterService = createTimedClusterService(false); + + AtomicBoolean isMaster = new AtomicBoolean(); + timedClusterService.add(new LocalNodeMasterListener() { + @Override + public void onMaster() { + isMaster.set(true); + } + + @Override + public void offMaster() { + isMaster.set(false); + } + + @Override + public String executorName() { + return ThreadPool.Names.SAME; + } + }); + + ClusterState state = timedClusterService.state(); + DiscoveryNodes nodes = state.nodes(); + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId()); + state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build(); + setState(timedClusterService, state); + assertThat(isMaster.get(), is(true)); + + nodes = state.nodes(); + nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(null); + state = ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES)) + .nodes(nodesBuilder).build(); + setState(timedClusterService, state); + assertThat(isMaster.get(), is(false)); + nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId()); + state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build(); + setState(timedClusterService, state); + assertThat(isMaster.get(), is(true)); + + timedClusterService.close(); + } + private static class SimpleTask { private final int id; diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 22844e0588..1688157c0d 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -122,7 +122,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false) @TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE") public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java index 3af2e32eef..b708ab4c26 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java @@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.equalTo; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class ZenUnicastDiscoveryIT extends ESIntegTestCase { private ClusterDiscoveryConfiguration discoveryConfig; diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index 226b1422b4..8284388d2c 100644 --- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; @@ -37,7 +36,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -@ClusterScope(numDataNodes =0, scope= Scope.TEST) +@ClusterScope(numDataNodes = 0, scope = Scope.TEST) public class QuorumGatewayIT extends ESIntegTestCase { @Override protected int numberOfReplicas() { @@ -47,8 +46,7 @@ public class QuorumGatewayIT extends ESIntegTestCase { public void testQuorumRecovery() throws Exception { logger.info("--> starting 3 nodes"); // we are shutting down nodes - make sure we don't have 2 clusters if we test network - internalCluster().startNodesAsync(3, - Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build()).get(); + internalCluster().startNodesAsync(3).get(); createIndex("test"); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java index 1e35bcdd46..1794aa2272 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java @@ -34,7 +34,7 @@ import java.util.Set; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class RecoverAfterNodesIT extends ESIntegTestCase { private static final TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(10); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4e1ff3f867..052bfc00ef 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -44,6 +43,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster.RestartCallback; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSIndexStore; @@ -333,48 +333,43 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(); assertThat(metaDataUuid, not(equalTo("_na_"))); - Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null); - logger.info("--> closing first node, and indexing more data to the second node"); - internalCluster().fullRestart(new RestartCallback() { + internalCluster().stopRandomDataNode(); - @Override - public void doAfterNodes(int numNodes, Client client) throws Exception { - if (numNodes == 1) { - logger.info("--> one node is closed - start indexing data into the second one"); - client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); - // TODO: remove once refresh doesn't fail immediately if there a master block: - // https://github.com/elastic/elasticsearch/issues/9997 - client.admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); - client.admin().indices().prepareRefresh().execute().actionGet(); - - for (int i = 0; i < 10; i++) { - assertHitCount(client.prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3); - } + logger.info("--> one node is closed - start indexing data into the second one"); + client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); + // TODO: remove once refresh doesn't fail immediately if there a master block: + // https://github.com/elastic/elasticsearch/issues/9997 + client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); + client().admin().indices().prepareRefresh().execute().actionGet(); - logger.info("--> add some metadata, additional type and template"); - client.admin().indices().preparePutMapping("test").setType("type2") - .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject()) - .execute().actionGet(); - client.admin().indices().preparePutTemplate("template_1") - .setPatterns(Collections.singletonList("te*")) - .setOrder(0) - .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") - .startObject("field1").field("type", "text").field("store", true).endObject() - .startObject("field2").field("type", "keyword").field("store", true).endObject() - .endObject().endObject().endObject()) - .execute().actionGet(); - client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet(); - logger.info("--> starting two nodes back, verifying we got the latest version"); - } + for (int i = 0; i < 10; i++) { + assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3); + } - } + logger.info("--> add some metadata, additional type and template"); + client().admin().indices().preparePutMapping("test").setType("type2") + .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject()) + .execute().actionGet(); + client().admin().indices().preparePutTemplate("template_1") + .setTemplate("te*") + .setOrder(0) + .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") + .startObject("field1").field("type", "text").field("store", true).endObject() + .startObject("field2").field("type", "keyword").field("store", true).endObject() + .endObject().endObject().endObject()) + .execute().actionGet(); + client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet(); + + logger.info("--> stopping the second node"); + internalCluster().stopRandomDataNode(); + + logger.info("--> starting the two nodes back"); - }); + internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get(); logger.info("--> running cluster_health (wait for the shards to startup)"); ensureGreen(); - primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(), equalTo(metaDataUuid)); @@ -502,27 +497,28 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { public void testRecoveryDifferentNodeOrderStartup() throws Exception { // we need different data paths so we make sure we start the second node fresh - final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build()); + final Path pathNode1 = createTempDir(); + final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build()); client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet(); - internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build()); + final Path pathNode2 = createTempDir(); + final String node_2 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build()); ensureGreen(); Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null); - - internalCluster().fullRestart(new RestartCallback() { - - @Override - public boolean doRestart(String nodeName) { - return !node_1.equals(nodeName); - } - }); - + if (randomBoolean()) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2)); + } else { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1)); + } + // start the second node again + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build()); ensureYellow(); primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); - assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index dfe9a09fb2..e0a6111833 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -555,10 +555,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { // start a master node internalCluster().startNode(nodeSettings); - InternalTestCluster.Async<String> blueFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); - InternalTestCluster.Async<String> redFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); - final String blueNodeName = blueFuture.get(); - final String redNodeName = redFuture.get(); + final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get(); assertThat(response.isTimedOut(), is(false)); diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index c55fc51433..fe3b569755 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -209,7 +209,10 @@ public class RareClusterStateIT extends ESIntegTestCase { // but the change might not be on the node that performed the indexing // operation yet - Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0ms").build(); + Settings settings = Settings.builder() + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design + .build(); final List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get(); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); @@ -327,7 +330,6 @@ public class RareClusterStateIT extends ESIntegTestCase { // time of indexing it final List<String> nodeNames = internalCluster().startNodesAsync(2, Settings.builder() - .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design .build()).get(); diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java index 4a61bebd4d..dc38867705 100644 --- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java @@ -72,16 +72,15 @@ public class FullRollingRestartIT extends ESIntegTestCase { } logger.info("--> now start adding nodes"); - internalCluster().startNodesAsync(2, settings).get(); + internalCluster().startNode(settings); + internalCluster().startNode(settings); // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3")); logger.info("--> add two more nodes"); - internalCluster().startNodesAsync(2, settings).get(); - - // We now have 5 nodes - setMinimumMasterNodes(3); + internalCluster().startNode(settings); + internalCluster().startNode(settings); // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("5")); @@ -97,9 +96,6 @@ public class FullRollingRestartIT extends ESIntegTestCase { // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("4")); - // going down to 3 nodes. note that the min_master_node may not be in effect when we shutdown the 4th - // node, but that's OK as it is set to 3 before. - setMinimumMasterNodes(2); internalCluster().stopRandomDataNode(); // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3")); @@ -115,8 +111,6 @@ public class FullRollingRestartIT extends ESIntegTestCase { // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("2")); - // closing the 2nd node - setMinimumMasterNodes(1); internalCluster().stopRandomDataNode(); // make sure the cluster state is yellow, and all has been recovered diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 205f2bbfad..cd93a9fef2 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -60,7 +60,6 @@ import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; @@ -399,7 +398,8 @@ public class RelocationIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) ).get(); - internalCluster().startNodesAsync(2).get(); + internalCluster().startNode(); + internalCluster().startNode(); List<IndexRequestBuilder> requests = new ArrayList<>(); int numDocs = scaledRandomIntBetween(25, 250); @@ -466,14 +466,15 @@ public class RelocationIT extends ESIntegTestCase { public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException { int halfNodes = randomIntBetween(1, 3); - Settings blueSetting = Settings.builder().put("node.attr.color", "blue").build(); - InternalTestCluster.Async<List<String>> blueFuture = internalCluster().startNodesAsync(halfNodes, blueSetting); - Settings redSetting = Settings.builder().put("node.attr.color", "red").build(); - InternalTestCluster.Async<java.util.List<String>> redFuture = internalCluster().startNodesAsync(halfNodes, redSetting); - blueFuture.get(); - redFuture.get(); - logger.info("blue nodes: {}", blueFuture.get()); - logger.info("red nodes: {}", redFuture.get()); + Settings[] nodeSettings = Stream.concat( + Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), + Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes) + ).toArray(Settings[]::new); + List<String> nodes = internalCluster().startNodesAsync(nodeSettings).get(); + String[] blueNodes = nodes.subList(0, halfNodes).stream().toArray(String[]::new); + String[] redNodes = nodes.subList(halfNodes, nodes.size()).stream().toArray(String[]::new); + logger.info("blue nodes: {}", (Object)blueNodes); + logger.info("red nodes: {}", (Object)redNodes); ensureStableCluster(halfNodes * 2); assertAcked(prepareCreate("test", Settings.builder() @@ -481,7 +482,7 @@ public class RelocationIT extends ESIntegTestCase { .put(indexSettings()) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) )); - assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2])); + assertAllShardsOnNodes("test", redNodes); int numDocs = randomIntBetween(100, 150); ArrayList<String> ids = new ArrayList<>(); logger.info(" --> indexing [{}] docs", numDocs); diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 6121b2c0c8..cf4fe03893 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -19,18 +19,6 @@ package org.elasticsearch.tribe; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.client.Client; @@ -58,6 +46,18 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + import static java.util.stream.Collectors.toSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -121,13 +121,13 @@ public class TribeIT extends ESIntegTestCase { final Collection<Class<? extends Plugin>> plugins = nodePlugins(); if (cluster1 == null) { - cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes, + cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes, UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_1", plugins, Function.identity()); } if (cluster2 == null) { - cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes, + cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes, UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_2", plugins, Function.identity()); } |