/* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayAllocator; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; /** * This service manages the node allocation of a cluster. For this reason the * {@link AllocationService} keeps {@link AllocationDeciders} to choose nodes * for shard allocation. This class also manages new nodes joining the cluster * and rerouting of shards. */ public class AllocationService extends AbstractComponent { private final AllocationDeciders allocationDeciders; private final GatewayAllocator gatewayAllocator; private final ShardsAllocator shardsAllocator; private final ClusterInfoService clusterInfoService; @Inject public AllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) { super(settings); this.allocationDeciders = allocationDeciders; this.gatewayAllocator = gatewayAllocator; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; } /** * Applies the started shards. Note, shards can be called several times within this method. *
* If the same instance of the routing table is returned, then no change has been made.
*/ public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List extends ShardRouting> startedShards) { return applyStartedShards(clusterState, startedShards, true); } public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List extends ShardRouting> startedShards, boolean withReroute) { RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards, clusterInfoService.getClusterInfo()); boolean changed = applyStartedShards(routingNodes, startedShards); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } gatewayAllocator.applyStartedShards(allocation); if (withReroute) { reroute(allocation); } final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString()); logClusterHealthStateChange( new ClusterStateHealth(clusterState), new ClusterStateHealth(clusterState.metaData(), result.routingTable()), "shards started [" + startedShardsAsString + "] ..." ); return result; } protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) { return buildChangedResult(metaData, routingNodes, new RoutingExplanations()); } protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) { final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build(); MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable); return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations); } /** * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. * * @param currentMetaData {@link MetaData} object from before the routing table was changed. * @param newRoutingTable new {@link RoutingTable} created by the allocation change * @return adapted {@link MetaData}, potentially the original one if no change was needed. */ static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) { // make sure index meta data and routing tables are in sync w.r.t active allocation ids MetaData.Builder metaDataBuilder = null; for (IndexRoutingTable indexRoutingTable : newRoutingTable) { final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex()); if (indexMetaData == null) { throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName()); } IndexMetaData.Builder indexMetaDataBuilder = null; for (IndexShardRoutingTable shardRoutings : indexRoutingTable) { Set* If the same instance of the routing table is returned, then no change has been made.
*/ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List* If the same instance of the routing table is returned, then no change has been made. */ public RoutingAllocation.Result reroute(ClusterState clusterState, String reason) { return reroute(clusterState, reason, false); } /** * Reroutes the routing table based on the live nodes. *
* If the same instance of the routing table is returned, then no change has been made.
*/
protected RoutingAllocation.Result reroute(ClusterState clusterState, String reason, boolean debug) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
reason
);
return result;
}
private void logClusterHealthStateChange(ClusterStateHealth previousStateHealth, ClusterStateHealth newStateHealth, String reason) {
ClusterHealthStatus previousHealth = previousStateHealth.getStatus();
ClusterHealthStatus currentHealth = newStateHealth.getStatus();
if (!previousHealth.equals(currentHealth)) {
logger.info("Cluster health status changed from [{}] to [{}] (reason: [{}]).", previousHealth, currentHealth, reason);
}
}
private boolean reroute(RoutingAllocation allocation) {
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation);
// create a sorted list of from nodes with least number of shards to the maximum ones
applyNewNodes(allocation);
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
updateLeftDelayOfUnassignedShards(allocation, settings);
changed |= gatewayAllocator.allocateUnassigned(allocation);
}
changed |= shardsAllocator.allocate(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
return changed;
}
// public for testing
public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocation, Settings settings) {
for (ShardRouting shardRouting : allocation.routingNodes().unassigned()) {
final MetaData metaData = allocation.metaData();
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardRouting.index());
shardRouting.unassignedInfo().updateDelay(allocation.getCurrentNanoTime(), settings, indexMetaData.getSettings());
}
}
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
if (routingNodes.unassigned().getNumPrimaries() == 0) {
// move out if we don't have unassigned primaries
return changed;
}
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
// routingNodes.hasUnassignedPrimaries() will potentially be false
for (ShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
// remove dangling replicas that are initializing for primary shards
changed |= failReplicasForUnassignedPrimary(allocation, shardEntry);
ShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
if (candidate != null) {
routingNodes.swapPrimaryFlag(shardEntry, candidate);
if (candidate.relocatingNodeId() != null) {
changed = true;
// its also relocating, make sure to move the other routing to primary
RoutingNode node = routingNodes.node(candidate.relocatingNodeId());
if (node != null) {
for (ShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(candidate.shardId()) && !shardRouting.primary()) {
routingNodes.swapPrimaryFlag(shardRouting);
break;
}
}
}
}
IndexMetaData index = allocation.metaData().getIndexSafe(candidate.index());
if (IndexMetaData.isIndexUsingShadowReplicas(index.getSettings())) {
routingNodes.reinitShadowPrimary(candidate);
changed = true;
}
}
}
}
return changed;
}
/**
* Applies the new nodes to the routing nodes and returns them (just the
* new nodes);
*/
private void applyNewNodes(RoutingAllocation allocation) {
final RoutingNodes routingNodes = allocation.routingNodes();
for (ObjectCursor