summaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorLee Hinman <dakrone@users.noreply.github.com>2017-06-29 08:56:34 -0600
committerGitHub <noreply@github.com>2017-06-29 08:56:34 -0600
commit22ff76da0c7eaaadfb4f5127770bdeba9eb0e6ac (patch)
treedec3b5885e701ff85059f4947eb558711df2d4f0 /core/src/test/java
parent7f2bcf1f97c917bf7e5a5c0bf904f6092e576acb (diff)
Promote replica on the highest version node (#25277)
* Promote replica on the highest version node This changes the replica selection to prefer to return replicas on the highest version when choosing a replacement to promote when the primary shard fails. Consider this situation: - A replica on a 5.6 node - Another replica on a 6.0 node - The primary on a 6.0 node The primary shard is sending sequence numbers to the replica on the 6.0 node and skipping sending them for the 5.6 node. Now assume that the primary shard fails and (prior to this change) the replica on 5.6 node gets promoted to primary, it now has no knowledge of sequence numbers and the replica on the 6.0 node will be expecting sequence numbers but will never receive them. Relates to #10708 * Switch from map of node to version to retrieving the version from the node * Remove uneeded null check * You can pretend you're a functional language Java, but you're not fooling me. * Randomize node versions * Add test with random cluster state with multiple versions that fails shards * Re-add comment and remove extra import * Remove unneeded stuff, randomly start replicas a few more times * Move test into FailedNodeRoutingTests * Make assertions actually test replica version promotion * Rewrite test, taking Yannick's feedback into account
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java148
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java120
2 files changed, 266 insertions, 2 deletions
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java
index 61a28897d5..3b551e9129 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java
@@ -19,20 +19,50 @@
package org.elasticsearch.cluster.routing.allocation;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
-
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.indices.cluster.AbstractIndicesClusterStateServiceTestCase;
+import org.elasticsearch.indices.cluster.ClusterStateChanges;
+import org.elasticsearch.indices.cluster.IndicesClusterStateService;
+import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
@@ -91,4 +121,120 @@ public class FailedNodeRoutingTests extends ESAllocationTestCase {
assertThat(routingNode.numberOfShardsWithState(INITIALIZING), equalTo(1));
}
}
+
+ public void testRandomClusterPromotesNewestReplica() throws InterruptedException {
+
+ ThreadPool threadPool = new TestThreadPool(getClass().getName());
+ ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
+ ClusterState state = randomInitialClusterState();
+
+ // randomly add nodes of mixed versions
+ logger.info("--> adding random nodes");
+ for (int i = 0; i < randomIntBetween(4, 8); i++) {
+ DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes())
+ .add(createNode()).build();
+ state = ClusterState.builder(state).nodes(newNodes).build();
+ state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after adding node
+ }
+
+ // Log the node versions (for debugging if necessary)
+ for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) {
+ Version nodeVer = cursor.value.getVersion();
+ logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer);
+ }
+
+ // randomly create some indices
+ logger.info("--> creating some indices");
+ for (int i = 0; i < randomIntBetween(2, 5); i++) {
+ String name = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
+ Settings.Builder settingsBuilder = Settings.builder()
+ .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
+ .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4));
+ CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
+ state = cluster.createIndex(state, request);
+ assertTrue(state.metaData().hasIndex(name));
+ }
+
+ ClusterState previousState = state;
+
+ logger.info("--> starting shards");
+ state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
+ logger.info("--> starting replicas a random number of times");
+ for (int i = 0; i < randomIntBetween(1,10); i++) {
+ state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
+ }
+
+ boolean keepGoing = true;
+ while (keepGoing) {
+ List<ShardRouting> primaries = state.getRoutingNodes().shardsWithState(STARTED)
+ .stream().filter(ShardRouting::primary).collect(Collectors.toList());
+
+ // Pick a random subset of primaries to fail
+ List<FailedShard> shardsToFail = new ArrayList<>();
+ List<ShardRouting> failedPrimaries = randomSubsetOf(primaries);
+ failedPrimaries.stream().forEach(sr -> {
+ shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception()));
+ });
+
+ logger.info("--> state before failing shards: {}", state);
+ state = cluster.applyFailedShards(state, shardsToFail);
+
+ final ClusterState compareState = state;
+ failedPrimaries.forEach(shardRouting -> {
+ logger.info("--> verifying version for {}", shardRouting);
+
+ ShardRouting newPrimary = compareState.routingTable().index(shardRouting.index())
+ .shard(shardRouting.id()).primaryShard();
+ Version newPrimaryVersion = getNodeVersion(newPrimary, compareState);
+
+ logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary);
+ compareState.routingTable().shardRoutingTable(newPrimary.shardId()).shardsWithState(STARTED)
+ .stream()
+ .forEach(sr -> {
+ Version candidateVer = getNodeVersion(sr, compareState);
+ if (candidateVer != null) {
+ logger.info("--> candidate on {} node; shard routing: {}", candidateVer, sr);
+ assertTrue("candidate was not on the newest version, new primary is on " +
+ newPrimaryVersion + " and there is a candidate on " + candidateVer,
+ candidateVer.onOrBefore(newPrimaryVersion));
+ }
+ });
+ });
+
+ keepGoing = randomBoolean();
+ }
+ terminate(threadPool);
+ }
+
+ private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) {
+ return Optional.ofNullable(state.getNodes().get(shardRouting.currentNodeId())).map(DiscoveryNode::getVersion).orElse(null);
+ }
+
+ private static final AtomicInteger nodeIdGenerator = new AtomicInteger();
+
+ public ClusterState randomInitialClusterState() {
+ List<DiscoveryNode> allNodes = new ArrayList<>();
+ DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
+ allNodes.add(localNode);
+ // at least two nodes that have the data role so that we can allocate shards
+ allNodes.add(createNode(DiscoveryNode.Role.DATA));
+ allNodes.add(createNode(DiscoveryNode.Role.DATA));
+ for (int i = 0; i < randomIntBetween(2, 5); i++) {
+ allNodes.add(createNode());
+ }
+ ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));
+ return state;
+ }
+
+
+ protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
+ Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
+ for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
+ roles.add(mustHaveRole);
+ }
+ final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
+ return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
+ VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null));
+ }
+
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
index 6063faba15..2eedeba63f 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
@@ -19,12 +19,14 @@
package org.elasticsearch.cluster.routing.allocation;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -35,6 +37,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.VersionUtils;
import java.util.ArrayList;
import java.util.Collections;
@@ -499,7 +502,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
- ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId);
+ ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
// fail the primary shard, check replicas get removed as well...
@@ -556,4 +559,119 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
}
+
+ public void testReplicaOnNewestVersionIsPromoted() {
+ AllocationService allocation = createAllocationService(Settings.builder().build());
+
+ MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test")
+ .settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build();
+
+ RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();
+
+ ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+ .metaData(metaData).routingTable(initialRoutingTable).build();
+
+ ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0);
+
+ // add a single node
+ clusterState = ClusterState.builder(clusterState).nodes(
+ DiscoveryNodes.builder()
+ .add(newNode("node1-5.x", Version.V_5_6_0)))
+ .build();
+ clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
+ assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));
+
+ // start primary shard
+ clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));
+
+ // add another 5.6 node
+ clusterState = ClusterState.builder(clusterState).nodes(
+ DiscoveryNodes.builder(clusterState.nodes())
+ .add(newNode("node2-5.x", Version.V_5_6_0)))
+ .build();
+
+ // start the shards, should have 1 primary and 1 replica available
+ clusterState = allocation.reroute(clusterState, "reroute");
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
+ clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
+
+ clusterState = ClusterState.builder(clusterState).nodes(
+ DiscoveryNodes.builder(clusterState.nodes())
+ .add(newNode("node3-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null)))
+ .add(newNode("node4-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null))))
+ .build();
+
+ // start all the replicas
+ clusterState = allocation.reroute(clusterState, "reroute");
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
+ clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
+
+ ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
+ logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica);
+
+ // fail the primary shard again and make sure the correct replica is promoted
+ ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
+ ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
+ assertThat(newState, not(equalTo(clusterState)));
+ clusterState = newState;
+ // the primary gets allocated on another node
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3));
+
+ ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
+ assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
+ assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));
+
+ Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
+ assertNotNull(replicaNodeVersion);
+ logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);
+
+ for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
+ if ("node1".equals(cursor.value.getId())) {
+ // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
+ continue;
+ }
+ Version nodeVer = cursor.value.getVersion();
+ assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
+ replicaNodeVersion.onOrAfter(nodeVer));
+ }
+
+ startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
+ logger.info("--> failing primary shard a second time, should select: {}", startedReplica);
+
+ // fail the primary shard again, and ensure the same thing happens
+ ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
+ newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail);
+ assertThat(newState, not(equalTo(clusterState)));
+ clusterState = newState;
+ // the primary gets allocated on another node
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
+
+ newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
+ assertThat(newPrimaryShard, not(equalTo(secondPrimaryShardToFail)));
+ assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));
+
+ replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
+ assertNotNull(replicaNodeVersion);
+ logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);
+
+ for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
+ if (primaryShardToFail.currentNodeId().equals(cursor.value.getId()) ||
+ secondPrimaryShardToFail.currentNodeId().equals(cursor.value.getId())) {
+ // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
+ continue;
+ }
+ Version nodeVer = cursor.value.getVersion();
+ assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
+ replicaNodeVersion.onOrAfter(nodeVer));
+ }
+ }
}