summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/cluster/routing
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/routing')
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java4
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java2
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java90
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java2
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java30
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java14
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java23
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java24
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java17
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java66
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java83
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java15
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java9
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java13
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java4
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java6
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java5
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java26
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java16
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java25
20 files changed, 344 insertions, 130 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
index a34405c09e..c27e0a9beb 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
@@ -586,10 +586,6 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
if (indicesRouting == null) {
throw new IllegalStateException("once build is called the builder cannot be reused");
}
- // normalize the versions right before we build it...
- for (ObjectCursor<IndexRoutingTable> indexRoutingTable : indicesRouting.values()) {
- indicesRouting.put(indexRoutingTable.value.getIndex().getName(), indexRoutingTable.value);
- }
RoutingTable table = new RoutingTable(version, indicesRouting.build());
indicesRouting = null;
return table;
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
index be7d90a1fe..b92fecf0f7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
@@ -139,7 +139,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
}
- UnassignedInfo(StreamInput in) throws IOException {
+ public UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.unassignedTimeMillis = in.readLong();
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
index 54f9b6855a..da0fea69c6 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
@@ -42,6 +42,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Collections;
@@ -98,7 +99,7 @@ public class AllocationService extends AbstractComponent {
if (withReroute) {
reroute(allocation);
}
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
logClusterHealthStateChange(
@@ -107,37 +108,44 @@ public class AllocationService extends AbstractComponent {
"shards started [" + startedShardsAsString + "] ..."
);
return result;
- }
+ }
- protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) {
- return buildChangedResult(metaData, routingNodes, new RoutingExplanations());
+ protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) {
+ return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations());
}
- protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) {
- final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build();
- MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable);
- return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations);
+
+ protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes,
+ RoutingExplanations explanations) {
+ final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build();
+ MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
+ return new RoutingAllocation.Result(true, newRoutingTable.validateRaiseException(newMetaData), newMetaData, explanations);
}
/**
- * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}.
+ * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically
+ * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
+ * the changes made during this allocation.
*
- * @param currentMetaData {@link MetaData} object from before the routing table was changed.
+ * @param oldMetaData {@link MetaData} object from before the routing table was changed.
+ * @param oldRoutingTable {@link RoutingTable} from before the change.
* @param newRoutingTable new {@link RoutingTable} created by the allocation change
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
*/
- static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) {
- // make sure index meta data and routing tables are in sync w.r.t active allocation ids
+ static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) {
MetaData.Builder metaDataBuilder = null;
- for (IndexRoutingTable indexRoutingTable : newRoutingTable) {
- final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex());
- if (indexMetaData == null) {
- throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName());
+ for (IndexRoutingTable newIndexTable : newRoutingTable) {
+ final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex());
+ if (oldIndexMetaData == null) {
+ throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName());
}
IndexMetaData.Builder indexMetaDataBuilder = null;
- for (IndexShardRoutingTable shardRoutings : indexRoutingTable) {
- Set<String> activeAllocationIds = shardRoutings.activeShards().stream()
+ for (IndexShardRoutingTable newShardTable : newIndexTable) {
+ final ShardId shardId = newShardTable.shardId();
+
+ // update activeAllocationIds
+ Set<String> activeAllocationIds = newShardTable.activeShards().stream()
.map(ShardRouting::allocationId)
.filter(Objects::nonNull)
.map(AllocationId::getId)
@@ -145,19 +153,44 @@ public class AllocationService extends AbstractComponent {
// only update active allocation ids if there is an active shard
if (activeAllocationIds.isEmpty() == false) {
// get currently stored allocation ids
- Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id());
+ Set<String> storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id());
if (activeAllocationIds.equals(storedAllocationIds) == false) {
if (indexMetaDataBuilder == null) {
- indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
+ indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
+ indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds);
+ }
+ }
- indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds);
+ // update primary terms
+ final ShardRouting newPrimary = newShardTable.primaryShard();
+ if (newPrimary == null) {
+ throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+ }
+ final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard();
+ if (oldPrimary == null) {
+ throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+ }
+ // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not*
+ // update them when a primary relocates
+ if (newPrimary.unassigned() ||
+ newPrimary.isSameAllocation(oldPrimary) ||
+ // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to
+ // be initializing. However, when the target shard is activated, we still want the primary term to staty
+ // the same
+ (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) {
+ // do nothing
+ } else {
+ // incrementing the primary term
+ if (indexMetaDataBuilder == null) {
+ indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
+ indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
}
}
if (indexMetaDataBuilder != null) {
if (metaDataBuilder == null) {
- metaDataBuilder = MetaData.builder(currentMetaData);
+ metaDataBuilder = MetaData.builder(oldMetaData);
}
metaDataBuilder.put(indexMetaDataBuilder);
}
@@ -165,7 +198,7 @@ public class AllocationService extends AbstractComponent {
if (metaDataBuilder != null) {
return metaDataBuilder.build();
} else {
- return currentMetaData;
+ return oldMetaData;
}
}
@@ -196,7 +229,7 @@ public class AllocationService extends AbstractComponent {
}
gatewayAllocator.applyFailedShards(allocation);
reroute(allocation);
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
@@ -243,7 +276,7 @@ public class AllocationService extends AbstractComponent {
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations);
+ RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -252,6 +285,7 @@ public class AllocationService extends AbstractComponent {
return result;
}
+
/**
* Reroutes the routing table based on the live nodes.
* <p>
@@ -275,7 +309,7 @@ public class AllocationService extends AbstractComponent {
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -412,8 +446,8 @@ public class AllocationService extends AbstractComponent {
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
- new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
- null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
+ new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
+ null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
index 4e6ba0fb5a..536806c083 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
@@ -44,7 +44,7 @@ import static java.util.Collections.unmodifiableSet;
public class RoutingAllocation {
/**
- * this class is used to describe results of a {@link RoutingAllocation}
+ * this class is used to describe results of a {@link RoutingAllocation}
*/
public static class Result {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
index 8102f20679..97a07169d2 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
@@ -23,6 +23,7 @@ import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -101,6 +102,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
@Override
+ public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
+ final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
+ return balancer.weighShard(shard);
+ }
+
+ @Override
public boolean allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
/* with no nodes this is pointless */
@@ -298,6 +305,29 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return balanceByWeights();
}
+ public Map<DiscoveryNode, Float> weighShard(ShardRouting shard) {
+ final NodeSorter sorter = newNodeSorter();
+ final ModelNode[] modelNodes = sorter.modelNodes;
+ final float[] weights = sorter.weights;
+
+ buildWeightOrderedIndices(sorter);
+ Map<DiscoveryNode, Float> nodes = new HashMap<>(modelNodes.length);
+ float currentNodeWeight = 0.0f;
+ for (int i = 0; i < modelNodes.length; i++) {
+ if (modelNodes[i].getNodeId().equals(shard.currentNodeId())) {
+ // If a node was found with the shard, use that weight instead of 0.0
+ currentNodeWeight = weights[i];
+ break;
+ }
+ }
+
+ for (int i = 0; i < modelNodes.length; i++) {
+ final float delta = currentNodeWeight - weights[i];
+ nodes.put(modelNodes[i].getRoutingNode().node(), delta);
+ }
+ return nodes;
+ }
+
/**
* Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java
index 0bf07e8cba..aa59e7788f 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java
@@ -19,8 +19,11 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import java.util.Map;
/**
* <p>
* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster.
@@ -40,4 +43,15 @@ public interface ShardsAllocator {
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean allocate(RoutingAllocation allocation);
+
+ /**
+ * Returns a map of node to a float "weight" of where the allocator would like to place the shard.
+ * Higher weights signify greater desire to place the shard on that node.
+ * Does not modify the allocation at all.
+ *
+ * @param allocation current node allocation
+ * @param shard shard to weigh
+ * @return map of nodes to float weights
+ */
+ Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard);
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java
index 227ec27746..baa0a3b1c0 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java
@@ -110,7 +110,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);
setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
+ this::setForcedAwarenessAttributes);
}
private void setForcedAwarenessAttributes(Settings forceSettings) {
@@ -150,7 +151,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
- return allocation.decision(Decision.YES, NAME, "no allocation awareness enabled");
+ return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled");
}
IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
@@ -158,7 +159,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
- return allocation.decision(Decision.NO, NAME, "node does not contain awareness attribute: [%s]", awarenessAttribute);
+ return allocation.decision(Decision.NO, NAME, "node does not contain the awareness attribute: [%s]", awarenessAttribute);
}
// build attr_value -> nodes map
@@ -180,7 +181,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (!node.nodeId().equals(nodeId)) {
// we work on different nodes, move counts around
- shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), 0, -1);
+ shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute),
+ 0, -1);
shardPerAttribute.addTo(node.node().attributes().get(awarenessAttribute), 1);
}
} else {
@@ -215,8 +217,15 @@ public class AwarenessAllocationDecider extends AllocationDecider {
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return allocation.decision(Decision.NO, NAME,
- "too many shards on node for attribute: [%s], required per attribute: [%d], node count: [%d], leftover: [%d]",
- awarenessAttribute, requiredCountPerAttribute, currentNodeCount, leftoverPerAttribute);
+ "there are too many shards on the node for attribute [%s], there are [%d] total shards for the index " +
+ " and [%d] total attributes values, expected the node count [%d] to be lower or equal to the required " +
+ "number of shards per attribute [%d] plus leftover [%d]",
+ awarenessAttribute,
+ shardCount,
+ numberOfAttributes,
+ currentNodeCount,
+ requiredCountPerAttribute,
+ leftoverPerAttribute);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
@@ -224,6 +233,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
}
}
- return allocation.decision(Decision.YES, NAME, "node meets awareness requirements");
+ return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java
index 84e974aceb..740c99016d 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java
@@ -78,7 +78,8 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
} else if ("indices_all_active".equalsIgnoreCase(typeString) || "indicesAllActive".equalsIgnoreCase(typeString)) {
return ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
- throw new IllegalArgumentException("Illegal value for " + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
+ throw new IllegalArgumentException("Illegal value for " +
+ CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
}
}
@@ -90,10 +91,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
try {
type = CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.get(settings);
} catch (IllegalStateException e) {
- logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
+ logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'",
+ CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
+ CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
type = ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
- logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), type.toString().toLowerCase(Locale.ROOT));
+ logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(),
+ type.toString().toLowerCase(Locale.ROOT));
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
}
@@ -112,11 +116,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has unassigned primary shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has unassigned primary shards and rebalance type is set to [%s]", type);
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has inactive primary shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has inactive primary shards and rebalance type is set to [%s]", type);
}
return allocation.decision(Decision.YES, NAME, "all primary shards are active");
@@ -124,15 +130,17 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if (allocation.routingNodes().hasUnassignedShards() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has unassigned shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has unassigned shards and rebalance type is set to [%s]", type);
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has inactive shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has inactive shards and rebalance type is set to [%s]", type);
}
}
// type == Type.ALWAYS
- return allocation.decision(Decision.YES, NAME, "all shards are active");
+ return allocation.decision(Decision.YES, NAME, "all shards are active, rebalance type is [%s]", type);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java
index fe6bf918dc..2c46f7bd54 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java
@@ -53,7 +53,8 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
super(settings);
this.clusterConcurrentRebalance = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.get(settings);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
+ this::setClusterConcurrentRebalance);
}
private void setClusterConcurrentRebalance(int concurrentRebalance) {
@@ -63,12 +64,16 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (clusterConcurrentRebalance == -1) {
- return allocation.decision(Decision.YES, NAME, "all concurrent rebalances are allowed");
+ return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
- if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) {
- return allocation.decision(Decision.NO, NAME, "too many concurrent rebalances [%d], limit: [%d]",
- allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance);
+ int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
+ if (relocatingShards >= clusterConcurrentRebalance) {
+ return allocation.decision(Decision.NO, NAME,
+ "too many shards are concurrently rebalancing [%d], limit: [%d]",
+ relocatingShards, clusterConcurrentRebalance);
}
- return allocation.decision(Decision.YES, NAME, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
+ return allocation.decision(Decision.YES, NAME,
+ "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
+ clusterConcurrentRebalance, relocatingShards);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java
index 02fc2fef94..ebf9230290 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java
@@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -146,6 +147,11 @@ public abstract class Decision implements ToXContent {
public abstract String label();
/**
+ * Return the list of all decisions that make up this decision
+ */
+ public abstract List<Decision> getDecisions();
+
+ /**
* Simple class representing a single decision
*/
public static class Single extends Decision {
@@ -191,6 +197,11 @@ public abstract class Decision implements ToXContent {
return this.label;
}
+ @Override
+ public List<Decision> getDecisions() {
+ return Collections.singletonList(this);
+ }
+
/**
* Returns the explanation string, fully formatted. Only formats the string once
*/
@@ -202,11 +213,35 @@ public abstract class Decision implements ToXContent {
}
@Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+
+ Decision.Single s = (Decision.Single) object;
+ return this.type == s.type &&
+ this.label.equals(s.label) &&
+ this.getExplanation().equals(s.getExplanation());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = this.type.hashCode();
+ result = 31 * result + this.label.hashCode();
+ result = 31 * result + this.getExplanation().hashCode();
+ return result;
+ }
+
+ @Override
public String toString() {
- if (explanation == null) {
- return type + "()";
+ if (explanationString != null || explanation != null) {
+ return type + "(" + getExplanation() + ")";
}
- return type + "(" + getExplanation() + ")";
+ return type + "()";
}
@Override
@@ -259,6 +294,31 @@ public abstract class Decision implements ToXContent {
}
@Override
+ public List<Decision> getDecisions() {
+ return Collections.unmodifiableList(this.decisions);
+ }
+
+ @Override
+ public boolean equals(final Object object) {
+ if (this == object) {
+ return true;
+ }
+
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+
+ final Decision.Multi m = (Decision.Multi) object;
+
+ return this.decisions.equals(m.decisions);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * decisions.hashCode();
+ }
+
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Decision decision : decisions) {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
index e2124558f2..890bbd3c31 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
@@ -164,7 +164,8 @@ public class DiskThresholdDecider extends AllocationDecider {
reroute = true;
explanation = "high disk watermark exceeded on one or more nodes";
} else {
- logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
+ logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
+ "in the last [{}], skipping reroute",
node, DiskThresholdDecider.this.rerouteInterval);
}
nodeHasPassedWatermark.add(node);
@@ -183,7 +184,8 @@ public class DiskThresholdDecider extends AllocationDecider {
explanation = "one or more nodes has gone under the high or low watermark";
nodeHasPassedWatermark.remove(node);
} else {
- logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred in the last [{}], skipping reroute",
+ logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
+ "in the last [{}], skipping reroute",
node, DiskThresholdDecider.this.rerouteInterval);
}
}
@@ -238,13 +240,15 @@ public class DiskThresholdDecider extends AllocationDecider {
private void setLowWatermark(String lowWatermark) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
- this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
+ this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark,
+ CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
}
private void setHighWatermark(String highWatermark) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
- this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
+ this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
+ CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
}
// For Testing
@@ -299,7 +303,8 @@ public class DiskThresholdDecider extends AllocationDecider {
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
*/
- public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway, String dataPath) {
+ public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo,
+ boolean subtractShardsMovingAway, String dataPath) {
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
@@ -353,7 +358,8 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the low watermark and has less than required [%s] free, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytes));
} else if (freeBytes > freeBytesThresholdHigh.bytes()) {
// Allow the shard to be allocated because it is primary that
@@ -363,7 +369,8 @@ public class DiskThresholdDecider extends AllocationDecider {
"but allowing allocation because primary has never been allocated",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
+ return allocation.decision(Decision.YES, NAME,
+ "the node is above the low watermark, but this primary shard has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
@@ -372,7 +379,9 @@ public class DiskThresholdDecider extends AllocationDecider {
"preventing allocation even though primary has never been allocated",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the high watermark even though this shard has never been allocated " +
+ "and has less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
}
@@ -386,7 +395,8 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(usedDiskThresholdLow, "%"),
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdLow, freeDiskPercentage);
} else if (freeDiskPercentage > freeDiskThresholdHigh) {
// Allow the shard to be allocated because it is primary that
@@ -397,7 +407,8 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(usedDiskThresholdLow, "%"),
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
}
- return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
+ return allocation.decision(Decision.YES, NAME,
+ "the node is above the low watermark, but this primary shard has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
@@ -407,7 +418,9 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(freeDiskThresholdHigh, "%"),
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the high watermark even though this shard has never been allocated " +
+ "and has more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdHigh, freeDiskPercentage);
}
}
@@ -417,19 +430,29 @@ public class DiskThresholdDecider extends AllocationDecider {
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
- logger.warn("after allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation",
+ logger.warn("after allocating, node [{}] would have less than the required " +
+ "{} free bytes threshold ({} bytes free), preventing allocation",
node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard);
- return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating the shard to this node, it would be above the high watermark " +
+ "and have less than required [%s] free, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard));
}
if (freeSpaceAfterShard < freeDiskThresholdHigh) {
- logger.warn("after allocating, node [{}] would have more than the allowed {} free disk threshold ({} free), preventing allocation",
+ logger.warn("after allocating, node [{}] would have more than the allowed " +
+ "{} free disk threshold ({} free), preventing allocation",
node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%"));
- return allocation.decision(Decision.NO, NAME, "after allocation more than allowed [%s%%] used disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating the shard to this node, it would be above the high watermark " +
+ "and have more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdLow, freeSpaceAfterShard);
}
- return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes));
+ return allocation.decision(Decision.YES, NAME,
+ "enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]",
+ new ByteSizeValue(freeBytes),
+ new ByteSizeValue(shardSize),
+ new ByteSizeValue(freeBytesAfterShard));
}
@Override
@@ -453,14 +476,17 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.trace("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes);
}
if (dataPath == null || usage.getPath().equals(dataPath) == false) {
- return allocation.decision(Decision.YES, NAME, "shard is not allocated on the most utilized disk");
+ return allocation.decision(Decision.YES, NAME,
+ "this shard is not allocated on the most utilized disk and can remain");
}
if (freeBytes < freeBytesThresholdHigh.bytes()) {
if (logger.isDebugEnabled()) {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating this shard this node would be above the high watermark " +
+ "and there would be less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
if (freeDiskPercentage < freeDiskThresholdHigh) {
@@ -468,11 +494,14 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s%%] free disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating this shard this node would be above the high watermark " +
+ "and there would be less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdHigh, freeDiskPercentage);
}
- return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
+ return allocation.decision(Decision.YES, NAME,
+ "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
}
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
@@ -543,7 +572,8 @@ public class DiskThresholdDecider extends AllocationDecider {
try {
return RatioValue.parseRatioValue(watermark).getAsPercent();
} catch (ElasticsearchParseException ex) {
- // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately
+ // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
+ // cases separately
return 100.0;
}
}
@@ -556,7 +586,8 @@ public class DiskThresholdDecider extends AllocationDecider {
try {
return ByteSizeValue.parseBytesSizeValue(watermark, settingName);
} catch (ElasticsearchParseException ex) {
- // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately
+ // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
+ // cases separately
return ByteSizeValue.parseBytesSizeValue("0b", settingName);
}
}
@@ -583,7 +614,7 @@ public class DiskThresholdDecider extends AllocationDecider {
private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
// Always allow allocation if the decider is disabled
if (!enabled) {
- return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
+ return allocation.decision(Decision.YES, NAME, "the disk threshold decider is disabled");
}
// Allow allocation regardless if only a single data node is available
@@ -591,7 +622,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("only a single data node is present, allowing allocation");
}
- return allocation.decision(Decision.YES, NAME, "only a single data node is present");
+ return allocation.decision(Decision.YES, NAME, "there is only a single data node present");
}
// Fail open there is no info available
@@ -600,7 +631,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("cluster info unavailable for disk threshold decider, allowing allocation.");
}
- return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
+ return allocation.decision(Decision.YES, NAME, "the cluster info is unavailable");
}
// Fail open if there are no disk usages available
@@ -608,7 +639,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation");
}
- return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
+ return allocation.decision(Decision.YES, NAME, "disk usages are unavailable");
}
return null;
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
index 0b69ba2a19..38a2a39fc7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
@@ -32,8 +32,9 @@ import org.elasticsearch.common.settings.Settings;
import java.util.Locale;
/**
- * This allocation decider allows shard allocations / rebalancing via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} /
- * {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}.
+ * This allocation decider allows shard allocations / rebalancing via the cluster wide settings
+ * {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting
+ * {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}.
* The per index settings overrides the cluster wide setting.
*
* <p>
@@ -98,7 +99,7 @@ public class EnableAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
- return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
+ return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of allocation");
}
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
@@ -133,7 +134,7 @@ public class EnableAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
- return allocation.decision(Decision.YES, NAME, "rebalance disabling is ignored");
+ return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of relocation");
}
Settings indexSettings = allocation.routingNodes().metaData().getIndexSafe(shardRouting.index()).getSettings();
@@ -167,7 +168,8 @@ public class EnableAllocationDecider extends AllocationDecider {
/**
* Allocation values or rather their string representation to be used used with
- * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING}
+ * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} /
+ * {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING}
* via cluster / index settings.
*/
public enum Allocation {
@@ -193,7 +195,8 @@ public class EnableAllocationDecider extends AllocationDecider {
/**
* Rebalance values or rather their string representation to be used used with
- * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING}
+ * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} /
+ * {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING}
* via cluster / index settings.
*/
public enum Rebalance {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java
index d1aa0d8b58..eb59c26121 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java
@@ -50,11 +50,14 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
* would disallow the allocation. Filters are applied in the following order:
* <ol>
* <li><tt>required</tt> - filters required allocations.
- * If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate on the filtered node</li>
+ * If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate
+ * on the filtered node</li>
* <li><tt>include</tt> - filters "allowed" allocations.
- * If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for the filtered node</li>
+ * If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for
+ * the filtered node</li>
* <li><tt>exclude</tt> - filters "prohibited" allocations.
- * If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the filtered node</li>
+ * If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the
+ * filtered node</li>
* </ol>
*/
public class FilterAllocationDecider extends AllocationDecider {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java
index eb9c5cf8ee..95540d89a6 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java
@@ -52,7 +52,7 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
return isVersionCompatible(shardRouting.restoreSource(), node, allocation);
} else {
// fresh primary, we can allocate wherever
- return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
+ return allocation.decision(Decision.YES, NAME, "the primary shard is new and can be allocated anywhere");
}
} else {
// relocating primary, only migrate to newer host
@@ -70,16 +70,17 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
}
}
- private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {
+ private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target,
+ RoutingAllocation allocation) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (target.node().version().onOrAfter(source.node().version())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
* differences in the lucene index format etc.*/
- return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than source node version [%s]",
+ return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than source node version [%s]",
target.node().version(), source.node().version());
} else {
- return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than source node version [%s]",
+ return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the source node version [%s]",
target.node().version(), source.node().version());
}
}
@@ -87,10 +88,10 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) {
if (target.node().version().onOrAfter(restoreSource.version())) {
/* we can allocate if we can restore from a snapshot that is older or on the same version */
- return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than snapshot version [%s]",
+ return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than snapshot version [%s]",
target.node().version(), restoreSource.version());
} else {
- return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than snapshot version [%s]",
+ return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the snapshot version [%s]",
target.node().version(), restoreSource.version());
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java
index 039abd8749..869c631306 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java
@@ -41,8 +41,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
// its ok to check for active here, since in relocation, a shard is split into two in routing
// nodes, once relocating, and one initializing
if (!allocation.routingNodes().allReplicasActive(shardRouting)) {
- return allocation.decision(Decision.NO, NAME, "not all replicas are active in cluster");
+ return allocation.decision(Decision.NO, NAME, "rebalancing can not occur if not all replicas are active in the cluster");
}
- return allocation.decision(Decision.YES, NAME, "all replicas are active in cluster");
+ return allocation.decision(Decision.YES, NAME, "all replicas are active in the cluster, rebalancing can occur");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java
index 1c5a3f93b7..59ab67c309 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java
@@ -45,12 +45,12 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
- return allocation.decision(Decision.YES, NAME, "shard is primary");
+ return allocation.decision(Decision.YES, NAME, "shard is primary and can be allocated");
}
ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) {
- return allocation.decision(Decision.NO, NAME, "primary shard is not yet active");
+ return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active");
}
- return allocation.decision(Decision.YES, NAME, "primary is already active");
+ return allocation.decision(Decision.YES, NAME, "primary shard for this replica is already active");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java
index 44eb7d0e2f..f0b4fdf35c 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java
@@ -61,7 +61,8 @@ public class SameShardAllocationDecider extends AllocationDecider {
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting);
for (ShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
- return allocation.decision(Decision.NO, NAME, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId());
+ return allocation.decision(Decision.NO, NAME,
+ "the shard cannot be allocated on the same node id [%s] on which it already exists", node.nodeId());
}
}
if (sameHost) {
@@ -85,7 +86,7 @@ public class SameShardAllocationDecider extends AllocationDecider {
for (ShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
return allocation.decision(Decision.NO, NAME,
- "shard cannot be allocated on same host [%s] it already exists on", node.nodeId());
+ "shard cannot be allocated on the same host [%s] on which it already exists", node.nodeId());
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java
index 04247525f1..eb25651635 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java
@@ -93,7 +93,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
- return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
+ return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
}
@@ -110,14 +110,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
}
}
if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME,
+ "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
- return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
+ return allocation.decision(Decision.YES, NAME,
+ "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
}
@@ -130,7 +132,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
- return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
+ return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
}
@@ -149,14 +151,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
// Subtle difference between the `canAllocate` and `canRemain` is that
// this checks > while canAllocate checks >=
if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
if (indexShardLimit > 0 && indexShardCount > indexShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME,
+ "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
- return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
+ return allocation.decision(Decision.YES, NAME,
+ "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
}
@@ -168,7 +172,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (clusterShardLimit <= 0) {
- return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0",
+ return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [cluster: %d] <= 0",
clusterShardLimit);
}
@@ -181,10 +185,10 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
nodeShardCount++;
}
if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
- return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node",
+ return allocation.decision(Decision.YES, NAME, "the shard count is under node limit [%d] of total shards per node",
clusterShardLimit);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
index d656afc803..54cfb6407d 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
@@ -54,7 +54,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
}
/**
- * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from given settings
+ * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from
+ * given settings
*
* @param settings {@link org.elasticsearch.common.settings.Settings} to use
*/
@@ -66,7 +67,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
public SnapshotInProgressAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
super(settings);
enableRelocation = CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING.get(settings);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, this::setEnableRelocation);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING,
+ this::setEnableRelocation);
}
private void setEnableRelocation(boolean enableRelocation) {
@@ -104,14 +106,18 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
- if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
- logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId());
+ if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null &&
+ shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]",
+ shardRouting.shardId(), shardSnapshotStatus.nodeId());
+ }
return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]",
shardRouting.shardId(), shardSnapshotStatus.nodeId());
}
}
}
- return allocation.decision(Decision.YES, NAME, "shard not primary or relocation disabled");
+ return allocation.decision(Decision.YES, NAME, "the shard is not primary or relocation is disabled");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java
index ca6b312da4..6eb44351c7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java
@@ -84,11 +84,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings);
concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, this::setPrimariesInitialRecoveries);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setConcurrentIncomingRecoverries);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
+ this::setPrimariesInitialRecoveries);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
+ this::setConcurrentIncomingRecoverries);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
+ this::setConcurrentOutgoingRecoverries);
- logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
+ logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " +
+ "node_initial_primaries_recoveries [{}]",
+ concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
}
private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) {
@@ -118,7 +123,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
if (primariesInRecovery >= primariesInitialRecoveries) {
- return allocation.decision(Decision.THROTTLE, NAME, "too many primaries currently recovering [%d], limit: [%d]",
+ return allocation.decision(Decision.THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]",
primariesInRecovery, primariesInitialRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
@@ -137,13 +142,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId());
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentOutRecoveries >= concurrentOutgoingRecoveries) {
- return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards currently recovering [%d], limit: [%d]",
+ return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]",
currentOutRecoveries, concurrentOutgoingRecoveries);
} else if (currentInRecoveries >= concurrentIncomingRecoveries) {
- return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards currently recovering [%d], limit: [%d]",
+ return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]",
currentInRecoveries, concurrentIncomingRecoveries);
} else {
- return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d] incoming: [%d]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries);
+ return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
+ currentOutRecoveries,
+ concurrentOutgoingRecoveries,
+ currentInRecoveries,
+ concurrentIncomingRecoveries);
}
}
}