summaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java27
1 files changed, 18 insertions, 9 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
index e93d071b0c..8268b98f34 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
@@ -320,14 +320,23 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
+ *
+ * Since replicas could possibly be on nodes with a older version of ES than
+ * the primary is, this will return replicas on the highest version of ES.
+ *
*/
- public ShardRouting activeReplica(ShardId shardId) {
- for (ShardRouting shardRouting : assignedShards(shardId)) {
- if (!shardRouting.primary() && shardRouting.active()) {
- return shardRouting;
- }
- }
- return null;
+ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
+ // It's possible for replicaNodeVersion to be null, when deassociating dead nodes
+ // that have been removed, the shards are failed, and part of the shard failing
+ // calls this method with an out-of-date RoutingNodes, where the version might not
+ // be accessible. Therefore, we need to protect against the version being null
+ // (meaning the node will be going away).
+ return assignedShards(shardId).stream()
+ .filter(shr -> !shr.primary() && shr.active())
+ .filter(shr -> node(shr.currentNodeId()) != null)
+ .max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(),
+ Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))))
+ .orElse(null);
}
/**
@@ -567,7 +576,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (failedShard.relocatingNodeId() == null) {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
- ShardRouting activeReplica = activeReplica(failedShard.shardId());
+ ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
@@ -596,7 +605,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
assert failedShard.active();
if (failedShard.primary()) {
// promote active replica to primary if active replica exists
- ShardRouting activeReplica = activeReplica(failedShard.shardId());
+ ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {