summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java27
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java148
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java120
3 files changed, 284 insertions, 11 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
index e93d071b0c..8268b98f34 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
@@ -320,14 +320,23 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
+ *
+ * Since replicas could possibly be on nodes with a older version of ES than
+ * the primary is, this will return replicas on the highest version of ES.
+ *
*/
- public ShardRouting activeReplica(ShardId shardId) {
- for (ShardRouting shardRouting : assignedShards(shardId)) {
- if (!shardRouting.primary() && shardRouting.active()) {
- return shardRouting;
- }
- }
- return null;
+ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
+ // It's possible for replicaNodeVersion to be null, when deassociating dead nodes
+ // that have been removed, the shards are failed, and part of the shard failing
+ // calls this method with an out-of-date RoutingNodes, where the version might not
+ // be accessible. Therefore, we need to protect against the version being null
+ // (meaning the node will be going away).
+ return assignedShards(shardId).stream()
+ .filter(shr -> !shr.primary() && shr.active())
+ .filter(shr -> node(shr.currentNodeId()) != null)
+ .max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(),
+ Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))))
+ .orElse(null);
}
/**
@@ -567,7 +576,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (failedShard.relocatingNodeId() == null) {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
- ShardRouting activeReplica = activeReplica(failedShard.shardId());
+ ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
@@ -596,7 +605,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
assert failedShard.active();
if (failedShard.primary()) {
// promote active replica to primary if active replica exists
- ShardRouting activeReplica = activeReplica(failedShard.shardId());
+ ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java
index 61a28897d5..3b551e9129 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java
@@ -19,20 +19,50 @@
package org.elasticsearch.cluster.routing.allocation;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
-
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.indices.cluster.AbstractIndicesClusterStateServiceTestCase;
+import org.elasticsearch.indices.cluster.ClusterStateChanges;
+import org.elasticsearch.indices.cluster.IndicesClusterStateService;
+import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
@@ -91,4 +121,120 @@ public class FailedNodeRoutingTests extends ESAllocationTestCase {
assertThat(routingNode.numberOfShardsWithState(INITIALIZING), equalTo(1));
}
}
+
+ public void testRandomClusterPromotesNewestReplica() throws InterruptedException {
+
+ ThreadPool threadPool = new TestThreadPool(getClass().getName());
+ ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
+ ClusterState state = randomInitialClusterState();
+
+ // randomly add nodes of mixed versions
+ logger.info("--> adding random nodes");
+ for (int i = 0; i < randomIntBetween(4, 8); i++) {
+ DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes())
+ .add(createNode()).build();
+ state = ClusterState.builder(state).nodes(newNodes).build();
+ state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after adding node
+ }
+
+ // Log the node versions (for debugging if necessary)
+ for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) {
+ Version nodeVer = cursor.value.getVersion();
+ logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer);
+ }
+
+ // randomly create some indices
+ logger.info("--> creating some indices");
+ for (int i = 0; i < randomIntBetween(2, 5); i++) {
+ String name = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
+ Settings.Builder settingsBuilder = Settings.builder()
+ .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
+ .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4));
+ CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
+ state = cluster.createIndex(state, request);
+ assertTrue(state.metaData().hasIndex(name));
+ }
+
+ ClusterState previousState = state;
+
+ logger.info("--> starting shards");
+ state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
+ logger.info("--> starting replicas a random number of times");
+ for (int i = 0; i < randomIntBetween(1,10); i++) {
+ state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
+ }
+
+ boolean keepGoing = true;
+ while (keepGoing) {
+ List<ShardRouting> primaries = state.getRoutingNodes().shardsWithState(STARTED)
+ .stream().filter(ShardRouting::primary).collect(Collectors.toList());
+
+ // Pick a random subset of primaries to fail
+ List<FailedShard> shardsToFail = new ArrayList<>();
+ List<ShardRouting> failedPrimaries = randomSubsetOf(primaries);
+ failedPrimaries.stream().forEach(sr -> {
+ shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception()));
+ });
+
+ logger.info("--> state before failing shards: {}", state);
+ state = cluster.applyFailedShards(state, shardsToFail);
+
+ final ClusterState compareState = state;
+ failedPrimaries.forEach(shardRouting -> {
+ logger.info("--> verifying version for {}", shardRouting);
+
+ ShardRouting newPrimary = compareState.routingTable().index(shardRouting.index())
+ .shard(shardRouting.id()).primaryShard();
+ Version newPrimaryVersion = getNodeVersion(newPrimary, compareState);
+
+ logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary);
+ compareState.routingTable().shardRoutingTable(newPrimary.shardId()).shardsWithState(STARTED)
+ .stream()
+ .forEach(sr -> {
+ Version candidateVer = getNodeVersion(sr, compareState);
+ if (candidateVer != null) {
+ logger.info("--> candidate on {} node; shard routing: {}", candidateVer, sr);
+ assertTrue("candidate was not on the newest version, new primary is on " +
+ newPrimaryVersion + " and there is a candidate on " + candidateVer,
+ candidateVer.onOrBefore(newPrimaryVersion));
+ }
+ });
+ });
+
+ keepGoing = randomBoolean();
+ }
+ terminate(threadPool);
+ }
+
+ private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) {
+ return Optional.ofNullable(state.getNodes().get(shardRouting.currentNodeId())).map(DiscoveryNode::getVersion).orElse(null);
+ }
+
+ private static final AtomicInteger nodeIdGenerator = new AtomicInteger();
+
+ public ClusterState randomInitialClusterState() {
+ List<DiscoveryNode> allNodes = new ArrayList<>();
+ DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
+ allNodes.add(localNode);
+ // at least two nodes that have the data role so that we can allocate shards
+ allNodes.add(createNode(DiscoveryNode.Role.DATA));
+ allNodes.add(createNode(DiscoveryNode.Role.DATA));
+ for (int i = 0; i < randomIntBetween(2, 5); i++) {
+ allNodes.add(createNode());
+ }
+ ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));
+ return state;
+ }
+
+
+ protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
+ Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
+ for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
+ roles.add(mustHaveRole);
+ }
+ final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
+ return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
+ VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null));
+ }
+
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
index 6063faba15..2eedeba63f 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java
@@ -19,12 +19,14 @@
package org.elasticsearch.cluster.routing.allocation;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -35,6 +37,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.VersionUtils;
import java.util.ArrayList;
import java.util.Collections;
@@ -499,7 +502,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
- ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId);
+ ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
// fail the primary shard, check replicas get removed as well...
@@ -556,4 +559,119 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
}
+
+ public void testReplicaOnNewestVersionIsPromoted() {
+ AllocationService allocation = createAllocationService(Settings.builder().build());
+
+ MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test")
+ .settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build();
+
+ RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();
+
+ ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+ .metaData(metaData).routingTable(initialRoutingTable).build();
+
+ ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0);
+
+ // add a single node
+ clusterState = ClusterState.builder(clusterState).nodes(
+ DiscoveryNodes.builder()
+ .add(newNode("node1-5.x", Version.V_5_6_0)))
+ .build();
+ clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
+ assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));
+
+ // start primary shard
+ clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));
+
+ // add another 5.6 node
+ clusterState = ClusterState.builder(clusterState).nodes(
+ DiscoveryNodes.builder(clusterState.nodes())
+ .add(newNode("node2-5.x", Version.V_5_6_0)))
+ .build();
+
+ // start the shards, should have 1 primary and 1 replica available
+ clusterState = allocation.reroute(clusterState, "reroute");
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
+ clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
+
+ clusterState = ClusterState.builder(clusterState).nodes(
+ DiscoveryNodes.builder(clusterState.nodes())
+ .add(newNode("node3-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null)))
+ .add(newNode("node4-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null))))
+ .build();
+
+ // start all the replicas
+ clusterState = allocation.reroute(clusterState, "reroute");
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
+ clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
+ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
+
+ ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
+ logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica);
+
+ // fail the primary shard again and make sure the correct replica is promoted
+ ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
+ ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
+ assertThat(newState, not(equalTo(clusterState)));
+ clusterState = newState;
+ // the primary gets allocated on another node
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3));
+
+ ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
+ assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
+ assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));
+
+ Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
+ assertNotNull(replicaNodeVersion);
+ logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);
+
+ for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
+ if ("node1".equals(cursor.value.getId())) {
+ // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
+ continue;
+ }
+ Version nodeVer = cursor.value.getVersion();
+ assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
+ replicaNodeVersion.onOrAfter(nodeVer));
+ }
+
+ startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
+ logger.info("--> failing primary shard a second time, should select: {}", startedReplica);
+
+ // fail the primary shard again, and ensure the same thing happens
+ ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
+ newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail);
+ assertThat(newState, not(equalTo(clusterState)));
+ clusterState = newState;
+ // the primary gets allocated on another node
+ assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
+
+ newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
+ assertThat(newPrimaryShard, not(equalTo(secondPrimaryShardToFail)));
+ assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));
+
+ replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
+ assertNotNull(replicaNodeVersion);
+ logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);
+
+ for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
+ if (primaryShardToFail.currentNodeId().equals(cursor.value.getId()) ||
+ secondPrimaryShardToFail.currentNodeId().equals(cursor.value.getId())) {
+ // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
+ continue;
+ }
+ Version nodeVer = cursor.value.getVersion();
+ assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
+ replicaNodeVersion.onOrAfter(nodeVer));
+ }
+ }
}