diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/routing/allocation')
18 files changed, 343 insertions, 125 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 54f9b6855a..da0fea69c6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collections; @@ -98,7 +99,7 @@ public class AllocationService extends AbstractComponent { if (withReroute) { reroute(allocation); } - final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString()); logClusterHealthStateChange( @@ -107,37 +108,44 @@ public class AllocationService extends AbstractComponent { "shards started [" + startedShardsAsString + "] ..." ); return result; - } + } - protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) { - return buildChangedResult(metaData, routingNodes, new RoutingExplanations()); + protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) { + return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations()); } - protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) { - final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build(); - MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable); - return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations); + + protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes, + RoutingExplanations explanations) { + final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build(); + MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable); + return new RoutingAllocation.Result(true, newRoutingTable.validateRaiseException(newMetaData), newMetaData, explanations); } /** - * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. + * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically + * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on + * the changes made during this allocation. * - * @param currentMetaData {@link MetaData} object from before the routing table was changed. + * @param oldMetaData {@link MetaData} object from before the routing table was changed. + * @param oldRoutingTable {@link RoutingTable} from before the change. * @param newRoutingTable new {@link RoutingTable} created by the allocation change * @return adapted {@link MetaData}, potentially the original one if no change was needed. */ - static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) { - // make sure index meta data and routing tables are in sync w.r.t active allocation ids + static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) { MetaData.Builder metaDataBuilder = null; - for (IndexRoutingTable indexRoutingTable : newRoutingTable) { - final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex()); - if (indexMetaData == null) { - throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName()); + for (IndexRoutingTable newIndexTable : newRoutingTable) { + final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex()); + if (oldIndexMetaData == null) { + throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName()); } IndexMetaData.Builder indexMetaDataBuilder = null; - for (IndexShardRoutingTable shardRoutings : indexRoutingTable) { - Set<String> activeAllocationIds = shardRoutings.activeShards().stream() + for (IndexShardRoutingTable newShardTable : newIndexTable) { + final ShardId shardId = newShardTable.shardId(); + + // update activeAllocationIds + Set<String> activeAllocationIds = newShardTable.activeShards().stream() .map(ShardRouting::allocationId) .filter(Objects::nonNull) .map(AllocationId::getId) @@ -145,19 +153,44 @@ public class AllocationService extends AbstractComponent { // only update active allocation ids if there is an active shard if (activeAllocationIds.isEmpty() == false) { // get currently stored allocation ids - Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id()); + Set<String> storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id()); if (activeAllocationIds.equals(storedAllocationIds) == false) { if (indexMetaDataBuilder == null) { - indexMetaDataBuilder = IndexMetaData.builder(indexMetaData); + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); } + indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds); + } + } - indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds); + // update primary terms + final ShardRouting newPrimary = newShardTable.primaryShard(); + if (newPrimary == null) { + throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); + } + final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard(); + if (oldPrimary == null) { + throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); + } + // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not* + // update them when a primary relocates + if (newPrimary.unassigned() || + newPrimary.isSameAllocation(oldPrimary) || + // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to + // be initializing. However, when the target shard is activated, we still want the primary term to staty + // the same + (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) { + // do nothing + } else { + // incrementing the primary term + if (indexMetaDataBuilder == null) { + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); } + indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1); } } if (indexMetaDataBuilder != null) { if (metaDataBuilder == null) { - metaDataBuilder = MetaData.builder(currentMetaData); + metaDataBuilder = MetaData.builder(oldMetaData); } metaDataBuilder.put(indexMetaDataBuilder); } @@ -165,7 +198,7 @@ public class AllocationService extends AbstractComponent { if (metaDataBuilder != null) { return metaDataBuilder.build(); } else { - return currentMetaData; + return oldMetaData; } } @@ -196,7 +229,7 @@ public class AllocationService extends AbstractComponent { } gatewayAllocator.applyFailedShards(allocation); reroute(allocation); - final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString()); logClusterHealthStateChange( new ClusterStateHealth(clusterState), @@ -243,7 +276,7 @@ public class AllocationService extends AbstractComponent { // the assumption is that commands will move / act on shards (or fail through exceptions) // so, there will always be shard "movements", so no need to check on reroute reroute(allocation); - RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations); logClusterHealthStateChange( new ClusterStateHealth(clusterState), new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), @@ -252,6 +285,7 @@ public class AllocationService extends AbstractComponent { return result; } + /** * Reroutes the routing table based on the live nodes. * <p> @@ -275,7 +309,7 @@ public class AllocationService extends AbstractComponent { if (!reroute(allocation)) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } - RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); logClusterHealthStateChange( new ClusterStateHealth(clusterState), new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), @@ -412,8 +446,8 @@ public class AllocationService extends AbstractComponent { boolean changed = false; for (ShardRouting routing : replicas) { changed |= applyFailedShard(allocation, routing, false, - new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", - null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); + new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); } return changed; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 4e6ba0fb5a..536806c083 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -44,7 +44,7 @@ import static java.util.Collections.unmodifiableSet; public class RoutingAllocation { /** - * this class is used to describe results of a {@link RoutingAllocation} + * this class is used to describe results of a {@link RoutingAllocation} */ public static class Result { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 8102f20679..97a07169d2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; @@ -101,6 +102,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } @Override + public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) { + final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + return balancer.weighShard(shard); + } + + @Override public boolean allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { /* with no nodes this is pointless */ @@ -298,6 +305,29 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return balanceByWeights(); } + public Map<DiscoveryNode, Float> weighShard(ShardRouting shard) { + final NodeSorter sorter = newNodeSorter(); + final ModelNode[] modelNodes = sorter.modelNodes; + final float[] weights = sorter.weights; + + buildWeightOrderedIndices(sorter); + Map<DiscoveryNode, Float> nodes = new HashMap<>(modelNodes.length); + float currentNodeWeight = 0.0f; + for (int i = 0; i < modelNodes.length; i++) { + if (modelNodes[i].getNodeId().equals(shard.currentNodeId())) { + // If a node was found with the shard, use that weight instead of 0.0 + currentNodeWeight = weights[i]; + break; + } + } + + for (int i = 0; i < modelNodes.length; i++) { + final float delta = currentNodeWeight - weights[i]; + nodes.put(modelNodes[i].getRoutingNode().node(), delta); + } + return nodes; + } + /** * Balances the nodes on the cluster model according to the weight * function. The configured threshold is the minimum delta between the diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index 0bf07e8cba..aa59e7788f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -19,8 +19,11 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import java.util.Map; /** * <p> * A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster. @@ -40,4 +43,15 @@ public interface ShardsAllocator { * @return <code>true</code> if the allocation has changed, otherwise <code>false</code> */ boolean allocate(RoutingAllocation allocation); + + /** + * Returns a map of node to a float "weight" of where the allocator would like to place the shard. + * Higher weights signify greater desire to place the shard on that node. + * Does not modify the allocation at all. + * + * @param allocation current node allocation + * @param shard shard to weigh + * @return map of nodes to float weights + */ + Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 227ec27746..baa0a3b1c0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -110,7 +110,8 @@ public class AwarenessAllocationDecider extends AllocationDecider { this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings)); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, + this::setForcedAwarenessAttributes); } private void setForcedAwarenessAttributes(Settings forceSettings) { @@ -150,7 +151,7 @@ public class AwarenessAllocationDecider extends AllocationDecider { private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { if (awarenessAttributes.length == 0) { - return allocation.decision(Decision.YES, NAME, "no allocation awareness enabled"); + return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled"); } IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); @@ -158,7 +159,7 @@ public class AwarenessAllocationDecider extends AllocationDecider { for (String awarenessAttribute : awarenessAttributes) { // the node the shard exists on must be associated with an awareness attribute if (!node.node().attributes().containsKey(awarenessAttribute)) { - return allocation.decision(Decision.NO, NAME, "node does not contain awareness attribute: [%s]", awarenessAttribute); + return allocation.decision(Decision.NO, NAME, "node does not contain the awareness attribute: [%s]", awarenessAttribute); } // build attr_value -> nodes map @@ -180,7 +181,8 @@ public class AwarenessAllocationDecider extends AllocationDecider { String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); if (!node.nodeId().equals(nodeId)) { // we work on different nodes, move counts around - shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), 0, -1); + shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), + 0, -1); shardPerAttribute.addTo(node.node().attributes().get(awarenessAttribute), 1); } } else { @@ -215,8 +217,15 @@ public class AwarenessAllocationDecider extends AllocationDecider { // if we are above with leftover, then we know we are not good, even with mod if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) { return allocation.decision(Decision.NO, NAME, - "too many shards on node for attribute: [%s], required per attribute: [%d], node count: [%d], leftover: [%d]", - awarenessAttribute, requiredCountPerAttribute, currentNodeCount, leftoverPerAttribute); + "there are too many shards on the node for attribute [%s], there are [%d] total shards for the index " + + " and [%d] total attributes values, expected the node count [%d] to be lower or equal to the required " + + "number of shards per attribute [%d] plus leftover [%d]", + awarenessAttribute, + shardCount, + numberOfAttributes, + currentNodeCount, + requiredCountPerAttribute, + leftoverPerAttribute); } // all is well, we are below or same as average if (currentNodeCount <= requiredCountPerAttribute) { @@ -224,6 +233,6 @@ public class AwarenessAllocationDecider extends AllocationDecider { } } - return allocation.decision(Decision.YES, NAME, "node meets awareness requirements"); + return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java index 84e974aceb..740c99016d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java @@ -78,7 +78,8 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { } else if ("indices_all_active".equalsIgnoreCase(typeString) || "indicesAllActive".equalsIgnoreCase(typeString)) { return ClusterRebalanceType.INDICES_ALL_ACTIVE; } - throw new IllegalArgumentException("Illegal value for " + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString); + throw new IllegalArgumentException("Illegal value for " + + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString); } } @@ -90,10 +91,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { try { type = CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.get(settings); } catch (IllegalStateException e) { - logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings)); + logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings)); type = ClusterRebalanceType.INDICES_ALL_ACTIVE; } - logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), type.toString().toLowerCase(Locale.ROOT)); + logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), + type.toString().toLowerCase(Locale.ROOT)); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType); } @@ -112,11 +116,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) { // check if there are unassigned primaries. if ( allocation.routingNodes().hasUnassignedPrimaries() ) { - return allocation.decision(Decision.NO, NAME, "cluster has unassigned primary shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has unassigned primary shards and rebalance type is set to [%s]", type); } // check if there are initializing primaries that don't have a relocatingNodeId entry. if ( allocation.routingNodes().hasInactivePrimaries() ) { - return allocation.decision(Decision.NO, NAME, "cluster has inactive primary shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has inactive primary shards and rebalance type is set to [%s]", type); } return allocation.decision(Decision.YES, NAME, "all primary shards are active"); @@ -124,15 +130,17 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) { // check if there are unassigned shards. if (allocation.routingNodes().hasUnassignedShards() ) { - return allocation.decision(Decision.NO, NAME, "cluster has unassigned shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has unassigned shards and rebalance type is set to [%s]", type); } // in case all indices are assigned, are there initializing shards which // are not relocating? if ( allocation.routingNodes().hasInactiveShards() ) { - return allocation.decision(Decision.NO, NAME, "cluster has inactive shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has inactive shards and rebalance type is set to [%s]", type); } } // type == Type.ALWAYS - return allocation.decision(Decision.YES, NAME, "all shards are active"); + return allocation.decision(Decision.YES, NAME, "all shards are active, rebalance type is [%s]", type); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index fe6bf918dc..2c46f7bd54 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -53,7 +53,8 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { super(settings); this.clusterConcurrentRebalance = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.get(settings); logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, + this::setClusterConcurrentRebalance); } private void setClusterConcurrentRebalance(int concurrentRebalance) { @@ -63,12 +64,16 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { if (clusterConcurrentRebalance == -1) { - return allocation.decision(Decision.YES, NAME, "all concurrent rebalances are allowed"); + return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); } - if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) { - return allocation.decision(Decision.NO, NAME, "too many concurrent rebalances [%d], limit: [%d]", - allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance); + int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); + if (relocatingShards >= clusterConcurrentRebalance) { + return allocation.decision(Decision.NO, NAME, + "too many shards are concurrently rebalancing [%d], limit: [%d]", + relocatingShards, clusterConcurrentRebalance); } - return allocation.decision(Decision.YES, NAME, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance); + return allocation.decision(Decision.YES, NAME, + "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", + clusterConcurrentRebalance, relocatingShards); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java index 02fc2fef94..ebf9230290 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; @@ -146,6 +147,11 @@ public abstract class Decision implements ToXContent { public abstract String label(); /** + * Return the list of all decisions that make up this decision + */ + public abstract List<Decision> getDecisions(); + + /** * Simple class representing a single decision */ public static class Single extends Decision { @@ -191,6 +197,11 @@ public abstract class Decision implements ToXContent { return this.label; } + @Override + public List<Decision> getDecisions() { + return Collections.singletonList(this); + } + /** * Returns the explanation string, fully formatted. Only formats the string once */ @@ -202,11 +213,35 @@ public abstract class Decision implements ToXContent { } @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + Decision.Single s = (Decision.Single) object; + return this.type == s.type && + this.label.equals(s.label) && + this.getExplanation().equals(s.getExplanation()); + } + + @Override + public int hashCode() { + int result = this.type.hashCode(); + result = 31 * result + this.label.hashCode(); + result = 31 * result + this.getExplanation().hashCode(); + return result; + } + + @Override public String toString() { - if (explanation == null) { - return type + "()"; + if (explanationString != null || explanation != null) { + return type + "(" + getExplanation() + ")"; } - return type + "(" + getExplanation() + ")"; + return type + "()"; } @Override @@ -259,6 +294,31 @@ public abstract class Decision implements ToXContent { } @Override + public List<Decision> getDecisions() { + return Collections.unmodifiableList(this.decisions); + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + final Decision.Multi m = (Decision.Multi) object; + + return this.decisions.equals(m.decisions); + } + + @Override + public int hashCode() { + return 31 * decisions.hashCode(); + } + + @Override public String toString() { StringBuilder sb = new StringBuilder(); for (Decision decision : decisions) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index e2124558f2..890bbd3c31 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -164,7 +164,8 @@ public class DiskThresholdDecider extends AllocationDecider { reroute = true; explanation = "high disk watermark exceeded on one or more nodes"; } else { - logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute", + logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", node, DiskThresholdDecider.this.rerouteInterval); } nodeHasPassedWatermark.add(node); @@ -183,7 +184,8 @@ public class DiskThresholdDecider extends AllocationDecider { explanation = "one or more nodes has gone under the high or low watermark"; nodeHasPassedWatermark.remove(node); } else { - logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred in the last [{}], skipping reroute", + logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", node, DiskThresholdDecider.this.rerouteInterval); } } @@ -238,13 +240,15 @@ public class DiskThresholdDecider extends AllocationDecider { private void setLowWatermark(String lowWatermark) { // Watermark is expressed in terms of used data, but we need "free" data watermark this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark); - this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); + this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); } private void setHighWatermark(String highWatermark) { // Watermark is expressed in terms of used data, but we need "free" data watermark this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark); - this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); + this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); } // For Testing @@ -299,7 +303,8 @@ public class DiskThresholdDecider extends AllocationDecider { * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size * of all shards */ - public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway, String dataPath) { + public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, + boolean subtractShardsMovingAway, String dataPath) { long totalSize = 0; for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { String actualPath = clusterInfo.getDataPath(routing); @@ -353,7 +358,8 @@ public class DiskThresholdDecider extends AllocationDecider { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation", freeBytesThresholdLow, freeBytes, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "the node is above the low watermark and has less than required [%s] free, free: [%s]", freeBytesThresholdLow, new ByteSizeValue(freeBytes)); } else if (freeBytes > freeBytesThresholdHigh.bytes()) { // Allow the shard to be allocated because it is primary that @@ -363,7 +369,8 @@ public class DiskThresholdDecider extends AllocationDecider { "but allowing allocation because primary has never been allocated", freeBytesThresholdLow, freeBytes, node.nodeId()); } - return allocation.decision(Decision.YES, NAME, "primary has never been allocated before"); + return allocation.decision(Decision.YES, NAME, + "the node is above the low watermark, but this primary shard has never been allocated before"); } else { // Even though the primary has never been allocated, the node is // above the high watermark, so don't allow allocating the shard @@ -372,7 +379,9 @@ public class DiskThresholdDecider extends AllocationDecider { "preventing allocation even though primary has never been allocated", freeBytesThresholdHigh, freeBytes, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "the node is above the high watermark even though this shard has never been allocated " + + "and has less than required [%s] free on node, free: [%s]", freeBytesThresholdHigh, new ByteSizeValue(freeBytes)); } } @@ -386,7 +395,8 @@ public class DiskThresholdDecider extends AllocationDecider { Strings.format1Decimals(usedDiskThresholdLow, "%"), Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]", usedDiskThresholdLow, freeDiskPercentage); } else if (freeDiskPercentage > freeDiskThresholdHigh) { // Allow the shard to be allocated because it is primary that @@ -397,7 +407,8 @@ public class DiskThresholdDecider extends AllocationDecider { Strings.format1Decimals(usedDiskThresholdLow, "%"), Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId()); } - return allocation.decision(Decision.YES, NAME, "primary has never been allocated before"); + return allocation.decision(Decision.YES, NAME, + "the node is above the low watermark, but this primary shard has never been allocated before"); } else { // Even though the primary has never been allocated, the node is // above the high watermark, so don't allow allocating the shard @@ -407,7 +418,9 @@ public class DiskThresholdDecider extends AllocationDecider { Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "the node is above the high watermark even though this shard has never been allocated " + + "and has more than allowed [%s%%] used disk, free: [%s%%]", usedDiskThresholdHigh, freeDiskPercentage); } } @@ -417,19 +430,29 @@ public class DiskThresholdDecider extends AllocationDecider { double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) { - logger.warn("after allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation", + logger.warn("after allocating, node [{}] would have less than the required " + + "{} free bytes threshold ({} bytes free), preventing allocation", node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard); - return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "after allocating the shard to this node, it would be above the high watermark " + + "and have less than required [%s] free, free: [%s]", freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard)); } if (freeSpaceAfterShard < freeDiskThresholdHigh) { - logger.warn("after allocating, node [{}] would have more than the allowed {} free disk threshold ({} free), preventing allocation", + logger.warn("after allocating, node [{}] would have more than the allowed " + + "{} free disk threshold ({} free), preventing allocation", node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%")); - return allocation.decision(Decision.NO, NAME, "after allocation more than allowed [%s%%] used disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "after allocating the shard to this node, it would be above the high watermark " + + "and have more than allowed [%s%%] used disk, free: [%s%%]", usedDiskThresholdLow, freeSpaceAfterShard); } - return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes)); + return allocation.decision(Decision.YES, NAME, + "enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]", + new ByteSizeValue(freeBytes), + new ByteSizeValue(shardSize), + new ByteSizeValue(freeBytesAfterShard)); } @Override @@ -453,14 +476,17 @@ public class DiskThresholdDecider extends AllocationDecider { logger.trace("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes); } if (dataPath == null || usage.getPath().equals(dataPath) == false) { - return allocation.decision(Decision.YES, NAME, "shard is not allocated on the most utilized disk"); + return allocation.decision(Decision.YES, NAME, + "this shard is not allocated on the most utilized disk and can remain"); } if (freeBytes < freeBytesThresholdHigh.bytes()) { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain", freeBytesThresholdHigh, freeBytes, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "after allocating this shard this node would be above the high watermark " + + "and there would be less than required [%s] free on node, free: [%s]", freeBytesThresholdHigh, new ByteSizeValue(freeBytes)); } if (freeDiskPercentage < freeDiskThresholdHigh) { @@ -468,11 +494,14 @@ public class DiskThresholdDecider extends AllocationDecider { logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain", freeDiskThresholdHigh, freeDiskPercentage, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s%%] free disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "after allocating this shard this node would be above the high watermark " + + "and there would be less than required [%s%%] free disk on node, free: [%s%%]", freeDiskThresholdHigh, freeDiskPercentage); } - return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes)); + return allocation.decision(Decision.YES, NAME, + "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes)); } private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) { @@ -543,7 +572,8 @@ public class DiskThresholdDecider extends AllocationDecider { try { return RatioValue.parseRatioValue(watermark).getAsPercent(); } catch (ElasticsearchParseException ex) { - // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately + // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two + // cases separately return 100.0; } } @@ -556,7 +586,8 @@ public class DiskThresholdDecider extends AllocationDecider { try { return ByteSizeValue.parseBytesSizeValue(watermark, settingName); } catch (ElasticsearchParseException ex) { - // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately + // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two + // cases separately return ByteSizeValue.parseBytesSizeValue("0b", settingName); } } @@ -583,7 +614,7 @@ public class DiskThresholdDecider extends AllocationDecider { private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) { // Always allow allocation if the decider is disabled if (!enabled) { - return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled"); + return allocation.decision(Decision.YES, NAME, "the disk threshold decider is disabled"); } // Allow allocation regardless if only a single data node is available @@ -591,7 +622,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isTraceEnabled()) { logger.trace("only a single data node is present, allowing allocation"); } - return allocation.decision(Decision.YES, NAME, "only a single data node is present"); + return allocation.decision(Decision.YES, NAME, "there is only a single data node present"); } // Fail open there is no info available @@ -600,7 +631,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isTraceEnabled()) { logger.trace("cluster info unavailable for disk threshold decider, allowing allocation."); } - return allocation.decision(Decision.YES, NAME, "cluster info unavailable"); + return allocation.decision(Decision.YES, NAME, "the cluster info is unavailable"); } // Fail open if there are no disk usages available @@ -608,7 +639,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isTraceEnabled()) { logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation"); } - return allocation.decision(Decision.YES, NAME, "disk usages unavailable"); + return allocation.decision(Decision.YES, NAME, "disk usages are unavailable"); } return null; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java index 0b69ba2a19..38a2a39fc7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java @@ -32,8 +32,9 @@ import org.elasticsearch.common.settings.Settings; import java.util.Locale; /** - * This allocation decider allows shard allocations / rebalancing via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / - * {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}. + * This allocation decider allows shard allocations / rebalancing via the cluster wide settings + * {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting + * {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}. * The per index settings overrides the cluster wide setting. * * <p> @@ -98,7 +99,7 @@ public class EnableAllocationDecider extends AllocationDecider { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (allocation.ignoreDisable()) { - return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored"); + return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of allocation"); } final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); @@ -133,7 +134,7 @@ public class EnableAllocationDecider extends AllocationDecider { @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { if (allocation.ignoreDisable()) { - return allocation.decision(Decision.YES, NAME, "rebalance disabling is ignored"); + return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of relocation"); } Settings indexSettings = allocation.routingNodes().metaData().getIndexSafe(shardRouting.index()).getSettings(); @@ -167,7 +168,8 @@ public class EnableAllocationDecider extends AllocationDecider { /** * Allocation values or rather their string representation to be used used with - * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} + * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / + * {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} * via cluster / index settings. */ public enum Allocation { @@ -193,7 +195,8 @@ public class EnableAllocationDecider extends AllocationDecider { /** * Rebalance values or rather their string representation to be used used with - * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING} + * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / + * {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING} * via cluster / index settings. */ public enum Rebalance { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index d1aa0d8b58..eb59c26121 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -50,11 +50,14 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; * would disallow the allocation. Filters are applied in the following order: * <ol> * <li><tt>required</tt> - filters required allocations. - * If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate on the filtered node</li> + * If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate + * on the filtered node</li> * <li><tt>include</tt> - filters "allowed" allocations. - * If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for the filtered node</li> + * If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for + * the filtered node</li> * <li><tt>exclude</tt> - filters "prohibited" allocations. - * If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the filtered node</li> + * If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the + * filtered node</li> * </ol> */ public class FilterAllocationDecider extends AllocationDecider { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java index eb9c5cf8ee..95540d89a6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -52,7 +52,7 @@ public class NodeVersionAllocationDecider extends AllocationDecider { return isVersionCompatible(shardRouting.restoreSource(), node, allocation); } else { // fresh primary, we can allocate wherever - return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere"); + return allocation.decision(Decision.YES, NAME, "the primary shard is new and can be allocated anywhere"); } } else { // relocating primary, only migrate to newer host @@ -70,16 +70,17 @@ public class NodeVersionAllocationDecider extends AllocationDecider { } } - private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) { + private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, + RoutingAllocation allocation) { final RoutingNode source = routingNodes.node(sourceNodeId); if (target.node().version().onOrAfter(source.node().version())) { /* we can allocate if we can recover from a node that is younger or on the same version * if the primary is already running on a newer version that won't work due to possible * differences in the lucene index format etc.*/ - return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than source node version [%s]", + return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than source node version [%s]", target.node().version(), source.node().version()); } else { - return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than source node version [%s]", + return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the source node version [%s]", target.node().version(), source.node().version()); } } @@ -87,10 +88,10 @@ public class NodeVersionAllocationDecider extends AllocationDecider { private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) { if (target.node().version().onOrAfter(restoreSource.version())) { /* we can allocate if we can restore from a snapshot that is older or on the same version */ - return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than snapshot version [%s]", + return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than snapshot version [%s]", target.node().version(), restoreSource.version()); } else { - return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than snapshot version [%s]", + return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the snapshot version [%s]", target.node().version(), restoreSource.version()); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java index 039abd8749..869c631306 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java @@ -41,8 +41,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider // its ok to check for active here, since in relocation, a shard is split into two in routing // nodes, once relocating, and one initializing if (!allocation.routingNodes().allReplicasActive(shardRouting)) { - return allocation.decision(Decision.NO, NAME, "not all replicas are active in cluster"); + return allocation.decision(Decision.NO, NAME, "rebalancing can not occur if not all replicas are active in the cluster"); } - return allocation.decision(Decision.YES, NAME, "all replicas are active in cluster"); + return allocation.decision(Decision.YES, NAME, "all replicas are active in the cluster, rebalancing can occur"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java index 1c5a3f93b7..59ab67c309 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java @@ -45,12 +45,12 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide @Override public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { if (shardRouting.primary()) { - return allocation.decision(Decision.YES, NAME, "shard is primary"); + return allocation.decision(Decision.YES, NAME, "shard is primary and can be allocated"); } ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting); if (primary == null) { - return allocation.decision(Decision.NO, NAME, "primary shard is not yet active"); + return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active"); } - return allocation.decision(Decision.YES, NAME, "primary is already active"); + return allocation.decision(Decision.YES, NAME, "primary shard for this replica is already active"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java index 44eb7d0e2f..f0b4fdf35c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java @@ -61,7 +61,8 @@ public class SameShardAllocationDecider extends AllocationDecider { Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting); for (ShardRouting assignedShard : assignedShards) { if (node.nodeId().equals(assignedShard.currentNodeId())) { - return allocation.decision(Decision.NO, NAME, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId()); + return allocation.decision(Decision.NO, NAME, + "the shard cannot be allocated on the same node id [%s] on which it already exists", node.nodeId()); } } if (sameHost) { @@ -85,7 +86,7 @@ public class SameShardAllocationDecider extends AllocationDecider { for (ShardRouting assignedShard : assignedShards) { if (checkNode.nodeId().equals(assignedShard.currentNodeId())) { return allocation.decision(Decision.NO, NAME, - "shard cannot be allocated on same host [%s] it already exists on", node.nodeId()); + "shard cannot be allocated on the same host [%s] on which it already exists", node.nodeId()); } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java index 04247525f1..eb25651635 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java @@ -93,7 +93,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { final int clusterShardLimit = this.clusterShardLimit; if (indexShardLimit <= 0 && clusterShardLimit <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0", + return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0", indexShardLimit, clusterShardLimit); } @@ -110,14 +110,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { } } if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]", nodeShardCount, clusterShardLimit); } if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, + "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]", shardRouting.index(), indexShardCount, indexShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node", + return allocation.decision(Decision.YES, NAME, + "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node", indexShardLimit, clusterShardLimit); } @@ -130,7 +132,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { final int clusterShardLimit = this.clusterShardLimit; if (indexShardLimit <= 0 && clusterShardLimit <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0", + return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0", indexShardLimit, clusterShardLimit); } @@ -149,14 +151,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { // Subtle difference between the `canAllocate` and `canRemain` is that // this checks > while canAllocate checks >= if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]", nodeShardCount, clusterShardLimit); } if (indexShardLimit > 0 && indexShardCount > indexShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, + "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]", shardRouting.index(), indexShardCount, indexShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node", + return allocation.decision(Decision.YES, NAME, + "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node", indexShardLimit, clusterShardLimit); } @@ -168,7 +172,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { final int clusterShardLimit = this.clusterShardLimit; if (clusterShardLimit <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0", + return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [cluster: %d] <= 0", clusterShardLimit); } @@ -181,10 +185,10 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { nodeShardCount++; } if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]", nodeShardCount, clusterShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node", + return allocation.decision(Decision.YES, NAME, "the shard count is under node limit [%d] of total shards per node", clusterShardLimit); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java index d656afc803..54cfb6407d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java @@ -54,7 +54,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { } /** - * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from given settings + * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from + * given settings * * @param settings {@link org.elasticsearch.common.settings.Settings} to use */ @@ -66,7 +67,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { public SnapshotInProgressAllocationDecider(Settings settings, ClusterSettings clusterSettings) { super(settings); enableRelocation = CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, this::setEnableRelocation); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, + this::setEnableRelocation); } private void setEnableRelocation(boolean enableRelocation) { @@ -104,14 +106,18 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId()); - if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) { - logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId()); + if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && + shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) { + if (logger.isTraceEnabled()) { + logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", + shardRouting.shardId(), shardSnapshotStatus.nodeId()); + } return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]", shardRouting.shardId(), shardSnapshotStatus.nodeId()); } } } - return allocation.decision(Decision.YES, NAME, "shard not primary or relocation disabled"); + return allocation.decision(Decision.YES, NAME, "the shard is not primary or relocation is disabled"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index ca6b312da4..6eb44351c7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -84,11 +84,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider { concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings); concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, this::setPrimariesInitialRecoveries); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setConcurrentIncomingRecoverries); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, + this::setPrimariesInitialRecoveries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, + this::setConcurrentIncomingRecoverries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, + this::setConcurrentOutgoingRecoverries); - logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries); + logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " + + "node_initial_primaries_recoveries [{}]", + concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries); } private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) { @@ -118,7 +123,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider { } } if (primariesInRecovery >= primariesInitialRecoveries) { - return allocation.decision(Decision.THROTTLE, NAME, "too many primaries currently recovering [%d], limit: [%d]", + return allocation.decision(Decision.THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]", primariesInRecovery, primariesInitialRecoveries); } else { return allocation.decision(Decision.YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries); @@ -137,13 +142,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider { int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId()); int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId()); if (currentOutRecoveries >= concurrentOutgoingRecoveries) { - return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards currently recovering [%d], limit: [%d]", + return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]", currentOutRecoveries, concurrentOutgoingRecoveries); } else if (currentInRecoveries >= concurrentIncomingRecoveries) { - return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards currently recovering [%d], limit: [%d]", + return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]", currentInRecoveries, concurrentIncomingRecoveries); } else { - return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d] incoming: [%d]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries); + return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]", + currentOutRecoveries, + concurrentOutgoingRecoveries, + currentInRecoveries, + concurrentIncomingRecoveries); } } } |